Skip to content

Commit

Permalink
[MODORDERS-1209-2]. Add integration details validation to Send Claims
Browse files Browse the repository at this point in the history
  • Loading branch information
BKadirkhodjaev committed Dec 19, 2024
1 parent 8bf956b commit ba3fd62
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 62 deletions.
3 changes: 2 additions & 1 deletion src/main/java/org/folio/models/claiming/ClaimingError.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public enum ClaimingError {
CANNOT_RETRIEVE_CONFIG_ENTRIES("Cannot retrieve config entries"),
CANNOT_GROUP_PIECES_BY_VENDOR_MESSAGE("Cannot group pieces by vendor"),
CANNOT_CREATE_JOBS_AND_UPDATE_PIECES("Cannot create jobs and update pieces"),
CANNOT_FIND_A_PIECE_BY_ID("Cannot find a piece by '%s' id");
CANNOT_FIND_A_PIECE_BY_ID("Cannot find a piece by '%s' id"),
UNABLE_TO_GENERATE_CLAIMS_FOR_ORG_NO_INTEGRATION_DETAILS("Unable to generate claims for %s because no claim integrations exist");

private final String value;
}
125 changes: 73 additions & 52 deletions src/main/java/org/folio/service/pieces/PiecesClaimingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.extern.log4j.Log4j2;
import one.util.streamex.StreamEx;
import org.apache.commons.lang3.tuple.Pair;
import org.folio.rest.acq.model.Organization;
import org.folio.rest.core.RestClient;
import org.folio.rest.core.models.RequestContext;
import org.folio.rest.jaxrs.model.ClaimingCollection;
Expand Down Expand Up @@ -37,6 +38,7 @@
import static org.folio.models.claiming.ClaimingError.CANNOT_GROUP_PIECES_BY_VENDOR_MESSAGE;
import static org.folio.models.claiming.ClaimingError.CANNOT_RETRIEVE_CONFIG_ENTRIES;
import static org.folio.models.claiming.ClaimingError.CANNOT_SEND_CLAIMS_PIECE_IDS_ARE_EMPTY;
import static org.folio.models.claiming.ClaimingError.UNABLE_TO_GENERATE_CLAIMS_FOR_ORG_NO_INTEGRATION_DETAILS;
import static org.folio.models.claiming.IntegrationDetailField.CLAIM_PIECE_IDS;
import static org.folio.models.claiming.IntegrationDetailField.EXPORT_TYPE_SPECIFIC_PARAMETERS;
import static org.folio.models.claiming.IntegrationDetailField.VENDOR_EDI_ORDERS_EXPORT_CONFIG;
Expand Down Expand Up @@ -67,8 +69,9 @@ public class PiecesClaimingService {
/**
* Sends claims by receiving pieces to be claimed, groups them by vendor,
* updates piece statuses and finally creates jobs per vendor and associated integration details
*
* @param claimingCollection An array of pieces ids
* @param requestContext Headers to make HTTP or Kafka requests
* @param requestContext Headers to make HTTP or Kafka requests
* @return Future of an array of claimingResults
*/
public Future<ClaimingResults> sendClaims(ClaimingCollection claimingCollection, RequestContext requestContext) {
Expand All @@ -84,86 +87,109 @@ public Future<ClaimingResults> sendClaims(ClaimingCollection claimingCollection,
}
var pieceIds = claimingCollection.getClaimingPieceIds().stream().toList();
log.info("sendClaims:: Received pieces to be claimed, pieceIds: {}", pieceIds);
return groupPieceIdsByVendorId(pieceIds, requestContext)
.compose(pieceIdsByVendorIds -> {
if (CollectionUtils.isEmpty(pieceIdsByVendorIds)) {
return groupPieceIdsByVendor(pieceIds, requestContext)
.compose(pieceIdsByVendors -> {
if (CollectionUtils.isEmpty(pieceIdsByVendors)) {
return Future.succeededFuture(createEmptyClaimingResults(CANNOT_FIND_PIECES_WITH_LATE_STATUS_TO_PROCESS.getValue()));
}
log.info("sendClaims:: Using pieces by vendor id map, map: {}", pieceIdsByVendorIds);
return createJobsByVendor(claimingCollection, config, pieceIdsByVendorIds, requestContext);
pieceIdsByVendors.forEach((key, value) ->
log.info("createVendorPiecePair:: Using pieces by vendor map, vendorId: {}, piecesByVendor: {}", key.getId(), value));
var vendorWithoutIntegrationDetails = checkVendorIntegrationDetails(config, pieceIdsByVendors);
if (Objects.nonNull(vendorWithoutIntegrationDetails)) {
return Future.succeededFuture(new ClaimingResults()
.withClaimingPieceResults(List.of(new ClaimingPieceResult().withStatus(FAILURE)
.withError(new Error().withMessage(String.format(UNABLE_TO_GENERATE_CLAIMS_FOR_ORG_NO_INTEGRATION_DETAILS.getValue(), vendorWithoutIntegrationDetails.getCode()))))));
}
return createJobsByVendor(claimingCollection, config, pieceIdsByVendors, requestContext);
});
})
.onFailure(t -> log.error("sendClaims:: Failed send claims: {}", JsonObject.mapFrom(claimingCollection).encodePrettily(), t));
}

private Future<Map<String, List<String>>> groupPieceIdsByVendorId(List<String> pieceIds, RequestContext requestContext) {
log.info("groupPieceIdsByVendorId:: Grouping pieces by vendor, pieceIds count: {}", pieceIds.size());
private Future<Map<Organization, List<String>>> groupPieceIdsByVendor(List<String> pieceIds, RequestContext requestContext) {
log.info("groupPieceIdsByVendor:: Grouping pieces by vendor, pieceIds count: {}", pieceIds.size());
return pieceStorageService.getPiecesByIds(pieceIds, requestContext)
.compose(pieces -> {
if (CollectionUtils.isEmpty(pieces)) {
log.info("groupPieceIdsByVendorId:: No pieces are found by piece ids, pieceIds: {}", pieceIds);
log.info("groupPieceIdsByVendor:: No pieces are found by piece ids, pieceIds: {}", pieceIds);
return Future.succeededFuture();
}
var uniquePiecePoLinePairs = pieces.stream()
.filter(Objects::nonNull).filter(piece -> Objects.nonNull(piece.getId()) && Objects.nonNull(piece.getPoLineId()))
.map(piece -> Pair.of(piece.getPoLineId(), piece.getId())).distinct()
.toList();
log.info("groupPieceIdsByVendorId:: Prepared unique piece-poLine pairs, pairs: {}", uniquePiecePoLinePairs);
log.info("groupPieceIdsByVendor:: Prepared unique piece-poLine pairs, pairs: {}", uniquePiecePoLinePairs);
return collectResultsOnSuccess(createPieceIdByVendorFutures(pieces, uniquePiecePoLinePairs, requestContext))
.map(PiecesClaimingService::transformAndGroupPieceIdsByVendorId);
.map(PiecesClaimingService::transformAndGroupPieceIdsByVendor);
});
}

private List<Future<Pair<String, String>>> createPieceIdByVendorFutures(List<Piece> pieces, List<Pair<String, String>> uniquePiecePoLinePairs,
RequestContext requestContext) {
var pieceIdByVendorIdFutures = new ArrayList<Future<Pair<String, String>>>();
private List<Future<Pair<Organization, String>>> createPieceIdByVendorFutures(List<Piece> pieces, List<Pair<String, String>> uniquePiecePoLinePairs,
RequestContext requestContext) {
var pieceIdByVendorFutures = new ArrayList<Future<Pair<Organization, String>>>();
uniquePiecePoLinePairs.forEach(piecePoLinePairs -> {
var foundPiece = pieces.stream()
.filter(Objects::nonNull).filter(piece -> Objects.nonNull(piece.getId())).filter(piece -> piece.getId().equals(piecePoLinePairs.getRight()))
.findFirst().orElseThrow(() -> new NoSuchElementException(String.format(CANNOT_FIND_A_PIECE_BY_ID.getValue(), piecePoLinePairs.getRight())));
var pieceIdByVendorIdFuture = createVendorPiecePair(piecePoLinePairs, foundPiece, requestContext);
if (Objects.nonNull(pieceIdByVendorIdFuture)) {
pieceIdByVendorIdFutures.add(pieceIdByVendorIdFuture);
var pieceIdByVendorFuture = createVendorPiecePair(piecePoLinePairs, foundPiece, requestContext);
if (Objects.nonNull(pieceIdByVendorFuture)) {
pieceIdByVendorFutures.add(pieceIdByVendorFuture);
}
});
return pieceIdByVendorIdFutures;
return pieceIdByVendorFutures;
}

private Future<Pair<String, String>> createVendorPiecePair(Pair<String, String> piecePoLinePairs,
Piece piece, RequestContext requestContext) {
private Future<Pair<Organization, String>> createVendorPiecePair(Pair<String, String> piecePoLinePairs,
Piece piece, RequestContext requestContext) {
if (!piece.getReceivingStatus().equals(Piece.ReceivingStatus.LATE)) {
log.info("createVendorPiecePair:: Ignoring processing of a piece not in LATE state, piece id: {}", piece.getId());
return Future.succeededFuture();
}
return purchaseOrderLineService.getOrderLineById(piecePoLinePairs.getLeft(), requestContext)
.compose(poLine -> purchaseOrderStorageService.getPurchaseOrderById(poLine.getPurchaseOrderId(), requestContext)
.compose(purchaseOrder -> organizationService.getVendorById(purchaseOrder.getVendor(), requestContext)))
.compose(purchaseOrder -> organizationService.getVendorById(purchaseOrder.getVendor(), requestContext)))
.map(vendor -> {
if (Objects.nonNull(vendor) && Boolean.TRUE.equals(vendor.getIsVendor())) {
return Pair.of(vendor.getId(), piecePoLinePairs.getRight());
return Pair.of(vendor, piecePoLinePairs.getRight());
}
return null;
});
}

private static Map<String, List<String>> transformAndGroupPieceIdsByVendorId(List<Pair<String, String>> piecesByVendorList) {
private Organization checkVendorIntegrationDetails(JsonObject config, Map<Organization, List<String>> pieceIdsByVendors) {
return pieceIdsByVendors.keySet().stream()
.filter(vendor -> {
var vendorIntegrationDetails = config.stream()
.filter(configEntry -> isExportTypeClaimsAndCorrectVendorId(vendor.getId(), configEntry))
.toList();
log.info("checkVendorIntegrationDetails:: Found vendor integration details, vendorId: {}, integrationDetails: {}", vendor.getId(), vendorIntegrationDetails);
return vendorIntegrationDetails.isEmpty();
})
.findFirst().orElse(null);
}

private boolean isExportTypeClaimsAndCorrectVendorId(String vendorId, Map.Entry<String, Object> configEntry) {
return configEntry.getKey().startsWith(String.format("%s_%s", EXPORT_TYPE_CLAIMS, vendorId)) && Objects.nonNull(configEntry.getValue());
}

private static Map<Organization, List<String>> transformAndGroupPieceIdsByVendor(List<Pair<Organization, String>> piecesByVendorList) {
return StreamEx.of(piecesByVendorList).distinct().filter(Objects::nonNull)
.groupingBy(Pair::getKey, mapping(Pair::getValue, toList()));
}

private Future<ClaimingResults> createJobsByVendor(ClaimingCollection claimingCollection, JsonObject config,
Map<String, List<String>> pieceIdsByVendorId, RequestContext requestContext) {
log.info("createJobsByVendor:: Creating jobs by vendor, vendors by pieces count: {}", pieceIdsByVendorId.size());
if (CollectionUtils.isEmpty(pieceIdsByVendorId)) {
log.info("createJobsByVendor:: No jobs are created, pieceIdsByVendorId is empty");
Map<Organization, List<String>> pieceIdsByVendor, RequestContext requestContext) {
log.info("createJobsByVendor:: Creating jobs by vendor, vendors by pieces count: {}", pieceIdsByVendor.size());
if (CollectionUtils.isEmpty(pieceIdsByVendor)) {
log.info("createJobsByVendor:: No jobs are created, pieceIdsByVendor is empty");
return Future.succeededFuture(new ClaimingResults()
.withClaimingPieceResults(createErrorClaimingResults(pieceIdsByVendorId, CANNOT_GROUP_PIECES_BY_VENDOR_MESSAGE.getValue())));
.withClaimingPieceResults(createErrorClaimingResults(pieceIdsByVendor, CANNOT_GROUP_PIECES_BY_VENDOR_MESSAGE.getValue())));
}
return collectResultsOnSuccess(createUpdatePiecesAndJobFutures(claimingCollection, config, pieceIdsByVendorId, requestContext))
return collectResultsOnSuccess(createUpdatePiecesAndJobFutures(claimingCollection, config, pieceIdsByVendor, requestContext))
.map(updatedPieceLists -> {
if (CollectionUtils.isEmpty(updatedPieceLists)) {
log.info("createJobsByVendor:: No pieces were processed for claiming");
return new ClaimingResults().withClaimingPieceResults(createErrorClaimingResults(pieceIdsByVendorId, CANNOT_CREATE_JOBS_AND_UPDATE_PIECES.getValue()));
return new ClaimingResults().withClaimingPieceResults(createErrorClaimingResults(pieceIdsByVendor, CANNOT_CREATE_JOBS_AND_UPDATE_PIECES.getValue()));
}
var successClaimingPieceResults = createSuccessClaimingResults(updatedPieceLists);
log.info("createJobsByVendor:: Successfully processed pieces for claiming, count: {}", successClaimingPieceResults.size());
Expand All @@ -174,15 +200,14 @@ private Future<ClaimingResults> createJobsByVendor(ClaimingCollection claimingCo
}

private List<Future<List<String>>> createUpdatePiecesAndJobFutures(ClaimingCollection claimingCollection, JsonObject config,
Map<String, List<String>> pieceIdsByVendorId, RequestContext requestContext) {
Map<Organization, List<String>> pieceIdsByVendor, RequestContext requestContext) {
var updatePiecesAndJobFutures = new ArrayList<Future<List<String>>>();
pieceIdsByVendorId.forEach((vendorId, pieceIds) -> config.stream()
.filter(pieceIdsByVendorIdEntry -> isExportTypeClaimsAndCorrectVendorId(vendorId, pieceIdsByVendorIdEntry)
&& Objects.nonNull(pieceIdsByVendorIdEntry.getValue()))
.forEach(pieceIdsByVendorIdEntry -> {
log.info("createJobsByVendor:: Preparing job integration detail for vendor, vendor id: {}, pieces: {}, job key: {}",
vendorId, pieceIds.size(), pieceIdsByVendorIdEntry.getKey());
updatePiecesAndJobFutures.add(updatePiecesAndCreateJob(claimingCollection, pieceIds, pieceIdsByVendorIdEntry, requestContext));
pieceIdsByVendor.forEach((vendor, pieceIds) -> config.stream()
.filter(pieceIdsByVendorIdEntry -> isExportTypeClaimsAndCorrectVendorId(vendor.getId(), pieceIdsByVendorIdEntry))
.forEach(configEntry -> {
log.info("createUpdatePiecesAndJobFutures:: Preparing job integration detail for vendor, vendor id: {}, pieces: {}, job key: {}",
vendor.getId(), pieceIds.size(), configEntry.getKey());
updatePiecesAndJobFutures.add(updatePiecesAndCreateJob(claimingCollection, pieceIds, configEntry, requestContext));
}));
return updatePiecesAndJobFutures;
}
Expand All @@ -193,27 +218,23 @@ private static ClaimingResults createEmptyClaimingResults(String message) {

private List<ClaimingPieceResult> createSuccessClaimingResults(List<List<String>> updatedPieceLists) {
return updatedPieceLists.stream().flatMap(Collection::stream).distinct()
.map(pieceId -> new ClaimingPieceResult().withPieceId(pieceId).withStatus(SUCCESS))
.toList();
}

private List<ClaimingPieceResult> createErrorClaimingResults(Map<String, List<String>> pieceIdsByVendorId, String message) {
return pieceIdsByVendorId.values().stream()
.flatMap(Collection::stream)
.map(pieceId -> new ClaimingPieceResult().withPieceId(pieceId).withStatus(FAILURE).withError(new Error().withMessage(message)))
.toList();
.map(pieceId -> new ClaimingPieceResult().withPieceId(pieceId).withStatus(SUCCESS))
.toList();
}

private static boolean isExportTypeClaimsAndCorrectVendorId(String vendorId, Map.Entry<String, Object> pieceIdsByVendorIdEntry) {
return pieceIdsByVendorIdEntry.getKey().startsWith(String.format("%s_%s", EXPORT_TYPE_CLAIMS, vendorId));
private List<ClaimingPieceResult> createErrorClaimingResults(Map<Organization, List<String>> pieceIdsByVendor, String message) {
return pieceIdsByVendor.values().stream()
.flatMap(Collection::stream)
.map(pieceId -> new ClaimingPieceResult().withPieceId(pieceId).withStatus(FAILURE).withError(new Error().withMessage(message)))
.toList();
}

private Future<List<String>> updatePiecesAndCreateJob(ClaimingCollection claimingCollection, List<String> pieceIds,
Map.Entry<String, Object> pieceIdsByVendorIdEntry, RequestContext requestContext) {
log.info("updatePiecesAndCreateJob:: Updating pieces and creating a job, job key: {}, count: {}", pieceIdsByVendorIdEntry.getKey(), pieceIds.size());
Map.Entry<String, Object> configEntry, RequestContext requestContext) {
log.info("updatePiecesAndCreateJob:: Updating pieces and creating a job, job key: {}, count: {}", configEntry.getKey(), pieceIds.size());
return pieceUpdateFlowManager.updatePiecesStatuses(pieceIds, PieceBatchStatusCollection.ReceivingStatus.CLAIM_SENT,
claimingCollection.getClaimingInterval(), claimingCollection.getInternalNote(), claimingCollection.getExternalNote(), requestContext)
.compose(v -> createJob(pieceIdsByVendorIdEntry.getKey(), pieceIdsByVendorIdEntry.getValue(), pieceIds, requestContext).map(pieceIds));
.compose(v -> createJob(configEntry.getKey(), configEntry.getValue(), pieceIds, requestContext).map(pieceIds));
}

private Future<Void> createJob(String configKey, Object configValue, List<String> pieceIds, RequestContext requestContext) {
Expand Down
5 changes: 2 additions & 3 deletions src/test/java/org/folio/rest/impl/PiecesClaimingApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ void testPostPiecesClaim(String name, int vendorIdx, int poLineIdx, int pieceIdx
assertThat(jobExecutions, hasSize(dto.jobExecutions));
assertThat(response.getClaimingPieceResults().size(), equalTo(dto.claimingResults));
pieceUpdates.forEach(pieceUpdate -> logger.info("Updated piece: {}", pieceUpdate.encodePrettily()));

var claimedPieceIds = jobCreations.stream()
.peek(job -> logger.info("Created job: {}", JsonObject.mapFrom(job).encodePrettily()))
.map(job -> job.getJsonObject(EXPORT_TYPE_SPECIFIC_PARAMETERS.getValue())
Expand All @@ -217,13 +216,13 @@ void testPostPiecesClaim(String name, int vendorIdx, int poLineIdx, int pieceIdx

response.getClaimingPieceResults()
.forEach(result -> {
assertThat(result.getPieceId(), not(nullValue()));
assertThat(result.getStatus(), is(expectedStatus));
if (expectedStatus == SUCCESS) {
assertThat(result.getPieceId(), not(nullValue()));
assertThat(result.getError(), is(nullValue()));
} else {
assertThat(result.getError(), is(notNullValue()));
}
assertThat(result.getStatus(), is(expectedStatus));
});
}
}
Loading

0 comments on commit ba3fd62

Please sign in to comment.