From 760dba689d3ad71ef272bd146abb565ac4d66d3b Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Mon, 14 Oct 2024 16:53:57 +0800 Subject: [PATCH] feat: PPT-525 Added scoped signals --- spec/controllers/notifications_spec.cr | 109 +++++++++++++++++- spec/controllers/root_spec.cr | 38 +++++- src/placeos-rest-api/controllers/metadata.cr | 18 +-- .../controllers/notifications.cr | 7 +- src/placeos-rest-api/controllers/root.cr | 9 +- src/placeos-rest-api/controllers/systems.cr | 2 +- src/placeos-rest-api/controllers/users.cr | 2 +- src/placeos-rest-api/controllers/zones.cr | 2 +- 8 files changed, 161 insertions(+), 26 deletions(-) diff --git a/spec/controllers/notifications_spec.cr b/spec/controllers/notifications_spec.cr index 09d8e650..db2c29a9 100644 --- a/spec/controllers/notifications_spec.cr +++ b/spec/controllers/notifications_spec.cr @@ -20,7 +20,7 @@ module PlaceOS::Api result.status_code.should eq 202 end - it "should receive valid payload when google sends change notification" do + it "should receive valid payload when google sends change notification to unscoped destination" do PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! subscription_channel = "4ba78bf0-6a47-11e2-bcfd-0800200c9a66/event" @@ -63,6 +63,50 @@ module PlaceOS::Api subs.terminate end end + + it "should receive valid payload when google sends change notification to scoped destination" do + authority = PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! + subscription_channel = "#{authority.id}/4ba78bf0-6a47-11e2-bcfd-0800200c9a66/event" + + channel = Channel(String).new + subs = PlaceOS::Driver::Subscriptions.new + + _subscription = subs.channel subscription_channel do |_, message| + channel.send(message) + end + + result = client.post("#{PushNotifications.base_route}/google", headers: HTTP::Headers{ + "Host" => "localhost", + "Content-Type" => "application/json", + "Content-Length" => "0", + "X-Goog-Channel-ID" => "4ba78bf0-6a47-11e2-bcfd-0800200c9a66", + "X-Goog-Channel-Token" => "398348u3tu83ut8uu38", + "X-Goog-Channel-Expiration" => "Fri, 26 May 2023 01:13:52 GMT", + "X-Goog-Resource-ID" => "ret08u3rv24htgh289g", + "X-Goog-Resource-URI" => "https://www.googleapis.com/calendar/v3/calendars/my_calendar@gmail.com/events", + "X-Goog-Resource-State" => "exists", + "X-Goog-Message-Number" => "1", + }) + result.status_code.should eq 202 + + begin + select + when message = channel.receive + { + "event_type": "updated", + "resource_id": "ret08u3rv24htgh289g", + "resource_uri": "https://www.googleapis.com/calendar/v3/calendars/my_calendar@gmail.com/events", + "subscription_id": "4ba78bf0-6a47-11e2-bcfd-0800200c9a66", + "client_secret": "398348u3tu83ut8uu38", + "expiration_time": 1685063632, + }.to_json.should eq(message) + when timeout 2.seconds + raise "timeout" + end + ensure + subs.terminate + end + end end describe "microsoft push notifications" do @@ -78,7 +122,64 @@ module PlaceOS::Api result.body.should eq(token) end - it "should receive valid payload when microsoft sends change notification" do + it "should receive valid payload when microsoft sends change notification on scoped channel" do + authority = PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! + subscription_channel = "#{authority.id}/f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" + + channel = Channel(String).new + subs = PlaceOS::Driver::Subscriptions.new + + _subscription = subs.channel subscription_channel do |_, message| + channel.send(message) + end + + payload = <<-'JSON' + { + "value": [ + { + "subscriptionId": "f37536ac-b308-4bc7-b239-b2b51cd2ff24", + "subscriptionExpirationDateTime": "2023-05-26T23:29:18.2277768+00:00", + "changeType": "created", + "resource": "Users/2189c720-90d5-44ff-818b-fe585706ee90/Events/AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "resourceData": { + "@odata.type": "#Microsoft.Graph.Event", + "@odata.id": "Users/2189c720-90d5-44ff-818b-fe585706ee90/Events/AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "@odata.etag": "W/\"DwAAABYAAAAXxlVK8zI/TZLFIn9D86hXAAAEB/jr\"", + "id": "AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=" + }, + "clientState": "secretClientState", + "tenantId": "7f1d0cb7-93b9-405a-8dad-c21703b7af18" + } + ] + } + JSON + result = client.post("#{PushNotifications.base_route}/office365", body: payload, headers: HTTP::Headers{ + "Host" => "localhost", + "Content-Type" => "application/json", + }) + + result.status_code.should eq 202 + + begin + select + when message = channel.receive + { + "event_type": "created", + "resource_id": "AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "resource_uri": "Users/2189c720-90d5-44ff-818b-fe585706ee90/Events/AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "subscription_id": "f37536ac-b308-4bc7-b239-b2b51cd2ff24", + "client_secret": "secretClientState", + "expiration_time": 1685143758, + }.to_json.should eq(message) + when timeout 2.seconds + raise "timeout" + end + ensure + subs.terminate + end + end + + it "should receive valid payload when microsoft sends change notification on unscoped channel" do PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! subscription_channel = "f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" @@ -136,8 +237,8 @@ module PlaceOS::Api end it "should receive valid payload when microsoft sends lifecycle notification" do - PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! - subscription_channel = "f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" + authority = PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! + subscription_channel = "#{authority.id}/f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" channel = Channel(String).new subs = PlaceOS::Driver::Subscriptions.new diff --git a/spec/controllers/root_spec.cr b/spec/controllers/root_spec.cr index 291c2c70..b5b14b74 100644 --- a/spec/controllers/root_spec.cr +++ b/spec/controllers/root_spec.cr @@ -52,7 +52,34 @@ module PlaceOS::Api end describe "POST /signal" do - it "writes an arbitrary payload to a redis subscription" do + it "writes an arbitrary payload to a redis subscription on scoped channel" do + user, headers = Spec::Authentication.authenticated + subscription_channel = "#{user.authority.as(Model::Authority).id}/test" + channel = Channel(String).new + subs = PlaceOS::Driver::Subscriptions.new + + _subscription = subs.channel subscription_channel do |_, message| + channel.send(message) + end + + params = HTTP::Params{"channel" => "test"} + result = client.post(File.join(Root.base_route, "signal?#{params}"), body: "hello", headers: headers) + result.status_code.should eq 200 + + begin + select + when message = channel.receive + message.should eq "hello" + when timeout 2.seconds + raise "timeout" + end + ensure + subs.terminate + end + end + + it "writes an arbitrary payload to a redis subscription on un-scoped channel" do + _, headers = Spec::Authentication.authenticated subscription_channel = "test" channel = Channel(String).new subs = PlaceOS::Driver::Subscriptions.new @@ -62,7 +89,7 @@ module PlaceOS::Api end params = HTTP::Params{"channel" => subscription_channel} - result = client.post(File.join(Root.base_route, "signal?#{params}"), body: "hello", headers: Spec::Authentication.headers) + result = client.post(File.join(Root.base_route, "signal?#{params}"), body: "hello", headers: headers) result.status_code.should eq 200 begin @@ -83,19 +110,20 @@ module PlaceOS::Api end context "guest users" do - _, guest_header = Spec::Authentication.authentication(sys_admin: false, support: false, scope: [PlaceOS::Model::UserJWT::Scope::GUEST]) - it "prevented access to non-guest channels " do + _, guest_header = Spec::Authentication.authentication(sys_admin: false, support: false, scope: [PlaceOS::Model::UserJWT::Scope::GUEST]) + result = client.post(File.join(Root.base_route, "signal?channel=dummy"), body: "hello", headers: guest_header) result.status_code.should eq 403 end it "allowed access to guest channels" do + guest, guest_header = Spec::Authentication.authentication(sys_admin: false, support: false, scope: [PlaceOS::Model::UserJWT::Scope::GUEST]) subscription_channel = "/guest/dummy" channel = Channel(String).new subs = PlaceOS::Driver::Subscriptions.new - _subscription = subs.channel subscription_channel do |_, message| + _subscription = subs.channel "#{guest.authority.as(Model::Authority).id}#{subscription_channel}" do |_, message| channel.send(message) end diff --git a/src/placeos-rest-api/controllers/metadata.cr b/src/placeos-rest-api/controllers/metadata.cr index 9a3cc935..2bd2e582 100644 --- a/src/placeos-rest-api/controllers/metadata.cr +++ b/src/placeos-rest-api/controllers/metadata.cr @@ -103,17 +103,21 @@ module PlaceOS::Api mutate(parent_id, meta, merge: false) end - SIGNAL_CHANNEL = "placeos/metadata/changed" + UNSCOPED_SIGNAL_CHANNEL = "placeos/metadata/changed" + SCOPED_SIGNAL_CHANNEL = "placeos/%s/metadata/changed" - protected def self.signal_metadata(action : Symbol, metadata) : Nil + protected def self.signal_metadata(authority : String, action : Symbol, metadata) : Nil payload = { action: action, metadata: metadata, }.to_json - Log.info { "signalling #{SIGNAL_CHANNEL} with #{payload.bytesize} bytes" } + Log.info { "signalling #{UNSCOPED_SIGNAL_CHANNEL} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(UNSCOPED_SIGNAL_CHANNEL, payload) - ::PlaceOS::Driver::RedisStorage.with_redis &.publish(SIGNAL_CHANNEL, payload) + signal_channel = sprintf(SCOPED_SIGNAL_CHANNEL, authority) + Log.info { "signalling #{signal_channel} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(signal_channel, payload) end # Find (otherwise create) then update (or patch) the Metadata. @@ -126,7 +130,7 @@ module PlaceOS::Api metadata payload = metadata.interface - spawn { self.class.signal_metadata(:update, payload) } + spawn { self.class.signal_metadata(current_authority.not_nil!.id.to_s, :update, payload) } payload end @@ -142,11 +146,11 @@ module PlaceOS::Api spawn do if metadata_name.empty? - self.class.signal_metadata(:destroy_all, { + self.class.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, { parent_id: parent_id, }) else - self.class.signal_metadata(:destroy, { + self.class.signal_metadata(current_authority.not_nil!.id.to_s, :destroy, { parent_id: parent_id, name: metadata_name, }) diff --git a/src/placeos-rest-api/controllers/notifications.cr b/src/placeos-rest-api/controllers/notifications.cr index f7e292a0..eef8ed50 100644 --- a/src/placeos-rest-api/controllers/notifications.cr +++ b/src/placeos-rest-api/controllers/notifications.cr @@ -50,10 +50,11 @@ module PlaceOS::Api private def signal(notification) notification.notifications.each do |entry| payload = entry.to_payload - path = "placeos/#{entry.subscription_id}/event" - Log.info { "signalling #{path} with #{payload.bytesize} bytes" } - ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + ["placeos/#{entry.subscription_id}/event", "placeos/#{current_authority.not_nil!.id}/#{entry.subscription_id}/event"].each do |path| + Log.info { "signalling #{path} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + end end end end diff --git a/src/placeos-rest-api/controllers/root.cr b/src/placeos-rest-api/controllers/root.cr index bb0ccdae..b5536085 100644 --- a/src/placeos-rest-api/controllers/root.cr +++ b/src/placeos-rest-api/controllers/root.cr @@ -208,10 +208,11 @@ module PlaceOS::Api "" end - path = Path["placeos/"].join(channel).to_s - Log.info { "signalling #{path} with #{payload.bytesize} bytes" } - - ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + [Path["placeos/"].join(channel).to_s, Path["placeos/#{current_authority.not_nil!.id}/"].join(channel).to_s].each do |path| + # path = Path["placeos/#{current_authority.not_nil!.id}/"].join(channel).to_s + Log.info { "signalling #{path} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + end end # maps the database tables to indexes in elasticsearch diff --git a/src/placeos-rest-api/controllers/systems.cr b/src/placeos-rest-api/controllers/systems.cr index 2a7b2252..eb103276 100644 --- a/src/placeos-rest-api/controllers/systems.cr +++ b/src/placeos-rest-api/controllers/systems.cr @@ -309,7 +309,7 @@ module PlaceOS::Api def destroy : Nil cs_id = current_control_system.id current_control_system.destroy - spawn { Api::Metadata.signal_metadata(:destroy_all, {parent_id: cs_id}) } + spawn { Api::Metadata.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, {parent_id: cs_id}) } end # Return all zones for this system diff --git a/src/placeos-rest-api/controllers/users.cr b/src/placeos-rest-api/controllers/users.cr index 87b9bad3..d0a50e80 100644 --- a/src/placeos-rest-api/controllers/users.cr +++ b/src/placeos-rest-api/controllers/users.cr @@ -263,7 +263,7 @@ module PlaceOS::Api else user_id = user.id user.destroy - spawn { Api::Metadata.signal_metadata(:destroy_all, {parent_id: user_id}) } + spawn { Api::Metadata.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, {parent_id: user_id}) } end end diff --git a/src/placeos-rest-api/controllers/zones.cr b/src/placeos-rest-api/controllers/zones.cr index 83801cfa..ad9c40dd 100644 --- a/src/placeos-rest-api/controllers/zones.cr +++ b/src/placeos-rest-api/controllers/zones.cr @@ -160,7 +160,7 @@ module PlaceOS::Api def destroy : Nil zone_id = current_zone.id current_zone.destroy - spawn { Api::Metadata.signal_metadata(:destroy_all, {parent_id: zone_id}) } + spawn { Api::Metadata.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, {parent_id: zone_id}) } end # return metadata associcated with the selected zone