Skip to content

Commit

Permalink
Add update robot semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
oysand committed Nov 29, 2024
1 parent 7ba31c5 commit 3d2bac2
Showing 1 changed file with 86 additions and 10 deletions.
96 changes: 86 additions & 10 deletions backend/api/EventHandlers/MqttEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class MqttEventHandler : EventHandlerBase

private readonly IServiceScopeFactory _scopeFactory;

private readonly Semaphore _updateRobotSemaphore = new(1, 1);

public MqttEventHandler(ILogger<MqttEventHandler> logger, IServiceScopeFactory scopeFactory)
{
_logger = logger;
Expand Down Expand Up @@ -100,20 +102,38 @@ private async void OnIsarStatus(object? sender, MqttReceivedArgs mqttArgs)
}
_logger.LogInformation("OnIsarStatus: Robot {robotName} has status {robotStatus} and current area {areaName}", preUpdatedRobot.Name, preUpdatedRobot.Status, preUpdatedRobot.CurrentArea?.Name);

_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating robot status");

var updatedRobot = await RobotService.UpdateRobotStatus(robot.Id, isarStatus.Status);

_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating robot status");

_logger.LogInformation("Updated status for robot {Name} to {Status}", updatedRobot.Name, updatedRobot.Status);


_logger.LogInformation("OnIsarStatus: Robot {robotName} has status {robotStatus} and current area {areaName}", updatedRobot.Name, updatedRobot.Status, updatedRobot.CurrentArea?.Name);

if (isarStatus.Status == RobotStatus.Available)
{
try { await RobotService.UpdateCurrentMissionId(robot.Id, null); }
try
{
_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating robot current mission id");

await RobotService.UpdateCurrentMissionId(robot.Id, null);
}
catch (RobotNotFoundException)
{
_logger.LogError("Robot {robotName} not found when updating current mission id to null", robot.Name);
return;
}
finally
{
_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating robot current mission id");
}
MissionScheduling.TriggerRobotAvailable(new RobotAvailableEventArgs(robot.Id));
}
else if (isarStatus.Status == RobotStatus.Offline)
Expand Down Expand Up @@ -176,20 +196,30 @@ private async void OnIsarRobotInfo(object? sender, MqttReceivedArgs mqttArgs)
return;
}

List<string> updatedFields = [];
try
{
_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating robot");

if (isarRobotInfo.VideoStreamQueries is not null) UpdateVideoStreamsIfChanged(isarRobotInfo.VideoStreamQueries, ref robot, ref updatedFields);
if (isarRobotInfo.Host is not null) UpdateHostIfChanged(isarRobotInfo.Host, ref robot, ref updatedFields);
List<string> updatedFields = [];

UpdatePortIfChanged(isarRobotInfo.Port, ref robot, ref updatedFields);
if (isarRobotInfo.VideoStreamQueries is not null) UpdateVideoStreamsIfChanged(isarRobotInfo.VideoStreamQueries, ref robot, ref updatedFields);
if (isarRobotInfo.Host is not null) UpdateHostIfChanged(isarRobotInfo.Host, ref robot, ref updatedFields);

if (isarRobotInfo.CurrentInstallation is not null) UpdateCurrentInstallationIfChanged(installation, ref robot, ref updatedFields);
if (isarRobotInfo.Capabilities is not null) UpdateRobotCapabilitiesIfChanged(isarRobotInfo.Capabilities, ref robot, ref updatedFields);
if (updatedFields.Count < 1) return;
UpdatePortIfChanged(isarRobotInfo.Port, ref robot, ref updatedFields);

robot = await RobotService.Update(robot);
if (isarRobotInfo.CurrentInstallation is not null) UpdateCurrentInstallationIfChanged(installation, ref robot, ref updatedFields);
if (isarRobotInfo.Capabilities is not null) UpdateRobotCapabilitiesIfChanged(isarRobotInfo.Capabilities, ref robot, ref updatedFields);
if (updatedFields.Count < 1) return;

_logger.LogInformation("Updated robot '{Id}' ('{RobotName}') in database: {Updates}", robot.Id, robot.Name, updatedFields);
robot = await RobotService.Update(robot);
_logger.LogInformation("Updated robot '{Id}' ('{RobotName}') in database: {Updates}", robot.Id, robot.Name, updatedFields);
}
finally
{
_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating robot");
}

}
catch (DbUpdateException e) { _logger.LogError(e, "Could not add robot to db"); }
Expand Down Expand Up @@ -279,18 +309,29 @@ private async void OnIsarMissionUpdate(object? sender, MqttReceivedArgs mqttArgs
{
try
{
_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating robot current area for localization mission successful");

var robotWithUpdatedArea = await RobotService.UpdateCurrentArea(flotillaMissionRun.Robot.Id, flotillaMissionRun.Area.Id);
}
catch (RobotNotFoundException)
{
_logger.LogError("Could not find robot '{RobotName}' with ID '{Id}'", flotillaMissionRun.Robot.Name, flotillaMissionRun.Robot.Id);
return;
}
finally
{
_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating robot current area for localization mission successful");
}
}
else if (flotillaMissionRun.Tasks.All((task) => task.Status == Database.Models.TaskStatus.Cancelled || task.Status == Database.Models.TaskStatus.Failed) || flotillaMissionRun.Status == MissionStatus.Aborted)
{
try
{
_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating robot current area for localization mission unsuccessful");

await RobotService.UpdateCurrentArea(flotillaMissionRun.Robot.Id, null);

_logger.LogError("Localization mission run {MissionRunId} was unsuccessful on {RobotId}, scheduled missions will be aborted", flotillaMissionRun.Id, flotillaMissionRun.Robot.Id);
Expand All @@ -302,6 +343,11 @@ private async void OnIsarMissionUpdate(object? sender, MqttReceivedArgs mqttArgs
_logger.LogError("Could not find robot '{RobotName}' with ID '{Id}'", flotillaMissionRun.Robot.Name, flotillaMissionRun.Robot.Id);
return;
}
finally
{
_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating robot current area for localization mission unsuccessful");
}

SignalRService.ReportGeneralFailToSignalR(flotillaMissionRun.Robot, "Failed Localization Mission", $"Failed localization mission for robot {flotillaMissionRun.Robot.Name}.");
_logger.LogError("Localization mission for robot '{RobotName}' failed.", isarMission.RobotName);
Expand Down Expand Up @@ -333,13 +379,21 @@ private async void OnIsarMissionUpdate(object? sender, MqttReceivedArgs mqttArgs
{
try
{
_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating robot current area");

await RobotService.UpdateCurrentArea(robot.Id, null);
}
catch (RobotNotFoundException)
{
_logger.LogError("Could not find robot '{RobotName}' with ID '{Id}'", robot.Name, robot.Id);
return;
}
finally
{
_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating robot current area");
}
}

_logger.LogInformation("Robot '{Id}' ('{Name}') - completed mission run {MissionRunId}", robot.IsarId, robot.Name, updatedFlotillaMissionRun.Id);
Expand Down Expand Up @@ -402,7 +456,15 @@ private async void OnIsarTaskUpdate(object? sender, MqttReceivedArgs mqttArgs)
private async void OnIsarBatteryUpdate(object? sender, MqttReceivedArgs mqttArgs)
{
var batteryStatus = (IsarBatteryMessage)mqttArgs.Message;

_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating battery");

var robot = await BatteryTimeseriesService.AddBatteryEntry(batteryStatus.BatteryLevel, batteryStatus.IsarId);

_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating battery");

if (robot == null) return;
robot.BatteryLevel = batteryStatus.BatteryLevel;

Expand All @@ -421,7 +483,15 @@ private async void OnIsarBatteryUpdate(object? sender, MqttReceivedArgs mqttArgs
private async void OnIsarPressureUpdate(object? sender, MqttReceivedArgs mqttArgs)
{
var pressureStatus = (IsarPressureMessage)mqttArgs.Message;

_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating pressure");

var robot = await PressureTimeseriesService.AddPressureEntry(pressureStatus.PressureLevel, pressureStatus.IsarId);

_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating pressure");

if (robot == null) return;
robot.PressureLevel = pressureStatus.PressureLevel;

Expand All @@ -442,7 +512,13 @@ private async void OnIsarPoseUpdate(object? sender, MqttReceivedArgs mqttArgs)
var poseStatus = (IsarPoseMessage)mqttArgs.Message;
var pose = new Pose(poseStatus.Pose);

_updateRobotSemaphore.WaitOne();
_logger.LogDebug("Semaphore acquired for updating pose");

await PoseTimeseriesService.AddPoseEntry(pose, poseStatus.IsarId);

_updateRobotSemaphore.Release();
_logger.LogDebug("Semaphore released after updating pose");
}

private async void OnIsarCloudHealthUpdate(object? sender, MqttReceivedArgs mqttArgs)
Expand Down

0 comments on commit 3d2bac2

Please sign in to comment.