Skip to content

Commit

Permalink
[Fix][dingo-executor] Fix pessimistic transaction update unique key i…
Browse files Browse the repository at this point in the history
…ssue
  • Loading branch information
githubgxll committed Feb 12, 2025
1 parent 75d37b5 commit 75c49e4
Showing 1 changed file with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,64 @@ private void resolveKeyChange(Vertex vertex, PessimisticLockUpdateParam param, C
if (ByteArrayUtils.equal(key, oldKey)) {
return;
}
StoreInstance localStore = Services.LOCAL_STORE.getInstance(tableId, partId);
byte[] partIdByte = partId.encode();
byte[] lockKeyBytes = encode(
CommonId.CommonType.TXN_CACHE_LOCK,
oldKey,
Op.LOCK.getCode(),
len,
txnIdByte,
tableIdByte,
partIdByte
);
KeyValue keyValue = localStore.get(lockKeyBytes);
if (keyValue != null) {
LogUtils.debug(log, "{}, repeat resolveKeyChange key :{} keyValue is not null",
txnId, Arrays.toString(key));
// data
byte[] dataKey = getKeyByOp(CommonId.CommonType.TXN_CACHE_DATA, Op.PUT, lockKeyBytes);
byte[] deleteKey = Arrays.copyOf(dataKey, dataKey.length);
deleteKey[deleteKey.length - 2] = (byte) Op.DELETE.getCode();
byte[] insertKey = Arrays.copyOf(dataKey, dataKey.length);
insertKey[insertKey.length - 2] = (byte) Op.PUTIFABSENT.getCode();
if (localStore.get(dataKey) != null) {
byte[] value = localStore.get(dataKey).getValue();
localStore.put(new KeyValue(deleteKey, value));
localStore.delete(dataKey);
// extraKeyValue
KeyValue extraKeyValue = new KeyValue(
ByteUtils.encode(
CommonId.CommonType.TXN_CACHE_EXTRA_DATA,
oldKey,
Op.PUT.getCode(),
len,
jobIdByte,
tableIdByte,
partIdByte),
value
);
localStore.put(extraKeyValue);
} else if (localStore.get(insertKey) != null) {
byte[] value = localStore.get(insertKey).getValue();
localStore.put(new KeyValue(deleteKey, value));
localStore.delete(insertKey);
// extraKeyValue
KeyValue extraKeyValue = new KeyValue(
ByteUtils.encode(
CommonId.CommonType.TXN_CACHE_EXTRA_DATA,
oldKey,
Op.PUTIFABSENT.getCode(),
len,
jobIdByte,
tableIdByte,
partIdByte),
value
);
localStore.put(extraKeyValue);
}
return;
}
byte[] originalKey;
if (isVector) {
originalKey = codec.encodeKeyPrefix(newTuple, 1);
Expand All @@ -487,9 +545,7 @@ private void resolveKeyChange(Vertex vertex, PessimisticLockUpdateParam param, C
} else {
originalKey = oldKey;
}
StoreInstance localStore = Services.LOCAL_STORE.getInstance(tableId, partId);
StoreInstance kvStore = Services.KV_STORE.getInstance(tableId, partId);
byte[] partIdByte = partId.encode();
// for check deadLock
byte[] deadLockKeyBytes = encode(
CommonId.CommonType.TXN_CACHE_BLOCK_LOCK,
Expand Down

0 comments on commit 75c49e4

Please sign in to comment.