diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index de3643bca6cc..efa1a0306ef4 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -373,7 +373,7 @@ public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerExc } public void receiveSnapshotFragment( - String snapshotId, String originalFilePath, ByteBuffer fileChunk) + String snapshotId, String originalFilePath, ByteBuffer fileChunk, long fileOffset) throws ConsensusGroupModifyPeerException { try { String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath); @@ -384,7 +384,7 @@ public void receiveSnapshotFragment( } try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true); FileChannel channel = fos.getChannel()) { - channel.write(fileChunk.slice()); + channel.write(fileChunk.slice(), fileOffset); } } catch (IOException e) { throw new ConsensusGroupModifyPeerException( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index 07fa67ee5394..a76b0e97b066 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -270,7 +270,7 @@ public TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq re } TSStatus responseStatus; try { - impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk); + impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk, req.offset); responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (ConsensusGroupModifyPeerException e) { responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java index a249bad90ff3..67fcb220c180 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java @@ -50,6 +50,7 @@ public TSendSnapshotFragmentReq toTSendSnapshotFragmentReq() { TSendSnapshotFragmentReq req = new TSendSnapshotFragmentReq(); req.setSnapshotId(snapshotId); req.setFilePath(filePath); + req.setOffset(startOffset); req.setChunkLength(fragmentSize); req.setFileChunk(fileChunk); return req; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java index ca79b8f3c955..3331829d243f 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java @@ -46,12 +46,12 @@ public SnapshotFragmentReader(String snapshotId, Path path) throws IOException { public boolean hasNext() throws IOException { buf.clear(); - int actualReadSize = fileChannel.read(buf); + int readSize = fileChannel.read(buf); buf.flip(); - if (actualReadSize > 0) { + if (readSize > 0) { cachedSnapshotFragment = - new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, actualReadSize, buf); - totalReadSize += actualReadSize; + new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, readSize, buf); + totalReadSize += readSize; return true; } return false; diff --git a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift index 36dbd2dc35ee..d0b4808977e2 100644 --- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift +++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift @@ -80,8 +80,9 @@ struct TSendSnapshotFragmentReq { 1: required common.TConsensusGroupId consensusGroupId 2: required string snapshotId 3: required string filePath - 4: required i64 chunkLength - 5: required binary fileChunk + 4: required i64 offset + 5: required i64 chunkLength + 6: required binary fileChunk } struct TWaitSyncLogCompleteReq {