Skip to content

Commit

Permalink
Merge pull request #430 from Backbase/hotfix/payment-order-fix
Browse files Browse the repository at this point in the history
Fixed issue with payment order ingestion
  • Loading branch information
shafiquech authored Jun 27, 2024
2 parents 875f54d + dc2830d commit 04e10d6
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 83 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog
All notable changes to this project will be documented in this file.

## [4.1.2](https://github.com/Backbase/stream-services/compare/4.1.2...4.1.3)
### Added
- Added Payment order Page size to fix an issue with payment order ingestion.
## [4.1.2](https://github.com/Backbase/stream-services/compare/4.1.2...4.1.1)
### Added
- Added CUSTOMERS data group enum type to legal entity OpenAPI contract
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,27 @@
import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.PROCESSED;
import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.READY;
import static com.backbase.dbs.paymentorder.api.service.v2.model.Status.REJECTED;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.Collections.emptyList;
import static reactor.core.publisher.Flux.defer;
import static reactor.core.publisher.Flux.empty;
import static reactor.util.retry.Retry.fixedDelay;

import com.backbase.dbs.arrangement.api.service.v2.ArrangementsApi;
import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItem;
import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementItems;
import com.backbase.dbs.arrangement.api.service.v2.model.AccountArrangementsFilter;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import com.backbase.dbs.paymentorder.api.service.v2.PaymentOrdersApi;
import com.backbase.dbs.paymentorder.api.service.v2.model.GetPaymentOrderResponse;
import com.backbase.dbs.paymentorder.api.service.v2.model.PaymentOrderPostFilterRequest;
Expand All @@ -28,19 +44,20 @@
import com.backbase.stream.worker.configuration.StreamWorkerConfiguration;
import com.backbase.stream.worker.model.UnitOfWork;
import com.backbase.stream.worker.repository.UnitOfWorkRepository;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class PaymentOrderUnitOfWorkExecutor extends UnitOfWorkExecutor<PaymentOrderTask> {

private static final PaymentOrderPostFilterRequest FILTER = new PaymentOrderPostFilterRequest().statuses(List.of(READY, ACCEPTED, PROCESSED, CANCELLED, REJECTED, CANCELLATION_PENDING));

private static final int PAGE_SIZE = 1000;

private final PaymentOrdersApi paymentOrdersApi;
private final ArrangementsApi arrangementsApi;
private final PaymentOrderTypeMapper paymentOrderTypeMapper;
Expand Down Expand Up @@ -68,18 +85,18 @@ public Flux<UnitOfWork<PaymentOrderTask>> prepareUnitOfWork(List<PaymentOrderIng

public Flux<UnitOfWork<PaymentOrderTask>> prepareUnitOfWork(Flux<PaymentOrderPostRequest> items) {
return items.collectList()
.map(paymentOrderPostRequests -> this.createPaymentOrderIngestContext(paymentOrderPostRequests))
.flatMap(this::addArrangementIdMap)
.flatMap(this::getPersistedScheduledTransfers)
.flatMapMany(this::getPaymentOrderIngestRequest)
.bufferTimeout(streamWorkerConfiguration.getBufferSize(), streamWorkerConfiguration.getBufferMaxTime())
.flatMap(this::prepareUnitOfWork);
.map(paymentOrderPostRequests -> this.createPaymentOrderIngestContext(paymentOrderPostRequests))
.flatMap(this::addArrangementIdMap)
.flatMap(this::getPersistedScheduledTransfers)
.flatMapMany(this::getPaymentOrderIngestRequest)
.bufferTimeout(streamWorkerConfiguration.getBufferSize(), streamWorkerConfiguration.getBufferMaxTime())
.flatMap(this::prepareUnitOfWork);
}

private PaymentOrderIngestContext createPaymentOrderIngestContext(List<PaymentOrderPostRequest> paymentOrderPostRequests) {
PaymentOrderIngestContext paymentOrderIngestContext = new PaymentOrderIngestContext();
paymentOrderIngestContext.corePaymentOrder(paymentOrderPostRequests);
paymentOrderIngestContext.internalUserId(paymentOrderPostRequests.get(0).getInternalUserId());
paymentOrderIngestContext.corePaymentOrder(paymentOrderPostRequests == null ? emptyList() : paymentOrderPostRequests);
paymentOrderIngestContext.internalUserId(getInternalUserId(paymentOrderPostRequests));
return paymentOrderIngestContext;
}

Expand All @@ -95,27 +112,27 @@ private PaymentOrderIngestContext createPaymentOrderIngestContext(List<PaymentOr
List<GetPaymentOrderResponse> listOfPayments = new ArrayList<>();

return getPayments(paymentOrderIngestContext2.internalUserId())
.map(response -> {
listOfPayments.addAll(response.getPaymentOrders());
return listOfPayments;
})
.map(getPaymentOrderResponses -> paymentOrderIngestContext2.existingPaymentOrder(getPaymentOrderResponses))
.doOnSuccess(result ->
log.debug("Successfully fetched dbs scheduled payment orders"));
.map(response -> {
listOfPayments.addAll(response.getPaymentOrders());
return listOfPayments;
})
.map(getPaymentOrderResponses -> paymentOrderIngestContext2.existingPaymentOrder(getPaymentOrderResponses))
.doOnSuccess(result ->
log.debug("Successfully fetched dbs scheduled payment orders"));
}

private @NotNull @Valid Mono<PaymentOrderIngestContext> addArrangementIdMap(PaymentOrderIngestContext paymentOrderIngestContext) {

return Flux.fromIterable(paymentOrderIngestContext.corePaymentOrder())
.flatMap(this::getArrangement)
.distinct()
.collectList()
.map(accountInternalIdGetResponseBody -> paymentOrderIngestContext.arrangementIds(accountInternalIdGetResponseBody));
.flatMap(this::getArrangement)
.distinct()
.collectList()
.map(accountInternalIdGetResponseBody -> paymentOrderIngestContext.arrangementIds(accountInternalIdGetResponseBody));
}

private Mono<AccountArrangementItems> getArrangement(PaymentOrderPostRequest paymentOrderPostRequest) {
AccountArrangementsFilter accountArrangementsFilter = new AccountArrangementsFilter()
.externalArrangementIds(Collections.singleton(paymentOrderPostRequest.getOriginatorAccount().getExternalArrangementId()));
.addExternalArrangementIdsItem(paymentOrderPostRequest.getOriginatorAccount().getExternalArrangementId());
return arrangementsApi.postFilter(accountArrangementsFilter);
}

Expand All @@ -127,23 +144,29 @@ private Mono<AccountArrangementItems> getArrangement(PaymentOrderPostRequest pay
*/
private Mono<PaymentOrderPostFilterResponse> getPayments(String internalUserId) {

var paymentOrderPostFilterRequest = new PaymentOrderPostFilterRequest();
paymentOrderPostFilterRequest.setStatuses(
List.of(READY, ACCEPTED, PROCESSED, CANCELLED, REJECTED, CANCELLATION_PENDING));

return paymentOrdersApi.postFilterPaymentOrders(
null, null, null, null, null, null, null, null, null, null, null,
internalUserId, null, null, null, Integer.MAX_VALUE,
null, null, paymentOrderPostFilterRequest);
if (isEmptyUserId(internalUserId)) {
return Mono.just(new PaymentOrderPostFilterResponse().paymentOrders(emptyList()).totalElements(new BigDecimal(0)));
}
return pullFromDBS(internalUserId).map(result -> {
final var response = new PaymentOrderPostFilterResponse();
response.setPaymentOrders(result);
response.setTotalElements(new BigDecimal(result.size()));
return response;
});
}

private Flux<PaymentOrderIngestRequest> getPaymentOrderIngestRequest(PaymentOrderIngestContext paymentOrderIngestContext) {

List<PaymentOrderIngestRequest> paymentOrderIngestRequests = new ArrayList<>();
final var userId = paymentOrderIngestContext.internalUserId();
if (isEmptyUserId(userId)) {
return Flux.fromIterable(paymentOrderIngestRequests);
}
final List<PaymentOrderPostRequest> orders = paymentOrderIngestContext.corePaymentOrder() == null ? new ArrayList<>() : paymentOrderIngestContext.corePaymentOrder();

// list of all the bank ref ids in core
List<String> coreBankRefIds = new ArrayList<>();
for (PaymentOrderPostRequest coreBankRefId : paymentOrderIngestContext.corePaymentOrder() ) {
for (PaymentOrderPostRequest coreBankRefId : orders) {
coreBankRefIds.add(coreBankRefId.getBankReferenceId());
}

Expand All @@ -153,11 +176,13 @@ private Flux<PaymentOrderIngestRequest> getPaymentOrderIngestRequest(PaymentOrde
existingBankRefIds.add(existingBankRefId.getBankReferenceId());
}

final List<GetPaymentOrderResponse> existing = paymentOrderIngestContext.existingPaymentOrder() == null ? new ArrayList<>() : paymentOrderIngestContext.existingPaymentOrder();

// build new payment list (Bank ref is in core, but not in DBS)
paymentOrderIngestContext.corePaymentOrder().forEach(corePaymentOrder -> {
orders.forEach(corePaymentOrder -> {
if(!existingBankRefIds.contains(corePaymentOrder.getBankReferenceId())) {
AccountArrangementItem accountArrangementItem = getInternalArrangementId(paymentOrderIngestContext.arrangementIds(),
corePaymentOrder.getOriginatorAccount().getExternalArrangementId());
corePaymentOrder.getOriginatorAccount().getExternalArrangementId());
if (accountArrangementItem != null) {
corePaymentOrder.getOriginatorAccount().setArrangementId(accountArrangementItem.getId());
}
Expand All @@ -166,31 +191,79 @@ private Flux<PaymentOrderIngestRequest> getPaymentOrderIngestRequest(PaymentOrde
});

// build update payment list (Bank ref is in core and DBS)
paymentOrderIngestContext.corePaymentOrder().forEach(corePaymentOrder -> {
orders.forEach(corePaymentOrder -> {
if(existingBankRefIds.contains(corePaymentOrder.getBankReferenceId())) {
UpdatePaymentOrderIngestRequest updatePaymentOrderIngestRequest = new UpdatePaymentOrderIngestRequest(paymentOrderTypeMapper.mapPaymentOrderPostRequest(corePaymentOrder));
paymentOrderIngestRequests.add(updatePaymentOrderIngestRequest);
}
});

// build delete payment list (Bank ref is in DBS, but not in core)
buildDeletePaymentList(existing, coreBankRefIds, paymentOrderIngestRequests);

return Flux.fromIterable(paymentOrderIngestRequests);
}

private void buildDeletePaymentList(List<GetPaymentOrderResponse> existing, List<String> coreBankRefIds, List<PaymentOrderIngestRequest> paymentOrderIngestRequests) {
if (((PaymentOrderWorkerConfigurationProperties) streamWorkerConfiguration).isDeletePaymentOrder()) {
paymentOrderIngestContext.existingPaymentOrder().forEach(existingPaymentOrder -> {
existing.forEach(existingPaymentOrder -> {
if(!coreBankRefIds.contains(existingPaymentOrder.getBankReferenceId())) {
paymentOrderIngestRequests.add(new DeletePaymentOrderIngestRequest(existingPaymentOrder.getId(), existingPaymentOrder.getBankReferenceId()));
}
});
}

return Flux.fromIterable(paymentOrderIngestRequests);
}

private AccountArrangementItem getInternalArrangementId(List<AccountArrangementItems> accountArrangementItemsList, String externalArrangementId) {

return accountArrangementItemsList.stream()
.flatMap(a -> a.getArrangementElements().stream())
.filter(b -> b.getExternalArrangementId().equalsIgnoreCase(externalArrangementId))
.findFirst()
.orElse(null);
.flatMap(a -> a.getArrangementElements().stream())
.filter(b -> b.getExternalArrangementId().equalsIgnoreCase(externalArrangementId))
.findFirst()
.orElse(null);
}

private record DBSPaymentOrderPageResult(int next, int total, List<GetPaymentOrderResponse> requests) {

}

private Mono<List<GetPaymentOrderResponse>> pullFromDBS(final @NotNull String userid) {
return defer(() -> retrieveNextPage(0, userid)
.expand(page -> {
// If there are no more pages, return an empty flux.
if (page.next >= page.total || page.requests.isEmpty()) {
return empty();
} else {
return retrieveNextPage(page.next, userid);
}
}))
.collectList()
.map(pages -> pages.stream().flatMap(page -> page.requests.stream()).toList());
}

private Mono<DBSPaymentOrderPageResult> retrieveNextPage(int currentCount, final @NotNull String userId) {
return paymentOrdersApi.postFilterPaymentOrders(null, null, null, null, null, null, null, null,
null, null, null, userId, null, null, currentCount / PAGE_SIZE, PAGE_SIZE, null,
null, FILTER)
.retryWhen(fixedDelay(3, Duration.of(2000, MILLIS)).filter(
t -> t instanceof WebClientRequestException
|| t instanceof WebClientResponseException.ServiceUnavailable))
.map(resp -> {
final List<GetPaymentOrderResponse> results = resp.getPaymentOrders() == null ? emptyList() : resp.getPaymentOrders();
final var total = resp.getTotalElements() == null ? new BigDecimal(0).intValue() : resp.getTotalElements().intValue();
return new DBSPaymentOrderPageResult(currentCount + results.size(), total, results);
});
}

private boolean isEmptyUserId(String userId) {
return userId == null || userId.isBlank();
}

private String getInternalUserId(List<PaymentOrderPostRequest> paymentOrderPostRequests) {
if (paymentOrderPostRequests == null || paymentOrderPostRequests.isEmpty()) {
return null;
} else {
return paymentOrderPostRequests.get(0).getInternalUserId();
}
}
}
Loading

0 comments on commit 04e10d6

Please sign in to comment.