Skip to content

Commit

Permalink
feat: ✨ Batch Job For Write Back & Write Allocate Strategy (This is L…
Browse files Browse the repository at this point in the history
…ast Piece 🤗) (#194)

* feat: add key_value record in batch module

* feat: impl last_message_id reader

* feat: impl last_message_id processor

* feat: impl last_message_id writer

* fix: convert prefix_pattern from member various to static

* feat: last_message_id_job_config setting

* chore: batch module test log setting

* refactor: remove state from the reader

* test: last_message_id job batch test

* chore: add testcontainer dependency within batch module

* chore: batch integration test setting

* chore: add sql script in batch test package due to init job instance table

* fix: sperate cursor bean from job_config to batch_redis_config

* chore: insert init sql into batch mysql testcontainer config

* fix: add @spring_batch_test in @batch_integration_test

* rename: add logging in reader, processor, writer

* chore: convert reader component to bean in job config

* fix: add try-catch about number_format_exception in processor

* feat: chat_message_statue entity overrid to_string

* test: last_message_id integration test

* test: add integartion test case for large data

* chore: last_message_id job scheuling
  • Loading branch information
psychology50 authored Nov 7, 2024
1 parent c9b6bae commit 14ca265
Show file tree
Hide file tree
Showing 17 changed files with 850 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pennyway-batch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ dependencies {

implementation 'org.springframework.boot:spring-boot-starter-batch:3.3.0'
testImplementation('org.springframework.batch:spring-batch-test:5.1.2')

/* testcontainer */
testImplementation "org.junit.jupiter:junit-jupiter:5.8.1"
testImplementation "org.testcontainers:testcontainers:1.19.7"
testImplementation "org.testcontainers:junit-jupiter:1.19.7"
testImplementation "org.testcontainers:mysql:1.19.7"
testImplementation "com.redis.testcontainers:testcontainers-redis-junit:1.6.4"
testImplementation "org.springframework.cloud:spring-cloud-contract-wiremock:4.1.2"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kr.co.pennyway.batch.common.dto;

public record KeyValue(
String key,
String value
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package kr.co.pennyway.batch.job;

import kr.co.pennyway.batch.common.dto.KeyValue;
import kr.co.pennyway.batch.processor.LastMessageIdProcessor;
import kr.co.pennyway.batch.reader.LastMessageIdReader;
import kr.co.pennyway.batch.writer.LastMessageIdWriter;
import kr.co.pennyway.domain.domains.chatstatus.domain.ChatMessageStatus;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@RequiredArgsConstructor
public class LastMessageIdJobConfig {
private static final int CHUNK_SIZE = 1000;
private static final String PREFIX_PATTERN = "chat:last_read:*";
private final JobRepository jobRepository;
private final LastMessageIdProcessor processor;
private final LastMessageIdWriter writer;
private final RedisTemplate<String, String> redisTemplate;

@Bean
public Job lastMessageIdJob(PlatformTransactionManager transactionManager) {
return new JobBuilder("lastMessageIdJob", jobRepository)
.start(lastMessageIdStep(transactionManager))
.on("FAILED")
.stopAndRestart(lastMessageIdStep(transactionManager))
.on("*")
.end()
.end()
.build();
}

@Bean
@JobScope
public Step lastMessageIdStep(PlatformTransactionManager transactionManager) {
return new StepBuilder("lastMessageIdStep", jobRepository)
.<KeyValue, ChatMessageStatus>chunk(CHUNK_SIZE, transactionManager)
.reader(lastMessageIdReader())
.processor(processor)
.writer(writer)
.build();
}

@Bean
@StepScope
public LastMessageIdReader lastMessageIdReader() {
ScanOptions options = ScanOptions.scanOptions().match(PREFIX_PATTERN).count(CHUNK_SIZE).build();
Cursor<String> cursor = redisTemplate.scan(options);
return new LastMessageIdReader(redisTemplate, cursor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kr.co.pennyway.batch.processor;

import kr.co.pennyway.batch.common.dto.KeyValue;
import kr.co.pennyway.domain.domains.chatstatus.domain.ChatMessageStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class LastMessageIdProcessor implements ItemProcessor<KeyValue, ChatMessageStatus> {

@Override
public ChatMessageStatus process(KeyValue item) throws Exception {
log.debug("Processing item - key: {}, value: {}", item.key(), item.value());

String[] parts = item.key().split(":");

if (parts.length != 4) {
log.error("Invalid key format: {}", item.key());
return null;
}

try {
Long roomId = Long.parseLong(parts[2]);
Long userId = Long.parseLong(parts[3]);
Long messageId = Long.parseLong(item.value());
log.debug("Parsed roomId: {}, userId: {}, messageId: {}", roomId, userId, messageId);

return new ChatMessageStatus(userId, roomId, messageId);
} catch (NoSuchFieldError | NumberFormatException e) {
log.error("Failed to parse key: {}", item.key(), e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kr.co.pennyway.batch.reader;

import kr.co.pennyway.batch.common.dto.KeyValue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;

@Slf4j
@RequiredArgsConstructor
public class LastMessageIdReader implements ItemReader<KeyValue> {
private final RedisTemplate<String, String> redisTemplate;
private final Cursor<String> cursor;

@Override
public KeyValue read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!cursor.hasNext()) {
log.debug("No more keys to read cursor: {}", cursor);
return null;
}

String key = cursor.next();
String value = redisTemplate.opsForValue().get(key);
log.debug("Read key: {}, value: {}", key, value);

if (value == null) {
log.warn("Value not found for key: {}", key);
return null;
}

return new KeyValue(key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@RequiredArgsConstructor
public class SpendingNotifyScheduler {
private final JobLauncher jobLauncher;
private final Job dailyNotificationJob;
private final Job monthlyNotificationJob;
private final Job lastMessageIdJob;

@Scheduled(cron = "0 0 20 * * ?")
public void runDailyNotificationJob() {
Expand Down Expand Up @@ -48,4 +51,18 @@ public void runMonthlyNotificationJob() {
log.error("Failed to run monthlyNotificationJob", e);
}
}

@Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES)
public void runLastMessageIdJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();

try {
jobLauncher.run(lastMessageIdJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Failed to run lastMessageIdJob", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package kr.co.pennyway.batch.writer;

import kr.co.pennyway.domain.domains.chatstatus.domain.ChatMessageStatus;
import kr.co.pennyway.domain.domains.chatstatus.repository.ChatMessageStatusRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class LastMessageIdWriter implements ItemWriter<ChatMessageStatus> {
private final ChatMessageStatusRepository repository;

@Override
public void write(Chunk<? extends ChatMessageStatus> chunk) throws Exception {
log.debug("Writing chunk size: {}", chunk.getItems().size());

Map<Long, Map<Long, Long>> updates = chunk.getItems().stream()
.collect(
Collectors.groupingBy(
ChatMessageStatus::getUserId,
Collectors.toMap(
ChatMessageStatus::getChatRoomId,
ChatMessageStatus::getLastReadMessageId,
Long::max
)
)
);
log.debug("Grouped updates: {}", updates);

updates.forEach((userId, roomUpdates) ->
roomUpdates.forEach((roomId, messageId) -> {
log.debug("Saving - userId: {}, roomId: {}, messageId: {}", userId, roomId, messageId);
repository.saveLastReadMessageIdInBulk(userId, roomId, messageId);
})
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package kr.co.pennyway.batch.config;

import com.redis.testcontainers.RedisContainer;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@ActiveProfiles("test")
public abstract class BatchDBTestConfig {
private static final String REDIS_CONTAINER_IMAGE = "redis:7.4";
private static final String MYSQL_CONTAINER_IMAGE = "mysql:8.0.26";

private static final RedisContainer REDIS_CONTAINER;
private static final MySQLContainer<?> MYSQL_CONTAINER;

static {
REDIS_CONTAINER =
new RedisContainer(DockerImageName.parse(REDIS_CONTAINER_IMAGE))
.withExposedPorts(6379)
.withCommand("redis-server", "--requirepass testpass")
.withReuse(true);
MYSQL_CONTAINER =
new MySQLContainer<>(DockerImageName.parse(MYSQL_CONTAINER_IMAGE))
.withDatabaseName("pennyway")
.withUsername("root")
.withPassword("testpass")
.withCommand("--sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION")
.withInitScript("sql/schema-mysql.sql")
.withReuse(true);

REDIS_CONTAINER.start();
MYSQL_CONTAINER.start();
}

@DynamicPropertySource
public static void setRedisProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.redis.host", REDIS_CONTAINER::getHost);
registry.add("spring.data.redis.port", () -> String.valueOf(REDIS_CONTAINER.getMappedPort(6379)));
registry.add("spring.data.redis.password", () -> "testpass");
registry.add("spring.datasource.url", () -> String.format("jdbc:mysql://%s:%s/pennyway?serverTimezone=Asia/Seoul&characterEncoding=utf8&postfileSQL=true&logger=Slf4JLogger&rewriteBatchedStatements=true", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getMappedPort(3306)));
registry.add("spring.datasource.username", () -> "root");
registry.add("spring.datasource.password", () -> "testpass");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kr.co.pennyway.batch.config;

import org.springframework.lang.NonNull;
import org.springframework.test.context.ActiveProfilesResolver;

public class BatchIntegrationProfileResolver implements ActiveProfilesResolver {
@Override
@NonNull
public String[] resolve(@NonNull Class<?> testClass) {
return new String[]{"common", "infra", "domain"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kr.co.pennyway.batch.config;

import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@SpringBatchTest
@SpringBootTest(classes = BatchIntegrationTestConfig.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles(profiles = {"test"}, resolver = BatchIntegrationProfileResolver.class)
@Documented
public @interface BatchIntegrationTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package kr.co.pennyway.batch.config;

import kr.co.pennyway.PennywayBatchApplication;
import kr.co.pennyway.common.PennywayCommonApplication;
import kr.co.pennyway.domain.DomainPackageLocation;
import kr.co.pennyway.infra.PennywayInfraApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan(
basePackageClasses = {
PennywayBatchApplication.class,
PennywayInfraApplication.class,
DomainPackageLocation.class,
PennywayCommonApplication.class
}
)
public class BatchIntegrationTestConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kr.co.pennyway.batch.config;

import com.querydsl.jpa.impl.JPAQueryFactory;
import com.querydsl.sql.MySQLTemplates;
import com.querydsl.sql.SQLTemplates;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;

@TestConfiguration
public class TestJpaConfig {
@PersistenceContext
private EntityManager em;

@Bean
@ConditionalOnMissingBean
public JPAQueryFactory testJpaQueryFactory() {
return new JPAQueryFactory(em);
}

@Bean
@ConditionalOnMissingBean
public SQLTemplates testSqlTemplates() {
return new MySQLTemplates();
}

@Bean
@ConditionalOnMissingBean
public RedisTemplate<String, ?> testRedisTemplate() {
return null;
}
}
Loading

0 comments on commit 14ca265

Please sign in to comment.