Skip to content

Commit

Permalink
Add OAuth2 support for authentication with Solace PubSub+ Broker
Browse files Browse the repository at this point in the history
  • Loading branch information
mayur-solace committed Jul 15, 2024
1 parent 07ba525 commit 60f2f5c
Show file tree
Hide file tree
Showing 51 changed files with 7,194 additions and 39 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.solace.spring.boot</groupId>
<artifactId>solace-spring-boot-bom</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
</parent>

<groupId>com.solace.spring.cloud</groupId>
Expand Down Expand Up @@ -34,7 +34,7 @@

<solace.spring.cloud.stream-starter.version>5.2.0-SNAPSHOT</solace.spring.cloud.stream-starter.version>

<solace.integration.test.support.version>1.0.2</solace.integration.test.support.version>
<solace.integration.test.support.version>1.1.2</solace.integration.test.support.version>
<solace.integration.test.support.fetch_checkout.skip>false</solace.integration.test.support.fetch_checkout.skip>
<solace.integration.test.support.install.skip>true</solace.integration.test.support.install.skip>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,6 @@ public class SolaceSessionEventHandler extends DefaultSolaceOAuth2SessionEventHa
private final SessionHealthIndicator sessionHealthIndicator;
private static final Log logger = LogFactory.getLog(SolaceSessionEventHandler.class);

/*public SolaceSessionEventHandler(SessionHealthIndicator sessionHealthIndicator) {
this.sessionHealthIndicator = sessionHealthIndicator;
}*/

/**
* Constructs a new DefaultSolaceOAuth2SessionEventHandler with the provided JCSMP properties and
* OAuth2 token provider.
*
* @param jcsmpProperties The JCSMP properties.
* @param solaceSessionOAuth2TokenProvider The OAuth2 token provider.
*/
public SolaceSessionEventHandler(JCSMPProperties jcsmpProperties,
SolaceSessionOAuth2TokenProvider solaceSessionOAuth2TokenProvider,
SessionHealthIndicator sessionHealthIndicator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testRejectFail(@Values(ints = {1, 255}) int numMessages,

logger.info(String.format("Disabling egress for queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue((String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME),
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(false), null);
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(false), null, null);
retryAssert(() -> assertFalse(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand All @@ -214,7 +214,7 @@ public void testRejectFail(@Values(ints = {1, 255}) int numMessages,

logger.info(String.format("Enabling egress for queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue((String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME),
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(true), null);
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(true), null, null);
retryAssert(() -> assertTrue(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand Down Expand Up @@ -277,7 +277,7 @@ public void testRejectWithErrorQueueFail(@Values(ints = {1, 255}) int numMessage
logger.info(String.format("Disabling ingress for error queue %s",
errorQueueInfrastructure.getErrorQueueName()));
sempV2Api.config().updateMsgVpnQueue((String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME),
errorQueueInfrastructure.getErrorQueueName(), new ConfigMsgVpnQueue().ingressEnabled(false), null);
errorQueueInfrastructure.getErrorQueueName(), new ConfigMsgVpnQueue().ingressEnabled(false), null, null);
retryAssert(() -> assertFalse(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, errorQueueInfrastructure.getErrorQueueName(), null)
.getData()
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testRequeueFail(@Values(ints = {1, 255}) int numMessages,

logger.info(String.format("Disabling egress for queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue((String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME),
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(false), null);
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(false), null, null);
retryAssert(() -> assertFalse(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand All @@ -370,7 +370,7 @@ public void testRequeueFail(@Values(ints = {1, 255}) int numMessages,

logger.info(String.format("Enabling egress for queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue((String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME),
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(true), null);
queue.getName(), new ConfigMsgVpnQueue().egressEnabled(true), null, null);
retryAssert(() -> assertTrue(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testHandleErrorAsyncRetry(boolean isDurable, JCSMPSession jcsmpSessi

logger.info(String.format("Shutting down ingress for queue %s", errorQueue.getName()));
sempV2Api.config().updateMsgVpnQueue(vpnName, errorQueue.getName(),
new ConfigMsgVpnQueue().ingressEnabled(false), null);
new ConfigMsgVpnQueue().ingressEnabled(false), null, null);
retryAssert(() -> assertFalse(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, errorQueue.getName(), null)
.getData()
Expand All @@ -183,7 +183,7 @@ public void testHandleErrorAsyncRetry(boolean isDurable, JCSMPSession jcsmpSessi
if (key.getErrorQueueDeliveryAttempt() == errorQueueInfrastructure.getMaxDeliveryAttempts()) {
logger.info(String.format("Starting ingress for queue %s", errorQueue.getName()));
sempV2Api.config().updateMsgVpnQueue(vpnName, errorQueue.getName(),
new ConfigMsgVpnQueue().ingressEnabled(true), null);
new ConfigMsgVpnQueue().ingressEnabled(true), null, null);
retryAssert(() -> assertTrue(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, errorQueue.getName(), null)
.getData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void testReceiveInterruptedByFlowReconnect(@Values(booleans = {false, tru

logger.info(String.format("Disabling egress to queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().egressEnabled(false),
null);
null, null);
retryAssert(() -> assertFalse(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand All @@ -784,7 +784,7 @@ public void testReceiveInterruptedByFlowReconnect(@Values(booleans = {false, tru

logger.info(String.format("Enabling egress to queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().egressEnabled(true),
null);
null, null);
retryAssert(() -> assertTrue(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand Down Expand Up @@ -871,7 +871,7 @@ public void testAcknowledgeAfterFlowReconnect(JCSMPSession jcsmpSession, Queue q

logger.info(String.format("Disabling egress to queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().egressEnabled(false),
null);
null, null);
retryAssert(() -> assertFalse(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand All @@ -881,7 +881,7 @@ public void testAcknowledgeAfterFlowReconnect(JCSMPSession jcsmpSession, Queue q

logger.info(String.format("Enabling egress to queue %s", queue.getName()));
sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().egressEnabled(true),
null);
null, null);
retryAssert(() -> assertTrue(sempV2Api.monitor()
.getMsgVpnQueue(vpnName, queue.getName(), null)
.getData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@
<artifactId>spring-boot-starter-web</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public SolaceMessageChannelBinderConfiguration(JCSMPProperties jcsmpProperties,

@PostConstruct
private void initSession() throws JCSMPException {
JCSMPProperties jcsmpProperties = (JCSMPProperties) this.jcsmpProperties.clone();
jcsmpProperties.setProperty(JCSMPProperties.CLIENT_INFO_PROVIDER, new SolaceBinderClientInfoProvider());
JCSMPProperties solaceJcsmpProperties = (JCSMPProperties) this.jcsmpProperties.clone();
solaceJcsmpProperties.setProperty(JCSMPProperties.CLIENT_INFO_PROVIDER, new SolaceBinderClientInfoProvider());
try {
if (solaceSessionEventHandler != null) {
if (logger.isDebugEnabled()) {
logger.debug("Registering Solace Session Event handler on session");
}

SpringJCSMPFactory springJCSMPFactory = new SpringJCSMPFactory(jcsmpProperties, solaceSessionOAuth2TokenProvider);
SpringJCSMPFactory springJCSMPFactory = new SpringJCSMPFactory(solaceJcsmpProperties, solaceSessionOAuth2TokenProvider);
context = springJCSMPFactory.createContext(new ContextProperties());
jcsmpSession = springJCSMPFactory.createSession(context, solaceSessionEventHandler);
} else {
SpringJCSMPFactory springJCSMPFactory = new SpringJCSMPFactory(jcsmpProperties, solaceSessionOAuth2TokenProvider);
SpringJCSMPFactory springJCSMPFactory = new SpringJCSMPFactory(solaceJcsmpProperties, solaceSessionOAuth2TokenProvider);
jcsmpSession = springJCSMPFactory.createSession();
}
logger.info(String.format("Connecting JCSMP session %s", jcsmpSession.getSessionName()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.solace.it.util.semp;

import org.osgi.annotation.versioning.ProviderType;

public class SempClientException extends RuntimeException {

public SempClientException(String message) {
super(message);
}

public SempClientException(String message, Throwable cause) {
super(message, cause);
}

public SempClientException(Throwable cause) {
super(cause);
}


/**
* A class for raising errors when client authentication fails.
*
* @since 1.0
*/
@ProviderType
public static class AuthenticationException extends SempClientException {

private static final long serialVersionUID = -4840876322728337412L;

/**
* Creates an instance of {@code AuthenticationException} when client authentication fails with
* an additional message.
*
* @param message the detailed message. The detailed message is saved for later retrieval by the
* {@link #getMessage()} method.
* @since 1.0
*/
public AuthenticationException(String message) {
super(message);
}

/**
* Creates an instance of {@code AuthenticationException} when client authentication fails with
* an additional message and a {@code Throwable}.
*
* @param message the detailed message. The detailed message is saved for later retrieval by the
* {@link #getMessage()} method.
* @param t the cause that is saved for later retrieval by the {@link #getCause()} method.
* A {@code null} value is permitted, and indicates that the cause is
* non-existent or unknown.
* @since 1.0
*/
public AuthenticationException(String message, Throwable t) {
super(message, t);
}
}

/**
* A class for raising errors when client authorizations fails, client authorizations unsupported
* or not enabled for the service
*
* @since 1.0
*/
@ProviderType
public static class AuthorizationException extends SempClientException {

private static final long serialVersionUID = -2315053666285971354L;

/**
* Creates an instance of {@code AuthorizationException} when client authorizations fails with
* an additional message.
*
* @param message the detailed message. The detailed message is saved for later retrieval by the
* {@link #getMessage()} method.
* @since 1.0
*/
public AuthorizationException(String message) {
super(message);
}

/**
* Creates an instance of {@code AuthorizationException} when client authorizations fails with
* an additional message and a {@code Throwable}.
*
* @param message the detailed message. The detailed message is saved for later retrieval by the
* {@link #getMessage()} method.
* @param t the cause that is saved for later retrieval by the {@link #getCause()} method.
* A {@code null} value is permitted, and indicates that the cause is
* non-existent or unknown.
* @since 1.0
*/
public AuthorizationException(String message, Throwable t) {
super(message, t);
}
}

/**
* A class for raising errors when a remote resource like a queue, vpn is not found on a broker.
*
* @since 1.0
*/
@ProviderType
public static class MissingResourceException extends SempClientException {

private static final long serialVersionUID = 3777381415318250678L;

/**
* Creates an instance of {@code MissingResourceException} with a detailed message of missing a
* resource situation.
*
* @param message the detailed message. The detailed message is saved for later retrieval by the
* {@link #getMessage()} method.
* @since 1.0
*/
public MissingResourceException(String message) {
super(message);
}

/**
* Creates an instance of {@code MissingResourceException} with a detailed message of missing a
* * resource situation and a {@code Throwable}.
*
* @param message the detailed message. The detailed message is saved for later retrieval by the
* {@link #getMessage()} method.
* @param t the cause that is saved for later retrieval by the {@link #getCause()} method.
* A {@code null} value is permitted, and indicates that the cause is
* non-existent or unknown.
* @since 1.0
*/
public MissingResourceException(String message, Throwable t) {
super(message, t);
}

}
}
Loading

0 comments on commit 60f2f5c

Please sign in to comment.