Skip to content

Commit

Permalink
MODLD-452: consume SRS domain event for create and update ops (#314)
Browse files Browse the repository at this point in the history
* MODLD-452: consume SRS domain event for create and update ops

* MODLD-452: split resourceService and add more logs to ResourceMarcService

* MODLD-452: service packages structure optimization
  • Loading branch information
PBobylev authored Aug 14, 2024
1 parent 3a9760d commit 22d573d
Show file tree
Hide file tree
Showing 140 changed files with 1,914 additions and 1,248 deletions.
121 changes: 88 additions & 33 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,44 +1,34 @@
version: '3.8'
services:
db:
container_name: postgres
image: postgres:12-alpine
restart: always
environment:
- POSTGRES_DB=okapi_modules
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- '5432:5432'
volumes:
- db:/var/lib/postgresql/data
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
image: bitnami/zookeeper
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ALLOW_ANONYMOUS_LOGIN: "yes"
ports:
- 2181:2181


kafka:
image: confluentinc/cp-kafka:7.3.2
image: bitnami/kafka
container_name: kafka
hostname: kafka
ports:
- "9092:9092"
- "9997:9997"
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_CFG_LISTENERS: INTERNAL://:9092,LOCAL://:29092
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://host.docker.internal:9092,LOCAL://localhost:29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: LOCAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_LOG_RETENTION_BYTES: -1
KAFKA_CFG_LOG_RETENTION_HOURS: -1

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
Expand All @@ -53,6 +43,71 @@ services:
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
db:
driver: local

postgres:
image: postgres:13
container_name: postgres
mem_limit: 2g
environment:
POSTGRES_PASSWORD: folio_admin
POSTGRES_USER: folio_admin
POSTGRES_DB: okapi_modules
command: -c max_connections=200 -c shared_buffers=512MB -c log_duration=on -c log_min_duration_statement=0ms -c shared_preload_libraries=pg_stat_statements -c jit=off
ports:
- 5432:5432

expose-docker-on-2375:
image: alpine/socat
container_name: expose-docker-on-2375
volumes:
- /var/run/docker.sock:/var/run/docker.sock
command: "tcp-listen:2375,fork,reuseaddr unix-connect:/var/run/docker.sock"
restart: always

#minio:
# image: 'minio/minio'
# command: server /data --console-address ":9001"
# ports:
# - 9000:9000
# - 9001:9001

#createbuckets: # This container will terminate after running its commands to create a bucket in minio
# image: minio/mc
# depends_on:
# - minio
# entrypoint: >
# /bin/sh -c "
# /usr/bin/mc config host add myminio http://host.docker.internal:9000 minioadmin minioadmin;
# /usr/bin/mc rm -r --force myminio/example-bucket;
# /usr/bin/mc mb myminio/example-bucket;
# exit 0;
# "

#okapi:
# image: 'folioci/okapi:latest'
# command: 'dev'
# ports:
# - 9130:9130
# environment: # be careful to leave a space character after every java option
# JAVA_OPTIONS: |-
# -Dhttp.port=9130
# -Dokapiurl=http://host.docker.internal:9130
# -Dstorage=postgres
# -Dpostgres_username=folio_admin
# -Dpostgres_password=folio_admin
# -Dpostgres_database=okapi_modules
# -Dpostgres_host=host.docker.internal
# -Dhost=host.docker.internal
# -Dport_end=9170
# -DdockerUrl=tcp://expose-docker-on-2375:2375
# depends_on:
# - postgres

#elasticsearch:
# image: 'ghcr.io/zcube/bitnami-compat/elasticsearch:7.17.9'
# ports:
# - 9300:9300
# - 9200:9200
# environment:
# ELASTICSEARCH_PLUGINS:
# "analysis-icu,analysis-kuromoji,analysis-smartcn,analysis-nori,analysis-phonetic"
4 changes: 2 additions & 2 deletions src/main/java/org/folio/linked/data/client/SearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;

import org.folio.search.domain.dto.CreateIndexRequest;
import org.folio.search.domain.dto.FolioCreateIndexResponse;
import org.folio.search.domain.dto.CreateIndexResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.context.annotation.Profile;
import org.springframework.http.ResponseEntity;
Expand All @@ -16,6 +16,6 @@
public interface SearchClient {

@PostMapping(value = "/index/indices", consumes = APPLICATION_JSON_VALUE, produces = APPLICATION_JSON_VALUE)
ResponseEntity<FolioCreateIndexResponse> createIndex(@RequestBody CreateIndexRequest createIndexRequest);
ResponseEntity<CreateIndexResponse> createIndex(@RequestBody CreateIndexRequest createIndexRequest);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.folio.linked.data.configuration.json.deserialization.ResourceRequestFieldDeserializer;
import org.folio.linked.data.configuration.json.deserialization.event.DataImportEventDeserializer;
import org.folio.linked.data.configuration.json.deserialization.instance.InstanceRequestAllOfMapDeserializer;
import org.folio.linked.data.configuration.json.deserialization.title.TitleFieldRequestDeserializer;
import org.folio.linked.data.configuration.json.serialization.MarcRecordSerializationConfig;
import org.folio.linked.data.domain.dto.InstanceRequestAllOfMap;
import org.folio.linked.data.domain.dto.MarcRecord;
import org.folio.linked.data.domain.dto.ResourceRequestField;
import org.folio.linked.data.domain.dto.TitleFieldRequest;
import org.folio.search.domain.dto.DataImportEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
Expand All @@ -40,7 +38,6 @@ private Module monographModule(ObjectMapper mapper) {
module.addDeserializer(ResourceRequestField.class, new ResourceRequestFieldDeserializer());
module.addDeserializer(TitleFieldRequest.class, new TitleFieldRequestDeserializer());
module.addDeserializer(InstanceRequestAllOfMap.class, new InstanceRequestAllOfMapDeserializer());
module.addDeserializer(DataImportEvent.class, new DataImportEventDeserializer(mapper));
return module;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.folio.search.domain.dto.DataImportEvent;
import org.folio.search.domain.dto.SourceRecordDomainEvent;
import org.folio.spring.tools.kafka.FolioKafkaProperties;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -28,7 +28,7 @@
public class KafkaListenerConfiguration {

private final KafkaProperties kafkaProperties;
private final ObjectMapper objectMapper;
private final ObjectMapper mapper;

@Bean
@ConfigurationProperties("folio.kafka")
Expand All @@ -37,16 +37,18 @@ public FolioKafkaProperties folioKafkaProperties() {
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, DataImportEvent> dataImportListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, DataImportEvent>();
public ConcurrentKafkaListenerContainerFactory<String, SourceRecordDomainEvent> srsEventListenerContainerFactory(
ConsumerFactory<String, SourceRecordDomainEvent> sourceRecordDomainEventConsumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, SourceRecordDomainEvent>();
factory.setBatchListener(true);
factory.setConsumerFactory(dataImportEventConsumerFactory());
factory.setConsumerFactory(sourceRecordDomainEventConsumerFactory);
return factory;
}

@Bean
public ConsumerFactory<String, DataImportEvent> dataImportEventConsumerFactory() {
var deserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(DataImportEvent.class, objectMapper));
public ConsumerFactory<String, SourceRecordDomainEvent> sourceRecordDomainEventConsumerFactory() {
var deserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(SourceRecordDomainEvent.class, mapper));
Map<String, Object> config = new HashMap<>(kafkaProperties.buildConsumerProperties(null));
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.StringSerializer;
import org.folio.search.domain.dto.InstanceIngressEvent;
import org.folio.search.domain.dto.LinkedDataAuthority;
import org.folio.search.domain.dto.LinkedDataWork;
import org.folio.search.domain.dto.ResourceIndexEvent;
import org.folio.spring.tools.kafka.FolioMessageProducer;
import org.jetbrains.annotations.NotNull;
Expand All @@ -34,7 +36,7 @@ public FolioMessageProducer<ResourceIndexEvent> bibliographicMessageProducer(
) {
var producer = new FolioMessageProducer<>(resourceIndexEventMessageTemplate,
linkedDataTopicProperties::getWorkSearchIndex);
producer.setKeyMapper(ResourceIndexEvent::getId);
producer.setKeyMapper(rie -> ((LinkedDataWork) rie.getNew()).getId());
return producer;
}

Expand All @@ -44,7 +46,7 @@ public FolioMessageProducer<ResourceIndexEvent> authorityMessageProducer(
) {
var producer = new FolioMessageProducer<>(resourceIndexEventMessageTemplate,
linkedDataTopicProperties::getAuthoritySearchIndex);
producer.setKeyMapper(ResourceIndexEvent::getId);
producer.setKeyMapper(rie -> ((LinkedDataAuthority) rie.getNew()).getId());
return producer;
}

Expand All @@ -54,7 +56,7 @@ public FolioMessageProducer<InstanceIngressEvent> instanceIngressEventProducer(
) {
var producer = new FolioMessageProducer<>(instanceIngressMessageTemplate,
linkedDataTopicProperties::getInstanceIngress);
producer.setKeyMapper(InstanceIngressEvent::getId);
producer.setKeyMapper(iie -> iie.getEventPayload().getSourceRecordIdentifier());
return producer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import lombok.RequiredArgsConstructor;
import org.folio.linked.data.rest.resource.ReindexApi;
import org.folio.linked.data.service.ReindexService;
import org.folio.linked.data.service.index.ReindexService;
import org.springframework.context.annotation.Profile;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import org.folio.linked.data.domain.dto.ResourceResponseDto;
import org.folio.linked.data.domain.dto.ResourceShortInfoPage;
import org.folio.linked.data.rest.resource.ResourceApi;
import org.folio.linked.data.service.ResourceService;
import org.folio.linked.data.service.resource.ResourceMarcService;
import org.folio.linked.data.service.resource.ResourceService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;

Expand All @@ -17,6 +18,7 @@
public class ResourceController implements ResourceApi {

private final ResourceService resourceService;
private final ResourceMarcService resourceMarcService;

@Override
public ResponseEntity<ResourceResponseDto> createResource(String okapiTenant, @Valid ResourceRequestDto resourceDto) {
Expand Down Expand Up @@ -47,7 +49,7 @@ public ResponseEntity<Void> deleteResource(Long id, String okapiTenant) {

@Override
public ResponseEntity<ResourceMarcViewDto> getResourceMarcViewById(Long id, String okapiTenant) {
return ResponseEntity.ok(resourceService.getResourceMarcViewById(id));
return ResponseEntity.ok(resourceMarcService.getResourceMarcView(id));
}

@Override
Expand Down
Loading

0 comments on commit 22d573d

Please sign in to comment.