Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Websocket uptake in at_lookup #732

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/at_lookup/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 4.0.0
- feat: Introduce websocket support in at_lookup_impl
## 3.0.49
- build[deps]: Upgraded the following packages:
- at_commons to v5.0.0
Expand Down
3 changes: 1 addition & 2 deletions packages/at_lookup/lib/at_lookup.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ library at_lookup;

export 'src/at_lookup.dart';
export 'src/at_lookup_impl.dart';
export 'src/connection/outbound_connection.dart';
export 'src/connection/outbound_connection_impl.dart';
export 'src/exception/at_lookup_exception.dart';
export 'src/monitor_client.dart';
export 'src/cache/secondary_address_finder.dart';
export 'src/cache/cacheable_secondary_address_finder.dart';
export 'src/util/secure_socket_util.dart';
export 'src/connection/at_connection_factory.dart';
122 changes: 56 additions & 66 deletions packages/at_lookup/lib/src/at_lookup_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@ import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:at_chops/at_chops.dart';
import 'package:at_commons/at_builders.dart';
import 'package:at_commons/at_commons.dart';
import 'package:at_lookup/at_lookup.dart';
import 'package:at_lookup/src/connection/outbound_message_listener.dart';
import 'package:at_lookup/src/connection/at_connection.dart';
import 'package:at_lookup/src/connection/at_message_listener.dart';
import 'package:at_utils/at_logger.dart';
import 'package:crypto/crypto.dart';
import 'package:crypton/crypton.dart';
import 'package:mutex/mutex.dart';
import 'package:at_chops/at_chops.dart';

class AtLookupImpl implements AtLookUp {
final logger = AtSignLogger('AtLookup');

/// Listener for reading verb responses from the remote server
late OutboundMessageListener messageListener;
late AtMessageListener messageListener;

OutboundConnection? _connection;
AtConnection? _connection; // Represents Socket or WebSocket connection

OutboundConnection? get connection => _connection;
AtConnection? get connection => _connection;

late AtConnectionFactory atConnectionFactory;

@override
late SecondaryAddressFinder secondaryAddressFinder;
Expand All @@ -40,16 +43,10 @@ class AtLookupImpl implements AtLookUp {
String? cramSecret;

// ignore: prefer_typing_uninitialized_variables
var outboundConnectionTimeout;
var atConnectionTimeout;

late SecureSocketConfig _secureSocketConfig;

late final AtLookupSecureSocketFactory socketFactory;

late final AtLookupSecureSocketListenerFactory socketListenerFactory;

late AtLookupOutboundConnectionFactory outboundConnectionFactory;

/// Represents the client configurations.
late Map<String, dynamic> _clientConfig;

Expand All @@ -61,9 +58,10 @@ class AtLookupImpl implements AtLookUp {
SecondaryAddressFinder? secondaryAddressFinder,
SecureSocketConfig? secureSocketConfig,
Map<String, dynamic>? clientConfig,
AtLookupSecureSocketFactory? secureSocketFactory,
AtLookupSecureSocketListenerFactory? socketListenerFactory,
AtLookupOutboundConnectionFactory? outboundConnectionFactory}) {
AtConnectionFactory? atConnectionFactory}) {
// Default to secure socket factory
this.atConnectionFactory =
atConnectionFactory ?? AtLookupSecureSocketFactory();
_currentAtSign = atSign;
_rootDomain = rootDomain;
_rootPort = rootPort;
Expand All @@ -73,11 +71,6 @@ class AtLookupImpl implements AtLookUp {
// Stores the client configurations.
// If client configurations are not available, defaults to empty map
_clientConfig = clientConfig ?? {};
socketFactory = secureSocketFactory ?? AtLookupSecureSocketFactory();
this.socketListenerFactory =
socketListenerFactory ?? AtLookupSecureSocketListenerFactory();
this.outboundConnectionFactory =
outboundConnectionFactory ?? AtLookupOutboundConnectionFactory();
}

@Deprecated('use CacheableSecondaryAddressFinder')
Expand Down Expand Up @@ -246,16 +239,17 @@ class AtLookupImpl implements AtLookUp {
await _connection!.close();
}
logger.info('Creating new connection');
//1. find secondary url for atsign from lookup library

// 1. Find secondary URL for the atsign from the lookup library
SecondaryAddress secondaryAddress =
await secondaryAddressFinder.findSecondary(_currentAtSign);
var host = secondaryAddress.host;
var port = secondaryAddress.port;
//2. create a connection to secondary server
await createOutBoundConnection(
host, port.toString(), _currentAtSign, _secureSocketConfig);
//3. listen to server response
messageListener = socketListenerFactory.createListener(_connection!);

// 2. Create a connection to the secondary server
await createAtConnection(host, port.toString(), _secureSocketConfig);

// 3. Listen to server response
messageListener.listen();
logger.info('New connection created OK');
}
Expand Down Expand Up @@ -436,7 +430,7 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _pkamAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
Expand All @@ -458,13 +452,13 @@ class AtLookupImpl implements AtLookUp {
var pkamResponse = await messageListener.read();
if (pkamResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException(
'Failed connecting to $_currentAtSign. $pkamResponse');
}
}
return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_pkamAuthenticationMutex.release();
}
Expand All @@ -475,7 +469,7 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _pkamAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
Expand Down Expand Up @@ -505,13 +499,13 @@ class AtLookupImpl implements AtLookUp {
var pkamResponse = await messageListener.read();
if (pkamResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException(
'Failed connecting to $_currentAtSign. $pkamResponse');
}
}
return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_pkamAuthenticationMutex.release();
}
Expand All @@ -524,32 +518,41 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _cramAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {

if (!_connection!.metaData.isAuthenticated) {
// Use the connection and message listener dynamically
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
.buildCommand());

var fromResponse = await messageListener.read(
transientWaitTimeMillis: 4000, maxWaitMilliSeconds: 10000);
logger.info('from result:$fromResponse');

if (fromResponse.isEmpty) {
return false;
}

fromResponse = fromResponse.trim().replaceAll('data:', '');

var digestInput = '$secret$fromResponse';
var bytes = utf8.encode(digestInput);
var digest = sha512.convert(bytes);

await _sendCommand('cram:$digest\n');
var cramResponse = await messageListener.read(
transientWaitTimeMillis: 4000, maxWaitMilliSeconds: 10000);

if (cramResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException('Auth failed');
}
}
return _connection!.getMetaData()!.isAuthenticated;

return _connection!.metaData.isAuthenticated;
} finally {
_cramAuthenticationMutex.release();
}
Expand Down Expand Up @@ -624,23 +627,30 @@ class AtLookupImpl implements AtLookUp {
}

bool _isAuthRequired() {
return !isConnectionAvailable() ||
!(_connection!.getMetaData()!.isAuthenticated);
return !isConnectionAvailable() || !(_connection!.metaData.isAuthenticated);
}

Future<bool> createOutBoundConnection(String host, String port,
String toAtSign, SecureSocketConfig secureSocketConfig) async {
Future<bool> createAtConnection(
String host, String port, SecureSocketConfig secureSocketConfig) async {
try {
SecureSocket secureSocket =
await socketFactory.createSocket(host, port, secureSocketConfig);
_connection =
outboundConnectionFactory.createOutboundConnection(secureSocket);
if (outboundConnectionTimeout != null) {
_connection!.setIdleTime(outboundConnectionTimeout);
// Create the socket connection using the factory
final underlying = await atConnectionFactory.createUnderlying(
host, port, secureSocketConfig);

// Create at connection and listener using the factory's methods
AtConnection atConnection =
atConnectionFactory.createConnection(underlying);
messageListener = atConnectionFactory.createListener(atConnection);

_connection = atConnection;

// Set idle time if applicable
if (atConnectionTimeout != null) {
atConnection.setIdleTime(atConnectionTimeout);
}
} on SocketException {
throw SecondaryConnectException(
'unable to connect to secondary $toAtSign on $host:$port');
'Unable to connect to secondary $_currentAtSign on $host:$port');
}
return true;
}
Expand Down Expand Up @@ -682,23 +692,3 @@ class AtLookupImpl implements AtLookUp {
@override
String? enrollmentId;
}

class AtLookupSecureSocketFactory {
Future<SecureSocket> createSocket(
String host, String port, SecureSocketConfig socketConfig) async {
return await SecureSocketUtil.createSecureSocket(host, port, socketConfig);
}
}

class AtLookupSecureSocketListenerFactory {
OutboundMessageListener createListener(
OutboundConnection outboundConnection) {
return OutboundMessageListener(outboundConnection);
}
}

class AtLookupOutboundConnectionFactory {
OutboundConnection createOutboundConnection(SecureSocket secureSocket) {
return OutboundConnectionImpl(secureSocket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ class SecondaryAddressCacheEntry {
class SecondaryUrlFinder {
final String _rootDomain;
final int _rootPort;
late final AtLookupSecureSocketFactory _socketFactory;
late final AtConnectionFactory _socketFactory;

SecondaryUrlFinder(this._rootDomain, this._rootPort,
{AtLookupSecureSocketFactory? socketFactory}) {
{AtConnectionFactory? socketFactory}) {
_socketFactory = socketFactory ?? AtLookupSecureSocketFactory();
}

Expand Down Expand Up @@ -188,11 +188,11 @@ class SecondaryUrlFinder {
var prompt = false;
var once = true;

socket = await _socketFactory.createSocket(
socket = await _socketFactory.createUnderlying(
_rootDomain, '$_rootPort', SecureSocketConfig());
_logger.finer('findSecondaryUrl: connection to root server established');
// listen to the received data event stream
socket.listen((List<int> event) async {
socket!.listen((List<int> event) async {
_logger.finest('root socket listener received: $event');
answer = utf8.decode(event);

Expand Down
56 changes: 43 additions & 13 deletions packages/at_lookup/lib/src/connection/at_connection.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
import 'dart:io';
import 'dart:async';

abstract class AtConnection {
/// Write a data to the underlying socket of the connection
abstract class AtConnection<T> {
/// The underlying connection
T get underlying;

/// Metadata for the connection
final AtConnectionMetaData metaData = AtConnectionMetaData();

/// The idle timeout in milliseconds (default: 10 minutes)
int idleTimeMillis = 600000;

AtConnection() {
metaData.created = DateTime.now().toUtc();
}

/// Writes data to the underlying socket of the connection.
/// @param - data - Data to write to the socket
/// @throws [AtIOException] for any exception during the operation
void write(String data);
FutureOr<void> write(String data);

/// Retrieves the socket of underlying connection
Socket getSocket();

/// closes the underlying connection
/// Closes the underlying connection.
Future<void> close();

/// Returns true if the connection is invalid
bool isInValid();
/// Returns true if the connection is invalid.
bool isInValid() {
return _isIdle() || metaData.isClosed || metaData.isStale;
}

/// Updates the idle time for the connection (Socket or WebSocket).
void setIdleTime(int? idleTimeMillis) {
if (idleTimeMillis != null) {
this.idleTimeMillis = idleTimeMillis;
}
}

/// Checks if the connection has been idle for longer than the specified timeout.
bool _isIdle() {
return _getIdleTimeMillis() > idleTimeMillis;
}

/// Gets the connection metadata
AtConnectionMetaData? getMetaData();
/// Calculates the idle time in milliseconds.
int _getIdleTimeMillis() {
var lastAccessedTime = metaData.lastAccessed;
lastAccessedTime ??= metaData.created;
var currentTime = DateTime.now().toUtc();
return currentTime.difference(lastAccessedTime!).inMilliseconds;
}
}

abstract class AtConnectionMetaData {
/// Metadata for [AtConnection].
class AtConnectionMetaData {
bool isAuthenticated = false;
DateTime? lastAccessed;
DateTime? created;
Expand Down
Loading
Loading