Skip to content

Commit

Permalink
IGNITE-24283 Fix data race in raft pipeline on command retry (#5124)
Browse files Browse the repository at this point in the history
(cherry picked from commit e9fd786)
  • Loading branch information
ascherbakoff authored and ptupitsyn committed Jan 30, 2025
1 parent 976fd4c commit a454507
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public interface MetaStorageWriteCommand extends WriteCommand {
* command is saved into the Raft log (see {@link BeforeApplyHandler#onBeforeApply(Command)}.
*/
@WithSetter
@Override
@Nullable HybridTimestamp safeTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public interface WriteCommand extends Command {
/**
* Holds request's initiator timestamp.
* Holds request's initiator timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand.
*
* @return The timestamp.
*/
Expand All @@ -34,9 +34,11 @@ public interface WriteCommand extends Command {
}

/**
* This is called before a command is submitted to replication pipeline.
* Gets safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand.
*
* @param safeTs Safe timestamp.
* @return The timestamp.
*/
default void patch(HybridTimestamp safeTs) {}
default @Nullable HybridTimestamp safeTime() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ default long term() {
return 0;
}

/**
* Safe timestamp (replication begin time) of a command. {@code Null} if a command doesn't carry safe timestamp.
* Returns the safe time.
*/
default @Nullable HybridTimestamp safeTimestamp() {
return null;
}

/**
* Returns command.
*/
Expand All @@ -56,11 +64,4 @@ default long term() {
* @param res Execution result.
*/
void result(@Nullable Serializable res);

/**
* Patches the command.
*
* @param safeTs Safe timestamp.
*/
default void patch(HybridTimestamp safeTs) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,26 @@

package org.apache.ignite.internal.raft.service;

import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.WriteCommand;
import org.jetbrains.annotations.Nullable;

/**
* The marker interface for a write command closure.
* The marker interface for a safe time aware command closure.
*/
public interface WriteCommandClosure extends CommandClosure<WriteCommand> {
public interface SafeTimeAwareCommandClosure extends CommandClosure<WriteCommand> {
/**
* Gets the safe timestamp.
*
* @return The timestamp.
*/
@Override
@Nullable HybridTimestamp safeTimestamp();

/**
* Sets the safe time.
*
* @param safeTs The timestamp.
*/
default void safeTimestamp(HybridTimestamp safeTs) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,9 @@ public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
Function<Peer, ActionRequest> requestFactory;

if (cmd instanceof WriteCommand) {
byte[] commandBytes = commandsMarshaller.marshall(cmd);

requestFactory = targetPeer -> factory.writeActionRequest()
.groupId(groupId)
.command(commandBytes)
.command(commandsMarshaller.marshall(cmd))
// Having prepared deserialized command makes its handling more efficient in the state machine.
// This saves us from extra-deserialization on a local machine, which would take precious time to do.
.deserializedCommand((WriteCommand) cmd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ void resetUnavailablePeers() {
* @return {@code this}.
*/
RetryContext nextAttempt(Peer newTargetPeer) {
// We can avoid recreating the request if the target peer has not changed.
if (!newTargetPeer.equals(targetPeer)) {
request = requestFactory.apply(newTargetPeer);
}
request = requestFactory.apply(newTargetPeer);

targetPeer = newTargetPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand Down Expand Up @@ -870,7 +871,9 @@ public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) currentDone;
ByteBuffer data = iter.getData();

// done != null means we are on the leader, otherwise a command has been read from the log.
WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command();
HybridTimestamp safeTs = done == null ? command.safeTime() : done.safeTimestamp();

long commandIndex = iter.getIndex();
long commandTerm = iter.getTerm();
Expand All @@ -886,6 +889,11 @@ public long term() {
return commandTerm;
}

@Override
public @Nullable HybridTimestamp safeTimestamp() {
return safeTs;
}

@Override
public WriteCommand command() {
return command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.WriteCommandClosure;
import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure;
import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
Expand Down Expand Up @@ -303,18 +303,19 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b
}

// Patch the command.
if (event.done instanceof WriteCommandClosure) {
WriteCommandClosure clo = (WriteCommandClosure) event.done;
if (event.done instanceof SafeTimeAwareCommandClosure) {
SafeTimeAwareCommandClosure clo = (SafeTimeAwareCommandClosure) event.done;
WriteCommand command = clo.command();
HybridTimestamp timestamp = command.initiatorTime();

if (timestamp != null) {
// Tick once per batch.
if (safeTs == null) {
safeTs = clock.update(timestamp);
} else if (timestamp.compareTo(safeTs) > 0) {
safeTs = clock.update(timestamp);
}

clo.patch(safeTs);
clo.safeTimestamp(safeTs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupListener.ShutdownException;
import org.apache.ignite.internal.raft.service.WriteCommandClosure;
import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
Expand All @@ -53,6 +53,7 @@
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.util.BytesUtil;
import org.jetbrains.annotations.Nullable;

/**
* Process action request.
Expand Down Expand Up @@ -176,7 +177,9 @@ private Object groupIdSyncMonitor(String groupId) {
*/
private void applyWrite(Node node, WriteActionRequest request, WriteCommand command, RpcContext rpcCtx) {
ByteBuffer wrapper = ByteBuffer.wrap(request.command());
node.apply(new Task(wrapper, new RaftWriteCommandClosure() {
node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure() {
private HybridTimestamp safeTs;

@Override
public void result(Serializable res) {
sendResponse(res, rpcCtx);
Expand All @@ -187,6 +190,11 @@ public WriteCommand command() {
return command;
}

@Override
public @Nullable HybridTimestamp safeTimestamp() {
return safeTs;
}

@Override
public void run(Status status) {
assert !status.isOk() : status;
Expand All @@ -195,9 +203,12 @@ public void run(Status status) {
}

@Override
public void patch(HybridTimestamp safeTs) {
public void safeTimestamp(HybridTimestamp safeTs) {
assert this.safeTs == null : "Safe time can be set only once";
// Apply binary patch.
node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs);
command.patch(safeTs);
// Avoid modifying WriteCommand object, because it's shared between raft pipeline threads.
this.safeTs = safeTs;
}
}));
}
Expand Down Expand Up @@ -335,6 +346,9 @@ private void sendRaftError(RpcContext ctx, Status status, Node node) {
ctx.sendResponse(response);
}

private interface RaftWriteCommandClosure extends Closure, WriteCommandClosure {
/**
* The command closure which is used to propagate replication state to local FSM.
*/
private interface LocalAwareWriteCommandClosure extends Closure, SafeTimeAwareCommandClosure {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,17 @@ public interface SafeTimePropagatingCommand extends WriteCommand {
HybridTimestamp initiatorTime();

/**
* The Safe timestamp. This field's value is used by a replication layer and not thread safe for external usage.
* The Safe timestamp. A value for this field is automatically generated by a replication layer.
*/
@Transient
@WithSetter
@Override
@Nullable HybridTimestamp safeTime();

/**
* Setter for the safeTime field. A value for this field is auto generated and should not be set manually.
* Setter for the safeTime field for internal usage. Should not be set manually.
*/
default void safeTime(HybridTimestamp safeTime) {
// No-op.
}

@Override
default void patch(HybridTimestamp safeTs) {
safeTime(safeTs);
}
}
Loading

0 comments on commit a454507

Please sign in to comment.