diff --git a/mod-audit-server/src/main/java/org/folio/rest/impl/InitAPIs.java b/mod-audit-server/src/main/java/org/folio/rest/impl/InitAPIs.java index 8c8eb22..c1491e9 100644 --- a/mod-audit-server/src/main/java/org/folio/rest/impl/InitAPIs.java +++ b/mod-audit-server/src/main/java/org/folio/rest/impl/InitAPIs.java @@ -31,12 +31,23 @@ public class InitAPIs implements InitAPI { @Value("${acq.orders.kafka.consumer.instancesNumber:1}") private int acqOrderConsumerInstancesNumber; + @Value("${acq.orders.kafka.consumer.pool.size:5}") + private int acqOrderConsumerPoolSize; + @Value("${acq.order-lines.kafka.consumer.instancesNumber:1}") private int acqOrderLineConsumerInstancesNumber; + @Value("${acq.order-lines.kafka.consumer.pool.size:5}") + private int acqOrderLineConsumerPoolSize; + @Value("${acq.pieces.kafka.consumer.instancesNumber:1}") private int acqPieceConsumerInstancesNumber; + @Value("${acq.orders.kafka.consumer.pool.size:5}") + private int acqPieceConsumerPoolSize; + @Value("${acq.invoices.kafka.consumer.instancesNumber:1}") private int acqInvoiceConsumerInstancesNumber; + @Value("${acq.orders.kafka.consumer.pool.size:5}") + private int acqInvoiceConsumerPoolSize; @Override public void init(Vertx vertx, Context context, Handler> handler) { @@ -70,21 +81,10 @@ private Future deployConsumersVerticles(Vertx vertx) { Promise pieceEventsConsumer = Promise.promise(); Promise invoiceEventsConsumer = Promise.promise(); - vertx.deployVerticle(getVerticleName(verticleFactory, OrderEventConsumersVerticle.class), - new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER) - .setInstances(acqOrderConsumerInstancesNumber), orderEventsConsumer); - - vertx.deployVerticle(getVerticleName(verticleFactory, OrderLineEventConsumersVerticle.class), - new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER) - .setInstances(acqOrderLineConsumerInstancesNumber), orderLineEventsConsumer); - - vertx.deployVerticle(getVerticleName(verticleFactory, PieceEventConsumersVerticle.class), - new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER) - .setInstances(acqPieceConsumerInstancesNumber), pieceEventsConsumer); - - vertx.deployVerticle(getVerticleName(verticleFactory, InvoiceEventConsumersVerticle.class), - new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER) - .setInstances(acqInvoiceConsumerInstancesNumber), invoiceEventsConsumer); + deployVerticle(vertx, verticleFactory, OrderEventConsumersVerticle.class, acqOrderConsumerInstancesNumber, acqOrderConsumerPoolSize, orderEventsConsumer); + deployVerticle(vertx, verticleFactory, OrderLineEventConsumersVerticle.class, acqOrderLineConsumerInstancesNumber, acqOrderLineConsumerPoolSize, orderLineEventsConsumer); + deployVerticle(vertx, verticleFactory, PieceEventConsumersVerticle.class, acqPieceConsumerInstancesNumber, acqPieceConsumerPoolSize, pieceEventsConsumer); + deployVerticle(vertx, verticleFactory, InvoiceEventConsumersVerticle.class, acqInvoiceConsumerInstancesNumber, acqInvoiceConsumerPoolSize, invoiceEventsConsumer); LOGGER.info("deployConsumersVerticles:: All consumer verticles were successfully deployed"); return GenericCompositeFuture.all(Arrays.asList( @@ -94,6 +94,13 @@ private Future deployConsumersVerticles(Vertx vertx) { invoiceEventsConsumer.future())); } + private void deployVerticle(Vertx vertx, VerticleFactory verticleFactory, Class consumerClass, + int acqOrderConsumerInstancesNumber, int acqOrderConsumerPoolSize, Promise orderEventsConsumer) { + DeploymentOptions deploymentOptions = new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER) + .setInstances(acqOrderConsumerInstancesNumber).setWorkerPoolSize(acqOrderConsumerPoolSize); + vertx.deployVerticle(getVerticleName(verticleFactory, consumerClass), deploymentOptions, orderEventsConsumer); + } + private String getVerticleName(VerticleFactory verticleFactory, Class clazz) { LOGGER.debug("getVerticleName:: Retrieving Verticle name"); return verticleFactory.prefix() + ":" + clazz.getName();