Skip to content

Commit

Permalink
[region migration] Fix tsfile error caused by target DataNode's fast …
Browse files Browse the repository at this point in the history
…rebooting #14031
  • Loading branch information
liyuheng55555 authored Nov 9, 2024
1 parent 8a92aaf commit 4be1020
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerExc
}

public void receiveSnapshotFragment(
String snapshotId, String originalFilePath, ByteBuffer fileChunk)
String snapshotId, String originalFilePath, ByteBuffer fileChunk, long fileOffset)
throws ConsensusGroupModifyPeerException {
try {
String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
Expand All @@ -384,7 +384,7 @@ public void receiveSnapshotFragment(
}
try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true);
FileChannel channel = fos.getChannel()) {
channel.write(fileChunk.slice());
channel.write(fileChunk.slice(), fileOffset);
}
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq re
}
TSStatus responseStatus;
try {
impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk);
impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk, req.offset);
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public TSendSnapshotFragmentReq toTSendSnapshotFragmentReq() {
TSendSnapshotFragmentReq req = new TSendSnapshotFragmentReq();
req.setSnapshotId(snapshotId);
req.setFilePath(filePath);
req.setOffset(startOffset);
req.setChunkLength(fragmentSize);
req.setFileChunk(fileChunk);
return req;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public SnapshotFragmentReader(String snapshotId, Path path) throws IOException {

public boolean hasNext() throws IOException {
buf.clear();
int actualReadSize = fileChannel.read(buf);
int readSize = fileChannel.read(buf);
buf.flip();
if (actualReadSize > 0) {
if (readSize > 0) {
cachedSnapshotFragment =
new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, actualReadSize, buf);
totalReadSize += actualReadSize;
new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, readSize, buf);
totalReadSize += readSize;
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ struct TSendSnapshotFragmentReq {
1: required common.TConsensusGroupId consensusGroupId
2: required string snapshotId
3: required string filePath
4: required i64 chunkLength
5: required binary fileChunk
4: required i64 offset
5: required i64 chunkLength
6: required binary fileChunk
}

struct TWaitSyncLogCompleteReq {
Expand Down

0 comments on commit 4be1020

Please sign in to comment.