Skip to content

Commit

Permalink
Merge pull request #128 from DwayneJengSage/develop
Browse files Browse the repository at this point in the history
Fix S3EventNotificationCallback
  • Loading branch information
DwayneJengSage authored Feb 8, 2023
2 parents f726947 + cfff8b0 commit ba0daf7
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import com.amazonaws.services.s3.event.S3EventNotification;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -21,6 +23,7 @@ public class S3EventNotificationCallback implements PollSqsCallback {
private static final Logger LOG = LoggerFactory.getLogger(S3EventNotificationCallback.class);
private static final String S3_EVENT_SOURCE = "aws:s3";
private static final String S3_OBJECT_CREATED_EVENT_PREFIX = "ObjectCreated:";
private static final String SNS_KEY_MESSAGE = "Message";

private static final ObjectMapper OBJECT_MAPPER;

Expand All @@ -38,8 +41,29 @@ public final void setBridgeHelper(BridgeHelper bridgeHelper) {
}

@Override
public void callback(String messageBody) throws Exception {
S3EventNotification notification = OBJECT_MAPPER.readValue(messageBody, S3EventNotification.class);
public void callback(String messageBody) {
JsonNode wrapperNode;
try {
wrapperNode = OBJECT_MAPPER.readTree(messageBody);
} catch (JsonProcessingException ex) {
// Malformed JSON. Log a warning and squelch.
LOG.warn("SNS notification is malformed JSON: " + messageBody);
return;
}
if (wrapperNode == null || !wrapperNode.hasNonNull(SNS_KEY_MESSAGE)) {
// Wrapper node doesn't exist or doesn't have a Message field. Log a warning and squelch.
LOG.warn("SNS notification doesn't contain an S3 notification: " + messageBody);
return;
}

S3EventNotification notification;
try {
notification = OBJECT_MAPPER.convertValue(wrapperNode.get(SNS_KEY_MESSAGE), S3EventNotification.class);
} catch (IllegalArgumentException ex) {
// Malformed S3 notification. Log a warning and squelch.
LOG.warn("S3 notification is malformed: " + messageBody);
return;
}
List<S3EventNotification.S3EventNotificationRecord> recordList = notification.getRecords();
if (recordList == null || recordList.isEmpty()) {
// Notification w/o record list is not actionable. Log a warning and squelch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

public class S3EventNotificationCallbackTest {

private static final String UPLOAD_COMPLETE_MESSAGE="{\"Records\":[{\"eventVersion\":\"2.0\",\"eventSource\":\"aws:s3\"," +
private static final String UPLOAD_COMPLETE_MESSAGE= "{" +
" \"Message\":" +
"{\"Records\":[{\"eventVersion\":\"2.0\",\"eventSource\":\"aws:s3\"," +
"\"awsRegion\":\"us-east-1\"," +
"\"eventTime\":\"2016-07-12T22:06:54.454Z\",\"eventName\":\"ObjectCreated:Put\"," +
"\"userIdentity\":{\"principalId\":\"AWS:AIDAJCSQZ35H7B4BFOVAW\"},\"requestParameters\":{\"sourceIPAddress\":\"54.87.180" +
Expand All @@ -36,7 +38,8 @@ public class S3EventNotificationCallbackTest {
"\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"Bridge Upload Complete Notification UAT\"," +
"\"bucket\":{\"name\":\"org-sagebridge-upload-uat\",\"ownerIdentity\":{\"principalId\":\"AZ9HQM5UC903F\"}," +
"\"arn\":\"arn:aws:s3:::org-sagebridge-upload-uat\"},\"object\":{\"key\":\"89b40dab-4982-4d5c-ae21-d74b072d02cd\"," +
"\"size\":1488,\"eTag\":\"e40df5cfa5874ab353947eb48ec0cfa4\",\"sequencer\":\"00578569FE6792370C\"}}}]}";
"\"size\":1488,\"eTag\":\"e40df5cfa5874ab353947eb48ec0cfa4\",\"sequencer\":\"00578569FE6792370C\"}}}]}" +
"}";
private static final String UPLOAD_ID = "89b40dab-4982-4d5c-ae21-d74b072d02cd";

private BridgeHelper mockBridgeHelper;
Expand All @@ -52,21 +55,75 @@ public void before() {
}

@Test
public void testCallback_StringMessage() throws Exception {
public void testCallback_StringMessage() {
callback.callback(UPLOAD_COMPLETE_MESSAGE);

verify(mockBridgeHelper, times(1)).completeUpload(UPLOAD_ID);
}

@Test
public void testCallback_NullList() throws Exception {
public void testCallback_MalformedMessage() {
callback.callback("malformed \" message");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_BlankWrapper() {
callback.callback("");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_NullWrapper() {
callback.callback("null");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_WrapperInvalidType() {
callback.callback("\"wrong type\"");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_NoMessage() {
callback.callback("{}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_EmptyList() throws Exception {
callback.callback("{\"Records\":[]}");
public void testCallback_NullMessage() {
callback.callback("{\"Message\":null}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_MessageWrongType() {
callback.callback("{\"Message\":\"wrong type\"}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_NoRecordList() {
callback.callback("{\"Message\":{}}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_NullRecordList() {
callback.callback("{\"Message\":{\"Records\":null}}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_RecordListWrongType() {
callback.callback("{\"Message\":{\"Records\":\"wrong type\"}}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

@Test
public void testCallback_EmptyList() {
callback.callback("{\"Message\":{\"Records\":[]}}");
verify(mockBridgeHelper, never()).completeUpload(anyString());
}

Expand All @@ -81,7 +138,7 @@ public Object[][] propagatedExceptionDataProvider() {
}

@Test(dataProvider = "propagatedExceptionDataProvider")
public void testCallback_PropagatesExceptions(int status) throws Exception {
public void testCallback_PropagatesExceptions(int status) {
doThrow(new BridgeSDKException("test exception", status)).when(mockBridgeHelper).completeUpload(UPLOAD_ID);

try {
Expand All @@ -104,14 +161,14 @@ public Object[][] suppressedExceptionDataProvider() {
}

@Test(dataProvider = "suppressedExceptionDataProvider")
public void testCallback_SuppressesExceptions(int status) throws Exception {
public void testCallback_SuppressesExceptions(int status) {
doThrow(new BridgeSDKException("test exception", status)).when(mockBridgeHelper).completeUpload(UPLOAD_ID);
callback.callback(UPLOAD_COMPLETE_MESSAGE);
verify(mockBridgeHelper, times(1)).completeUpload(UPLOAD_ID);
}

@Test
public void testCallback_CompleteUploadForShouldProcessRecords() throws Exception {
public void testCallback_CompleteUploadForShouldProcessRecords() {
String key1 = "key1";
String key2 = "key2";
String key3 = "key3";
Expand Down

0 comments on commit ba0daf7

Please sign in to comment.