Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added WebSocket support in AtLookup #709

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b586311
changed version of websocket
purnimavenkatasubbu Sep 26, 2024
caa7591
websocket changes
purnimavenkatasubbu Oct 7, 2024
ffc02f5
Merge remote-tracking branch 'origin/trunk' into ws_version2
gkc Oct 7, 2024
63fc299
fixed closing ws connection
purnimavenkatasubbu Oct 15, 2024
df4658d
changes
purnimavenkatasubbu Oct 16, 2024
ab88f5e
Merge branch 'trunk' of https://github.com/atsign-foundation/at_libra…
purnimavenkatasubbu Oct 16, 2024
d88a139
Merge branch 'trunk' of https://github.com/atsign-foundation/at_libra…
purnimavenkatasubbu Oct 22, 2024
417234c
refactored code
purnimavenkatasubbu Oct 22, 2024
0eca375
Merge branch 'trunk' of https://github.com/atsign-foundation/at_libra…
purnimavenkatasubbu Oct 23, 2024
b78716a
refactored code based on factories
purnimavenkatasubbu Nov 4, 2024
3f453af
removed commented code
purnimavenkatasubbu Nov 4, 2024
0452d85
Merge branch 'trunk' of https://github.com/atsign-foundation/at_libra…
purnimavenkatasubbu Nov 5, 2024
a72cf1a
addressed review comments
purnimavenkatasubbu Nov 5, 2024
a7c8e64
updated unit tests
purnimavenkatasubbu Nov 12, 2024
aee94bb
fixed failing test
purnimavenkatasubbu Nov 12, 2024
a67171b
fix: Updated mock references to prevent null pointer errors
sitaram-kalluri Nov 13, 2024
49b840b
fix: Add unit test - A test to verify the connections are invalidated…
sitaram-kalluri Nov 13, 2024
4f23462
fixed failing unit test
purnimavenkatasubbu Nov 13, 2024
95fc90c
pubspec
purnimavenkatasubbu Nov 13, 2024
a635041
correct atsign and removed commented code
purnimavenkatasubbu Nov 13, 2024
65f6141
removed useWebSocket flag and added unit tests
purnimavenkatasubbu Nov 18, 2024
61e2913
renamed variables and minor changes
purnimavenkatasubbu Nov 18, 2024
b4eef12
addressed review comments
purnimavenkatasubbu Nov 19, 2024
ab72ec6
Merge branch 'trunk' of https://github.com/atsign-foundation/at_libra…
purnimavenkatasubbu Nov 21, 2024
be8058b
addressed review commits
purnimavenkatasubbu Nov 21, 2024
b32009b
removed commented code
purnimavenkatasubbu Nov 22, 2024
9ea9415
review comments
purnimavenkatasubbu Nov 25, 2024
5dc5a47
removed commented code
purnimavenkatasubbu Nov 25, 2024
3e05187
addressed more review comments
purnimavenkatasubbu Dec 4, 2024
8e5aaee
changing heirarchy and removeing outboundconnection
purnimavenkatasubbu Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/at_lookup/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 4.0.0
- feat: Introduce websocket support in at_lookup_impl
## 3.0.49
- build[deps]: Upgraded the following packages:
- at_commons to v5.0.0
Expand Down
3 changes: 1 addition & 2 deletions packages/at_lookup/lib/at_lookup.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ library at_lookup;

export 'src/at_lookup.dart';
export 'src/at_lookup_impl.dart';
export 'src/connection/outbound_connection.dart';
export 'src/connection/outbound_connection_impl.dart';
export 'src/exception/at_lookup_exception.dart';
export 'src/monitor_client.dart';
export 'src/cache/secondary_address_finder.dart';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have modified SecondaryUrlFinder constructor and exported this class, the change should be treated as breaking and have a major release

Copy link
Member Author

@purnimavenkatasubbu purnimavenkatasubbu Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated be8058b

export 'src/cache/cacheable_secondary_address_finder.dart';
export 'src/util/secure_socket_util.dart';
export 'src/connection/at_lookup_connection_factory.dart';
121 changes: 55 additions & 66 deletions packages/at_lookup/lib/src/at_lookup_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@ import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:at_chops/at_chops.dart';
import 'package:at_commons/at_builders.dart';
import 'package:at_commons/at_commons.dart';
import 'package:at_lookup/at_lookup.dart';
import 'package:at_lookup/src/connection/outbound_message_listener.dart';
import 'package:at_lookup/src/connection/at_connection.dart';
import 'package:at_lookup/src/connection/at_message_listener.dart';
import 'package:at_utils/at_logger.dart';
import 'package:crypto/crypto.dart';
import 'package:crypton/crypton.dart';
import 'package:mutex/mutex.dart';
import 'package:at_chops/at_chops.dart';

class AtLookupImpl implements AtLookUp {
final logger = AtSignLogger('AtLookup');

/// Listener for reading verb responses from the remote server
late OutboundMessageListener messageListener;
late AtMessageListener messageListener;

OutboundConnection? _connection;
AtConnection? _connection; // Represents Socket or WebSocket connection

OutboundConnection? get connection => _connection;
AtConnection? get connection => _connection;

late AtLookupConnectionFactory atConnectionFactory;

@override
late SecondaryAddressFinder secondaryAddressFinder;
Expand All @@ -40,16 +43,10 @@ class AtLookupImpl implements AtLookUp {
String? cramSecret;

// ignore: prefer_typing_uninitialized_variables
var outboundConnectionTimeout;
var atConnectionTimeout;

late SecureSocketConfig _secureSocketConfig;

late final AtLookupSecureSocketFactory socketFactory;

late final AtLookupSecureSocketListenerFactory socketListenerFactory;

late AtLookupOutboundConnectionFactory outboundConnectionFactory;

/// Represents the client configurations.
late Map<String, dynamic> _clientConfig;

Expand All @@ -61,9 +58,9 @@ class AtLookupImpl implements AtLookUp {
SecondaryAddressFinder? secondaryAddressFinder,
SecureSocketConfig? secureSocketConfig,
Map<String, dynamic>? clientConfig,
AtLookupSecureSocketFactory? secureSocketFactory,
AtLookupSecureSocketListenerFactory? socketListenerFactory,
AtLookupOutboundConnectionFactory? outboundConnectionFactory}) {
AtLookupConnectionFactory? atConnectionFactory}) {
// Default to secure socket factory
this.atConnectionFactory = atConnectionFactory ?? AtLookupSecureSocketFactory();
_currentAtSign = atSign;
_rootDomain = rootDomain;
_rootPort = rootPort;
Expand All @@ -73,11 +70,6 @@ class AtLookupImpl implements AtLookUp {
// Stores the client configurations.
// If client configurations are not available, defaults to empty map
_clientConfig = clientConfig ?? {};
socketFactory = secureSocketFactory ?? AtLookupSecureSocketFactory();
this.socketListenerFactory =
socketListenerFactory ?? AtLookupSecureSocketListenerFactory();
this.outboundConnectionFactory =
outboundConnectionFactory ?? AtLookupOutboundConnectionFactory();
}

@Deprecated('use CacheableSecondaryAddressFinder')
Expand Down Expand Up @@ -246,16 +238,17 @@ class AtLookupImpl implements AtLookUp {
await _connection!.close();
}
logger.info('Creating new connection');
//1. find secondary url for atsign from lookup library

// 1. Find secondary URL for the atsign from the lookup library
SecondaryAddress secondaryAddress =
await secondaryAddressFinder.findSecondary(_currentAtSign);
var host = secondaryAddress.host;
var port = secondaryAddress.port;
//2. create a connection to secondary server
await createOutBoundConnection(
host, port.toString(), _currentAtSign, _secureSocketConfig);
//3. listen to server response
messageListener = socketListenerFactory.createListener(_connection!);

// 2. Create a connection to the secondary server
await createAtConnection(host, port.toString(), _secureSocketConfig);

// 3. Listen to server response
messageListener.listen();
logger.info('New connection created OK');
}
Expand Down Expand Up @@ -436,7 +429,7 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _pkamAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
Expand All @@ -458,13 +451,13 @@ class AtLookupImpl implements AtLookUp {
var pkamResponse = await messageListener.read();
if (pkamResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException(
'Failed connecting to $_currentAtSign. $pkamResponse');
}
}
return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_pkamAuthenticationMutex.release();
}
Expand All @@ -475,7 +468,7 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _pkamAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
Expand Down Expand Up @@ -505,13 +498,13 @@ class AtLookupImpl implements AtLookUp {
var pkamResponse = await messageListener.read();
if (pkamResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException(
'Failed connecting to $_currentAtSign. $pkamResponse');
}
}
return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_pkamAuthenticationMutex.release();
}
Expand All @@ -524,32 +517,41 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _cramAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {

if (!_connection!.metaData.isAuthenticated) {
// Use the connection and message listener dynamically
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
.buildCommand());

var fromResponse = await messageListener.read(
transientWaitTimeMillis: 4000, maxWaitMilliSeconds: 10000);
logger.info('from result:$fromResponse');

if (fromResponse.isEmpty) {
return false;
}

fromResponse = fromResponse.trim().replaceAll('data:', '');

var digestInput = '$secret$fromResponse';
var bytes = utf8.encode(digestInput);
var digest = sha512.convert(bytes);

await _sendCommand('cram:$digest\n');
var cramResponse = await messageListener.read(
transientWaitTimeMillis: 4000, maxWaitMilliSeconds: 10000);

if (cramResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException('Auth failed');
}
}
return _connection!.getMetaData()!.isAuthenticated;

return _connection!.metaData.isAuthenticated;
} finally {
_cramAuthenticationMutex.release();
}
Expand Down Expand Up @@ -624,23 +626,30 @@ class AtLookupImpl implements AtLookUp {
}

bool _isAuthRequired() {
return !isConnectionAvailable() ||
!(_connection!.getMetaData()!.isAuthenticated);
return !isConnectionAvailable() || !(_connection!.metaData.isAuthenticated);
}

Future<bool> createOutBoundConnection(String host, String port,
String toAtSign, SecureSocketConfig secureSocketConfig) async {
Future<bool> createAtConnection(
String host, String port, SecureSocketConfig secureSocketConfig) async {
try {
SecureSocket secureSocket =
await socketFactory.createSocket(host, port, secureSocketConfig);
_connection =
outboundConnectionFactory.createOutboundConnection(secureSocket);
if (outboundConnectionTimeout != null) {
_connection!.setIdleTime(outboundConnectionTimeout);
// Create the socket connection using the factory
final underlying = await atConnectionFactory.createUnderlying(
host, port, secureSocketConfig);

// Create at connection and listener using the factory's methods
AtConnection atConnection =
atConnectionFactory.createConnection(underlying);
messageListener = atConnectionFactory.createListener(atConnection);

_connection = atConnection;

// Set idle time if applicable
if (atConnectionTimeout != null) {
atConnection.setIdleTime(atConnectionTimeout);
}
} on SocketException {
throw SecondaryConnectException(
'unable to connect to secondary $toAtSign on $host:$port');
'Unable to connect to secondary $_currentAtSign on $host:$port');
}
return true;
}
Expand Down Expand Up @@ -682,23 +691,3 @@ class AtLookupImpl implements AtLookUp {
@override
String? enrollmentId;
}

class AtLookupSecureSocketFactory {
Future<SecureSocket> createSocket(
String host, String port, SecureSocketConfig socketConfig) async {
return await SecureSocketUtil.createSecureSocket(host, port, socketConfig);
}
}

class AtLookupSecureSocketListenerFactory {
OutboundMessageListener createListener(
OutboundConnection outboundConnection) {
return OutboundMessageListener(outboundConnection);
}
}

class AtLookupOutboundConnectionFactory {
OutboundConnection createOutboundConnection(SecureSocket secureSocket) {
return OutboundConnectionImpl(secureSocket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ class SecondaryAddressCacheEntry {
class SecondaryUrlFinder {
final String _rootDomain;
final int _rootPort;
late final AtLookupSecureSocketFactory _socketFactory;
late final AtLookupConnectionFactory _socketFactory;

SecondaryUrlFinder(this._rootDomain, this._rootPort,
{AtLookupSecureSocketFactory? socketFactory}) {
{AtLookupConnectionFactory? socketFactory}) {
_socketFactory = socketFactory ?? AtLookupSecureSocketFactory();
}

Expand Down Expand Up @@ -188,11 +188,11 @@ class SecondaryUrlFinder {
var prompt = false;
var once = true;

socket = await _socketFactory.createSocket(
socket = await _socketFactory.createUnderlying(
_rootDomain, '$_rootPort', SecureSocketConfig());
_logger.finer('findSecondaryUrl: connection to root server established');
// listen to the received data event stream
socket.listen((List<int> event) async {
socket!.listen((List<int> event) async {
_logger.finest('root socket listener received: $event');
answer = utf8.decode(event);

Expand Down
56 changes: 43 additions & 13 deletions packages/at_lookup/lib/src/connection/at_connection.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
import 'dart:io';
import 'dart:async';

abstract class AtConnection {
/// Write a data to the underlying socket of the connection
abstract class AtConnection<T> {
/// The underlying connection
T get underlying;

/// Metadata for the connection
final AtConnectionMetaData metaData = AtConnectionMetaData();

/// The idle timeout in milliseconds (default: 10 minutes)
int idleTimeMillis = 600000;

AtConnection() {
metaData.created = DateTime.now().toUtc();
}

/// Writes data to the underlying socket of the connection.
/// @param - data - Data to write to the socket
/// @throws [AtIOException] for any exception during the operation
void write(String data);
FutureOr<void> write(String data);

/// Retrieves the socket of underlying connection
Socket getSocket();

/// closes the underlying connection
/// Closes the underlying connection.
Future<void> close();

/// Returns true if the connection is invalid
bool isInValid();
/// Returns true if the connection is invalid.
bool isInValid() {
return _isIdle() || metaData.isClosed || metaData.isStale;
}

/// Updates the idle time for the connection (Socket or WebSocket).
void setIdleTime(int? idleTimeMillis) {
if (idleTimeMillis != null) {
this.idleTimeMillis = idleTimeMillis;
}
}

/// Checks if the connection has been idle for longer than the specified timeout.
bool _isIdle() {
return _getIdleTimeMillis() > idleTimeMillis;
}

/// Gets the connection metadata
AtConnectionMetaData? getMetaData();
/// Calculates the idle time in milliseconds.
int _getIdleTimeMillis() {
var lastAccessedTime = metaData.lastAccessed;
lastAccessedTime ??= metaData.created;
var currentTime = DateTime.now().toUtc();
return currentTime.difference(lastAccessedTime!).inMilliseconds;
}
}

abstract class AtConnectionMetaData {
/// Metadata for [AtConnection].
class AtConnectionMetaData {
bool isAuthenticated = false;
DateTime? lastAccessed;
DateTime? created;
Expand Down
Loading
Loading