Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MODINVOICE-478] Prevent conflicts when saving multiple invoice lines with data import #413

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import static org.folio.invoices.utils.HelperUtils.collectResultsOnSuccess;
import static org.folio.invoices.utils.ResourcePathResolver.ORDER_LINES;
import static org.folio.invoices.utils.ResourcePathResolver.resourcesPath;
import static org.folio.rest.RestConstants.SEMAPHORE_MAX_ACTIVE_THREADS;
import static org.folio.rest.jaxrs.model.EntityType.EDIFACT_INVOICE;
import static org.folio.rest.jaxrs.model.InvoiceLine.InvoiceLineStatus.OPEN;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;
Expand All @@ -23,11 +22,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
Expand All @@ -52,7 +51,6 @@
import org.folio.rest.core.models.RequestContext;
import org.folio.rest.core.models.RequestEntry;
import org.folio.rest.impl.InvoiceHelper;
import org.folio.rest.impl.InvoiceLineHelper;
import org.folio.rest.jaxrs.model.Invoice;
import org.folio.rest.jaxrs.model.InvoiceLine;
import org.folio.rest.jaxrs.model.InvoiceLineCollection;
Expand All @@ -62,7 +60,6 @@
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertxconcurrent.Semaphore;

public class CreateInvoiceEventHandler implements EventHandler {

Expand Down Expand Up @@ -106,6 +103,7 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d

Map<String, String> okapiHeaders = DataImportUtils.getOkapiHeaders(dataImportEventPayload);
Future<Map<Integer, PoLine>> poLinesFuture = getAssociatedPoLines(dataImportEventPayload, okapiHeaders);
InvoiceHelper invoiceHelper = new InvoiceHelper(okapiHeaders, Vertx.currentContext());

poLinesFuture
.map(invLineNoToPoLine -> {
Expand All @@ -115,47 +113,26 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
prepareMappingResult(dataImportEventPayload);
return null;
})
.compose(v -> saveInvoice(dataImportEventPayload, okapiHeaders))
.map(savedInvoice -> prepareInvoiceLinesToSave(savedInvoice.getId(), dataImportEventPayload, poLinesFuture.result()))
.compose(preparedInvoiceLines -> saveInvoiceLines(preparedInvoiceLines, okapiHeaders))
.map(v -> prepareInvoiceAndLinesToSave(dataImportEventPayload, poLinesFuture.result()))
.compose(invoiceAndLines -> createInvoiceAndLines(dataImportEventPayload, invoiceAndLines, invoiceHelper))
.onComplete(result -> {
makeLightweightReturnPayload(dataImportEventPayload);

if (result.succeeded()) {
List<InvoiceLine> invoiceLines = result.result().stream().map(Pair::getLeft).collect(Collectors.toList());
InvoiceLineCollection invoiceLineCollection = new InvoiceLineCollection().withInvoiceLines(invoiceLines).withTotalRecords(invoiceLines.size());
dataImportEventPayload.getContext().put(INVOICE_LINES_KEY, Json.encode(invoiceLineCollection));
Map<Integer, String> invoiceLinesErrors = prepareInvoiceLinesErrors(result.result());
if (!invoiceLinesErrors.isEmpty()) {
dataImportEventPayload.getContext().put(INVOICE_LINES_ERRORS_KEY, Json.encode(invoiceLinesErrors));
future.completeExceptionally(new EventProcessingException("Error during invoice lines creation"));
return;
}
future.complete(dataImportEventPayload);
} else {
preparePayloadWithMappedInvoiceLines(dataImportEventPayload);
logger.error("Error during invoice creation", result.cause());
logger.error("Error when creating the invoice and lines", result.cause());
future.completeExceptionally(result.cause());
}
});
} catch (Exception e) {
logger.error("Error during creation invoice and invoice lines", e);
logger.error("Error when creating the invoice and lines", e);
future.completeExceptionally(e);
}
return future;
}

private Map<Integer, String> prepareInvoiceLinesErrors(List<Pair<InvoiceLine, String>> invoiceLinesSavingResult) {
Map<Integer, String> invoiceLinesErrors = new HashMap<>();
for (int i = 0; i < invoiceLinesSavingResult.size(); i++) {
Pair<InvoiceLine, String> invLineResult = invoiceLinesSavingResult.get(i);
if (invLineResult.getRight() != null) {
invoiceLinesErrors.put(i + 1, invLineResult.getRight());
}
}
return invoiceLinesErrors;
}

private Future<Map<Integer, PoLine>> getAssociatedPoLines(DataImportEventPayload eventPayload, Map<String, String> okapiHeaders) {
String recordAsString = eventPayload.getContext().get(EDIFACT_INVOICE.value());
Record sourceRecord = Json.decodeValue(recordAsString, Record.class);
Expand Down Expand Up @@ -264,23 +241,9 @@ private Future<Map<Integer, PoLine>> getAssociatedPoLinesByRefNumbers(Map<Intege
if (MapUtils.isEmpty(refNumberList)) {
return Future.succeededFuture(new HashMap<>());
}
return requestContext.getContext()
.<List<Future<Pair<Integer, PoLine>>>>executeBlocking(promise -> {
List<Future<Pair<Integer, PoLine>>> futures = new ArrayList<>();
Semaphore semaphore = new Semaphore(SEMAPHORE_MAX_ACTIVE_THREADS, Vertx.currentContext().owner());
for (Map.Entry<Integer, List<String>> entry : refNumberList.entrySet()) {
semaphore.acquire(() -> {
var future = getLinePair(entry, requestContext)
.onComplete(asyncResult -> semaphore.release());
futures.add(future);
// complete executeBlocking promise when all operations started
if (futures.size() == refNumberList.size()) {
promise.complete(futures);
}
});
}
})
.compose(HelperUtils::collectResultsOnSuccess)
return HelperUtils.executeWithSemaphores(refNumberList.entrySet(),
entry -> getLinePair(entry, requestContext),
requestContext)
.map(poLinePairs -> poLinePairs.stream()
.filter(Objects::nonNull)
.collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
Expand Down Expand Up @@ -320,32 +283,51 @@ private String prepareQueryGetPoLinesByRefNumber(List<String> referenceNumbers)
return format(PO_LINES_BY_REF_NUMBER_CQL, valueString);
}

private Future<Invoice> saveInvoice(DataImportEventPayload dataImportEventPayload, Map<String, String> okapiHeaders) {
Invoice invoiceToSave = Json.decodeValue(dataImportEventPayload.getContext().get(INVOICE.value()), Invoice.class);
invoiceToSave.setSource(Invoice.Source.EDI);
InvoiceHelper invoiceHelper = new InvoiceHelper(okapiHeaders, Vertx.currentContext());
RequestContext requestContext = new RequestContext(Vertx.currentContext(), okapiHeaders);

return invoiceHelper.createInvoice(invoiceToSave, requestContext)
.onComplete(result -> {
if (result.succeeded()) {
dataImportEventPayload.getContext().put(INVOICE.value(), Json.encode(result.result()));
return;
}
logger.error("Error during creation invoice in the storage", result.cause());
});
}

private List<InvoiceLine> prepareInvoiceLinesToSave(String invoiceId, DataImportEventPayload dataImportEventPayload, Map<Integer, PoLine> associatedPoLines) {
private InvoiceAndLines prepareInvoiceAndLinesToSave(DataImportEventPayload dataImportEventPayload,
Map<Integer, PoLine> associatedPoLines) {
Invoice invoice = Json.decodeValue(dataImportEventPayload.getContext().get(INVOICE.value()), Invoice.class);
if (invoice.getId() == null) {
invoice.setId(UUID.randomUUID().toString());
}
invoice.setSource(Invoice.Source.EDI);
List<InvoiceLine> invoiceLines = new JsonArray(dataImportEventPayload.getContext().get(INVOICE_LINES_KEY))
.stream()
.map(JsonObject.class::cast)
.map(json -> json.mapTo(InvoiceLine.class))
.map(invoiceLine -> invoiceLine.withInvoiceId(invoiceId).withInvoiceLineStatus(OPEN))
.map(invoiceLine -> invoiceLine.withInvoiceLineStatus(OPEN)
.withInvoiceId(invoice.getId()))
.collect(Collectors.toList());

linkInvoiceLinesToPoLines(invoiceLines, associatedPoLines);
return invoiceLines;

InvoiceAndLines invoiceAndLines = new InvoiceAndLines();
invoiceAndLines.invoice = invoice;
invoiceAndLines.invoiceLines = invoiceLines;
return invoiceAndLines;
}

private Future<Void> createInvoiceAndLines(DataImportEventPayload dataImportEventPayload,
InvoiceAndLines invoiceAndLines, InvoiceHelper invoiceHelper) {
return invoiceHelper.createInvoiceAndLines(invoiceAndLines.invoice, invoiceAndLines.invoiceLines)
.onFailure(t -> logger.error("Error during creation of invoice and lines", t))
.map(result -> {
Map<Integer, String> invoiceLinesErrors = result.getInvoiceLinesErrors();
if (!invoiceLinesErrors.isEmpty()) {
String errorsAsString = Json.encode(invoiceLinesErrors);
dataImportEventPayload.getContext().put(INVOICE_LINES_ERRORS_KEY, errorsAsString);
throw new EventProcessingException("Error during invoice lines creation: " + errorsAsString);
}
if (result.getNewInvoice() != null) {
dataImportEventPayload.getContext().put(INVOICE.value(), Json.encode(result.getNewInvoice()));
}
if (result.getNewInvoiceLines() != null) {
InvoiceLineCollection invoiceLineCollection = new InvoiceLineCollection()
.withInvoiceLines(result.getNewInvoiceLines())
.withTotalRecords(result.getNewInvoiceLines().size());
dataImportEventPayload.getContext().put(INVOICE_LINES_KEY, Json.encode(invoiceLineCollection));
}
return null;
});
}

private void linkInvoiceLinesToPoLines(List<InvoiceLine> invoiceLines, Map<Integer, PoLine> associatedPoLines) {
Expand All @@ -358,40 +340,6 @@ private void linkInvoiceLinesToPoLines(List<InvoiceLine> invoiceLines, Map<Integ
}
}

private Future<List<Pair<InvoiceLine, String>>> saveInvoiceLines(List<InvoiceLine> invoiceLines, Map<String, String> okapiHeaders) {
List<Future<Pair<InvoiceLine, String>>> futures = new ArrayList<>();

if (CollectionUtils.isEmpty(invoiceLines)) {
return Future.succeededFuture(new ArrayList<>());
}

InvoiceLineHelper helper = new InvoiceLineHelper(okapiHeaders, Vertx.currentContext());
return Vertx.currentContext()
.<List<Future<Pair<InvoiceLine, String>>>>executeBlocking(promise -> {
Semaphore semaphore = new Semaphore(SEMAPHORE_MAX_ACTIVE_THREADS, Vertx.currentContext().owner());
for (InvoiceLine invoiceLine : invoiceLines) {
semaphore.acquire(() -> {
var future = helper.createInvoiceLine(invoiceLine)
.compose(createdInvoiceLine -> {
Pair<InvoiceLine, String> invoiceLineToMsg = Pair.of(createdInvoiceLine, null);
return Future.succeededFuture(invoiceLineToMsg);
}, err -> {
logger.error("Failed to create invoice line {}, {}", invoiceLine, err);
Pair<InvoiceLine, String> invoiceLineToMsg = Pair.of(invoiceLine, err.getMessage());
return Future.succeededFuture(invoiceLineToMsg);
})
.onComplete(asyncResult -> semaphore.release());
futures.add(future);
// complete executeBlocking promise when all operations processed
if (futures.size() == invoiceLines.size()) {
promise.complete(futures);
}
});
}
})
.compose(HelperUtils::collectResultsOnSuccess);
}

private List<InvoiceLine> mapInvoiceLinesArrayToList(JsonArray invoiceLinesArray) {
return invoiceLinesArray.stream()
.map(JsonObject.class::cast)
Expand Down Expand Up @@ -457,4 +405,9 @@ public boolean isEligible(DataImportEventPayload dataImportEventPayload) {
}
return false;
}

private static class InvoiceAndLines {
Invoice invoice;
List<InvoiceLine> invoiceLines;
}
}
3 changes: 2 additions & 1 deletion src/main/java/org/folio/invoices/utils/ErrorCodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public enum ErrorCodes {
MULTIPLE_FISCAL_YEARS("multipleFiscalYears", "Multiple fiscal years are used with the funds %s and %s."),
COULD_NOT_FIND_VALID_FISCAL_YEAR("couldNotFindValidFiscalYear", "Could not find any valid fiscal year with a budget for all funds in the invoice"),
MORE_THAN_ONE_FISCAL_YEAR_SERIES("moreThanOneFiscalYearSeries", "Fund distributions cannot reference more than one fiscal year series. Please edit fund distributions so they all come from the same fiscal year series."),
CANNOT_RESET_INVOICE_FISCAL_YEAR("cannotResetInvoiceFiscalYear", "Invoice fiscal year cannot be set to null if it was previously defined");
CANNOT_RESET_INVOICE_FISCAL_YEAR("cannotResetInvoiceFiscalYear", "Invoice fiscal year cannot be set to null if it was previously defined"),
ERROR_CREATING_INVOICE_LINE("errorCreatingInvoiceLine", "Error creating invoice line");

private final String code;
private final String description;
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/folio/invoices/utils/HelperUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.folio.invoices.utils.ResourcePathResolver.INVOICE_LINES;
import static org.folio.invoices.utils.ResourcePathResolver.VOUCHERS_STORAGE;
import static org.folio.invoices.utils.ResourcePathResolver.VOUCHER_LINES;
import static org.folio.rest.RestConstants.SEMAPHORE_MAX_ACTIVE_THREADS;
import static org.folio.rest.impl.AbstractHelper.ID;
import static org.folio.rest.jaxrs.model.FundDistribution.DistributionType.PERCENTAGE;
import static org.folio.services.exchange.ExchangeRateProviderResolver.RATE_KEY;
Expand All @@ -28,12 +29,15 @@
import javax.money.convert.ConversionQuery;
import javax.money.convert.ConversionQueryBuilder;

import io.vertx.core.Vertx;
import io.vertxconcurrent.Semaphore;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.folio.invoices.rest.exceptions.HttpException;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.rest.acq.model.finance.ExchangeRate;
import org.folio.rest.core.models.RequestContext;
import org.folio.rest.impl.ProtectionHelper;
import org.folio.rest.jaxrs.model.Adjustment;
import org.folio.rest.jaxrs.model.FundDistribution;
Expand Down Expand Up @@ -168,6 +172,30 @@ public static <T> Future<List<T>> collectResultsOnSuccess(List<Future<T>> future
.map(CompositeFuture::list);
}

public static <I, O> Future<List<O>> executeWithSemaphores(Collection<I> collection,
FunctionReturningFuture<I, O> f, RequestContext requestContext) {
SerhiiNosko marked this conversation as resolved.
Show resolved Hide resolved
if (collection.isEmpty())
return Future.succeededFuture(List.of());
return requestContext.getContext().<List<Future<O>>>executeBlocking(promise -> {
Semaphore semaphore = new Semaphore(SEMAPHORE_MAX_ACTIVE_THREADS, Vertx.currentContext().owner());
List<Future<O>> futures = new ArrayList<>();
for (I item : collection) {
semaphore.acquire(() -> {
Future<O> future = f.apply(item)
.onComplete(asyncResult -> semaphore.release());
futures.add(future);
if (futures.size() == collection.size()) {
promise.complete(futures);
}
});
}
}).compose(HelperUtils::collectResultsOnSuccess);
}

public interface FunctionReturningFuture<I, O> {
Future<O> apply(I item);
}

public static double calculateVoucherAmount(Voucher voucher, List<VoucherLine> voucherLines) {

CurrencyUnit currency = Monetary.getCurrency(voucher.getSystemCurrency());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private ResourcePathResolver() {
public static final String ACQUISITIONS_MEMBERSHIPS = "acquisitionsMemberships";
public static final String INVOICES = "invoices";
public static final String INVOICE_LINES = "invoiceLines";
public static final String COMPOSITE_ORDER = "compositeOrder";
public static final String COMPOSITE_ORDERS = "compositeOrders";
public static final String ORDER_LINES = "orderLines";
public static final String ORDER_INVOICE_RELATIONSHIP = "orderInvoiceRelationship";
public static final String VOUCHER_LINES = "voucherLines";
Expand Down Expand Up @@ -56,7 +56,7 @@ private ResourcePathResolver() {
apis.put(ACQUISITIONS_UNITS, "/acquisitions-units-storage/units");
apis.put(ACQUISITIONS_MEMBERSHIPS, "/acquisitions-units-storage/memberships");
apis.put(INVOICES, "/invoice-storage/invoices");
apis.put(COMPOSITE_ORDER, "/orders/composite-orders");
apis.put(COMPOSITE_ORDERS, "/orders/composite-orders");
apis.put(INVOICE_LINES, "/invoice-storage/invoice-lines");
apis.put(INVOICE_LINE_NUMBER, "/invoice-storage/invoice-line-number");
apis.put(ORDER_LINES, "/orders/order-lines");
Expand Down
Loading