From dc2830dcc55f93a5b6ca24a1baa31862322142c7 Mon Sep 17 00:00:00 2001 From: Shafiq Ur Rahman Date: Thu, 27 Jun 2024 14:49:13 -0400 Subject: [PATCH] Fixed issue with payment order ingestion error, where it was not paging if the user had lots of objects in the same request. --- CHANGELOG.md | 3 + .../PaymentOrderUnitOfWorkExecutor.java | 163 +++++++++++++----- .../PaymentOrderUnitOfWorkExecutorTest.java | 116 +++++++++---- 3 files changed, 199 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b27dfe3a..0c0796905 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ # Changelog All notable changes to this project will be documented in this file. +## [4.1.2](https://github.com/Backbase/stream-services/compare/4.1.2...4.1.3) +### Added +- Added Payment order Page size to fix an issue with payment order ingestion. ## [4.1.2](https://github.com/Backbase/stream-services/compare/4.1.2...4.1.1) ### Added - Added CUSTOMERS data group enum type to legal entity OpenAPI contract diff --git a/stream-payment-order/payment-order-core/src/main/java/com/backbase/stream/paymentorder/PaymentOrderUnitOfWorkExecutor.java b/stream-payment-order/payment-order-core/src/main/java/com/backbase/stream/paymentorder/PaymentOrderUnitOfWorkExecutor.java index 7384d0885..ab878a08a 100644 --- a/stream-payment-order/payment-order-core/src/main/java/com/backbase/stream/paymentorder/PaymentOrderUnitOfWorkExecutor.java +++ b/stream-payment-order/payment-order-core/src/main/java/com/backbase/stream/paymentorder/PaymentOrderUnitOfWorkExecutor.java @@ -6,11 +6,27 @@ import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.PROCESSED; import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.READY; import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.REJECTED; +import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.Collections.emptyList; +import static reactor.core.publisher.Flux.defer; +import static reactor.core.publisher.Flux.empty; +import static reactor.util.retry.Retry.fixedDelay; import com.backbase.dbs.arrangement.api.service.v2.ArrangementsApi; import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItem; import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItems; import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementsFilter; + +import java.math.BigDecimal; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + import com.backbase.dbs.paymentorder.api.service.v2.PaymentOrdersApi; import com.backbase.dbs.paymentorder.api.service.v2.model.GetPaymentOrderResponse; import com.backbase.dbs.paymentorder.api.service.v2.model.PaymentOrderPostFilterRequest; @@ -28,19 +44,20 @@ import com.backbase.stream.worker.configuration.StreamWorkerConfiguration; import com.backbase.stream.worker.model.UnitOfWork; import com.backbase.stream.worker.repository.UnitOfWorkRepository; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotNull; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Stream; + import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class PaymentOrderUnitOfWorkExecutor extends UnitOfWorkExecutor { + private static final PaymentOrderPostFilterRequest FILTER = new PaymentOrderPostFilterRequest().statuses(List.of(READY, ACCEPTED, PROCESSED, CANCELLED, REJECTED, CANCELLATION_PENDING)); + + private static final int PAGE_SIZE = 1000; + private final PaymentOrdersApi paymentOrdersApi; private final ArrangementsApi arrangementsApi; private final PaymentOrderTypeMapper paymentOrderTypeMapper; @@ -68,18 +85,18 @@ public Flux> prepareUnitOfWork(List> prepareUnitOfWork(Flux items) { return items.collectList() - .map(paymentOrderPostRequests -> this.createPaymentOrderIngestContext(paymentOrderPostRequests)) - .flatMap(this::addArrangementIdMap) - .flatMap(this::getPersistedScheduledTransfers) - .flatMapMany(this::getPaymentOrderIngestRequest) - .bufferTimeout(streamWorkerConfiguration.getBufferSize(), streamWorkerConfiguration.getBufferMaxTime()) - .flatMap(this::prepareUnitOfWork); + .map(paymentOrderPostRequests -> this.createPaymentOrderIngestContext(paymentOrderPostRequests)) + .flatMap(this::addArrangementIdMap) + .flatMap(this::getPersistedScheduledTransfers) + .flatMapMany(this::getPaymentOrderIngestRequest) + .bufferTimeout(streamWorkerConfiguration.getBufferSize(), streamWorkerConfiguration.getBufferMaxTime()) + .flatMap(this::prepareUnitOfWork); } private PaymentOrderIngestContext createPaymentOrderIngestContext(List paymentOrderPostRequests) { PaymentOrderIngestContext paymentOrderIngestContext = new PaymentOrderIngestContext(); - paymentOrderIngestContext.corePaymentOrder(paymentOrderPostRequests); - paymentOrderIngestContext.internalUserId(paymentOrderPostRequests.get(0).getInternalUserId()); + paymentOrderIngestContext.corePaymentOrder(paymentOrderPostRequests == null ? emptyList() : paymentOrderPostRequests); + paymentOrderIngestContext.internalUserId(getInternalUserId(paymentOrderPostRequests)); return paymentOrderIngestContext; } @@ -95,27 +112,27 @@ private PaymentOrderIngestContext createPaymentOrderIngestContext(List listOfPayments = new ArrayList<>(); return getPayments(paymentOrderIngestContext2.internalUserId()) - .map(response -> { - listOfPayments.addAll(response.getPaymentOrders()); - return listOfPayments; - }) - .map(getPaymentOrderResponses -> paymentOrderIngestContext2.existingPaymentOrder(getPaymentOrderResponses)) - .doOnSuccess(result -> - log.debug("Successfully fetched dbs scheduled payment orders")); + .map(response -> { + listOfPayments.addAll(response.getPaymentOrders()); + return listOfPayments; + }) + .map(getPaymentOrderResponses -> paymentOrderIngestContext2.existingPaymentOrder(getPaymentOrderResponses)) + .doOnSuccess(result -> + log.debug("Successfully fetched dbs scheduled payment orders")); } private @NotNull @Valid Mono addArrangementIdMap(PaymentOrderIngestContext paymentOrderIngestContext) { return Flux.fromIterable(paymentOrderIngestContext.corePaymentOrder()) - .flatMap(this::getArrangement) - .distinct() - .collectList() - .map(accountInternalIdGetResponseBody -> paymentOrderIngestContext.arrangementIds(accountInternalIdGetResponseBody)); + .flatMap(this::getArrangement) + .distinct() + .collectList() + .map(accountInternalIdGetResponseBody -> paymentOrderIngestContext.arrangementIds(accountInternalIdGetResponseBody)); } private Mono getArrangement(PaymentOrderPostRequest paymentOrderPostRequest) { AccountArrangementsFilter accountArrangementsFilter = new AccountArrangementsFilter() - .externalArrangementIds(Collections.singleton(paymentOrderPostRequest.getOriginatorAccount().getExternalArrangementId())); + .addExternalArrangementIdsItem(paymentOrderPostRequest.getOriginatorAccount().getExternalArrangementId()); return arrangementsApi.postFilter(accountArrangementsFilter); } @@ -127,23 +144,29 @@ private Mono getArrangement(PaymentOrderPostRequest pay */ private Mono getPayments(String internalUserId) { - var paymentOrderPostFilterRequest = new PaymentOrderPostFilterRequest(); - paymentOrderPostFilterRequest.setStatuses( - List.of(READY, ACCEPTED, PROCESSED, CANCELLED, REJECTED, CANCELLATION_PENDING)); - - return paymentOrdersApi.postFilterPaymentOrders( - null, null, null, null, null, null, null, null, null, null, null, - internalUserId, null, null, null, Integer.MAX_VALUE, - null, null, paymentOrderPostFilterRequest); + if (isEmptyUserId(internalUserId)) { + return Mono.just(new PaymentOrderPostFilterResponse().paymentOrders(emptyList()).totalElements(new BigDecimal(0))); + } + return pullFromDBS(internalUserId).map(result -> { + final var response = new PaymentOrderPostFilterResponse(); + response.setPaymentOrders(result); + response.setTotalElements(new BigDecimal(result.size())); + return response; + }); } private Flux getPaymentOrderIngestRequest(PaymentOrderIngestContext paymentOrderIngestContext) { List paymentOrderIngestRequests = new ArrayList<>(); + final var userId = paymentOrderIngestContext.internalUserId(); + if (isEmptyUserId(userId)) { + return Flux.fromIterable(paymentOrderIngestRequests); + } + final List orders = paymentOrderIngestContext.corePaymentOrder() == null ? new ArrayList<>() : paymentOrderIngestContext.corePaymentOrder(); // list of all the bank ref ids in core List coreBankRefIds = new ArrayList<>(); - for (PaymentOrderPostRequest coreBankRefId : paymentOrderIngestContext.corePaymentOrder() ) { + for (PaymentOrderPostRequest coreBankRefId : orders) { coreBankRefIds.add(coreBankRefId.getBankReferenceId()); } @@ -153,11 +176,13 @@ private Flux getPaymentOrderIngestRequest(PaymentOrde existingBankRefIds.add(existingBankRefId.getBankReferenceId()); } + final List existing = paymentOrderIngestContext.existingPaymentOrder() == null ? new ArrayList<>() : paymentOrderIngestContext.existingPaymentOrder(); + // build new payment list (Bank ref is in core, but not in DBS) - paymentOrderIngestContext.corePaymentOrder().forEach(corePaymentOrder -> { + orders.forEach(corePaymentOrder -> { if(!existingBankRefIds.contains(corePaymentOrder.getBankReferenceId())) { AccountArrangementItem accountArrangementItem = getInternalArrangementId(paymentOrderIngestContext.arrangementIds(), - corePaymentOrder.getOriginatorAccount().getExternalArrangementId()); + corePaymentOrder.getOriginatorAccount().getExternalArrangementId()); if (accountArrangementItem != null) { corePaymentOrder.getOriginatorAccount().setArrangementId(accountArrangementItem.getId()); } @@ -166,7 +191,7 @@ private Flux getPaymentOrderIngestRequest(PaymentOrde }); // build update payment list (Bank ref is in core and DBS) - paymentOrderIngestContext.corePaymentOrder().forEach(corePaymentOrder -> { + orders.forEach(corePaymentOrder -> { if(existingBankRefIds.contains(corePaymentOrder.getBankReferenceId())) { UpdatePaymentOrderIngestRequest updatePaymentOrderIngestRequest = new UpdatePaymentOrderIngestRequest(paymentOrderTypeMapper.mapPaymentOrderPostRequest(corePaymentOrder)); paymentOrderIngestRequests.add(updatePaymentOrderIngestRequest); @@ -174,23 +199,71 @@ private Flux getPaymentOrderIngestRequest(PaymentOrde }); // build delete payment list (Bank ref is in DBS, but not in core) + buildDeletePaymentList(existing, coreBankRefIds, paymentOrderIngestRequests); + + return Flux.fromIterable(paymentOrderIngestRequests); + } + + private void buildDeletePaymentList(List existing, List coreBankRefIds, List paymentOrderIngestRequests) { if (((PaymentOrderWorkerConfigurationProperties) streamWorkerConfiguration).isDeletePaymentOrder()) { - paymentOrderIngestContext.existingPaymentOrder().forEach(existingPaymentOrder -> { + existing.forEach(existingPaymentOrder -> { if(!coreBankRefIds.contains(existingPaymentOrder.getBankReferenceId())) { paymentOrderIngestRequests.add(new DeletePaymentOrderIngestRequest(existingPaymentOrder.getId(), existingPaymentOrder.getBankReferenceId())); } }); } - - return Flux.fromIterable(paymentOrderIngestRequests); } private AccountArrangementItem getInternalArrangementId(List accountArrangementItemsList, String externalArrangementId) { return accountArrangementItemsList.stream() - .flatMap(a -> a.getArrangementElements().stream()) - .filter(b -> b.getExternalArrangementId().equalsIgnoreCase(externalArrangementId)) - .findFirst() - .orElse(null); + .flatMap(a -> a.getArrangementElements().stream()) + .filter(b -> b.getExternalArrangementId().equalsIgnoreCase(externalArrangementId)) + .findFirst() + .orElse(null); + } + + private record DBSPaymentOrderPageResult(int next, int total, List requests) { + + } + + private Mono> pullFromDBS(final @NotNull String userid) { + return defer(() -> retrieveNextPage(0, userid) + .expand(page -> { + // If there are no more pages, return an empty flux. + if (page.next >= page.total || page.requests.isEmpty()) { + return empty(); + } else { + return retrieveNextPage(page.next, userid); + } + })) + .collectList() + .map(pages -> pages.stream().flatMap(page -> page.requests.stream()).toList()); + } + + private Mono retrieveNextPage(int currentCount, final @NotNull String userId) { + return paymentOrdersApi.postFilterPaymentOrders(null, null, null, null, null, null, null, null, + null, null, null, userId, null, null, currentCount / PAGE_SIZE, PAGE_SIZE, null, + null, FILTER) + .retryWhen(fixedDelay(3, Duration.of(2000, MILLIS)).filter( + t -> t instanceof WebClientRequestException + || t instanceof WebClientResponseException.ServiceUnavailable)) + .map(resp -> { + final List results = resp.getPaymentOrders() == null ? emptyList() : resp.getPaymentOrders(); + final var total = resp.getTotalElements() == null ? new BigDecimal(0).intValue() : resp.getTotalElements().intValue(); + return new DBSPaymentOrderPageResult(currentCount + results.size(), total, results); + }); + } + + private boolean isEmptyUserId(String userId) { + return userId == null || userId.isBlank(); + } + + private String getInternalUserId(List paymentOrderPostRequests) { + if (paymentOrderPostRequests == null || paymentOrderPostRequests.isEmpty()) { + return null; + } else { + return paymentOrderPostRequests.get(0).getInternalUserId(); + } } } \ No newline at end of file diff --git a/stream-payment-order/payment-order-core/src/test/java/com/backbase/stream/task/PaymentOrderUnitOfWorkExecutorTest.java b/stream-payment-order/payment-order-core/src/test/java/com/backbase/stream/task/PaymentOrderUnitOfWorkExecutorTest.java index e946bcf8c..7abc64a3d 100644 --- a/stream-payment-order/payment-order-core/src/test/java/com/backbase/stream/task/PaymentOrderUnitOfWorkExecutorTest.java +++ b/stream-payment-order/payment-order-core/src/test/java/com/backbase/stream/task/PaymentOrderUnitOfWorkExecutorTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.lenient; import com.backbase.dbs.arrangement.api.service.v2.ArrangementsApi; import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItem; @@ -23,6 +24,8 @@ import com.backbase.stream.worker.model.UnitOfWork; import com.backbase.stream.worker.repository.UnitOfWorkRepository; import java.math.BigDecimal; + +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,42 +62,42 @@ public class PaymentOrderUnitOfWorkExecutorTest extends PaymentOrderBaseTest { @BeforeEach void setup() { paymentOrderUnitOfWorkExecutor = new PaymentOrderUnitOfWorkExecutor( - repository, streamTaskExecutor, streamWorkerConfiguration, - paymentOrdersApi, arrangementsApi, paymentOrderTypeMapper); + repository, streamTaskExecutor, streamWorkerConfiguration, + paymentOrdersApi, arrangementsApi, paymentOrderTypeMapper); } @Test void test_prepareUnitOfWork_paymentOrderIngestRequestList() { List paymentOrderIngestRequestList = List.of( - new NewPaymentOrderIngestRequest(paymentOrderPostRequest.get(0)), - new NewPaymentOrderIngestRequest(paymentOrderPostRequest.get(1)) + new NewPaymentOrderIngestRequest(paymentOrderPostRequest.get(0)), + new NewPaymentOrderIngestRequest(paymentOrderPostRequest.get(1)) ); PaymentOrderPostResponse paymentOrderPostResponse = new PaymentOrderPostResponse() - .id("po_post_resp_id") - .putAdditionsItem("key", "val"); + .id("po_post_resp_id") + .putAdditionsItem("key", "val"); - Mockito.lenient().when(paymentOrdersApi.postPaymentOrder(Mockito.any())) - .thenReturn(Mono.just(paymentOrderPostResponse)); + lenient().when(paymentOrdersApi.postPaymentOrder(Mockito.any())) + .thenReturn(Mono.just(paymentOrderPostResponse)); AccountArrangementItem accountArrangementItem = new AccountArrangementItem() - .id("arrangementId_1") - .externalArrangementId("externalArrangementId_1"); + .id("arrangementId_1") + .externalArrangementId("externalArrangementId_1"); AccountArrangementItems accountArrangementItems = new AccountArrangementItems() - .addArrangementElementsItem(accountArrangementItem); + .addArrangementElementsItem(accountArrangementItem); - Mockito.lenient().when(arrangementsApi.postFilter(Mockito.any())) - .thenReturn(Mono.just(accountArrangementItems)); + lenient().when(arrangementsApi.postFilter(Mockito.any())) + .thenReturn(Mono.just(accountArrangementItems)); StepVerifier.create(paymentOrderUnitOfWorkExecutor.prepareUnitOfWork(paymentOrderIngestRequestList)) - .assertNext(unitOfWork -> { - Assertions.assertTrue(unitOfWork.getUnitOfOWorkId().startsWith("payment-orders-mixed-")); - Assertions.assertEquals(UnitOfWork.State.NEW, unitOfWork.getState()); - Assertions.assertEquals(1, unitOfWork.getStreamTasks().size()); - Assertions.assertEquals(paymentOrderIngestRequestList.size(), unitOfWork.getStreamTasks().get(0).getData().size()); - }) - .verifyComplete(); + .assertNext(unitOfWork -> { + Assertions.assertTrue(unitOfWork.getUnitOfOWorkId().startsWith("payment-orders-mixed-")); + Assertions.assertEquals(UnitOfWork.State.NEW, unitOfWork.getState()); + Assertions.assertEquals(1, unitOfWork.getStreamTasks().size()); + Assertions.assertEquals(paymentOrderIngestRequestList.size(), unitOfWork.getStreamTasks().get(0).getData().size()); + }) + .verifyComplete(); } @Test @@ -102,37 +105,74 @@ void test_prepareUnitOfWork_paymentOrderPostRequestFlux() { Flux paymentOrderPostRequestFlux = Flux.fromIterable(paymentOrderPostRequest); PaymentOrderPostResponse paymentOrderPostResponse = new PaymentOrderPostResponse() - .id("po_post_resp_id") - .putAdditionsItem("key", "val"); + .id("po_post_resp_id") + .putAdditionsItem("key", "val"); - Mockito.lenient().when(paymentOrdersApi.postPaymentOrder(any())) - .thenReturn(Mono.just(paymentOrderPostResponse)); + lenient().when(paymentOrdersApi.postPaymentOrder(any())) + .thenReturn(Mono.just(paymentOrderPostResponse)); GetPaymentOrderResponse getPaymentOrderResponse = new GetPaymentOrderResponse() - .id("arrangementId_1"); + .id("arrangementId_1") + .bankReferenceId("bankReferenceId_1"); PaymentOrderPostFilterResponse paymentOrderPostFilterResponse = new PaymentOrderPostFilterResponse() - .addPaymentOrdersItem(getPaymentOrderResponse) - .totalElements(new BigDecimal(1)); + .addPaymentOrdersItem(getPaymentOrderResponse) + .totalElements(new BigDecimal(1)); doReturn(Mono.just(paymentOrderPostFilterResponse)).when(paymentOrdersApi).postFilterPaymentOrders(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()); AccountArrangementItem accountArrangementItem = new AccountArrangementItem() - .id("arrangementId_1") - .externalArrangementId("externalArrangementId_1"); + .id("arrangementId_1") + .externalArrangementId("externalArrangementId_1"); AccountArrangementItems accountArrangementItems = new AccountArrangementItems() - .addArrangementElementsItem(accountArrangementItem); + .addArrangementElementsItem(accountArrangementItem); Mockito.when(arrangementsApi.postFilter(Mockito.any())) - .thenReturn(Mono.just(accountArrangementItems)); + .thenReturn(Mono.just(accountArrangementItems)); StepVerifier.create(paymentOrderUnitOfWorkExecutor.prepareUnitOfWork(paymentOrderPostRequestFlux)) - .assertNext(unitOfWork -> { - Assertions.assertTrue(unitOfWork.getUnitOfOWorkId().startsWith("payment-orders-mixed-")); - Assertions.assertEquals(UnitOfWork.State.NEW, unitOfWork.getState()); - Assertions.assertEquals(1, unitOfWork.getStreamTasks().size()); - Assertions.assertEquals(paymentOrderPostRequest.size(), unitOfWork.getStreamTasks().get(0).getData().size()); - }) - .verifyComplete(); + .assertNext(unitOfWork -> { + Assertions.assertTrue(unitOfWork.getUnitOfOWorkId().startsWith("payment-orders-mixed-")); + Assertions.assertEquals(UnitOfWork.State.NEW, unitOfWork.getState()); + Assertions.assertEquals(1, unitOfWork.getStreamTasks().size()); + Assertions.assertEquals(paymentOrderPostRequest.size(), unitOfWork.getStreamTasks().get(0).getData().size()); + }) + .verifyComplete(); + } + + @Test + void test_prepareunitofwork_blankuserid() { + + paymentOrderUnitOfWorkExecutor = new PaymentOrderUnitOfWorkExecutor( + repository, streamTaskExecutor, streamWorkerConfiguration, + paymentOrdersApi, arrangementsApi, null); + + paymentOrderPostRequest.get(0).setInternalUserId(StringUtils.EMPTY); + paymentOrderPostRequest.get(1).setInternalUserId(null); + + Flux paymentOrderPostRequestFlux = Flux.fromIterable(paymentOrderPostRequest); + + AccountArrangementItem accountArrangementItem = new AccountArrangementItem() + .id("arrangementId_1") + .externalArrangementId("externalArrangementId_1"); + AccountArrangementItems accountArrangementItems = new AccountArrangementItems() + .addArrangementElementsItem(accountArrangementItem); + + lenient().when(arrangementsApi.postFilter(Mockito.any())) + .thenReturn(Mono.just(accountArrangementItems)); + + GetPaymentOrderResponse getPaymentOrderResponseWithEmptyUserId = new GetPaymentOrderResponse() + .id("arrangementId_1"); + + PaymentOrderPostFilterResponse paymentOrderPostFilterResponse = new PaymentOrderPostFilterResponse() + .addPaymentOrdersItem(getPaymentOrderResponseWithEmptyUserId) + .totalElements(new BigDecimal(1)); + + lenient().doReturn(Mono.just(paymentOrderPostFilterResponse)).when(paymentOrdersApi).postFilterPaymentOrders(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()); + + StepVerifier + .create(paymentOrderUnitOfWorkExecutor.prepareUnitOfWork(paymentOrderPostRequestFlux)) + .expectNextCount(0) + .verifyComplete(); } } \ No newline at end of file