Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream media using LiveKit #1745

Merged
merged 14 commits into from
Oct 15, 2024
22 changes: 22 additions & 0 deletions backend/api/EventHandlers/MqttEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public override void Subscribe()
MqttService.MqttIsarPressureReceived += OnIsarPressureUpdate;
MqttService.MqttIsarPoseReceived += OnIsarPoseUpdate;
MqttService.MqttIsarCloudHealthReceived += OnIsarCloudHealthUpdate;
MqttService.MqttIsarMediaConfigReceived += OnIsarMediaConfigUpdate;
}

public override void Unsubscribe()
Expand All @@ -71,6 +72,7 @@ public override void Unsubscribe()
MqttService.MqttIsarPressureReceived -= OnIsarPressureUpdate;
MqttService.MqttIsarPoseReceived -= OnIsarPoseUpdate;
MqttService.MqttIsarCloudHealthReceived -= OnIsarCloudHealthUpdate;
MqttService.MqttIsarMediaConfigReceived -= OnIsarMediaConfigUpdate;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await stoppingToken; }
Expand Down Expand Up @@ -485,5 +487,25 @@ private async void OnIsarCloudHealthUpdate(object? sender, MqttReceivedArgs mqtt

TeamsMessageService.TriggerTeamsMessageReceived(new TeamsMessageEventArgs(message));
}

private async void OnIsarMediaConfigUpdate(object? sender, MqttReceivedArgs mqttArgs)
{
var isarTelemetyUpdate = (IsarMediaConfigMessage)mqttArgs.Message;

var robot = await RobotService.ReadByIsarId(isarTelemetyUpdate.IsarId);
if (robot == null)
{
_logger.LogInformation("Received message from unknown ISAR instance {Id} with robot name {Name}", isarTelemetyUpdate.IsarId, isarTelemetyUpdate.RobotName);
sondreo marked this conversation as resolved.
Show resolved Hide resolved
return;
}
await SignalRService.SendMessageAsync("Media stream config received", robot.CurrentInstallation,
new MediaConfig
{
Url = isarTelemetyUpdate.Url,
Token = isarTelemetyUpdate.Token,
RobotId = robot.Id,
MediaConnectionType = isarTelemetyUpdate.MediaConnectionType
});
}
}
}
25 changes: 25 additions & 0 deletions backend/api/MQTT/MessageModels/IsarMediaConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System.Text.Json.Serialization;
using Api.Services.Models;

namespace Api.Mqtt.MessageModels
{
#nullable disable
public class IsarMediaConfigMessage : MqttMessage
{
[JsonPropertyName("robot_name")]
public string RobotName { get; set; }

[JsonPropertyName("isar_id")]
public string IsarId { get; set; }

[JsonPropertyName("url")]
public string Url { get; set; }

[JsonPropertyName("token")]
public string Token { get; set; }

[JsonPropertyName("mediaConnectionType")]
public MediaConnectionType MediaConnectionType { get; set; }

}
}
5 changes: 5 additions & 0 deletions backend/api/MQTT/MqttService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public MqttService(ILogger<MqttService> logger, IConfiguration config)
public static event EventHandler<MqttReceivedArgs>? MqttIsarPressureReceived;
public static event EventHandler<MqttReceivedArgs>? MqttIsarPoseReceived;
public static event EventHandler<MqttReceivedArgs>? MqttIsarCloudHealthReceived;
public static event EventHandler<MqttReceivedArgs>? MqttIsarMediaConfigReceived;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Expand Down Expand Up @@ -152,6 +153,9 @@ private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs messageRe
case Type type when type == typeof(IsarCloudHealthMessage):
OnIsarTopicReceived<IsarCloudHealthMessage>(content);
break;
case Type type when type == typeof(IsarMediaConfigMessage):
OnIsarTopicReceived<IsarMediaConfigMessage>(content);
break;
default:
_logger.LogWarning(
"No callback defined for MQTT message type '{type}'",
Expand Down Expand Up @@ -301,6 +305,7 @@ private void OnIsarTopicReceived<T>(string content) where T : MqttMessage
_ when type == typeof(IsarPressureMessage) => MqttIsarPressureReceived,
_ when type == typeof(IsarPoseMessage) => MqttIsarPoseReceived,
_ when type == typeof(IsarCloudHealthMessage) => MqttIsarCloudHealthReceived,
_ when type == typeof(IsarMediaConfigMessage) => MqttIsarMediaConfigReceived,
_
=> throw new NotImplementedException(
$"No event defined for message type '{typeof(T).Name}'"
Expand Down
3 changes: 3 additions & 0 deletions backend/api/MQTT/MqttTopics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public static class MqttTopics
},
{
"isar/+/cloud_health", typeof(IsarCloudHealthMessage)
},
{
"isar/+/media_config", typeof(IsarMediaConfigMessage)
}
};

Expand Down
20 changes: 20 additions & 0 deletions backend/api/Services/Models/MediaConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Text.Json.Serialization;
namespace Api.Services.Models
{
public struct MediaConfig
{
[JsonPropertyName("url")]
public string? Url { get; set; }

[JsonPropertyName("token")]
public string? Token { get; set; }

[JsonPropertyName("robotId")]
public string? RobotId { get; set; }

[JsonPropertyName("mediaConnectionType")]
public MediaConnectionType MediaConnectionType { get; set; }
}

public enum MediaConnectionType { LiveKit };
}
3 changes: 2 additions & 1 deletion backend/api/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"isar/+/battery",
"isar/+/pressure",
"isar/+/pose",
"isar/+/cloud_health"
"isar/+/cloud_health",
"isar/+/media_config"
],
"MaxRetryAttempts": 5,
"ShouldFailOnMaxRetries": false
Expand Down
3 changes: 2 additions & 1 deletion backend/api/appsettings.Local.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"isar/+/battery",
"isar/+/pressure",
"isar/+/pose",
"isar/+/cloud_health"
"isar/+/cloud_health",
"isar/+/media_config"
],
"MaxRetryAttempts": 5,
"ShouldFailOnMaxRetries": false
Expand Down
3 changes: 2 additions & 1 deletion backend/api/appsettings.Production.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"isar/+/battery",
"isar/+/pressure",
"isar/+/pose",
"isar/+/cloud_health"
"isar/+/cloud_health",
"isar/+/media_config"
],
"MaxRetryAttempts": 15,
"ShouldFailOnMaxRetries": true
Expand Down
3 changes: 2 additions & 1 deletion backend/api/appsettings.Staging.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"isar/+/battery",
"isar/+/pressure",
"isar/+/pose",
"isar/+/cloud_health"
"isar/+/cloud_health",
"isar/+/media_config"
],
"MaxRetryAttempts": 15,
"ShouldFailOnMaxRetries": true
Expand Down
3 changes: 2 additions & 1 deletion backend/api/appsettings.Test.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"isar/+/battery",
"isar/+/pressure",
"isar/+/pose",
"isar/+/cloud_health"
"isar/+/cloud_health",
"isar/+/media_config"
],
"MaxRetryAttempts": 15,
"ShouldFailOnMaxRetries": true
Expand Down
Loading