Skip to content

Commit

Permalink
Fixed issue with payment order ingestion error, where it was not pagi…
Browse files Browse the repository at this point in the history
…ng if the user had lots of objects in the same request.
  • Loading branch information
shafiquech committed Jun 27, 2024
1 parent 875f54d commit dc2830d
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 83 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog
All notable changes to this project will be documented in this file.

## [4.1.2](https://github.com/Backbase/stream-services/compare/4.1.2...4.1.3)
### Added
- Added Payment order Page size to fix an issue with payment order ingestion.
## [4.1.2](https://github.com/Backbase/stream-services/compare/4.1.2...4.1.1)
### Added
- Added CUSTOMERS data group enum type to legal entity OpenAPI contract
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,27 @@
import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.PROCESSED;
import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.READY;
import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.REJECTED;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.Collections.emptyList;
import static reactor.core.publisher.Flux.defer;
import static reactor.core.publisher.Flux.empty;
import static reactor.util.retry.Retry.fixedDelay;

import com.backbase.dbs.arrangement.api.service.v2.ArrangementsApi;
import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItem;
import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItems;
import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementsFilter;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import com.backbase.dbs.paymentorder.api.service.v2.PaymentOrdersApi;
import com.backbase.dbs.paymentorder.api.service.v2.model.GetPaymentOrderResponse;
import com.backbase.dbs.paymentorder.api.service.v2.model.PaymentOrderPostFilterRequest;
Expand All @@ -28,19 +44,20 @@
import com.backbase.stream.worker.configuration.StreamWorkerConfiguration;
import com.backbase.stream.worker.model.UnitOfWork;
import com.backbase.stream.worker.repository.UnitOfWorkRepository;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class PaymentOrderUnitOfWorkExecutor extends UnitOfWorkExecutor<PaymentOrderTask> {

private static final PaymentOrderPostFilterRequest FILTER = new PaymentOrderPostFilterRequest().statuses(List.of(READY, ACCEPTED, PROCESSED, CANCELLED, REJECTED, CANCELLATION_PENDING));

private static final int PAGE_SIZE = 1000;

private final PaymentOrdersApi paymentOrdersApi;
private final ArrangementsApi arrangementsApi;
private final PaymentOrderTypeMapper paymentOrderTypeMapper;
Expand Down Expand Up @@ -68,18 +85,18 @@ public Flux<UnitOfWork<PaymentOrderTask>> prepareUnitOfWork(List<PaymentOrderIng

public Flux<UnitOfWork<PaymentOrderTask>> prepareUnitOfWork(Flux<PaymentOrderPostRequest> items) {
return items.collectList()
.map(paymentOrderPostRequests -> this.createPaymentOrderIngestContext(paymentOrderPostRequests))
.flatMap(this::addArrangementIdMap)
.flatMap(this::getPersistedScheduledTransfers)
.flatMapMany(this::getPaymentOrderIngestRequest)
.bufferTimeout(streamWorkerConfiguration.getBufferSize(), streamWorkerConfiguration.getBufferMaxTime())
.flatMap(this::prepareUnitOfWork);
.map(paymentOrderPostRequests -> this.createPaymentOrderIngestContext(paymentOrderPostRequests))
.flatMap(this::addArrangementIdMap)
.flatMap(this::getPersistedScheduledTransfers)
.flatMapMany(this::getPaymentOrderIngestRequest)
.bufferTimeout(streamWorkerConfiguration.getBufferSize(), streamWorkerConfiguration.getBufferMaxTime())
.flatMap(this::prepareUnitOfWork);
}

private PaymentOrderIngestContext createPaymentOrderIngestContext(List<PaymentOrderPostRequest> paymentOrderPostRequests) {
PaymentOrderIngestContext paymentOrderIngestContext = new PaymentOrderIngestContext();
paymentOrderIngestContext.corePaymentOrder(paymentOrderPostRequests);
paymentOrderIngestContext.internalUserId(paymentOrderPostRequests.get(0).getInternalUserId());
paymentOrderIngestContext.corePaymentOrder(paymentOrderPostRequests == null ? emptyList() : paymentOrderPostRequests);
paymentOrderIngestContext.internalUserId(getInternalUserId(paymentOrderPostRequests));
return paymentOrderIngestContext;
}

Expand All @@ -95,27 +112,27 @@ private PaymentOrderIngestContext createPaymentOrderIngestContext(List<PaymentOr
List<GetPaymentOrderResponse> listOfPayments = new ArrayList<>();

return getPayments(paymentOrderIngestContext2.internalUserId())
.map(response -> {
listOfPayments.addAll(response.getPaymentOrders());
return listOfPayments;
})
.map(getPaymentOrderResponses -> paymentOrderIngestContext2.existingPaymentOrder(getPaymentOrderResponses))
.doOnSuccess(result ->
log.debug("Successfully fetched dbs scheduled payment orders"));
.map(response -> {
listOfPayments.addAll(response.getPaymentOrders());
return listOfPayments;
})
.map(getPaymentOrderResponses -> paymentOrderIngestContext2.existingPaymentOrder(getPaymentOrderResponses))
.doOnSuccess(result ->
log.debug("Successfully fetched dbs scheduled payment orders"));
}

private @NotNull @Valid Mono<PaymentOrderIngestContext> addArrangementIdMap(PaymentOrderIngestContext paymentOrderIngestContext) {

return Flux.fromIterable(paymentOrderIngestContext.corePaymentOrder())
.flatMap(this::getArrangement)
.distinct()
.collectList()
.map(accountInternalIdGetResponseBody -> paymentOrderIngestContext.arrangementIds(accountInternalIdGetResponseBody));
.flatMap(this::getArrangement)
.distinct()
.collectList()
.map(accountInternalIdGetResponseBody -> paymentOrderIngestContext.arrangementIds(accountInternalIdGetResponseBody));
}

private Mono<AccountArrangementItems> getArrangement(PaymentOrderPostRequest paymentOrderPostRequest) {
AccountArrangementsFilter accountArrangementsFilter = new AccountArrangementsFilter()
.externalArrangementIds(Collections.singleton(paymentOrderPostRequest.getOriginatorAccount().getExternalArrangementId()));
.addExternalArrangementIdsItem(paymentOrderPostRequest.getOriginatorAccount().getExternalArrangementId());
return arrangementsApi.postFilter(accountArrangementsFilter);
}

Expand All @@ -127,23 +144,29 @@ private Mono<AccountArrangementItems> getArrangement(PaymentOrderPostRequest pay
*/
private Mono<PaymentOrderPostFilterResponse> getPayments(String internalUserId) {

var paymentOrderPostFilterRequest = new PaymentOrderPostFilterRequest();
paymentOrderPostFilterRequest.setStatuses(
List.of(READY, ACCEPTED, PROCESSED, CANCELLED, REJECTED, CANCELLATION_PENDING));

return paymentOrdersApi.postFilterPaymentOrders(
null, null, null, null, null, null, null, null, null, null, null,
internalUserId, null, null, null, Integer.MAX_VALUE,
null, null, paymentOrderPostFilterRequest);
if (isEmptyUserId(internalUserId)) {
return Mono.just(new PaymentOrderPostFilterResponse().paymentOrders(emptyList()).totalElements(new BigDecimal(0)));
}
return pullFromDBS(internalUserId).map(result -> {
final var response = new PaymentOrderPostFilterResponse();
response.setPaymentOrders(result);
response.setTotalElements(new BigDecimal(result.size()));
return response;
});
}

private Flux<PaymentOrderIngestRequest> getPaymentOrderIngestRequest(PaymentOrderIngestContext paymentOrderIngestContext) {

List<PaymentOrderIngestRequest> paymentOrderIngestRequests = new ArrayList<>();
final var userId = paymentOrderIngestContext.internalUserId();
if (isEmptyUserId(userId)) {
return Flux.fromIterable(paymentOrderIngestRequests);
}
final List<PaymentOrderPostRequest> orders = paymentOrderIngestContext.corePaymentOrder() == null ? new ArrayList<>() : paymentOrderIngestContext.corePaymentOrder();

// list of all the bank ref ids in core
List<String> coreBankRefIds = new ArrayList<>();
for (PaymentOrderPostRequest coreBankRefId : paymentOrderIngestContext.corePaymentOrder() ) {
for (PaymentOrderPostRequest coreBankRefId : orders) {
coreBankRefIds.add(coreBankRefId.getBankReferenceId());
}

Expand All @@ -153,11 +176,13 @@ private Flux<PaymentOrderIngestRequest> getPaymentOrderIngestRequest(PaymentOrde
existingBankRefIds.add(existingBankRefId.getBankReferenceId());
}

final List<GetPaymentOrderResponse> existing = paymentOrderIngestContext.existingPaymentOrder() == null ? new ArrayList<>() : paymentOrderIngestContext.existingPaymentOrder();

// build new payment list (Bank ref is in core, but not in DBS)
paymentOrderIngestContext.corePaymentOrder().forEach(corePaymentOrder -> {
orders.forEach(corePaymentOrder -> {
if(!existingBankRefIds.contains(corePaymentOrder.getBankReferenceId())) {
AccountArrangementItem accountArrangementItem = getInternalArrangementId(paymentOrderIngestContext.arrangementIds(),
corePaymentOrder.getOriginatorAccount().getExternalArrangementId());
corePaymentOrder.getOriginatorAccount().getExternalArrangementId());
if (accountArrangementItem != null) {
corePaymentOrder.getOriginatorAccount().setArrangementId(accountArrangementItem.getId());
}
Expand All @@ -166,31 +191,79 @@ private Flux<PaymentOrderIngestRequest> getPaymentOrderIngestRequest(PaymentOrde
});

// build update payment list (Bank ref is in core and DBS)
paymentOrderIngestContext.corePaymentOrder().forEach(corePaymentOrder -> {
orders.forEach(corePaymentOrder -> {
if(existingBankRefIds.contains(corePaymentOrder.getBankReferenceId())) {
UpdatePaymentOrderIngestRequest updatePaymentOrderIngestRequest = new UpdatePaymentOrderIngestRequest(paymentOrderTypeMapper.mapPaymentOrderPostRequest(corePaymentOrder));
paymentOrderIngestRequests.add(updatePaymentOrderIngestRequest);
}
});

// build delete payment list (Bank ref is in DBS, but not in core)
buildDeletePaymentList(existing, coreBankRefIds, paymentOrderIngestRequests);

return Flux.fromIterable(paymentOrderIngestRequests);
}

private void buildDeletePaymentList(List<GetPaymentOrderResponse> existing, List<String> coreBankRefIds, List<PaymentOrderIngestRequest> paymentOrderIngestRequests) {
if (((PaymentOrderWorkerConfigurationProperties) streamWorkerConfiguration).isDeletePaymentOrder()) {
paymentOrderIngestContext.existingPaymentOrder().forEach(existingPaymentOrder -> {
existing.forEach(existingPaymentOrder -> {
if(!coreBankRefIds.contains(existingPaymentOrder.getBankReferenceId())) {
paymentOrderIngestRequests.add(new DeletePaymentOrderIngestRequest(existingPaymentOrder.getId(), existingPaymentOrder.getBankReferenceId()));
}
});
}

return Flux.fromIterable(paymentOrderIngestRequests);
}

private AccountArrangementItem getInternalArrangementId(List<AccountArrangementItems> accountArrangementItemsList, String externalArrangementId) {

return accountArrangementItemsList.stream()
.flatMap(a -> a.getArrangementElements().stream())
.filter(b -> b.getExternalArrangementId().equalsIgnoreCase(externalArrangementId))
.findFirst()
.orElse(null);
.flatMap(a -> a.getArrangementElements().stream())
.filter(b -> b.getExternalArrangementId().equalsIgnoreCase(externalArrangementId))
.findFirst()
.orElse(null);
}

private record DBSPaymentOrderPageResult(int next, int total, List<GetPaymentOrderResponse> requests) {

}

private Mono<List<GetPaymentOrderResponse>> pullFromDBS(final @NotNull String userid) {
return defer(() -> retrieveNextPage(0, userid)
.expand(page -> {
// If there are no more pages, return an empty flux.
if (page.next >= page.total || page.requests.isEmpty()) {
return empty();
} else {
return retrieveNextPage(page.next, userid);
}
}))
.collectList()
.map(pages -> pages.stream().flatMap(page -> page.requests.stream()).toList());
}

private Mono<DBSPaymentOrderPageResult> retrieveNextPage(int currentCount, final @NotNull String userId) {
return paymentOrdersApi.postFilterPaymentOrders(null, null, null, null, null, null, null, null,
null, null, null, userId, null, null, currentCount / PAGE_SIZE, PAGE_SIZE, null,
null, FILTER)
.retryWhen(fixedDelay(3, Duration.of(2000, MILLIS)).filter(
t -> t instanceof WebClientRequestException
|| t instanceof WebClientResponseException.ServiceUnavailable))
.map(resp -> {
final List<GetPaymentOrderResponse> results = resp.getPaymentOrders() == null ? emptyList() : resp.getPaymentOrders();
final var total = resp.getTotalElements() == null ? new BigDecimal(0).intValue() : resp.getTotalElements().intValue();
return new DBSPaymentOrderPageResult(currentCount + results.size(), total, results);
});
}

private boolean isEmptyUserId(String userId) {
return userId == null || userId.isBlank();
}

private String getInternalUserId(List<PaymentOrderPostRequest> paymentOrderPostRequests) {
if (paymentOrderPostRequests == null || paymentOrderPostRequests.isEmpty()) {
return null;
} else {
return paymentOrderPostRequests.get(0).getInternalUserId();
}
}
}
Loading

0 comments on commit dc2830d

Please sign in to comment.