-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[MODAUD-174] - Consume piece change events and implement endpoints
- Loading branch information
Showing
24 changed files
with
848 additions
and
89 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
mod-audit-server/src/main/java/org/folio/dao/acquisition/PieceEventsDao.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package org.folio.dao.acquisition; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.sqlclient.Row; | ||
import io.vertx.sqlclient.RowSet; | ||
import org.folio.rest.jaxrs.model.PieceAuditEvent; | ||
import org.folio.rest.jaxrs.model.PieceAuditEventCollection; | ||
|
||
public interface PieceEventsDao { | ||
|
||
/** | ||
* Saves pieceAuditEvent entity to DB | ||
* | ||
* @param pieceAuditEvent pieceAuditEvent entity to save | ||
* @param tenantId tenant id | ||
* @return future with created row | ||
*/ | ||
Future<RowSet<Row>> save(PieceAuditEvent pieceAuditEvent, String tenantId); | ||
|
||
/** | ||
* Searches for piece audit events by id | ||
* | ||
* @param pieceId piece id | ||
* @param sortBy sort by | ||
* @param sortOrder sort order | ||
* @param limit limit | ||
* @param offset offset | ||
* @param tenantId tenant id | ||
* @return future with PieceAuditEventCollection | ||
*/ | ||
Future<PieceAuditEventCollection> getAuditEventsByPieceId(String pieceId, String sortBy, String sortOrder, int limit, int offset, String tenantId); | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
128 changes: 128 additions & 0 deletions
128
mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/PieceEventsDaoImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package org.folio.dao.acquisition.impl; | ||
|
||
import static java.lang.String.format; | ||
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard; | ||
import static org.folio.util.AuditEventDBConstants.ACTION_DATE_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.ACTION_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.EVENT_DATE_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.ID_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.MODIFIED_CONTENT_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.ORDER_BY_PATTERN; | ||
import static org.folio.util.AuditEventDBConstants.PIECE_ID_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.TOTAL_RECORDS_FIELD; | ||
import static org.folio.util.AuditEventDBConstants.USER_ID_FIELD; | ||
|
||
import java.time.LocalDateTime; | ||
import java.time.ZoneOffset; | ||
import java.util.Date; | ||
import java.util.UUID; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.core.Promise; | ||
import io.vertx.core.json.JsonObject; | ||
import io.vertx.sqlclient.Row; | ||
import io.vertx.sqlclient.RowSet; | ||
import io.vertx.sqlclient.Tuple; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.folio.dao.acquisition.PieceEventsDao; | ||
import org.folio.rest.jaxrs.model.PieceAuditEvent; | ||
import org.folio.rest.jaxrs.model.PieceAuditEventCollection; | ||
import org.folio.util.PostgresClientFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Repository; | ||
|
||
@Repository | ||
public class PieceEventsDaoImpl implements PieceEventsDao { | ||
private static final Logger LOGGER = LogManager.getLogger(); | ||
private static final String TABLE_NAME = "acquisition_piece_log"; | ||
private static final String GET_BY_PIECE_ID_SQL = "SELECT id, action, piece_id, user_id, event_date, action_date, modified_content_snapshot," + | ||
" (SELECT count(*) AS total_records FROM %s WHERE piece_id = $1) FROM %s WHERE piece_id = $1 %s LIMIT $2 OFFSET $3"; | ||
private static final String INSERT_SQL = "INSERT INTO %s (id, action, piece_id, user_id, event_date, action_date, modified_content_snapshot)" + | ||
" VALUES ($1, $2, $3, $4, $5, $6, $7)"; | ||
|
||
@Autowired | ||
private final PostgresClientFactory pgClientFactory; | ||
|
||
public PieceEventsDaoImpl(PostgresClientFactory pgClientFactory) { | ||
this.pgClientFactory = pgClientFactory; | ||
} | ||
|
||
@Override | ||
public Future<RowSet<Row>> save(PieceAuditEvent pieceAuditEvent, String tenantId) { | ||
LOGGER.debug("save:: Trying to save Piece AuditEvent with tenant id : {}", tenantId); | ||
Promise<RowSet<Row>> promise = Promise.promise(); | ||
|
||
String logTable = formatDBTableName(tenantId, TABLE_NAME); | ||
String query = format(INSERT_SQL, logTable); | ||
|
||
makeSaveCall(promise, query, pieceAuditEvent, tenantId); | ||
LOGGER.info("save:: Saved Piece AuditEvent with tenant id : {}", tenantId); | ||
return promise.future(); | ||
} | ||
|
||
@Override | ||
public Future<PieceAuditEventCollection> getAuditEventsByPieceId(String pieceId, String sortBy, String sortOrder, int limit, int offset, String tenantId) { | ||
LOGGER.debug("getAuditEventsByOrderId:: Trying to retrieve AuditEvent with piece id : {}", pieceId); | ||
Promise<RowSet<Row>> promise = Promise.promise(); | ||
try { | ||
String logTable = formatDBTableName(tenantId, TABLE_NAME); | ||
String query = format(GET_BY_PIECE_ID_SQL, logTable, logTable, format(ORDER_BY_PATTERN, sortBy, sortOrder)); | ||
Tuple queryParams = Tuple.of(UUID.fromString(pieceId), limit, offset); | ||
|
||
pgClientFactory.createInstance(tenantId).selectRead(query, queryParams, promise); | ||
} catch (Exception e) { | ||
LOGGER.warn("Error getting piece audit events by piece id: {}", pieceId, e); | ||
promise.fail(e); | ||
} | ||
LOGGER.info("getAuditEventsByOrderId:: Retrieved AuditEvent with piece id : {}", pieceId); | ||
return promise.future().map(rowSet -> rowSet.rowCount() == 0 ? new PieceAuditEventCollection().withTotalItems(0) : | ||
mapRowToListOfPieceEvent(rowSet)); | ||
} | ||
|
||
private PieceAuditEventCollection mapRowToListOfPieceEvent(RowSet<Row> rowSet) { | ||
LOGGER.debug("mapRowToListOfOrderEvent:: Mapping row to List of Piece Events"); | ||
PieceAuditEventCollection pieceAuditEventCollection = new PieceAuditEventCollection(); | ||
rowSet.iterator().forEachRemaining(row -> { | ||
pieceAuditEventCollection.getPieceAuditEvents().add(mapRowToPieceEvent(row)); | ||
pieceAuditEventCollection.setTotalItems(row.getInteger(TOTAL_RECORDS_FIELD)); | ||
}); | ||
LOGGER.info("mapRowToListOfOrderEvent:: Mapped row to List of Piece Events"); | ||
return pieceAuditEventCollection; | ||
} | ||
|
||
private PieceAuditEvent mapRowToPieceEvent(Row row) { | ||
LOGGER.debug("mapRowToPieceEvent:: Mapping row to Order Event"); | ||
return new PieceAuditEvent() | ||
.withId(row.getValue(ID_FIELD).toString()) | ||
.withAction(row.get(PieceAuditEvent.Action.class, ACTION_FIELD)) | ||
.withPieceId(row.getValue(PIECE_ID_FIELD).toString()) | ||
.withUserId(row.getValue(USER_ID_FIELD).toString()) | ||
.withEventDate(Date.from(row.getLocalDateTime(EVENT_DATE_FIELD).toInstant(ZoneOffset.UTC))) | ||
.withActionDate(Date.from(row.getLocalDateTime(ACTION_DATE_FIELD).toInstant(ZoneOffset.UTC))) | ||
.withPieceSnapshot(JsonObject.mapFrom(row.getValue(MODIFIED_CONTENT_FIELD))); | ||
} | ||
|
||
private void makeSaveCall(Promise<RowSet<Row>> promise, String query, PieceAuditEvent pieceAuditEvent, String tenantId) { | ||
LOGGER.debug("makeSaveCall:: Making save call with query : {} and tenant id : {}", query, tenantId); | ||
try { | ||
pgClientFactory.createInstance(tenantId).execute(query, Tuple.of(pieceAuditEvent.getId(), | ||
pieceAuditEvent.getActionDate(), | ||
pieceAuditEvent.getPieceId(), | ||
pieceAuditEvent.getUserId(), | ||
LocalDateTime.ofInstant(pieceAuditEvent.getEventDate().toInstant(), ZoneOffset.UTC), | ||
LocalDateTime.ofInstant(pieceAuditEvent.getActionDate().toInstant(), ZoneOffset.UTC), | ||
JsonObject.mapFrom(pieceAuditEvent.getPieceSnapshot())), | ||
promise); | ||
} catch (Exception e) { | ||
LOGGER.error("Failed to save record with id: {} for order id: {} in to table {}", | ||
pieceAuditEvent.getId(), pieceAuditEvent.getPieceId(), TABLE_NAME, e); | ||
promise.fail(e); | ||
} | ||
} | ||
|
||
private String formatDBTableName(String tenantId, String table) { | ||
LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId); | ||
return format("%s.%s", convertToPsqlStandard(tenantId), table); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
mod-audit-server/src/main/java/org/folio/services/acquisition/PieceAuditEventsService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package org.folio.services.acquisition; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.sqlclient.Row; | ||
import io.vertx.sqlclient.RowSet; | ||
import org.folio.rest.jaxrs.model.PieceAuditEvent; | ||
import org.folio.rest.jaxrs.model.PieceAuditEventCollection; | ||
|
||
public interface PieceAuditEventsService { | ||
|
||
/** | ||
* Saves Piece Audit Event | ||
* | ||
* @param pieceAuditEvent pieceAuditEvent | ||
* @param tenantId id of tenant | ||
* @return | ||
*/ | ||
Future<RowSet<Row>> savePieceAuditEvent(PieceAuditEvent pieceAuditEvent, String tenantId); | ||
|
||
/** | ||
* Searches for piece audit events by piece id | ||
* | ||
* @param pieceId piece id | ||
* @param sortBy sort by | ||
* @param sortOrder sort order | ||
* @param limit limit | ||
* @param offset offset | ||
* @return future with PieceAuditEventCollection | ||
*/ | ||
Future<PieceAuditEventCollection> getAuditEventsByPieceId(String pieceId, String sortBy, String sortOrder, | ||
int limit, int offset, String tenantId); | ||
|
||
} |
54 changes: 54 additions & 0 deletions
54
...server/src/main/java/org/folio/services/acquisition/impl/PieceAuditEventsServiceImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package org.folio.services.acquisition.impl; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.pgclient.PgException; | ||
import io.vertx.sqlclient.Row; | ||
import io.vertx.sqlclient.RowSet; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.folio.dao.acquisition.PieceEventsDao; | ||
import org.folio.kafka.exception.DuplicateEventException; | ||
import org.folio.rest.jaxrs.model.PieceAuditEvent; | ||
import org.folio.rest.jaxrs.model.PieceAuditEventCollection; | ||
import org.folio.services.acquisition.PieceAuditEventsService; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Service; | ||
|
||
@Service | ||
public class PieceAuditEventsServiceImpl implements PieceAuditEventsService { | ||
private static final Logger LOGGER = LogManager.getLogger(); | ||
private static final String UNIQUE_CONSTRAINT_VIOLATION_CODE = "23505"; | ||
private PieceEventsDao pieceEventsDao; | ||
|
||
@Autowired | ||
public PieceAuditEventsServiceImpl(PieceEventsDao pieceEventsDao) { | ||
this.pieceEventsDao = pieceEventsDao; | ||
} | ||
|
||
@Override | ||
public Future<RowSet<Row>> savePieceAuditEvent(PieceAuditEvent pieceAuditEvent, String tenantId) { | ||
LOGGER.debug("savePieceAuditEvent:: Trying to save piece audit event with id={} for tenantId={}", pieceAuditEvent.getPieceId(), tenantId); | ||
return pieceEventsDao.save(pieceAuditEvent, tenantId) | ||
.recover(throwable -> handleFailures(throwable, pieceAuditEvent.getId())); | ||
} | ||
|
||
@Override | ||
public Future<PieceAuditEventCollection> getAuditEventsByPieceId(String pieceId, String sortBy, String sortOrder, int limit, int offset, String tenantId) { | ||
LOGGER.debug("getAuditEventsByOrderLineId:: Trying to retrieve audit events for piece Id : {} and tenant Id : {}", pieceId, tenantId); | ||
return pieceEventsDao.getAuditEventsByPieceId(pieceId, sortBy, sortOrder, limit, offset, tenantId); | ||
} | ||
|
||
private <T> Future<T> handleFailures(Throwable throwable, String id) { | ||
LOGGER.debug("handleFailures:: Handling Failures with id={}", id); | ||
return (throwable instanceof PgException && ((PgException) throwable).getCode().equals(UNIQUE_CONSTRAINT_VIOLATION_CODE)) ? | ||
Future.failedFuture(new DuplicateEventException(String.format("Event with id=%s is already processed.", id))) : | ||
Future.failedFuture(throwable); | ||
} | ||
|
||
private <T> Future<T> handleFailuress(Throwable throwable, String id) { | ||
LOGGER.debug("handleFailures:: Handling Failures with Id : {}", id); | ||
return (throwable instanceof PgException && ((PgException) throwable).getCode().equals(UNIQUE_CONSTRAINT_VIOLATION_CODE)) ? | ||
Future.failedFuture(new DuplicateEventException(String.format("Event with Id=%s is already processed.", id))) : | ||
Future.failedFuture(throwable); | ||
} | ||
} |
Oops, something went wrong.