Skip to content

Commit

Permalink
Added remove pipes support
Browse files Browse the repository at this point in the history
  • Loading branch information
guyluz11 committed Jan 21, 2024
1 parent 2ba7b1d commit 6c94c3b
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 26 deletions.
15 changes: 7 additions & 8 deletions lib/domain/connections_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ part 'package:cybearjinni/infrastructure/connection_service/app_connection_servi
part 'package:cybearjinni/infrastructure/connection_service/hub_connection_service.dart';
part 'package:cybearjinni/infrastructure/connection_service/demo_connection_service.dart';
part 'package:cybearjinni/infrastructure/connection_service/none_connection_service.dart';
part 'package:cybearjinni/infrastructure/connection_service/remote_pipes_connection_service.dart';

enum ConnectionType {
appAsHub,
Expand All @@ -41,33 +42,31 @@ abstract interface class ConnectionsService {

static ConnectionType _currentConnectionType = ConnectionType.appAsHub;

static void setCurrentConnectionType(ConnectionType? connectionType) {
static void setCurrentConnectionType(ConnectionType connectionType) {
if (connectionType == _currentConnectionType) {
return;
}
_instance?.dispose();

_currentConnectionType = connectionType;

switch (connectionType) {
case ConnectionType.appAsHub:
_instance = _AppConnectionService();
_currentConnectionType = ConnectionType.appAsHub;
case ConnectionType.hub:
_currentConnectionType = ConnectionType.hub;
_instance = _HubConnectionService();
case ConnectionType.demo:
case ConnectionType.remotePipes:
_instance = _RemotePipesConnectionService();
case ConnectionType.demo:
_instance = _DemoConnectionService();
_currentConnectionType = ConnectionType.demo;
case ConnectionType.none:
case null:
_instance = _NoneConnectionService();
_currentConnectionType = ConnectionType.none;
}
}

static ConnectionType getCurrentConnectionType() => _currentConnectionType;

Future<bool> connect();
Future<bool> connect({String? address});

Future searchDevices();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ class _AppConnectionService implements ConnectionsService {
IcSynchronizer().getVendors();

@override
Future<bool> connect() async => true;
Future<bool> connect({String? address}) async => true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ class _DemoConnectionService implements ConnectionsService {
IcSynchronizer().getVendors();

@override
Future<bool> connect() async => true;
Future<bool> connect({String? address}) async => true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class _HubConnectionService implements ConnectionsService {
IcSynchronizer().getVendors();

@override
Future<bool> connect() async {
Future<bool> connect({String? address}) async {
searchDevices();
if (hubIp == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ class _NoneConnectionService implements ConnectionsService {
Future<List<VendorEntityInformation>> getVendors() async => [];

@override
Future<bool> connect() async => true;
Future<bool> connect({String? address}) async => true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
part of 'package:cybearjinni/domain/connections_service.dart';

class _RemotePipesConnectionService implements ConnectionsService {
/// Port to connect to the cbj hub, will change according to the current
/// running environment
int hubPort = 50051;

String? address;
String? networkBssid;
String? networkName;

ClientChannel? channel;
CbjHubClient? stub;

StreamController<MapEntry<String, DeviceEntityBase>>? entitiesStream;
StreamController<MapEntry<String, AreaEntity>>? areasStream;

BehaviorSubject<RequestsAndStatusFromHub> hubMessagesToApp =
BehaviorSubject<RequestsAndStatusFromHub>();

// StreamController<ClientStatusRequests> appMessagesToHub =
// StreamController<ClientStatusRequests>();

BehaviorSubject<ClientStatusRequests> appMessagesToHub = BehaviorSubject();

@override
Future dispose() async {
entitiesStream?.close();
}

@override
Future<HashMap<String, DeviceEntityBase>> get getEntities async {
appMessagesToHub.sink.add(
ClientStatusRequests(sendingType: SendingType.allEntities.name),
);
final HashMap<String, DeviceEntityBase> entities = HashMap();

await for (final RequestsAndStatusFromHub message
in hubMessagesToApp.stream) {
final SendingType sendingType =
SendingTypeExtension.fromString(message.sendingType);
if (sendingType != SendingType.allEntities) {
continue;
}

try {
final Map<String, String> entitiesMap = Map<String, String>.from(
jsonDecode(message.allRemoteCommands) as Map<String, dynamic>,
);
entities.addEntries(
entitiesMap.entries.map(
(e) => MapEntry(
e.key,
DeviceHelper.convertJsonStringToDomain(e.value),
),
),
);
} catch (e) {
logger.e('Error converting entities\n$e');
}
break;
}

return entities;
}

@override
Future<HashMap<String, AreaEntity>> get getAreas async {
appMessagesToHub.sink.add(
ClientStatusRequests(sendingType: SendingType.allAreas.name),
);

final HashMap<String, AreaEntity> areas = HashMap();

await for (final RequestsAndStatusFromHub message
in hubMessagesToApp.stream) {
final SendingType sendingType =
SendingTypeExtension.fromString(message.sendingType);
if (sendingType != SendingType.allAreas) {
continue;
}

try {
final Map<String, String> entitiesMap = Map<String, String>.from(
jsonDecode(message.allRemoteCommands) as Map<String, dynamic>,
);
areas.addEntries(
entitiesMap.entries.map(
(e) => MapEntry(
e.key,
AreaEntityDtos.fromJson(
jsonDecode(e.value) as Map<String, dynamic>,
).toDomain(),
),
),
);
} catch (e) {
logger.e('Error converting areas\n$e');
}
break;
}

return areas;
}

@override
Future<HashMap<String, SceneCbjEntity>> get getScenes async {
appMessagesToHub.sink.add(
ClientStatusRequests(sendingType: SendingType.allScenes.name),
);

final HashMap<String, SceneCbjEntity> scenesMap = HashMap();

await for (final RequestsAndStatusFromHub message
in hubMessagesToApp.stream) {
final SendingType sendingType =
SendingTypeExtension.fromString(message.sendingType);
if (sendingType != SendingType.allScenes) {
continue;
}

try {
final Map<String, String> entities = Map<String, String>.from(
jsonDecode(message.allRemoteCommands) as Map<String, dynamic>,
);
scenesMap.addEntries(
entities.entries.map(
(e) => MapEntry(
e.key,
SceneCbjDtos.fromJson(jsonDecode(e.value) as Map<String, dynamic>)
.toDomain(),
),
),
);
} catch (e) {
logger.e('Error converting scenes\n$e');
}
break;
}

return scenesMap;
}

@override
Future searchDevices() async {}

@override
void setEntityState(RequestActionObject action) {
appMessagesToHub.sink.add(
ClientStatusRequests(
sendingType: SendingType.setEntitiesAction.name,
allRemoteCommands: action.toInfrastructure().toJsonString(),
),
);
}

@override
Stream<MapEntry<String, DeviceEntityBase>> watchEntities() {
entitiesStream?.close();

entitiesStream = StreamController.broadcast();
return entitiesStream!.stream;
}

@override
Stream<MapEntry<String, AreaEntity>> watchAreas() {
areasStream?.close();

areasStream = StreamController.broadcast();
return areasStream!.stream;
}

@override
Future setNewArea(AreaEntity area) async {}

@override
Future setEtitiesToArea(String areaId, HashSet entities) async {}

@override
Future addScene(SceneCbjEntity scene) async {}

@override
Future activateScene(String id) async {}

@override
Future loginVendor(VendorLoginEntity value) async {}

@override
Future<List<VendorEntityInformation>> getVendors() async =>
IcSynchronizer().getVendors();

@override
Future<bool> connect({String? address}) async {
this.address = address;
if (this.address == null) {
return false;
}

connectHelper();

return true;
}

/// Connect directly to the Hub if possible
Future connectHelper() async {
if (address == null) {
return;
}

try {
channel = ClientChannel(
address!,
port: hubPort,
options:
const ChannelOptions(credentials: ChannelCredentials.insecure()),
);

channel!.onConnectionStateChanged.listen((event) {
logger.i('gRPC connection state $event');
});

stub = CbjHubClient(channel!);

final ResponseStream<RequestsAndStatusFromHub> response =
stub!.clientTransferEntities(
appMessagesToHub.stream,
);

// appMessagesToHub.sink
// .add(ClientStatusRequests(sendingType: SendingType.firstConnection));

hubMessagesToApp.addStream(response);
await Future.delayed(const Duration(seconds: 3));
} catch (e) {
logger.e('Caught error while stream with hub\n$e');
await channel?.shutdown();
}
}
}
2 changes: 1 addition & 1 deletion lib/infrastructure/core/injection.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import 'package:cbj_integrations_controller/integrations_controller.dart';
import 'package:cybearjinni/infrastructure/core/injection.config.dart';
import 'package:cybearjinni/infrastructure/core/logger.dart';
import 'package:get_it/get_it.dart';
import 'package:injectable/injectable.dart';
Expand Down
2 changes: 0 additions & 2 deletions lib/main.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import 'package:cbj_integrations_controller/integrations_controller.dart';
import 'package:cybearjinni/infrastructure/core/injection.dart';
import 'package:cybearjinni/presentation/core/app_widget.dart';
import 'package:cybearjinni/presentation/core/routes/app_router.dart';
Expand All @@ -8,7 +7,6 @@ import 'package:flutter/material.dart';

Future<Unit> main() async {
configureDependencies(EnvApp.dev);
configureInjection(Env.devPc);

WidgetsFlutterBinding.ensureInitialized();

Expand Down
2 changes: 1 addition & 1 deletion lib/presentation/pages/plus_button.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import 'package:iconify_flutter/icons/simple_icons.dart';
class PlusButtonPage extends StatelessWidget {
Future _logout(BuildContext context) async {
context.router.replace(const ConnectToHubRoute());
ConnectionsService.setCurrentConnectionType(null);
ConnectionsService.setCurrentConnectionType(ConnectionType.none);
}

@override
Expand Down
13 changes: 3 additions & 10 deletions lib/presentation/pages/remote_pipes_page.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'package:adaptive_action_sheet/adaptive_action_sheet.dart';
import 'package:auto_route/auto_route.dart';
import 'package:cbj_integrations_controller/integrations_controller.dart';
import 'package:cybearjinni/domain/connections_service.dart';
import 'package:cybearjinni/presentation/atoms/atoms.dart';
import 'package:cybearjinni/presentation/core/snack_bar_service.dart';
import 'package:cybearjinni/presentation/molecules/molecules.dart';
Expand Down Expand Up @@ -71,15 +71,9 @@ class _RemotePipesWidgetState extends State<RemotePipesWidget> {
if (remotePipesDomainName == null || remotePipesDomainName!.isEmpty) {
return;
}

final RemotePipesEntity remotePipesEntity =
RemotePipesEntity.empty().copyWith(
domainName: RemotePipesDomain(remotePipesDomainName!),
);

ConnectionsService.setCurrentConnectionType(ConnectionType.remotePipes);
ConnectionsService.instance.connect(address: remotePipesDomainName);
context.router.pop();
await IRemotePipesRepository.instance
.setRemotePipesDomainName(remotePipesEntity);
}

@override
Expand All @@ -91,7 +85,6 @@ class _RemotePipesWidgetState extends State<RemotePipesWidget> {
children: [
const TextAtom(
'Please insert the Remote Pipes domain',
style: TextStyle(color: Colors.black, fontSize: 25),
),
const SizedBox(
height: 60,
Expand Down
Loading

0 comments on commit 6c94c3b

Please sign in to comment.