Skip to content

Commit

Permalink
MODSOURMAN-1268 Log entries are empty for completed jobs (#955)
Browse files Browse the repository at this point in the history
* increase session timeouts and heartbeats for consumer

* openjdk

* increase session timeout

* change dockerfile

* add partition assignment strategy and group instance id

* add envId into group name

* remove redundant env var
  • Loading branch information
JavokhirAbdullayev authored Jan 18, 2025
1 parent 7f8257a commit fde74e1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ which are created during tenant initialization, the values of which can be custo
and `DI_RAW_RECORDS_CHUNK_PARSED_PARTITIONS` env variables respectively.
Default value - `1`.

## Environment variables
`MAX_NUM_EVENTS`:`100` - set max num events to consume

#### Note:
From v 3.1.3 there is a new property which defines limit for retrieving data to fill mapping parameters for the data-import mechanism: **"srm.mapping.parameters.settings.limit:1000"**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.folio.verticle.consumers.util.EventTypeHandlerSelector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

Expand All @@ -56,6 +55,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.folio.kafka.services.KafkaEnvironmentProperties.environment;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_COMPLETED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_AUTHORITY_NOT_MATCHED;
Expand Down Expand Up @@ -94,7 +94,6 @@
* Verticle to write events into journal log. It combines two streams
* - kafka consumer for specific events defined in {@link DataImportJournalBatchConsumerVerticle#getEvents()}
* - vert.x event bus for events generated other parts of SRM
*
* Marked with SCOPE_PROTOTYPE to support deploying more than 1 instance.
* @see org.folio.rest.impl.InitAPIImpl
*/
Expand All @@ -106,8 +105,7 @@ public class DataImportJournalBatchConsumerVerticle extends AbstractVerticle {

public static final String DATA_IMPORT_JOURNAL_BATCH_KAFKA_HANDLER_UUID = "ca0c6c56-e74e-4921-b4c9-7b2de53c43ec";

@Value("${MAX_NUM_EVENTS:100}")
private int maxNumEvents;
private static final int MAX_NUM_EVENTS = 100;

@Autowired
@Qualifier("newKafkaConfig")
Expand Down Expand Up @@ -144,9 +142,9 @@ public void start(Promise<Void> startPromise) {

// Listen to both Kafka events and EventBus messages, merging their streams
disposables.add(Flowable.merge(listenKafkaEvents(), listenEventBusMessages())
.window(2, TimeUnit.SECONDS, scheduler, maxNumEvents, true)
.window(2, TimeUnit.SECONDS, scheduler, MAX_NUM_EVENTS, true)
// Save the journal records for each window
.flatMapCompletable(flowable -> saveJournalRecords(flowable.replay(maxNumEvents))
.flatMapCompletable(flowable -> saveJournalRecords(flowable.replay(MAX_NUM_EVENTS))
.onErrorResumeNext(error -> {
LOGGER.error("Error saving journal records, continuing with next batch", error);
return Completable.complete();
Expand Down Expand Up @@ -223,8 +221,11 @@ private Completable initializeKafkaConsumer() {
Map<String, String> consumerProps = kafkaConfigWithDeserializer.getConsumerProps();
// this is set so that this consumer can start where the non-batch consumer left off, when no previous offset is found.
consumerProps.put(KafkaConfig.KAFKA_CONSUMER_AUTO_OFFSET_RESET_CONFIG, "latest");

consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "20000");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaTopicNameHelper.formatGroupName("DATA_IMPORT_JOURNAL_BATCH",
constructModuleName() + "_" + getClass().getSimpleName()));
environment() + "_" + constructModuleName() + "_" + getClass().getSimpleName()));
if(SharedDataUtil.getIsTesting(vertx.getDelegate())) {
// this will allow the consumer to retrieve messages faster during tests
consumerProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
Expand Down Expand Up @@ -303,7 +304,7 @@ private Flowable<Pair<Optional<Bundle>, Collection<BatchableJournalRecord>>> lis
// Flatten the iterable list of messages
.flatMapIterable(list -> list)
// Window the messages in 2-second intervals, with a maximum of MAX_NUM_EVENTS per window
.window(2, TimeUnit.SECONDS, scheduler, maxNumEvents, true)
.window(2, TimeUnit.SECONDS, scheduler, MAX_NUM_EVENTS, true)
.flatMap(window -> window
// Group messages by tenant ID
.groupBy(BatchableJournalRecord::getTenantId)
Expand Down

0 comments on commit fde74e1

Please sign in to comment.