diff --git a/dart/iot_sender/lib/iot_mqtt_listener.dart b/dart/iot_sender/lib/iot_mqtt_listener.dart index 3b25b60a..9dfc0451 100644 --- a/dart/iot_sender/lib/iot_mqtt_listener.dart +++ b/dart/iot_sender/lib/iot_mqtt_listener.dart @@ -9,7 +9,6 @@ import 'package:at_client/at_client.dart'; //import 'package:at_commons/at_commons.dart'; import 'package:iot_sender/models/send_hr02_receiver.dart'; import 'package:iot_sender/models/hro2_data_owner.dart'; -import 'package:iot_sender/models/hro2_device.dart'; final client = MqttServerClient('localhost', ''); final AtSignLogger logger = AtSignLogger('iotListen'); @@ -19,7 +18,8 @@ Random random = Random(); int fakeO2IntMinValue = 950; int fakeO2IntMaxValue = 995; // fakeO2 value in int, convert to double by dividing by 10 when publishing -int currentFakeO2IntValue = random.nextInt(fakeO2IntMaxValue - fakeO2IntMinValue) + fakeO2IntMinValue; +int currentFakeO2IntValue = + random.nextInt(fakeO2IntMaxValue - fakeO2IntMinValue) + fakeO2IntMinValue; int getNextFakeO2IntValue() { // get random int in range -5..+5 @@ -40,8 +40,12 @@ bool _sendO2 = true; int _putCounterHR = 0; int _putCounterO2 = 0; -Future iotListen(AtClientManager atClientManager, NotificationService notificationService, String fromAtsign, - String ownerAtsign, String deviceName) async { +Future iotListen( + AtClientManager atClientManager, + NotificationService notificationService, + String fromAtsign, + String ownerAtsign, + String deviceName) async { client.logging(on: false); client.setProtocolV311(); client.keepAlivePeriod = 20; @@ -71,14 +75,17 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti logger.info('Mosquitto client connected'); } else { /// Use status here rather than state if you also want the broker return code. - logger.severe('ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}'); + logger.severe( + 'ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}'); client.disconnect(); exit(-1); } - logger.info('calling atClient.put for HeartRate to ensure AtClient connection goes through authorization exchange'); + logger.info( + 'calling atClient.put for HeartRate to ensure AtClient connection goes through authorization exchange'); - logger.info('Initial put complete, AtClient connection should now be authorized'); + logger.info( + 'Initial put complete, AtClient connection should now be authorized'); /// Ok, lets try a subscription logger.info('Subscribing to the mqtt/mwc_hr topic'); @@ -98,14 +105,17 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti client.updates!.listen((List>? c) async { final recMess = c![0].payload as MqttPublishMessage; - final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message); + final pt = + MqttPublishPayload.bytesToStringAsString(recMess.payload.message); // Get list of dataOwners // - List dataOwnerAtsigns = await getDataOwners(atClient, ownerAtsign, deviceName); + List dataOwnerAtsigns = + await getDataOwners(atClient, ownerAtsign, deviceName); List toAtsigns = []; for (HrO2DataOwner owner in dataOwnerAtsigns) { - var addtoAtsigns = await getReceivers(atClient, owner.dataOwnerAtsign, owner.hrO2Device.sensorName); + var addtoAtsigns = await getReceivers( + atClient, owner.dataOwnerAtsign, owner.hrO2Device.sensorName); toAtsigns.addAll(addtoAtsigns); } @@ -122,7 +132,11 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti lastHeartRateDoubleValue = heartRateDoubleValue; for (var receiver in toAtsigns) { if (receiver.sendHR) { - await shareHeartRate(heartRateDoubleValue, fromAtsign, receiver.receiverAtsign, receiver.receiverShortname, + await shareHeartRate( + heartRateDoubleValue, + fromAtsign, + receiver.receiverAtsign, + receiver.receiverShortname, notificationService); } @@ -130,7 +144,11 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti // get random int between 0 and 101, then subtract 50 to get a number in range -50..+50 currentFakeO2IntValue = getNextFakeO2IntValue(); double fakeO2DoubleValue = currentFakeO2IntValue / 10; - await shareO2Sat(fakeO2DoubleValue, fromAtsign, receiver.receiverAtsign, receiver.receiverShortname, + await shareO2Sat( + fakeO2DoubleValue, + fromAtsign, + receiver.receiverAtsign, + receiver.receiverShortname, notificationService); } } @@ -144,7 +162,11 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti for (var receiver in toAtsigns) { if (receiver.sendO2) { await shareO2Sat( - o2SatDoubleValue, fromAtsign, receiver.receiverAtsign, receiver.receiverShortname, notificationService); + o2SatDoubleValue, + fromAtsign, + receiver.receiverAtsign, + receiver.receiverShortname, + notificationService); } } } @@ -161,17 +183,18 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti bpm = double.parse(beatBpmSpo[1]); spo = double.parse(beatBpmSpo[2]); } catch (e) { - logger.severe('Error in message sent to mqtt/mwc_beat_hr_o2 format HR,O2 and this was received: $pt'); + logger.severe( + 'Error in message sent to mqtt/mwc_beat_hr_o2 format HR,O2 and this was received: $pt'); } if (hrDetect) { for (var receiver in toAtsigns) { if (receiver.sendHR) { - await shareHeartRate( - bpm, fromAtsign, receiver.receiverAtsign, receiver.receiverShortname, notificationService); + await shareHeartRate(bpm, fromAtsign, receiver.receiverAtsign, + receiver.receiverShortname, notificationService); } if (receiver.sendO2) { - await shareO2Sat( - spo, fromAtsign, receiver.receiverAtsign, receiver.receiverShortname, notificationService); + await shareO2Sat(spo, fromAtsign, receiver.receiverAtsign, + receiver.receiverShortname, notificationService); } } } @@ -180,7 +203,8 @@ Future iotListen(AtClientManager atClientManager, NotificationService noti }); } -Future> getDataOwners(AtClient atClient, String ownerAtsign, String sensorName) async { +Future> getDataOwners( + AtClient atClient, String ownerAtsign, String sensorName) async { String dataOwnersJson = ''; //Using a Set first dedupes the list we eventually send back Set dataOwners = {}; @@ -212,7 +236,8 @@ Future> getDataOwners(AtClient atClient, String ownerAtsign, return (dataOwners.toList()); } -Future> getReceivers(AtClient atClient, String ownerAtsign, String sensorName) async { +Future> getReceivers( + AtClient atClient, String ownerAtsign, String sensorName) async { String receiversString = ''; const String libraryNamespace = 'iot_receiver'; @@ -245,8 +270,8 @@ Future> getReceivers(AtClient atClient, String ownerAtsig return toAtsigns; } -Future shareHeartRate(double heartRate, String atsign, String toAtsign, String sendToShortname, - NotificationService notificationService) async { +Future shareHeartRate(double heartRate, String atsign, String toAtsign, + String sendToShortname, NotificationService notificationService) async { if (!_sendHR) { return; } @@ -258,7 +283,9 @@ Future shareHeartRate(double heartRate, String atsign, String toAtsign, St logger.info('calling atClient.put for HeartRate #$thisHRPutNo'); try { await notificationService.notify( - NotificationParams.forText('HR:$heartRateAsString:$sendToShortname', toAtsign, shouldEncrypt: true), + NotificationParams.forText( + 'HR:$heartRateAsString:$sendToShortname', toAtsign, + shouldEncrypt: true), checkForFinalDeliveryStatus: false, onSuccess: (notification) { logger.info('SUCCESS:$notification'); }, onError: (notification) { @@ -272,8 +299,8 @@ Future shareHeartRate(double heartRate, String atsign, String toAtsign, St logger.info('atClient.put #$thisHRPutNo complete'); } -Future shareO2Sat(double o2Sat, String atsign, String toAtsign, String sendToShortname, - NotificationService notificationService) async { +Future shareO2Sat(double o2Sat, String atsign, String toAtsign, + String sendToShortname, NotificationService notificationService) async { if (!_sendO2) { return; } @@ -312,10 +339,12 @@ void onSubscribed(String topic) { /// The unsolicited disconnect callback void onDisconnected() { logger.info('OnDisconnected client callback - Client disconnection'); - if (client.connectionStatus!.disconnectionOrigin == MqttDisconnectionOrigin.solicited) { + if (client.connectionStatus!.disconnectionOrigin == + MqttDisconnectionOrigin.solicited) { logger.info('OnDisconnected callback is solicited, this is correct'); } else { - logger.severe('OnDisconnected callback is unsolicited or none, this is incorrect - exiting'); + logger.severe( + 'OnDisconnected callback is unsolicited or none, this is incorrect - exiting'); exit(-1); } } diff --git a/flutter/iot_receiver/lib/screens/data_owners_screen.dart b/flutter/iot_receiver/lib/screens/data_owners_screen.dart index 5c424f51..256b5459 100644 --- a/flutter/iot_receiver/lib/screens/data_owners_screen.dart +++ b/flutter/iot_receiver/lib/screens/data_owners_screen.dart @@ -21,6 +21,7 @@ class _DataOwnersScreenState extends State { @override Widget build(BuildContext context) { + _hrO2DataService.getDataOwners(); return Scaffold( appBar: NewGradientAppBar( title: const AutoSizeText( diff --git a/flutter/iot_receiver/lib/screens/receivers_screen.dart b/flutter/iot_receiver/lib/screens/receivers_screen.dart index d7fddad5..ec88e344 100644 --- a/flutter/iot_receiver/lib/screens/receivers_screen.dart +++ b/flutter/iot_receiver/lib/screens/receivers_screen.dart @@ -21,6 +21,8 @@ class _ReceiversScreenState extends State { @override Widget build(BuildContext context) { + _hrO2DataService.getReceivers(); + return Scaffold( appBar: NewGradientAppBar( title: const AutoSizeText( diff --git a/flutter/iot_receiver/lib/services/hro2_data_service.dart b/flutter/iot_receiver/lib/services/hro2_data_service.dart index 042a96f3..15c921ba 100644 --- a/flutter/iot_receiver/lib/services/hro2_data_service.dart +++ b/flutter/iot_receiver/lib/services/hro2_data_service.dart @@ -9,6 +9,7 @@ class Hro2DataService { static final Hro2DataService _singleton = Hro2DataService._internal(); final _logger = AtSignLogger('HrO2DataService'); + final _atClient = AtClientManager.getInstance().atClient; Hro2DataService._internal(); @@ -17,26 +18,48 @@ class Hro2DataService { } Future delete(AtKey atKey) async { - return AtClientManager.getInstance().atClient.delete(atKey); + return _atClient.delete(atKey); + } + + Future deleteAllForKey(String key) async { + var atKeys = await _atClient.getAtKeys(regex: key); + for (var atKey in atKeys) { + await delete(atKey); + } + return true; } Future deleteAllData() async { - AtKey deviceAtKey = AtKey()..key = AppConstants.deviceListKey; - var deviceKeyDeleted = await delete(deviceAtKey); - _logger.info('deleteAllData.deviceKeyDeleted $deviceKeyDeleted'); - AtKey receiverAtKey = AtKey()..key = AppConstants.receiverListKey; - var receiverKeyDeleted = await delete(receiverAtKey); - _logger.info('deleteAllData.receiverKeyDeleted $receiverKeyDeleted'); - AtKey dataOwnerAtKey = AtKey()..key = AppConstants.dataOwnerListKey; - var dataOwnerKeyDeleted = await delete(dataOwnerAtKey); - _logger.info('deleteAllData.dataOwnerKeyDeleted $dataOwnerKeyDeleted'); - return (deviceKeyDeleted && receiverKeyDeleted && dataOwnerKeyDeleted); - } - - ///Returns `AtFollowsValue` for [atKey]. + List keyStrings = AppConstants() as List; + for (var keyString in keyStrings) { + deleteAllForKey(keyString); + } + return true; + } + + Future> getDevices() async { + List hrO2DeviceList = []; + var atClient = _atClient; + var keys = await atClient.getAtKeys(regex: AppConstants.deviceKey); + for (var element in keys) { + var receiverData = await atClient.get(element); + _logger.info('getDevices got ${receiverData.value}'); + try { + HrO2Device hrO2Device = + HrO2Device.fromJson(jsonDecode(receiverData.value)); + hrO2DeviceList.add(hrO2Device); + } catch (error) { + // found some dirty data, consider deleting + _logger.severe('getDevices error $error for ${element.key}'); + // await atClient.delete(element); + } + } + return hrO2DeviceList; + } + Future> getDeviceList() async { AtKey atKey = AtKey()..key = AppConstants.deviceListKey; - var data = await AtClientManager.getInstance().atClient.get(atKey); + var data = await _atClient.get(atKey); _logger.info('getDeviceList got ${data.value}'); List hrO2DeviceList; hrO2DeviceList = (json.decode(data.value) as List) @@ -49,8 +72,7 @@ class Hro2DataService { Future putDeviceList(List hrO2DeviceList) async { AtKey atKey = AtKey()..key = AppConstants.deviceListKey; var value = jsonEncode(hrO2DeviceList); - var response = - await AtClientManager.getInstance().atClient.put(atKey, value); + var response = await _atClient.put(atKey, value); _logger.info('putDeviceList success = $response'); return response; } @@ -63,15 +85,34 @@ class Hro2DataService { deviceList.add(hrO2Device); AtKey atKey = AtKey()..key = AppConstants.deviceListKey; var value = jsonEncode(deviceList); - var response = - await AtClientManager.getInstance().atClient.put(atKey, value); + var response = await _atClient.put(atKey, value); _logger.info('addDeviceToList success = $response'); return response; } + Future> getReceivers() async { + List hrO2ReceiverList = []; + var atClient = _atClient; + var keys = await atClient.getAtKeys(regex: AppConstants.deviceReceiverKey); + for (var element in keys) { + var receiverData = await atClient.get(element); + _logger.info('getReceivers got ${receiverData.value}'); + try { + HrO2Receiver hrO2Receiver = + HrO2Receiver.fromJson(jsonDecode(receiverData.value)); + hrO2ReceiverList.add(hrO2Receiver); + } catch (error) { + // found some dirty data, consider deleting + _logger.severe('getReceivers error $error for ${element.key}'); + // await atClient.delete(element); + } + } + return hrO2ReceiverList; + } + Future> getReceiverList() async { AtKey atKey = AtKey()..key = AppConstants.receiverListKey; - var data = await AtClientManager.getInstance().atClient.get(atKey); + var data = await _atClient.get(atKey); _logger.info('getReceiverList got ${data.value}'); List hrO2ReceiverList; hrO2ReceiverList = (json.decode(data.value) as List) @@ -84,8 +125,7 @@ class Hro2DataService { Future putReceiverList(List hrO2ReceiverList) async { AtKey atKey = AtKey()..key = AppConstants.receiverListKey; var value = jsonEncode(hrO2ReceiverList); - var response = - await AtClientManager.getInstance().atClient.put(atKey, value); + var response = await _atClient.put(atKey, value); _logger.info('putReceiverList success = $response'); return response; } @@ -118,9 +158,29 @@ class Hro2DataService { return receiverSuccess && deviceReceiverSuccess; } + Future> getDataOwners() async { + List hrO2DataOwnerList = []; + var keys = + await _atClient.getAtKeys(regex: AppConstants.deviceDataOwnerKey); + for (var element in keys) { + var dataOwnerData = await _atClient.get(element); + _logger.info('getDataOwners got ${dataOwnerData.value}'); + try { + HrO2DataOwner hrO2DataOwner = + HrO2DataOwner.fromJson(jsonDecode(dataOwnerData.value)); + hrO2DataOwnerList.add(hrO2DataOwner); + } catch (error) { + // found some dirty data, consider deleting + _logger.severe('getDataOwners error $error for ${element.key}'); + // await atClient.delete(element); + } + } + return hrO2DataOwnerList; + } + Future> getDataOwnerList() async { AtKey atKey = AtKey()..key = AppConstants.dataOwnerListKey; - var data = await AtClientManager.getInstance().atClient.get(atKey); + var data = await _atClient.get(atKey); _logger.info('getDataOwnerList got ${data.value}'); List hrO2DataOwnerList; hrO2DataOwnerList = (json.decode(data.value) as List) @@ -134,7 +194,7 @@ class Hro2DataService { AtKey atKey = AtKey() ..key = AppConstants.deviceDataOwnerKey ..sharedBy = "@mwcmanager"; - var data = await AtClientManager.getInstance().atClient.get(atKey); + var data = await _atClient.get(atKey); _logger.info('getDeviceDataOwner got ${data.value}'); HrO2DataOwner hrO2DataOwner = HrO2DataOwner.fromJson(json.decode(data.value)); @@ -147,8 +207,7 @@ class Hro2DataService { Future putDataOwnerList(List hrO2DataOwnerList) async { AtKey atKey = AtKey()..key = AppConstants.dataOwnerListKey; var value = jsonEncode(hrO2DataOwnerList); - var response = - await AtClientManager.getInstance().atClient.put(atKey, value); + var response = await _atClient.put(atKey, value); _logger.info('putDataOwnerList success = $response'); return response; } @@ -174,7 +233,7 @@ class Hro2DataService { .atClient .put(dataOwnerKey, sharedDataOwnerJson); _logger.info('shareDeviceSuccess success = $sharedDataOwnerSuccess'); - // Share the same object to the device + // Share the list to the device dataOwnerKey.sharedWith = hrO2DataOwner.hrO2Device.deviceAtsign; var deviceDataOwnerSuccess = await AtClientManager.getInstance() .atClient @@ -189,6 +248,7 @@ class Hro2DataService { class AppConstants { static const String libraryNamespace = 'iot_receiver'; static const String deviceListKey = 'device_list.$libraryNamespace'; + static const String deviceKey = 'device.$libraryNamespace'; static const String receiverListKey = 'receiver_list.$libraryNamespace'; static const String dataOwnerListKey = 'data_owner_list.$libraryNamespace'; static const String deviceDataOwnerKey =