diff --git a/interactive_engine/common/pom.xml b/interactive_engine/common/pom.xml
index bce589c7aca1..38c3464d449a 100644
--- a/interactive_engine/common/pom.xml
+++ b/interactive_engine/common/pom.xml
@@ -120,6 +120,7 @@
schema_common.proto
ddl_service.proto
write_service.proto
+ request_option.proto
diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java
new file mode 100644
index 000000000000..bc5f8831719a
--- /dev/null
+++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java
@@ -0,0 +1,50 @@
+package com.alibaba.graphscope.groot.common.constant;
+
+public class LogConstant {
+
+ public static String TRACE_ID = "traceId";
+
+ public static String UPSTREAM_ID = "upstreamId";
+
+ public static String QUERY_ID = "queryId";
+
+ /**
+ * 具体查询语句
+ */
+ public static String QUERY = "query";
+
+ public static String SUCCESS = "success";
+
+ public static String ERROR_MESSAGE = "errorMessage";
+
+ public static String STACK_TRACE = "stackTrace";
+
+ /**
+ * 查询计划
+ */
+ public static String IR_PLAN = "irPlan";
+
+ /**
+ * 打印日志的阶段
+ * query: java/rust
+ * write: writeKafka/consumeKafka/writeDb
+ */
+ public static String STAGE = "stage";
+
+ public static String RESULT = "result";
+
+ public static String COST = "cost";
+
+ public static String START_TIME = "startMillis";
+
+ public static String END_TIME = "endMillis";
+
+ /**
+ * 日志类型: query/write
+ */
+ public static String LOG_TYPE = "logType";
+
+ public static String BATCH_SIZE = "batchSize";
+
+ public static String PARTITION_ID = "partitionId";
+}
diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java
new file mode 100644
index 000000000000..e31447be3552
--- /dev/null
+++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java
@@ -0,0 +1,63 @@
+package com.alibaba.graphscope.groot.common.util;
+
+import com.alibaba.graphscope.groot.common.constant.LogConstant;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utils {
+
+ private static final ObjectMapper jsonMapper = new ObjectMapper();
+ private static final Logger defaultLogger = LoggerFactory.getLogger(Utils.class);
+
+ /**
+ * build metric json log for monitor
+ * plz dont delete any field
+ * could add new field
+ * @param isSuccess
+ * @param traceId
+ * @param batchSize
+ * @param partitionId
+ * @param cost
+ * @param endTime
+ * @param stage
+ * @param logType
+ * @return
+ */
+ public static String buildMetricJsonLog(
+ boolean isSuccess,
+ String traceId,
+ Integer batchSize,
+ Integer partitionId,
+ long cost,
+ Long endTime,
+ String stage,
+ String logType) {
+ String jsonLog = "";
+ ObjectNode metricJsonLog = jsonMapper.createObjectNode();
+ metricJsonLog.put(LogConstant.TRACE_ID, traceId);
+ metricJsonLog.put(LogConstant.SUCCESS, isSuccess);
+ if (batchSize != null) {
+ metricJsonLog.put(LogConstant.BATCH_SIZE, batchSize);
+ }
+ if (partitionId != null) {
+ metricJsonLog.put(LogConstant.PARTITION_ID, partitionId);
+ }
+ if (endTime != null) {
+ metricJsonLog.put(LogConstant.END_TIME, endTime);
+ }
+ metricJsonLog.put(LogConstant.COST, cost);
+ if (stage != null) {
+ metricJsonLog.put(LogConstant.STAGE, stage);
+ }
+ metricJsonLog.put(LogConstant.LOG_TYPE, logType);
+ try {
+ jsonLog = jsonMapper.writeValueAsString(metricJsonLog);
+ } catch (Exception e) {
+ defaultLogger.error("JsonProcessingException!", e);
+ }
+ return jsonLog;
+ }
+}
diff --git a/interactive_engine/compiler/pom.xml b/interactive_engine/compiler/pom.xml
index fe5ed32f9566..124837e7de22 100644
--- a/interactive_engine/compiler/pom.xml
+++ b/interactive_engine/compiler/pom.xml
@@ -146,6 +146,10 @@
io.opentelemetry
opentelemetry-api
+
+ io.opentelemetry
+ opentelemetry-sdk
+
io.opentelemetry.semconv
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java
index c2fd396e9614..6d286af496a0 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java
@@ -16,6 +16,12 @@
package com.alibaba.graphscope.gremlin.plugin;
+import com.alibaba.graphscope.groot.common.constant.LogConstant;
+import com.alibaba.graphscope.groot.common.util.Utils;
+import com.google.gson.JsonObject;
+
+import io.opentelemetry.api.trace.Span;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,9 +34,25 @@ public class QueryLogger {
private final String query;
private final BigInteger queryId;
+ /**
+ * 上游带下来的traceId
+ */
+ private final String upstreamId;
+
+ private String irPlan;
+
public QueryLogger(String query, BigInteger queryId) {
this.query = query;
this.queryId = queryId;
+ this.irPlan = null;
+ this.upstreamId = null;
+ }
+
+ public QueryLogger(String query, BigInteger queryId, String upstreamId) {
+ this.query = query;
+ this.queryId = queryId;
+ this.upstreamId = upstreamId;
+ this.irPlan = null;
}
public void debug(String format, Object... args) {
@@ -49,13 +71,61 @@ public void error(String format, Object... args) {
defaultLogger.error(this + " : " + format, args);
}
+ public void error(Throwable throwable) {
+ JsonObject errorJson = new JsonObject();
+ String traceId = Span.current().getSpanContext().getTraceId();
+ if (this.upstreamId != null) {
+ errorJson.addProperty(LogConstant.UPSTREAM_ID, this.upstreamId);
+ }
+ errorJson.addProperty(LogConstant.TRACE_ID, traceId);
+ errorJson.addProperty(LogConstant.SUCCESS, false);
+ errorJson.addProperty(LogConstant.STAGE, "java");
+ errorJson.addProperty(LogConstant.LOG_TYPE, "query");
+ errorJson.addProperty(LogConstant.ERROR_MESSAGE, throwable.getMessage());
+ defaultLogger.error(errorJson.toString(), throwable);
+ }
+
+ public void print(String message, boolean success, Throwable t) {
+ if (success) {
+ defaultLogger.info(message);
+ } else {
+ defaultLogger.error(message, t);
+ }
+ }
+
public void metricsInfo(String format, Object... args) {
metricLogger.info(queryId + " | " + query + " | " + format, args);
}
+ public void metricsInfo(boolean isSucceed, long cost) {
+ String traceId = Span.current().getSpanContext().getTraceId();
+ String metricJson =
+ Utils.buildMetricJsonLog(
+ isSucceed,
+ traceId,
+ null,
+ null,
+ cost,
+ System.currentTimeMillis(),
+ "java",
+ "query");
+ metricLogger.info(metricJson);
+ }
+
@Override
public String toString() {
- return "[" + "query='" + query + '\'' + ", queryId=" + queryId + ']';
+ StringBuilder str = new StringBuilder();
+ str.append("[");
+ if (this.upstreamId != null) {
+ str.append("upstreamId=").append(this.upstreamId).append(", ");
+ }
+ str.append("query='")
+ .append(this.query)
+ .append("'")
+ .append(", queryId=")
+ .append(this.queryId)
+ .append("]");
+ return str.toString();
}
public String getQuery() {
@@ -65,4 +135,16 @@ public String getQuery() {
public BigInteger getQueryId() {
return queryId;
}
+
+ public void setIrPlan(String irPlan) {
+ this.irPlan = irPlan;
+ }
+
+ public String getUpstreamId() {
+ return upstreamId;
+ }
+
+ public String getIrPlan() {
+ return irPlan;
+ }
}
diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java
index 5d417d3858b8..3ed116befb19 100644
--- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java
+++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java
@@ -16,50 +16,113 @@
package com.alibaba.graphscope.gremlin.plugin;
-import static io.opentelemetry.api.common.AttributeKey.*;
+import com.alibaba.graphscope.groot.common.constant.LogConstant;
+import com.alibaba.graphscope.groot.common.util.JSON;
+import com.google.gson.JsonObject;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.trace.Span;
-import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
+import java.util.List;
+
public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;
private LongHistogram queryHistogram;
+ // if query cost large than threshold, will print detail log
+ private long printThreshold;
public QueryStatusCallback(
- MetricsCollector metricsCollector, LongHistogram histogram, QueryLogger queryLogger) {
+ MetricsCollector metricsCollector,
+ LongHistogram histogram,
+ QueryLogger queryLogger,
+ long printThreshold) {
this.metricsCollector = metricsCollector;
this.queryLogger = queryLogger;
this.queryHistogram = histogram;
+ this.printThreshold = printThreshold;
}
public void onStart() {}
- public void onEnd(boolean isSucceed, @Nullable String msg) {
+ public void onErrorEnd(@Nullable String msg) {
+ this.metricsCollector.stop();
+ onErrorEnd(null, msg);
+ }
+
+ public void onErrorEnd(@Nullable Throwable t) {
this.metricsCollector.stop();
- if (isSucceed) {
- queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
+ onErrorEnd(t, null);
+ }
+
+ private void onErrorEnd(Throwable t, String msg) {
+ String errorMsg = msg;
+ if (t != null) {
+ errorMsg = t.getMessage();
}
+ JsonObject logJson = buildSimpleLog(false, metricsCollector.getElapsedMillis());
+ fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis(), null);
+ queryLogger.print(logJson.toString(), false, t);
Attributes attrs =
Attributes.builder()
.put("id", queryLogger.getQueryId().toString())
.put("query", queryLogger.getQuery())
- .put("success", isSucceed)
+ .put("success", false)
.put("message", msg != null ? msg : "")
.build();
this.queryHistogram.record(metricsCollector.getElapsedMillis(), attrs);
+ queryLogger.metricsInfo(false, metricsCollector.getElapsedMillis());
+ }
+
+ public void onSuccessEnd(List