Skip to content

Commit

Permalink
Reuse DAO
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Jan 22, 2025
1 parent df6a3df commit 7633e65
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.comet.opik.domain;

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.comet.opik.api.LogCriteria;
import com.comet.opik.api.LogItem;
import com.comet.opik.utils.TemplateUtils;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
Expand All @@ -22,9 +24,15 @@

import static com.comet.opik.api.LogItem.LogLevel;
import static com.comet.opik.api.LogItem.LogPage;
import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.*;
import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder;

@ImplementedBy(AutomationRuleEvaluatorLogsDAOImpl.class)
public interface AutomationRuleEvaluatorLogsDAO {
public interface AutomationRuleEvaluatorLogsDAO extends UserLogTableDAO {

static AutomationRuleEvaluatorLogsDAO create(ConnectionFactory factory) {
return new AutomationRuleEvaluatorLogsDAOImpl(factory);
}

Mono<LogPage> findLogs(LogCriteria criteria);

Expand All @@ -35,6 +43,22 @@ public interface AutomationRuleEvaluatorLogsDAO {
@RequiredArgsConstructor(onConstructor_ = @Inject)
class AutomationRuleEvaluatorLogsDAOImpl implements AutomationRuleEvaluatorLogsDAO {

private static final String INSERT_STATEMENT = """
INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers)
VALUES <items:{item |
(
parseDateTime64BestEffort(:timestamp<item.index>, 9),
:level<item.index>,
:workspace_id<item.index>,
:rule_id<item.index>,
:message<item.index>,
mapFromArrays(:marker_keys<item.index>, :marker_values<item.index>)
)
<if(item.hasNext)>,<endif>
}>
;
""";

public static final String FIND_ALL = """
SELECT * FROM automation_rule_evaluator_logs
WHERE workspace_id = :workspaceId
Expand Down Expand Up @@ -99,4 +123,47 @@ private void bindParameters(LogCriteria criteria, Statement statement) {
Optional.ofNullable(criteria.size()).ifPresent(limit -> statement.bind("limit", limit));
}

@Override
public Mono<Void> saveAll(@NonNull List<ILoggingEvent> events) {
return Mono.from(connectionFactory.create())
.flatMapMany(connection -> {
var template = new ST(INSERT_STATEMENT);
List<TemplateUtils.QueryItem> queryItems = getQueryItemPlaceHolder(events.size());

template.add("items", queryItems);

Statement statement = connection.createStatement(template.render());

for (int i = 0; i < events.size(); i++) {
ILoggingEvent event = events.get(i);

String logLevel = event.getLevel().toString();
String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id"))
.orElseThrow(() -> failWithMessage("workspace_id is not set"));
String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id"))
.orElseThrow(() -> failWithMessage("trace_id is not set"));
String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id"))
.orElseThrow(() -> failWithMessage("rule_id is not set"));

statement
.bind("timestamp" + i, event.getInstant().toString())
.bind("level" + i, logLevel)
.bind("workspace_id" + i, workspaceId)
.bind("rule_id" + i, ruleId)
.bind("message" + i, event.getFormattedMessage())
.bind("marker_keys" + i, new String[]{"trace_id"})
.bind("marker_values" + i, new String[]{traceId});
}

return statement.execute();
})
.collectList()
.then();
}

private IllegalStateException failWithMessage(String message) {
log.error(message);
return new IllegalStateException(message);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.comet.opik.infrastructure.log.tables;

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.comet.opik.domain.AutomationRuleEvaluatorLogsDAO;
import com.comet.opik.domain.UserLog;
import io.r2dbc.spi.ConnectionFactory;
import lombok.NonNull;
Expand Down Expand Up @@ -29,7 +30,7 @@ class UserLogTableFactoryImpl implements UserLogTableFactory {

UserLogTableFactoryImpl(@NonNull ConnectionFactory factory) {
daoMap = Map.of(
UserLog.AUTOMATION_RULE_EVALUATOR, new AutomationRuleEvaluatorLogDAO(factory));
UserLog.AUTOMATION_RULE_EVALUATOR, AutomationRuleEvaluatorLogsDAO.create(factory));
}

@Override
Expand Down

0 comments on commit 7633e65

Please sign in to comment.