Skip to content

Commit

Permalink
Make it possible for apps to provide a custom connection name
Browse files Browse the repository at this point in the history
Closes #155, references #154.
  • Loading branch information
michaelklishin committed Mar 11, 2019
1 parent 90b9046 commit 1a8ad99
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 18 deletions.
40 changes: 38 additions & 2 deletions RMQClient/RMQConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
waiterFactory:(nonnull id<RMQWaiterFactory>)waiterFactory
heartbeatSender:(nonnull id<RMQHeartbeatSender>)heartbeatSender;

/// @brief Connection tuning, customisable TLS, all recovery options.
/// @brief Connection tuning, customisable config, all recovery options.
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
tlsOptions:(nonnull RMQTLSOptions *)tlsOptions
userProvidedConnectionName:(nonnull NSString *)connectionName
channelMax:(nonnull NSNumber *)channelMax
frameMax:(nonnull NSNumber *)frameMax
heartbeat:(nonnull NSNumber *)heartbeat
Expand All @@ -96,6 +96,42 @@
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose;

/// @brief Connection tuning, customisable TLS, key recovery options.
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
tlsOptions:(nonnull RMQTLSOptions *)tlsOptions
delegate:(nullable id<RMQConnectionDelegate>)delegate
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose;

/// @brief Connection tuning, customisable TLS and connection name, key recovery options.
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
tlsOptions:(nonnull RMQTLSOptions *)tlsOptions
userProvidedConnectionName:(nullable NSString *)connectionName
delegate:(nullable id<RMQConnectionDelegate>)delegate
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose;

/// @brief Connection tuning, customisable TLS, all recovery options.
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
userProvidedConnectionName:(nullable NSString *)connectionName
delegate:(nullable id<RMQConnectionDelegate>)delegate
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose;

/// @brief Connection configuration.
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
userProvidedConnectionName:(nonnull NSString *)connectionName
delegate:(nullable id<RMQConnectionDelegate>)delegate;

/// @brief TLS, connection configuration and delegate.
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
tlsOptions:(nonnull RMQTLSOptions *)tlsOptions
userProvidedConnectionName:(nonnull NSString *)connectionName
delegate:(nullable id<RMQConnectionDelegate>)delegate;

/// @brief Allows setting of timeouts
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
connectTimeout:(nonnull NSNumber*)connectTimeout
Expand Down
188 changes: 186 additions & 2 deletions RMQClient/RMQConnection.m
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ - (instancetype)initWithTransport:(id<RMQTransport>)transport

- (nonnull instancetype)initWithUri:(NSString *)uri
tlsOptions:(RMQTLSOptions *)tlsOptions
userProvidedConnectionName:(NSString *)connectionName
channelMax:(nonnull NSNumber *)channelMax
frameMax:(nonnull NSNumber *)frameMax
heartbeat:(nonnull NSNumber *)heartbeat
Expand Down Expand Up @@ -165,26 +166,90 @@ - (nonnull instancetype)initWithUri:(NSString *)uri
RMQSemaphoreWaiterFactory *waiterFactory = [RMQSemaphoreWaiterFactory new];
RMQGCDHeartbeatSender *heartbeatSender = [[RMQGCDHeartbeatSender alloc] initWithTransport:transport
clock:[RMQTickingClock new]];
RMQProcessInfoNameGenerator *nameGenerator = [RMQProcessInfoNameGenerator new];
RMQGCDSerialQueue *commandQueue = [[RMQGCDSerialQueue alloc]
initWithName:[nameGenerator generateWithPrefix:@"connection-commands"]];
RMQConnectionRecover *recovery = [[RMQConnectionRecover alloc] initWithInterval:recoveryInterval
attemptLimit:recoveryAttempts
onlyErrors:!shouldRecoverFromConnectionClose
heartbeatSender:heartbeatSender
commandQueue:commandQueue
delegate:delegateProxy];

RMQCredentials *credentials = [[RMQCredentials alloc] initWithUsername:rmqURI.username
password:rmqURI.password];
RMQConnectionConfig *config = [[RMQConnectionConfig alloc] initWithCredentials:credentials
channelMax:channelMax
frameMax:frameMax
heartbeat:heartbeat
vhost:rmqURI.vhost
authMechanism:tlsOptions.authMechanism
userProvidedConnectionName:connectionName
recovery:recovery];
return [self initWithTransport:transport
config:config
handshakeTimeout:syncTimeout
channelAllocator:allocator
frameHandler:allocator
delegate:delegateProxy
commandQueue:commandQueue
waiterFactory:waiterFactory
heartbeatSender:heartbeatSender];
}

- (nonnull instancetype)initWithUri:(NSString *)uri
userProvidedConnectionName:(nonnull NSString *)connectionName
channelMax:(nonnull NSNumber *)channelMax
frameMax:(nonnull NSNumber *)frameMax
heartbeat:(nonnull NSNumber *)heartbeat
connectTimeout:(nonnull NSNumber*)connectTimeout
readTimeout:(nonnull NSNumber*)readTimeout
writeTimeout:(nonnull NSNumber*)writeTimeout
syncTimeout:(nonnull NSNumber *)syncTimeout
delegate:(id<RMQConnectionDelegate>)delegate
delegateQueue:(dispatch_queue_t)delegateQueue
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose {
NSError *error = NULL;
RMQURI *rmqURI = [RMQURI parse:uri error:&error];

RMQTCPSocketTransport *transport = [[RMQTCPSocketTransport alloc] initWithHost:rmqURI.host
port:rmqURI.portNumber
tlsOptions:[RMQTLSOptions fromURI:uri]
connectTimeout:connectTimeout
readTimeout:readTimeout
writeTimeout:writeTimeout];
RMQMultipleChannelAllocator *allocator = [[RMQMultipleChannelAllocator alloc]
initWithMaxCapacity:[channelMax unsignedIntegerValue]
channelSyncTimeout:syncTimeout];
RMQQueuingConnectionDelegateProxy *delegateProxy = [[RMQQueuingConnectionDelegateProxy alloc]
initWithDelegate:delegate
queue:delegateQueue];
RMQSemaphoreWaiterFactory *waiterFactory = [RMQSemaphoreWaiterFactory new];
RMQGCDHeartbeatSender *heartbeatSender = [[RMQGCDHeartbeatSender alloc] initWithTransport:transport
clock:[RMQTickingClock new]];


RMQProcessInfoNameGenerator *nameGenerator = [RMQProcessInfoNameGenerator new];
RMQGCDSerialQueue *commandQueue = [[RMQGCDSerialQueue alloc]
initWithName:[nameGenerator generateWithPrefix:@"connection-commands"]];
initWithName:[nameGenerator generateWithPrefix:@"connection-commands"]];
RMQConnectionRecover *recovery = [[RMQConnectionRecover alloc] initWithInterval:recoveryInterval
attemptLimit:recoveryAttempts
onlyErrors:!shouldRecoverFromConnectionClose
heartbeatSender:heartbeatSender
commandQueue:commandQueue
delegate:delegateProxy];

RMQCredentials *credentials = [[RMQCredentials alloc] initWithUsername:rmqURI.username
password:rmqURI.password];
RMQConnectionConfig *config = [[RMQConnectionConfig alloc] initWithCredentials:credentials
channelMax:channelMax
frameMax:frameMax
heartbeat:heartbeat
vhost:rmqURI.vhost
authMechanism:tlsOptions.authMechanism
authMechanism:@"PLAIN"
userProvidedConnectionName:connectionName
recovery:recovery];
return [self initWithTransport:transport
config:config
Expand All @@ -197,11 +262,127 @@ - (nonnull instancetype)initWithUri:(NSString *)uri
heartbeatSender:heartbeatSender];
}

- (nonnull instancetype)initWithUri:(NSString *)uri
userProvidedConnectionName:(NSString *)connectionName
delegate:(id<RMQConnectionDelegate>)delegate
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose {
return [self initWithUri:uri
tlsOptions:[RMQTLSOptions fromURI:uri]
userProvidedConnectionName:connectionName
delegate:delegate
recoverAfter:recoveryInterval
recoveryAttempts:recoveryAttempts
recoverFromConnectionClose:shouldRecoverFromConnectionClose];
}

- (nonnull instancetype)initWithUri:(NSString *)uri
userProvidedConnectionName:(NSString *)connectionName
delegate:(id<RMQConnectionDelegate>)delegate {
return [self initWithUri:uri
tlsOptions:[RMQTLSOptions fromURI:uri]
userProvidedConnectionName:connectionName
delegate:delegate
recoverAfter:RMQDefaultRecoveryInterval
recoveryAttempts:@(NSUIntegerMax)
recoverFromConnectionClose:YES];
}

- (nonnull instancetype)initWithUri:(NSString *)uri
tlsOptions:(RMQTLSOptions *)tlsOptions
delegate:(id<RMQConnectionDelegate>)delegate
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose {
return [self initWithUri:uri
tlsOptions:tlsOptions
userProvidedConnectionName:NULL
delegate:delegate
recoverAfter:recoveryInterval
recoveryAttempts:recoveryAttempts
recoverFromConnectionClose:shouldRecoverFromConnectionClose];
}

- (nonnull instancetype)initWithUri:(NSString *)uri
tlsOptions:(RMQTLSOptions *)tlsOptions
userProvidedConnectionName:(NSString *)connectionName
delegate:(id<RMQConnectionDelegate>)delegate
recoverAfter:(nonnull NSNumber *)recoveryInterval
recoveryAttempts:(nonnull NSNumber *)recoveryAttempts
recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose {
NSError *error = NULL;
RMQURI *rmqURI = [RMQURI parse:uri error:&error];

RMQTCPSocketTransport *transport = [[RMQTCPSocketTransport alloc] initWithHost:rmqURI.host
port:rmqURI.portNumber
tlsOptions:[RMQTLSOptions fromURI:uri]
connectTimeout:RMQDefaultConnectTimeout
readTimeout:RMQDefaultReadTimeout
writeTimeout:RMQDefaultWriteTimeout];
RMQMultipleChannelAllocator *allocator = [[RMQMultipleChannelAllocator alloc]
initWithMaxCapacity:[@(RMQChannelMaxDefault) unsignedIntegerValue]
channelSyncTimeout:RMQDefaultSyncTimeout];
RMQQueuingConnectionDelegateProxy *delegateProxy =
[[RMQQueuingConnectionDelegateProxy alloc]
initWithDelegate:delegate
queue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)];
RMQSemaphoreWaiterFactory *waiterFactory = [RMQSemaphoreWaiterFactory new];
RMQGCDHeartbeatSender *heartbeatSender = [[RMQGCDHeartbeatSender alloc] initWithTransport:transport
clock:[RMQTickingClock new]];


RMQProcessInfoNameGenerator *nameGenerator = [RMQProcessInfoNameGenerator new];
RMQGCDSerialQueue *commandQueue = [[RMQGCDSerialQueue alloc]
initWithName:[nameGenerator generateWithPrefix:@"connection-commands"]];
RMQConnectionRecover *recovery = [[RMQConnectionRecover alloc] initWithInterval:recoveryInterval
attemptLimit:recoveryAttempts
onlyErrors:!shouldRecoverFromConnectionClose
heartbeatSender:heartbeatSender
commandQueue:commandQueue
delegate:delegateProxy];

RMQCredentials *credentials = [[RMQCredentials alloc] initWithUsername:rmqURI.username
password:rmqURI.password];
RMQConnectionConfig *config = [[RMQConnectionConfig alloc] initWithCredentials:credentials
channelMax:@(RMQChannelMaxDefault)
frameMax:@(RMQFrameMax)
heartbeat:RMQDefaultHeartbeatTimeout
vhost:rmqURI.vhost
authMechanism:tlsOptions.authMechanism
userProvidedConnectionName:connectionName
recovery:recovery];

return [self initWithTransport:transport
config:config
handshakeTimeout:RMQDefaultHeartbeatTimeout
channelAllocator:allocator
frameHandler:allocator
delegate:delegateProxy
commandQueue:commandQueue
waiterFactory:waiterFactory
heartbeatSender:heartbeatSender];
}

- (instancetype)initWithUri:(NSString *)uri
tlsOptions:(RMQTLSOptions *)tlsOptions
userProvidedConnectionName:(nonnull NSString *)connectionName
delegate:(id<RMQConnectionDelegate>)delegate {
return [self initWithUri:uri
tlsOptions:tlsOptions
userProvidedConnectionName:connectionName
delegate:delegate
recoverAfter:RMQDefaultRecoveryInterval
recoveryAttempts:@(NSUIntegerMax)
recoverFromConnectionClose:YES];
}

- (instancetype)initWithUri:(NSString *)uri
delegate:(id<RMQConnectionDelegate>)delegate
recoverAfter:(NSNumber *)recoveryInterval {
return [self initWithUri:uri
tlsOptions:[RMQTLSOptions fromURI:uri]
userProvidedConnectionName:NULL
channelMax:@(RMQChannelMaxDefault)
frameMax:@(RMQFrameMax)
heartbeat:RMQDefaultHeartbeatTimeout
Expand All @@ -220,6 +401,7 @@ - (instancetype)initWithUri:(NSString *)uri
delegate:(id<RMQConnectionDelegate>)delegate {
return [self initWithUri:uri
tlsOptions:[RMQTLSOptions fromURI:uri]
userProvidedConnectionName:NULL
channelMax:@(RMQChannelMaxDefault)
frameMax:@(RMQFrameMax)
heartbeat:RMQDefaultHeartbeatTimeout
Expand All @@ -241,6 +423,7 @@ - (instancetype)initWithUri:(NSString *)uri
delegate:(id<RMQConnectionDelegate>)delegate {
return [self initWithUri:uri
tlsOptions:[RMQTLSOptions fromURI:uri]
userProvidedConnectionName:NULL
channelMax:@(RMQChannelMaxDefault)
frameMax:@(RMQFrameMax)
heartbeat:RMQDefaultHeartbeatTimeout
Expand Down Expand Up @@ -268,6 +451,7 @@ - (instancetype)initWithUri:(NSString *)uri
delegateQueue:(dispatch_queue_t)delegateQueue {
return [self initWithUri:uri
tlsOptions:tlsOptions
userProvidedConnectionName:NULL
channelMax:channelMax
frameMax:frameMax
heartbeat:heartbeat
Expand Down
Loading

0 comments on commit 1a8ad99

Please sign in to comment.