From 7633e656cba60e80c0fab3c30e83e67d5de5babc Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Wed, 22 Jan 2025 09:39:49 +0100 Subject: [PATCH] Reuse DAO --- .../AutomationRuleEvaluatorLogsDAO.java | 69 ++++++++++++++- .../tables/AutomationRuleEvaluatorLogDAO.java | 84 ------------------- .../log/tables/UserLogTableFactory.java | 3 +- 3 files changed, 70 insertions(+), 86 deletions(-) delete mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java index 2dcca620ae..bcdce6657b 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java @@ -1,7 +1,9 @@ package com.comet.opik.domain; +import ch.qos.logback.classic.spi.ILoggingEvent; import com.comet.opik.api.LogCriteria; import com.comet.opik.api.LogItem; +import com.comet.opik.utils.TemplateUtils; import com.google.inject.ImplementedBy; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.Row; @@ -22,9 +24,15 @@ import static com.comet.opik.api.LogItem.LogLevel; import static com.comet.opik.api.LogItem.LogPage; +import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.*; +import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; @ImplementedBy(AutomationRuleEvaluatorLogsDAOImpl.class) -public interface AutomationRuleEvaluatorLogsDAO { +public interface AutomationRuleEvaluatorLogsDAO extends UserLogTableDAO { + + static AutomationRuleEvaluatorLogsDAO create(ConnectionFactory factory) { + return new AutomationRuleEvaluatorLogsDAOImpl(factory); + } Mono findLogs(LogCriteria criteria); @@ -35,6 +43,22 @@ public interface AutomationRuleEvaluatorLogsDAO { @RequiredArgsConstructor(onConstructor_ = @Inject) class AutomationRuleEvaluatorLogsDAOImpl implements AutomationRuleEvaluatorLogsDAO { + private static final String INSERT_STATEMENT = """ + INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) + VALUES , 9), + :level, + :workspace_id, + :rule_id, + :message, + mapFromArrays(:marker_keys, :marker_values) + ) + , + }> + ; + """; + public static final String FIND_ALL = """ SELECT * FROM automation_rule_evaluator_logs WHERE workspace_id = :workspaceId @@ -99,4 +123,47 @@ private void bindParameters(LogCriteria criteria, Statement statement) { Optional.ofNullable(criteria.size()).ifPresent(limit -> statement.bind("limit", limit)); } + @Override + public Mono saveAll(@NonNull List events) { + return Mono.from(connectionFactory.create()) + .flatMapMany(connection -> { + var template = new ST(INSERT_STATEMENT); + List queryItems = getQueryItemPlaceHolder(events.size()); + + template.add("items", queryItems); + + Statement statement = connection.createStatement(template.render()); + + for (int i = 0; i < events.size(); i++) { + ILoggingEvent event = events.get(i); + + String logLevel = event.getLevel().toString(); + String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) + .orElseThrow(() -> failWithMessage("workspace_id is not set")); + String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) + .orElseThrow(() -> failWithMessage("trace_id is not set")); + String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) + .orElseThrow(() -> failWithMessage("rule_id is not set")); + + statement + .bind("timestamp" + i, event.getInstant().toString()) + .bind("level" + i, logLevel) + .bind("workspace_id" + i, workspaceId) + .bind("rule_id" + i, ruleId) + .bind("message" + i, event.getFormattedMessage()) + .bind("marker_keys" + i, new String[]{"trace_id"}) + .bind("marker_values" + i, new String[]{traceId}); + } + + return statement.execute(); + }) + .collectList() + .then(); + } + + private IllegalStateException failWithMessage(String message) { + log.error(message); + return new IllegalStateException(message); + } + } \ No newline at end of file diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java deleted file mode 100644 index a0b976c816..0000000000 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.comet.opik.infrastructure.log.tables; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import com.comet.opik.utils.TemplateUtils; -import io.r2dbc.spi.ConnectionFactory; -import io.r2dbc.spi.Statement; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.stringtemplate.v4.ST; -import reactor.core.publisher.Mono; - -import java.util.List; -import java.util.Optional; - -import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.UserLogTableDAO; -import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; - -@RequiredArgsConstructor -@Slf4j -class AutomationRuleEvaluatorLogDAO implements UserLogTableDAO { - - private final ConnectionFactory factory; - - private static final String INSERT_STATEMENT = """ - INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) - VALUES , 9), - :level, - :workspace_id, - :rule_id, - :message, - mapFromArrays(:marker_keys, :marker_values) - ) - , - }> - ; - """; - - @Override - public Mono saveAll(@NonNull List events) { - return Mono.from(factory.create()) - .flatMapMany(connection -> { - var template = new ST(INSERT_STATEMENT); - List queryItems = getQueryItemPlaceHolder(events.size()); - - template.add("items", queryItems); - - Statement statement = connection.createStatement(template.render()); - - for (int i = 0; i < events.size(); i++) { - ILoggingEvent event = events.get(i); - - String logLevel = event.getLevel().toString(); - String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) - .orElseThrow(() -> failWithMessage("workspace_id is not set")); - String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) - .orElseThrow(() -> failWithMessage("trace_id is not set")); - String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) - .orElseThrow(() -> failWithMessage("rule_id is not set")); - - statement - .bind("timestamp" + i, event.getInstant().toString()) - .bind("level" + i, logLevel) - .bind("workspace_id" + i, workspaceId) - .bind("rule_id" + i, ruleId) - .bind("message" + i, event.getFormattedMessage()) - .bind("marker_keys" + i, new String[]{"trace_id"}) - .bind("marker_values" + i, new String[]{traceId}); - } - - return statement.execute(); - }) - .collectList() - .then(); - } - - private IllegalStateException failWithMessage(String message) { - log.error(message); - return new IllegalStateException(message); - } - -} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java index def849d8c0..5900326a46 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java @@ -1,6 +1,7 @@ package com.comet.opik.infrastructure.log.tables; import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.domain.AutomationRuleEvaluatorLogsDAO; import com.comet.opik.domain.UserLog; import io.r2dbc.spi.ConnectionFactory; import lombok.NonNull; @@ -29,7 +30,7 @@ class UserLogTableFactoryImpl implements UserLogTableFactory { UserLogTableFactoryImpl(@NonNull ConnectionFactory factory) { daoMap = Map.of( - UserLog.AUTOMATION_RULE_EVALUATOR, new AutomationRuleEvaluatorLogDAO(factory)); + UserLog.AUTOMATION_RULE_EVALUATOR, AutomationRuleEvaluatorLogsDAO.create(factory)); } @Override