diff --git a/ramls/instance-ingress-event.json b/ramls/instance-ingress-event.json index 867cb07d2..d34e079c9 100644 --- a/ramls/instance-ingress-event.json +++ b/ramls/instance-ingress-event.json @@ -14,15 +14,11 @@ "enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"], "description": "Instance ingress event type" }, - "eventMetadata": { + "InstanceIngressEventMetadata": { "description": "Event metadata", "type": "object", "additionalProperties": false, "properties": { - "tenantId": { - "description": "Tenant id", - "type": "string" - }, "eventTTL": { "description": "Time-to-live (TTL) for event in minutes", "type": "integer" @@ -69,7 +65,6 @@ } }, "required": [ - "tenantId", "eventTTL", "publishedBy" ] @@ -78,15 +73,24 @@ "type": "object", "description": "An instance source record container", "$ref": "instance-ingress-payload.json" + }, + "tenant": { + "description": "Tenant id", + "type": "string" + }, + "ts": { + "description": "Message timestamp", + "type": "string", + "format": "date-time" } }, "excludedFromEqualsAndHashCode": [ "eventMetadata", - "eventPayload" + "tenant", + "ts" ], "required": [ "id", - "eventType", - "eventMetadata" + "eventType" ] } diff --git a/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java b/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java index 6142cb908..b4a8e1dcc 100644 --- a/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java +++ b/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java @@ -31,7 +31,6 @@ import org.folio.kafka.AsyncRecordHandler; import org.folio.kafka.KafkaHeaderUtils; import org.folio.processing.exceptions.EventProcessingException; -import org.folio.rest.jaxrs.model.EventMetadata; import org.folio.rest.jaxrs.model.InstanceIngressEvent; public class InstanceIngressEventConsumer implements AsyncRecordHandler { @@ -69,8 +68,7 @@ public Future handle(KafkaConsumerRecord consumerRecord) private static String getTenantId(InstanceIngressEvent event, Map kafkaHeaders) { - return Optional.ofNullable(event.getEventMetadata()) - .map(EventMetadata::getTenantId) + return Optional.ofNullable(event.getTenant()) .orElseGet(() -> kafkaHeaders.get(OKAPI_TENANT_HEADER)); } diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java index 787a46924..cf2e107b2 100644 --- a/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java +++ b/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java @@ -117,32 +117,29 @@ private Future prepareAndExecuteMapping(MappingMetadataDto m Record targetRecord, InstanceIngressEvent event, String instanceId) { - return postSnapshotInSrsAndHandleResponse(targetRecord.getId()) - .compose(snapshot -> { - try { - LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId()); - var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class); - AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters); - AdditionalFieldsUtil.move001To035(targetRecord); - AdditionalFieldsUtil.normalize035(targetRecord); - if (event.getEventPayload().getAdditionalProperties().containsKey(LINKED_DATA_ID)) { - AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB, - LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID)); - } - - LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId()); - var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent()); - RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT); - var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules())); - instance.setId(instanceId); - instance.setSource(event.getEventPayload().getSourceType().value()); - LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance); - return Future.succeededFuture(instance); - } catch (Exception e) { - LOGGER.warn("Error during preparing and executing mapping:", e); - return Future.failedFuture(e); + try { + LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId()); + var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class); + AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters); + AdditionalFieldsUtil.move001To035(targetRecord); + AdditionalFieldsUtil.normalize035(targetRecord); + if (event.getEventPayload().getAdditionalProperties().containsKey(LINKED_DATA_ID)) { + AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB, + LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID)); } - }); + + LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId()); + var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent()); + RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT); + var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules())); + instance.setId(instanceId); + instance.setSource(event.getEventPayload().getSourceType().value()); + LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance); + return Future.succeededFuture(instance); + } catch (Exception e) { + LOGGER.warn("Error during preparing and executing mapping:", e); + return Future.failedFuture(e); + } } private Record constructMarcBibRecord(InstanceIngressPayload eventPayload) { @@ -215,21 +212,29 @@ private Future executeFieldsManipulation(Instance instance, Record src private Future saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) { LOGGER.info("Saving record in SRS and handling a response for an Instance with id '{}':", instance.getId()); Promise promise = Promise.promise(); - getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId()) - .postSourceStorageRecords(srcRecord) - .onComplete(ar -> { - var result = ar.result(); - if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) { - LOGGER.info("Created MARC record in SRS with id: '{}', instanceId: '{}', from tenant: {}", - srcRecord.getId(), instance.getId(), context.getTenantId()); - promise.complete(instance); - } else { - String msg = format("Failed to create MARC record in SRS, instanceId: '%s', status code: %s, Record: %s", - instance.getId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : ""); - LOGGER.warn(msg); - super.deleteInstance(instance.getId(), event.getId(), instanceCollection); - promise.fail(msg); - } + postSnapshotInSrsAndHandleResponse(srcRecord.getSnapshotId()) + .onFailure(promise::fail) + .compose(snapshot -> { + getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId()) + .postSourceStorageRecords(srcRecord) + .onComplete(ar -> { + var result = ar.result(); + if (ar.succeeded() && + result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) { + LOGGER.info("Created MARC record in SRS with id: '{}', instanceId: '{}', from tenant: {}", + srcRecord.getId(), instance.getId(), context.getTenantId()); + promise.complete(instance); + } else { + String msg = format( + "Failed to create MARC record in SRS, instanceId: '%s', status code: %s, Record: %s", + instance.getId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : ""); + LOGGER.warn(msg); + super.deleteInstance(instance.getId(), event.getId(), + instanceCollection); + promise.fail(msg); + } + }); + return promise.future(); }); return promise.future(); } diff --git a/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java b/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java index 4dbe5cfe8..5a4052987 100644 --- a/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java +++ b/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java @@ -155,7 +155,7 @@ public void shouldReturnFailedFuture_ifMappingMetadataWasNotFound() { } @Test - public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError() throws IOException { + public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOException { // given var event = new InstanceIngressEvent() .withId(UUID.randomUUID().toString()) @@ -169,27 +169,29 @@ public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError( .withMappingRules(mappingRules.encode()) .withMappingParams(Json.encode(new MappingParameters()))))) .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE); + doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any()); - var snapshotHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST); + var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED); doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any()); - var expectedMessage = "Failed to create snapshot in SRS, snapshot id: "; + var expectedMessage = "Mapped Instance is invalid: [Field 'title' is a required field and can not be null, " + + "Field 'instanceTypeId' is a required field and can not be null], from InstanceIngressEvent with id '" + event.getId() + "'"; // when var future = handler.handle(event); // then var exception = Assert.assertThrows(ExecutionException.class, future::get); - assertThat(exception.getCause().getMessage()).startsWith(expectedMessage); + assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage); } @Test - public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOException { + public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException { // given var event = new InstanceIngressEvent() .withId(UUID.randomUUID().toString()) .withEventPayload(new InstanceIngressPayload() - .withSourceRecordObject("{}") + .withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH)) .withSourceType(LINKED_DATA) ); doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString()); @@ -198,13 +200,16 @@ public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOExcept .withMappingRules(mappingRules.encode()) .withMappingParams(Json.encode(new MappingParameters()))))) .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE); - doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any()); var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED); doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any()); - var expectedMessage = "Mapped Instance is invalid: [Field 'title' is a required field and can not be null, " - + "Field 'instanceTypeId' is a required field and can not be null], from InstanceIngressEvent with id '" + event.getId() + "'"; + var expectedMessage = "Some failure"; + doAnswer(i -> { + Consumer failureHandler = i.getArgument(2); + failureHandler.accept(new Failure(expectedMessage, 400)); + return null; + }).when(instanceCollection).add(any(), any(), any()); // when var future = handler.handle(event); @@ -215,7 +220,7 @@ public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOExcept } @Test - public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException { + public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() throws IOException { // given var event = new InstanceIngressEvent() .withId(UUID.randomUUID().toString()) @@ -232,13 +237,13 @@ public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any()); var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED); doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any()); - - var expectedMessage = "Some failure"; doAnswer(i -> { - Consumer failureHandler = i.getArgument(2); - failureHandler.accept(new Failure(expectedMessage, 400)); + Consumer> sucessHandler = i.getArgument(1); + sucessHandler.accept(new Success<>(i.getArgument(0))); return null; }).when(instanceCollection).add(any(), any(), any()); + var expectedMessage = "Some failure"; + doReturn(failedFuture(expectedMessage)).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any()); // when var future = handler.handle(event); @@ -249,7 +254,7 @@ public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException } @Test - public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() throws IOException { + public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError() throws IOException { // given var event = new InstanceIngressEvent() .withId(UUID.randomUUID().toString()) @@ -264,22 +269,24 @@ public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() t .withMappingParams(Json.encode(new MappingParameters()))))) .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE); doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any()); - var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED); - doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any()); doAnswer(i -> { Consumer> sucessHandler = i.getArgument(1); sucessHandler.accept(new Success<>(i.getArgument(0))); return null; }).when(instanceCollection).add(any(), any(), any()); - var expectedMessage = "Some failure"; - doReturn(failedFuture(expectedMessage)).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any()); + doReturn(succeededFuture()).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any()); + doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any()); + var snapshotHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST); + doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any()); + + var expectedMessage = "Failed to create snapshot in SRS, snapshot id: "; // when var future = handler.handle(event); // then var exception = Assert.assertThrows(ExecutionException.class, future::get); - assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage); + assertThat(exception.getCause().getMessage()).startsWith(expectedMessage); } @Test