From adfcffb06fd29afd8a2481ddd35ae7b724771205 Mon Sep 17 00:00:00 2001 From: Potato Date: Sun, 4 Jul 2021 20:51:24 +0800 Subject: [PATCH] [Cluster] enable cacheLeader by default (#3468) --- .../java/org/apache/iotdb/tool/ImportCsv.java | 2 +- .../java/org/apache/iotdb/SessionExample.java | 1 + .../java/org/apache/iotdb/session/Config.java | 2 +- .../java/org/apache/iotdb/session/Session.java | 18 ++++++++++++------ 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java index 6959360d8497..fb7a878be311 100644 --- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java +++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java @@ -290,7 +290,7 @@ private static void parseSpecialParams(CommandLine commandLine) { public static void importCsvFromFile( String ip, String port, String username, String password, String filename, String timeZone) { try { - session = new Session(ip, Integer.parseInt(port), username, password); + session = new Session(ip, Integer.parseInt(port), username, password, false); session.open(false); timeZoneID = timeZone; setTimeZone(); diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index c27c17d0f144..a705c24ee322 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -322,6 +322,7 @@ private static void insertRecords() throws IoTDBConnectionException, StatementEx deviceIds.clear(); measurementsList.clear(); valuesList.clear(); + typesList.clear(); timestamps.clear(); } } diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java index 4114594b0092..02e7e708658e 100644 --- a/session/src/main/java/org/apache/iotdb/session/Config.java +++ b/session/src/main/java/org/apache/iotdb/session/Config.java @@ -24,7 +24,7 @@ public class Config { public static final String DEFAULT_PASSWORD = "root"; public static final int DEFAULT_FETCH_SIZE = 5000; public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0; - public static final boolean DEFAULT_CACHE_LEADER_MODE = false; + public static final boolean DEFAULT_CACHE_LEADER_MODE = true; public static final int RETRY_NUM = 3; public static final long RETRY_INTERVAL_MS = 1000; diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 447c2b5b946e..1b2f8125eb61 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -679,7 +679,9 @@ private void insertRecord(String deviceId, TSInsertStringRecordReq request) private SessionConnection getSessionConnection(String deviceId) { EndPoint endPoint; - if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) { + if (enableCacheLeader + && !deviceIdToEndpoint.isEmpty() + && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) { return endPointToSessionConnection.get(endPoint); } else { return defaultSessionConnection; @@ -898,7 +900,7 @@ private void insertStringRecordsWithLeaderCache( EndPoint endPoint; SessionConnection connection; for (int i = 0; i < deviceIds.size(); i++) { - endPoint = deviceIdToEndpoint.get(deviceIds.get(i)); + endPoint = deviceIdToEndpoint.isEmpty() ? null : deviceIdToEndpoint.get(deviceIds.get(i)); if (endPoint != null) { connection = endPointToSessionConnection.get(endPoint); } else { @@ -1120,7 +1122,7 @@ private void insertRecordsWithLeaderCache( EndPoint endPoint; SessionConnection connection; for (int i = 0; i < deviceIds.size(); i++) { - endPoint = deviceIdToEndpoint.get(deviceIds.get(i)); + endPoint = deviceIdToEndpoint.isEmpty() ? null : deviceIdToEndpoint.get(deviceIds.get(i)); if (endPoint != null) { connection = endPointToSessionConnection.get(endPoint); } else { @@ -1205,7 +1207,9 @@ public void insertTablet(Tablet tablet) TSInsertTabletReq request = genTSInsertTabletReq(tablet, false); EndPoint endPoint; try { - if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) { + if (enableCacheLeader + && !deviceIdToEndpoint.isEmpty() + && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) { endPointToSessionConnection.get(endPoint).insertTablet(request); } else { defaultSessionConnection.insertTablet(request); @@ -1226,7 +1230,9 @@ public void insertTablet(Tablet tablet, boolean sorted) TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted); EndPoint endPoint; try { - if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) { + if (enableCacheLeader + && !deviceIdToEndpoint.isEmpty() + && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) { endPointToSessionConnection.get(endPoint).insertTablet(request); } else { defaultSessionConnection.insertTablet(request); @@ -1313,7 +1319,7 @@ private void insertTabletsWithLeaderCache(Map tablets, boolean s SessionConnection connection; Map tabletGroup = new HashMap<>(); for (Entry entry : tablets.entrySet()) { - endPoint = deviceIdToEndpoint.get(entry.getKey()); + endPoint = deviceIdToEndpoint.isEmpty() ? null : deviceIdToEndpoint.get(entry.getKey()); if (endPoint != null) { connection = endPointToSessionConnection.get(endPoint); } else {