Skip to content

Commit

Permalink
feat: PPT-864 fix and capture online and last seen activity (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
naqvis authored Sep 13, 2023
1 parent 679a490 commit ebb769f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
8 changes: 4 additions & 4 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ shards:

action-controller:
git: https://github.com/spider-gazelle/action-controller.git
version: 7.2.2
version: 7.2.3

active-model:
git: https://github.com/spider-gazelle/active-model.git
Expand Down Expand Up @@ -183,7 +183,7 @@ shards:

office365:
git: https://github.com/placeos/office365.git
version: 1.22.0
version: 1.23.0

open_api:
git: https://github.com/elbywan/open_api.cr.git
Expand All @@ -203,7 +203,7 @@ shards:

opentelemetry-instrumentation:
git: https://github.com/wyhaines/opentelemetry-instrumentation.cr.git
version: 0.5.3+git.commit.5c0323d0046719bae4a7a325ca5ce0e7405bf803
version: 0.5.3+git.commit.cd3994b22d9f7a0d68752698974d3873a1b2fce2

opentelemetry-sdk: # Overridden
git: https://github.com/wyhaines/opentelemetry-sdk.cr.git
Expand Down Expand Up @@ -263,7 +263,7 @@ shards:

placeos-models:
git: https://github.com/placeos/models.git
version: 9.18.8
version: 9.20.0

placeos-resource:
git: https://github.com/place-labs/resource.git
Expand Down
32 changes: 28 additions & 4 deletions src/placeos-rest-api/controllers/edges/connection_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module PlaceOS::Api
edge_lock.synchronize do
if existing_socket = edge_sockets[edge_id]?
existing_socket.on_close { }
existing_socket.close
existing_socket.close rescue nil
end

edge_sockets[edge_id] = socket
Expand All @@ -37,15 +37,33 @@ module PlaceOS::Api
else
node_found = core_discovery.find(edge_id)
add_core(edge_id, current_node: node_found)
ping_tasks[edge_id] = Tasker.every(30.seconds) do
ping_tasks[edge_id] = Tasker.every(10.seconds) do
socket.ping rescue nil
core_sockets[edge_id].ping rescue nil
nil
end

socket.on_ping do |string|
core_sockets[edge_id].ping(string)
rescue e
Log.error(exception: e) { {edge_id: edge_id, message: "while forwarding ping to core socket"} }
end

socket.on_pong do |string|
Model::Edge.update(edge_id, {last_seen: Time.utc, online: true})
begin
core_sockets[edge_id].pong(string)
rescue e
Log.error(exception: e) { {edge_id: edge_id, message: "while forwarding pong to core socket"} }
end
end
end
end

socket.on_close { edge_lock.synchronize { remove(edge_id) if socket == edge_sockets[edge_id]? } }
socket.on_close do
Model::Edge.update(edge_id, {last_seen: Time.utc, online: false})
edge_lock.synchronize { remove(edge_id) if socket == edge_sockets[edge_id]? }
end
rescue e
Log.error(exception: e) { {edge_id: edge_id, message: "while adding edge socket"} }
remove(edge_id)
Expand Down Expand Up @@ -115,7 +133,13 @@ module PlaceOS::Api
Log.debug { {message: "from core", packet: message} }
edge_sockets[edge_id].send(message)
}
core_socket.on_binary { |bytes| edge_sockets[edge_id].stream &.write(bytes) }
core_socket.on_binary do |bytes|
Log.debug { {message: "Got binary packet from core", size: bytes.size, edge_active: !edge_socket.closed?} }
edge_socket.stream &.write(bytes)
end

core_socket.on_ping { |string| edge_socket.ping(string) }
core_socket.on_pong { |string| edge_socket.pong(string) }

# Link edge to core
link_edge(edge_socket, edge_id)
Expand Down

0 comments on commit ebb769f

Please sign in to comment.