Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24283 Fix data race in raft pipeline on command retry #5124

Merged
merged 9 commits into from
Jan 30, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public interface MetaStorageWriteCommand extends WriteCommand {
* command is saved into the Raft log (see {@link BeforeApplyHandler#onBeforeApply(Command)}.
*/
@WithSetter
@Override
@Nullable HybridTimestamp safeTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public interface WriteCommand extends Command {
/**
* Holds request's initiator timestamp.
* Holds request's initiator timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand.
*
* @return The timestamp.
*/
Expand All @@ -34,9 +34,11 @@ public interface WriteCommand extends Command {
}

/**
* This is called before a command is submitted to replication pipeline.
* Gets safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand.
*
* @param safeTs Safe timestamp.
* @return The timestamp.
*/
default void patch(HybridTimestamp safeTs) {}
default @Nullable HybridTimestamp safeTime() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ default long term() {
return 0;
}

/**
* Safe timestamp (replication begin time) of a command. {@code Null} if a command doesn't carry safe timestamp.
* Returns the safe time.
*/
default @Nullable HybridTimestamp safeTimestamp() {
return null;
}

/**
* Returns command.
*/
Expand All @@ -56,11 +64,4 @@ default long term() {
* @param res Execution result.
*/
void result(@Nullable Serializable res);

/**
* Patches the command.
*
* @param safeTs Safe timestamp.
*/
default void patch(HybridTimestamp safeTs) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,26 @@

package org.apache.ignite.internal.raft.service;

import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.WriteCommand;
import org.jetbrains.annotations.Nullable;

/**
* The marker interface for a write command closure.
* The marker interface for a safe time aware command closure.
*/
public interface WriteCommandClosure extends CommandClosure<WriteCommand> {
public interface SafeTimeAwareCommandClosure extends CommandClosure<WriteCommand> {
/**
* Gets the safe timestamp.
*
* @return The timestamp.
*/
@Override
@Nullable HybridTimestamp safeTimestamp();

/**
* Sets the safe time.
*
* @param safeTs The timestamp.
*/
default void safeTimestamp(HybridTimestamp safeTs) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,9 @@ public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
Function<Peer, ActionRequest> requestFactory;

if (cmd instanceof WriteCommand) {
byte[] commandBytes = commandsMarshaller.marshall(cmd);

requestFactory = targetPeer -> factory.writeActionRequest()
.groupId(groupId)
.command(commandBytes)
.command(commandsMarshaller.marshall(cmd))
// Having prepared deserialized command makes its handling more efficient in the state machine.
// This saves us from extra-deserialization on a local machine, which would take precious time to do.
.deserializedCommand((WriteCommand) cmd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ void resetUnavailablePeers() {
* @return {@code this}.
*/
RetryContext nextAttempt(Peer newTargetPeer) {
// We can avoid recreating the request if the target peer has not changed.
if (!newTargetPeer.equals(targetPeer)) {
rpuch marked this conversation as resolved.
Show resolved Hide resolved
request = requestFactory.apply(newTargetPeer);
}
request = requestFactory.apply(newTargetPeer);

targetPeer = newTargetPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand Down Expand Up @@ -907,7 +908,9 @@ public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) currentDone;
ByteBuffer data = iter.getData();

// done != null means we are on the leader, otherwise a command has been read from the log.
WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command();
HybridTimestamp safeTs = done == null ? command.safeTime() : done.safeTimestamp();
rpuch marked this conversation as resolved.
Show resolved Hide resolved

long commandIndex = iter.getIndex();
long commandTerm = iter.getTerm();
Expand All @@ -923,6 +926,11 @@ public long term() {
return commandTerm;
}

@Override
public @Nullable HybridTimestamp safeTimestamp() {
return safeTs;
}

@Override
public WriteCommand command() {
return command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.WriteCommandClosure;
import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure;
import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
Expand Down Expand Up @@ -303,18 +303,19 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b
}

// Patch the command.
if (event.done instanceof WriteCommandClosure) {
WriteCommandClosure clo = (WriteCommandClosure) event.done;
if (event.done instanceof SafeTimeAwareCommandClosure) {
SafeTimeAwareCommandClosure clo = (SafeTimeAwareCommandClosure) event.done;
WriteCommand command = clo.command();
HybridTimestamp timestamp = command.initiatorTime();

if (timestamp != null) {
// Tick once per batch.
if (safeTs == null) {
safeTs = clock.update(timestamp);
rpuch marked this conversation as resolved.
Show resolved Hide resolved
} else if (timestamp.compareTo(safeTs) > 0) {
safeTs = clock.update(timestamp);
}

clo.patch(safeTs);
clo.safeTimestamp(safeTs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupListener.ShutdownException;
import org.apache.ignite.internal.raft.service.WriteCommandClosure;
import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
Expand All @@ -53,6 +53,7 @@
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.util.BytesUtil;
import org.jetbrains.annotations.Nullable;

/**
* Process action request.
Expand Down Expand Up @@ -176,7 +177,9 @@ private Object groupIdSyncMonitor(String groupId) {
*/
private void applyWrite(Node node, WriteActionRequest request, WriteCommand command, RpcContext rpcCtx) {
ByteBuffer wrapper = ByteBuffer.wrap(request.command());
node.apply(new Task(wrapper, new RaftWriteCommandClosure() {
node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure() {
private HybridTimestamp safeTs;

@Override
public void result(Serializable res) {
sendResponse(res, rpcCtx);
Expand All @@ -187,6 +190,11 @@ public WriteCommand command() {
return command;
}

@Override
public @Nullable HybridTimestamp safeTimestamp() {
return safeTs;
}

@Override
public void run(Status status) {
assert !status.isOk() : status;
Expand All @@ -195,9 +203,12 @@ public void run(Status status) {
}

@Override
public void patch(HybridTimestamp safeTs) {
public void safeTimestamp(HybridTimestamp safeTs) {
assert this.safeTs == null : "Safe time can be set only once";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert makes sure that safeTime() is not set multiple times. But if it actually tries to be invoked more than once, don't we need safeTs to be volatile to see that it has already been set earlier? Wouldn't such calls happen from different threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should only be invoked from raft event loop.
So it's safe without volatile.

// Apply binary patch.
node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs);
command.patch(safeTs);
// Avoid modifying WriteCommand object, because it's shared between raft pipeline threads.
this.safeTs = safeTs;
}
}));
}
Expand Down Expand Up @@ -335,6 +346,9 @@ private void sendRaftError(RpcContext ctx, Status status, Node node) {
ctx.sendResponse(response);
}

private interface RaftWriteCommandClosure extends Closure, WriteCommandClosure {
/**
* The command closure which is used to propagate replication state to local FSM.
*/
private interface LocalAwareWriteCommandClosure extends Closure, SafeTimeAwareCommandClosure {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,17 @@ public interface SafeTimePropagatingCommand extends WriteCommand {
HybridTimestamp initiatorTime();

/**
* The Safe timestamp. This field's value is used by a replication layer and not thread safe for external usage.
* The Safe timestamp. A value for this field is automatically generated by a replication layer.
*/
@Transient
@WithSetter
@Override
@Nullable HybridTimestamp safeTime();

/**
* Setter for the safeTime field. A value for this field is auto generated and should not be set manually.
* Setter for the safeTime field for internal usage. Should not be set manually.
*/
default void safeTime(HybridTimestamp safeTime) {
// No-op.
}

@Override
default void patch(HybridTimestamp safeTs) {
safeTime(safeTs);
}
}
Loading