Skip to content

Commit

Permalink
[Cluster] enable cacheLeader by default (apache#3468)
Browse files Browse the repository at this point in the history
  • Loading branch information
OneSizeFitsQuorum authored Jul 4, 2021
1 parent dbfb564 commit adfcffb
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private static void parseSpecialParams(CommandLine commandLine) {
public static void importCsvFromFile(
String ip, String port, String username, String password, String filename, String timeZone) {
try {
session = new Session(ip, Integer.parseInt(port), username, password);
session = new Session(ip, Integer.parseInt(port), username, password, false);
session.open(false);
timeZoneID = timeZone;
setTimeZone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ private static void insertRecords() throws IoTDBConnectionException, StatementEx
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
typesList.clear();
timestamps.clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion session/src/main/java/org/apache/iotdb/session/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class Config {
public static final String DEFAULT_PASSWORD = "root";
public static final int DEFAULT_FETCH_SIZE = 5000;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
public static final boolean DEFAULT_CACHE_LEADER_MODE = false;
public static final boolean DEFAULT_CACHE_LEADER_MODE = true;

public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL_MS = 1000;
Expand Down
18 changes: 12 additions & 6 deletions session/src/main/java/org/apache/iotdb/session/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,9 @@ private void insertRecord(String deviceId, TSInsertStringRecordReq request)

private SessionConnection getSessionConnection(String deviceId) {
EndPoint endPoint;
if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
if (enableCacheLeader
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
return endPointToSessionConnection.get(endPoint);
} else {
return defaultSessionConnection;
Expand Down Expand Up @@ -898,7 +900,7 @@ private void insertStringRecordsWithLeaderCache(
EndPoint endPoint;
SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
endPoint = deviceIdToEndpoint.get(deviceIds.get(i));
endPoint = deviceIdToEndpoint.isEmpty() ? null : deviceIdToEndpoint.get(deviceIds.get(i));
if (endPoint != null) {
connection = endPointToSessionConnection.get(endPoint);
} else {
Expand Down Expand Up @@ -1120,7 +1122,7 @@ private void insertRecordsWithLeaderCache(
EndPoint endPoint;
SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
endPoint = deviceIdToEndpoint.get(deviceIds.get(i));
endPoint = deviceIdToEndpoint.isEmpty() ? null : deviceIdToEndpoint.get(deviceIds.get(i));
if (endPoint != null) {
connection = endPointToSessionConnection.get(endPoint);
} else {
Expand Down Expand Up @@ -1205,7 +1207,9 @@ public void insertTablet(Tablet tablet)
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
EndPoint endPoint;
try {
if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
if (enableCacheLeader
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
endPointToSessionConnection.get(endPoint).insertTablet(request);
} else {
defaultSessionConnection.insertTablet(request);
Expand All @@ -1226,7 +1230,9 @@ public void insertTablet(Tablet tablet, boolean sorted)
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
EndPoint endPoint;
try {
if (enableCacheLeader && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
if (enableCacheLeader
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
endPointToSessionConnection.get(endPoint).insertTablet(request);
} else {
defaultSessionConnection.insertTablet(request);
Expand Down Expand Up @@ -1313,7 +1319,7 @@ private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean s
SessionConnection connection;
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
for (Entry<String, Tablet> entry : tablets.entrySet()) {
endPoint = deviceIdToEndpoint.get(entry.getKey());
endPoint = deviceIdToEndpoint.isEmpty() ? null : deviceIdToEndpoint.get(entry.getKey());
if (endPoint != null) {
connection = endPointToSessionConnection.get(endPoint);
} else {
Expand Down

0 comments on commit adfcffb

Please sign in to comment.