diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java index f715bf2880b..81e2faa37e2 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java @@ -38,6 +38,7 @@ public interface MetaStorageWriteCommand extends WriteCommand { * command is saved into the Raft log (see {@link BeforeApplyHandler#onBeforeApply(Command)}. */ @WithSetter + @Override @Nullable HybridTimestamp safeTime(); /** diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java index 3b05698a858..3d36712f4fa 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java @@ -25,7 +25,7 @@ */ public interface WriteCommand extends Command { /** - * Holds request's initiator timestamp. + * Holds request's initiator timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand. * * @return The timestamp. */ @@ -34,9 +34,11 @@ public interface WriteCommand extends Command { } /** - * This is called before a command is submitted to replication pipeline. + * Gets safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand. * - * @param safeTs Safe timestamp. + * @return The timestamp. */ - default void patch(HybridTimestamp safeTs) {} + default @Nullable HybridTimestamp safeTime() { + return null; + } } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java index fa052c93036..7074b4b5328 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java @@ -45,6 +45,14 @@ default long term() { return 0; } + /** + * Safe timestamp (replication begin time) of a command. {@code Null} if a command doesn't carry safe timestamp. + * Returns the safe time. + */ + default @Nullable HybridTimestamp safeTimestamp() { + return null; + } + /** * Returns command. */ @@ -56,11 +64,4 @@ default long term() { * @param res Execution result. */ void result(@Nullable Serializable res); - - /** - * Patches the command. - * - * @param safeTs Safe timestamp. - */ - default void patch(HybridTimestamp safeTs) {} } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/WriteCommandClosure.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java similarity index 63% rename from modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/WriteCommandClosure.java rename to modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java index 2a788d8642f..563e27dc86d 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/WriteCommandClosure.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java @@ -17,10 +17,26 @@ package org.apache.ignite.internal.raft.service; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.raft.WriteCommand; +import org.jetbrains.annotations.Nullable; /** - * The marker interface for a write command closure. + * The marker interface for a safe time aware command closure. */ -public interface WriteCommandClosure extends CommandClosure { +public interface SafeTimeAwareCommandClosure extends CommandClosure { + /** + * Gets the safe timestamp. + * + * @return The timestamp. + */ + @Override + @Nullable HybridTimestamp safeTimestamp(); + + /** + * Sets the safe time. + * + * @param safeTs The timestamp. + */ + default void safeTimestamp(HybridTimestamp safeTs) {} } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java index fc5b6da96da..5d36991010f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java @@ -492,11 +492,9 @@ public CompletableFuture run(Command cmd, long timeoutMillis) { Function requestFactory; if (cmd instanceof WriteCommand) { - byte[] commandBytes = commandsMarshaller.marshall(cmd); - requestFactory = targetPeer -> factory.writeActionRequest() .groupId(groupId) - .command(commandBytes) + .command(commandsMarshaller.marshall(cmd)) // Having prepared deserialized command makes its handling more efficient in the state machine. // This saves us from extra-deserialization on a local machine, which would take precious time to do. .deserializedCommand((WriteCommand) cmd) diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java index 4fbfce4d2e7..dfa8fe2e10b 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java @@ -132,10 +132,7 @@ void resetUnavailablePeers() { * @return {@code this}. */ RetryContext nextAttempt(Peer newTargetPeer) { - // We can avoid recreating the request if the target peer has not changed. - if (!newTargetPeer.equals(targetPeer)) { - request = requestFactory.apply(newTargetPeer); - } + request = requestFactory.apply(newTargetPeer); targetPeer = newTargetPeer; diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index d5a91e5bfdb..726468ee724 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.failure.FailureContext; import org.apache.ignite.internal.failure.FailureManager; import org.apache.ignite.internal.failure.FailureType; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; @@ -907,7 +908,9 @@ public CommandClosure next() { @Nullable CommandClosure done = (CommandClosure) currentDone; ByteBuffer data = iter.getData(); + // done != null means we are on the leader, otherwise a command has been read from the log. WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command(); + HybridTimestamp safeTs = done == null ? command.safeTime() : done.safeTimestamp(); long commandIndex = iter.getIndex(); long commandTerm = iter.getTerm(); @@ -923,6 +926,11 @@ public long term() { return commandTerm; } + @Override + public @Nullable HybridTimestamp safeTimestamp() { + return safeTs; + } + @Override public WriteCommand command() { return command; diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index ba59260b719..8923cb4062e 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -50,7 +50,7 @@ import org.apache.ignite.internal.raft.JraftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.internal.raft.service.WriteCommandClosure; +import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure; import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; @@ -303,18 +303,19 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b } // Patch the command. - if (event.done instanceof WriteCommandClosure) { - WriteCommandClosure clo = (WriteCommandClosure) event.done; + if (event.done instanceof SafeTimeAwareCommandClosure) { + SafeTimeAwareCommandClosure clo = (SafeTimeAwareCommandClosure) event.done; WriteCommand command = clo.command(); HybridTimestamp timestamp = command.initiatorTime(); if (timestamp != null) { - // Tick once per batch. if (safeTs == null) { safeTs = clock.update(timestamp); + } else if (timestamp.compareTo(safeTs) > 0) { + safeTs = clock.update(timestamp); } - clo.patch(safeTs); + clo.safeTimestamp(safeTs); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java index bc2fef1ce9e..d5f55790b76 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java @@ -35,7 +35,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupListener.ShutdownException; -import org.apache.ignite.internal.raft.service.WriteCommandClosure; +import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure; import org.apache.ignite.raft.jraft.Closure; import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.RaftMessagesFactory; @@ -53,6 +53,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.rpc.WriteActionRequest; import org.apache.ignite.raft.jraft.util.BytesUtil; +import org.jetbrains.annotations.Nullable; /** * Process action request. @@ -176,7 +177,9 @@ private Object groupIdSyncMonitor(String groupId) { */ private void applyWrite(Node node, WriteActionRequest request, WriteCommand command, RpcContext rpcCtx) { ByteBuffer wrapper = ByteBuffer.wrap(request.command()); - node.apply(new Task(wrapper, new RaftWriteCommandClosure() { + node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure() { + private HybridTimestamp safeTs; + @Override public void result(Serializable res) { sendResponse(res, rpcCtx); @@ -187,6 +190,11 @@ public WriteCommand command() { return command; } + @Override + public @Nullable HybridTimestamp safeTimestamp() { + return safeTs; + } + @Override public void run(Status status) { assert !status.isOk() : status; @@ -195,9 +203,12 @@ public void run(Status status) { } @Override - public void patch(HybridTimestamp safeTs) { + public void safeTimestamp(HybridTimestamp safeTs) { + assert this.safeTs == null : "Safe time can be set only once"; + // Apply binary patch. node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs); - command.patch(safeTs); + // Avoid modifying WriteCommand object, because it's shared between raft pipeline threads. + this.safeTs = safeTs; } })); } @@ -335,6 +346,9 @@ private void sendRaftError(RpcContext ctx, Status status, Node node) { ctx.sendResponse(response); } - private interface RaftWriteCommandClosure extends Closure, WriteCommandClosure { + /** + * The command closure which is used to propagate replication state to local FSM. + */ + private interface LocalAwareWriteCommandClosure extends Closure, SafeTimeAwareCommandClosure { } } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java index 1b7515156f8..658c7e85aac 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java @@ -31,21 +31,17 @@ public interface SafeTimePropagatingCommand extends WriteCommand { HybridTimestamp initiatorTime(); /** - * The Safe timestamp. This field's value is used by a replication layer and not thread safe for external usage. + * The Safe timestamp. A value for this field is automatically generated by a replication layer. */ @Transient @WithSetter + @Override @Nullable HybridTimestamp safeTime(); /** - * Setter for the safeTime field. A value for this field is auto generated and should not be set manually. + * Setter for the safeTime field for internal usage. Should not be set manually. */ default void safeTime(HybridTimestamp safeTime) { // No-op. } - - @Override - default void patch(HybridTimestamp safeTs) { - safeTime(safeTs); - } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index 9333abdebaa..117a580a8e4 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -25,12 +25,16 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import java.util.Iterator; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -44,6 +48,8 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.StaticNodeFinder; @@ -54,8 +60,10 @@ import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.TestLozaFactory; +import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageFactory; @@ -85,6 +93,8 @@ */ @ExtendWith(ConfigurationExtension.class) public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(ReplicasSafeTimePropagationTest.class); + @InjectConfiguration("mock: { fsync: false }") private RaftConfiguration raftConfiguration; @@ -107,6 +117,11 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { private Map cluster; + private volatile boolean blockFsmThread = false; + + private final CountDownLatch l1 = new CountDownLatch(1); + private final CountDownLatch l2 = new CountDownLatch(1); + @AfterEach public void after() throws Exception { for (PartialNode partialNode : cluster.values()) { @@ -122,14 +137,19 @@ private static void sendSafeTimeSyncCommand( RaftGroupService raftClient, HybridTimestamp initiatorTime ) { - CompletableFuture safeTimeCommandFuture = raftClient.run( + assertThat(sendSafeTimeSyncCommandAsync(raftClient, initiatorTime), willCompleteSuccessfully()); + } + + private static CompletableFuture sendSafeTimeSyncCommandAsync( + RaftGroupService raftClient, + HybridTimestamp initiatorTime + ) { + return raftClient.run( REPLICA_MESSAGES_FACTORY .safeTimeSyncCommand() .initiatorTime(initiatorTime) .build() ); - - assertThat(safeTimeCommandFuture, willCompleteSuccessfully()); } /** @@ -254,6 +274,45 @@ public void testSafeTimeReorderingOnClusterShrink() throws Exception { assertTrue(leaderNode2.safeTs.current().compareTo(firstSafeTs) > 0); } + @Test + public void testSafeTimeReorderingOnRetry() throws Exception { + { + cluster = Stream.of("node1").collect(toMap(identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + // Assumes stable clock on test runner. + HybridClock initiatorClock = new TestHybridClock(() -> System.currentTimeMillis() - 100); + + sendSafeTimeSyncCommand(raftClient, initiatorClock.now()); + + blockFsmThread = true; + + var fut = sendSafeTimeSyncCommandAsync(raftClient, initiatorClock.now()); + var fut2 = sendSafeTimeSyncCommandAsync(raftClient, initiatorClock.now()); + + l1.await(); + + // Block enough to trigger retry. + Thread.sleep(raftConfiguration.responseTimeout().value() + 1000); + + LOG.info("Unblocking FSM thread"); + + blockFsmThread = false; + + l2.countDown(); + + assertThat(fut, willCompleteSuccessfully()); + assertThat(fut2, willCompleteSuccessfully()); + } + private void startCluster(Map cluster) throws Exception { for (PartialNode node : cluster.values()) { node.start(); @@ -324,7 +383,23 @@ void start() throws Exception { mock(IndexMetaStorage.class), clusterService.topologyService().localMember().id(), mock(MinimumRequiredTimeCollectorService.class) - ), + ) { + @Override + public void onWrite(Iterator> iterator) { + if (blockFsmThread) { + LOG.info("Blocked on FSM call: safeTs=" + safeTs.current()); + l1.countDown(); + + try { + assertTrue(l2.await(30_000, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + fail("Unexpected interruption", e); + } + } + + super.onWrite(iterator); + } + }, RaftGroupEventsListener.noopLsnr, RaftGroupOptions.defaults() .maxClockSkew(schemaSynchronizationConfiguration.maxClockSkew().value().intValue()) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 12ba362c399..14e909f7e90 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table.distributed.raft; import static java.util.Objects.requireNonNull; +import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs; import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING; @@ -170,6 +171,8 @@ public void onWrite(Iterator> iterator) { long commandIndex = clo.index(); long commandTerm = clo.term(); + @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp(); + assert safeTimestamp == null || command instanceof SafeTimePropagatingCommand : command; // We choose the minimum applied index, since we choose it (the minimum one) on local recovery so as not to lose the data for // one of the storages. @@ -195,9 +198,9 @@ public void onWrite(Iterator> iterator) { try { if (command instanceof UpdateCommand) { - result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm); + result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm, safeTimestamp); } else if (command instanceof UpdateAllCommand) { - result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm); + result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm, safeTimestamp); } else if (command instanceof FinishTxCommand) { result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm); } else if (command instanceof WriteIntentSwitchCommand) { @@ -219,12 +222,8 @@ public void onWrite(Iterator> iterator) { if (Boolean.TRUE.equals(result.get2())) { // Adjust safe time before completing update to reduce waiting. - if (command instanceof SafeTimePropagatingCommand) { - SafeTimePropagatingCommand safeTimePropagatingCommand = (SafeTimePropagatingCommand) command; - - assert safeTimePropagatingCommand.safeTime() != null; - - updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTimePropagatingCommand.safeTime()); + if (safeTimestamp != null) { + updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTimestamp); } updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex); @@ -249,16 +248,29 @@ public void onWrite(Iterator> iterator) { }); } + private final void updateTrackers(@Nullable HybridTimestamp safeTs, long commandIndex) { + if (safeTs != null) { + updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTs); + } + + updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex); + } + /** * Handler for the {@link UpdateCommand}. * * @param cmd Command. * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. - * + * @param safeTimestamp Safe timestamp. * @return The result. */ - private IgniteBiTuple handleUpdateCommand(UpdateCommand cmd, long commandIndex, long commandTerm) { + private IgniteBiTuple handleUpdateCommand( + UpdateCommand cmd, + long commandIndex, + long commandTerm, + HybridTimestamp safeTimestamp + ) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return new IgniteBiTuple<>(null, false); // Update result is not needed. @@ -270,12 +282,8 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c long storageLeaseStartTime = storage.leaseStartTime(); if (leaseStartTime != storageLeaseStartTime) { - return new IgniteBiTuple<>(new UpdateCommandResult( - false, - storageLeaseStartTime, - isPrimaryInGroupTopology(), - cmd.safeTime().longValue() - ), false); + return new IgniteBiTuple<>( + new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), NULL_HYBRID_TIMESTAMP), false); } } @@ -292,7 +300,7 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c cmd.rowToUpdate(), !cmd.full(), () -> storage.lastApplied(commandIndex, commandTerm), - cmd.full() ? cmd.safeTime() : null, + cmd.full() ? safeTimestamp : null, cmd.lastCommitTimestamp(), indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId()) ); @@ -304,9 +312,9 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c advanceLastAppliedIndexConsistently(commandIndex, commandTerm); } - replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full()); + replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp : null, cmd.full()); - return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), cmd.safeTime().longValue()), true); + return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), safeTimestamp.longValue()), true); } /** @@ -315,8 +323,14 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c * @param cmd Command. * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. + * @param safeTimestamp Safe timestamp. */ - private IgniteBiTuple handleUpdateAllCommand(UpdateAllCommand cmd, long commandIndex, long commandTerm) { + private IgniteBiTuple handleUpdateAllCommand( + UpdateAllCommand cmd, + long commandIndex, + long commandTerm, + HybridTimestamp safeTimestamp + ) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return new IgniteBiTuple<>(null, false); @@ -328,12 +342,8 @@ private IgniteBiTuple handleUpdateAllCommand(UpdateAllCom long storageLeaseStartTime = storage.leaseStartTime(); if (leaseStartTime != storageLeaseStartTime) { - return new IgniteBiTuple<>(new UpdateCommandResult( - false, - storageLeaseStartTime, - isPrimaryInGroupTopology(), - cmd.safeTime().longValue() - ), false); + return new IgniteBiTuple<>( + new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), NULL_HYBRID_TIMESTAMP), false); } } @@ -346,7 +356,7 @@ private IgniteBiTuple handleUpdateAllCommand(UpdateAllCom cmd.tablePartitionId().asTablePartitionId(), !cmd.full(), () -> storage.lastApplied(commandIndex, commandTerm), - cmd.full() ? cmd.safeTime() : null, + cmd.full() ? safeTimestamp : null, indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId()) ); } else { @@ -357,9 +367,9 @@ private IgniteBiTuple handleUpdateAllCommand(UpdateAllCom advanceLastAppliedIndexConsistently(commandIndex, commandTerm); } - replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full()); + replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp : null, cmd.full()); - return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), cmd.safeTime().longValue()), true); + return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), safeTimestamp.longValue()), true); } /** @@ -461,7 +471,7 @@ private IgniteBiTuple handleWriteIntentSwitchCommand( * @param commandIndex RAFT index of the command. * @param commandTerm RAFT term of the command. */ - private IgniteBiTuple handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long commandIndex, long commandTerm) { + private IgniteBiTuple handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long commandIndex, long commandTerm) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return new IgniteBiTuple<>(null, false); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index a40e47be53b..7e4def8307a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -2907,6 +2907,8 @@ private CompletableFuture applyUpdateAllCommand( return safeTime.waitFor(safeTs) .thenApply(ignored -> new CommandApplicationResult(safeTs, null)); } else { + HybridTimestamp safeTs = hybridTimestamp(updateCommandResult.safeTimestamp()); + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll( cmd.txId(), @@ -2914,11 +2916,11 @@ private CompletableFuture applyUpdateAllCommand( cmd.tablePartitionId().asTablePartitionId(), false, null, - cmd.safeTime(), + safeTs, indexIdsAtRwTxBeginTs(txId) ); - return completedFuture(new CommandApplicationResult(((UpdateAllCommand) res.getCommand()).safeTime(), null)); + return completedFuture(new CommandApplicationResult(safeTs, null)); } }); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index d85cd435ae3..3621d014e83 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -310,7 +310,7 @@ public void before() { .leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue()) .build(); - commandListener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(), 1, command)).iterator()); + commandListener.onWrite(List.of(writeCommandCommandClosure(raftIndex.incrementAndGet(), 1, command, null, null)).iterator()); } } @@ -389,26 +389,19 @@ void testSkipWriteCommandByAppliedIndex() { mvPartitionStorage.lastApplied(10L, 1L); UpdateCommand updateCommand = mock(UpdateCommand.class); - lenient().when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - WriteIntentSwitchCommand writeIntentSwitchCommand = mock(WriteIntentSwitchCommand.class); - lenient().when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - SafeTimeSyncCommand safeTimeSyncCommand = mock(SafeTimeSyncCommand.class); - lenient().when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - FinishTxCommand finishTxCommand = mock(FinishTxCommand.class); - lenient().when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now()); PrimaryReplicaChangeCommand primaryReplicaChangeCommand = mock(PrimaryReplicaChangeCommand.class); // Checks for MvPartitionStorage. commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 1, updateCommand, updateCommandClosureResultCaptor), - writeCommandCommandClosure(10, 1, updateCommand, updateCommandClosureResultCaptor), - writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor), - writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor), - writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, commandClosureResultCaptor) + writeCommandCommandClosure(3, 1, updateCommand, updateCommandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(10, 1, updateCommand, updateCommandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, commandClosureResultCaptor, null) ).iterator()); // Two storage runConsistently runs are expected: one for configuration application and another for primaryReplicaChangeCommand @@ -427,8 +420,8 @@ void testSkipWriteCommandByAppliedIndex() { commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class); commandListener.onWrite(List.of( - writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor), - writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor) + writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor, hybridClock.now()) ).iterator()); verify(txStateStorage, never()).compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong(), anyLong()); @@ -437,32 +430,36 @@ void testSkipWriteCommandByAppliedIndex() { assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null})); } - private static CommandClosure writeCommandCommandClosure( + private CommandClosure writeCommandCommandClosure( long index, long term, WriteCommand writeCommand ) { - return writeCommandCommandClosure(index, term, writeCommand, null); + return writeCommandCommandClosure(index, term, writeCommand, null, hybridClock.now()); } /** * Create a command closure. * * @param index Index of the RAFT command. + * @param term Term of RAFT command. * @param writeCommand Write command. * @param resultClosureCaptor Captor for {@link CommandClosure#result(Serializable)} + * @param safeTimestamp The safe timestamp. */ private static CommandClosure writeCommandCommandClosure( long index, long term, WriteCommand writeCommand, - @Nullable ArgumentCaptor resultClosureCaptor + @Nullable ArgumentCaptor resultClosureCaptor, + @Nullable HybridTimestamp safeTimestamp ) { CommandClosure commandClosure = mock(CommandClosure.class); when(commandClosure.index()).thenReturn(index); when(commandClosure.term()).thenReturn(term); when(commandClosure.command()).thenReturn(writeCommand); + when(commandClosure.safeTimestamp()).thenReturn(safeTimestamp); if (resultClosureCaptor != null) { doNothing().when(commandClosure).result(resultClosureCaptor.capture()); @@ -545,7 +542,6 @@ void updatesLastAppliedForUpdateCommands() { .txCoordinatorId(UUID.randomUUID()) .txId(TestTransactionIds.newTransactionId()) .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()) .build(); commandListener.onWrite(List.of( @@ -555,10 +551,6 @@ void updatesLastAppliedForUpdateCommands() { verify(mvPartitionStorage).lastApplied(3, 2); } - private HybridTimestamp staleOrFreshSafeTime(boolean stale) { - return stale ? safeTimeTracker.current().subtractPhysicalTime(1) : hybridClock.now(); - } - @Test void updatesLastAppliedForUpdateAllCommands() { safeTimeTracker.update(hybridClock.now(), null); @@ -572,7 +564,6 @@ void updatesLastAppliedForUpdateAllCommands() { .txCoordinatorId(UUID.randomUUID()) .txId(TestTransactionIds.newTransactionId()) .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()) .build(); commandListener.onWrite(List.of( @@ -589,7 +580,6 @@ void updatesLastAppliedForFinishTxCommands() { FinishTxCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() .txId(TestTransactionIds.newTransactionId()) .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()) .partitionIds(List.of()) .build(); @@ -605,11 +595,10 @@ void updatesLastAppliedForFinishTxCommands() { void locksOnCommandApplication() { SafeTimeSyncCommandBuilder safeTimeSyncCommand = new ReplicaMessagesFactory() .safeTimeSyncCommand() - .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()); + .initiatorTime(hybridClock.now()); commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), commandClosureResultCaptor, hybridClock.now()) ).iterator()); InOrder inOrder = inOrder(partitionDataStorage); @@ -715,9 +704,8 @@ private BuildIndexCommand createBuildIndexCommand(int indexId, List rowUui private void applySafeTimeCommand(Class cls, HybridTimestamp timestamp) { SafeTimePropagatingCommand command = mock(cls); - when(command.safeTime()).thenReturn(timestamp); - CommandClosure closure = writeCommandCommandClosure(3, 1, command, commandClosureResultCaptor); + CommandClosure closure = writeCommandCommandClosure(3, 1, command, commandClosureResultCaptor, timestamp); commandListener.onWrite(asList(closure).iterator()); assertEquals(timestamp, safeTimeTracker.current()); } @@ -944,6 +932,7 @@ private void update(Function keyValueMapper) { txIds.add(txId); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); when(clo.command()).thenReturn( @@ -996,6 +985,7 @@ private void delete() { txIds.add(txId); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); when(clo.command()).thenReturn( @@ -1068,6 +1058,7 @@ private void insert() { UUID txId = TestTransactionIds.newTransactionId(); txIds.add(txId); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); when(clo.command()).thenReturn( @@ -1148,6 +1139,7 @@ private void invokeBatchedCommand(WriteCommand cmd) { return null; }).when(clo).result(any()); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.command()).thenReturn(cmd); })); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 48c7315d092..797c3faa55c 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand; import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; @@ -355,6 +356,8 @@ public NetworkMessage clone() { long commandIndex = raftIndex.incrementAndGet(); + HybridTimestamp safeTs = cmd instanceof SafeTimePropagatingCommand ? CLOCK.now() : null; + CompletableFuture res = new CompletableFuture<>(); // All read commands are handled directly throw partition replica listener. @@ -367,7 +370,13 @@ public long index() { /** {@inheritDoc} */ @Override - public WriteCommand command() { + public HybridTimestamp safeTimestamp() { + return safeTs; + } + + /** {@inheritDoc} */ + @Override + public @Nullable WriteCommand command() { return (WriteCommand) cmd; } @@ -380,15 +389,8 @@ public void result(@Nullable Serializable r) { res.complete(r); } } - - @Override - public void patch(HybridTimestamp safeTs) { - command().patch(safeTs); - } }; - clo.patch(CLOCK.now()); - try { partitionListener.onWrite(List.of(clo).iterator()); } catch (Throwable e) { diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java index 96e2c4cc27f..c3e636e6951 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java @@ -36,6 +36,7 @@ public class UpdateCommandResult implements Serializable { /** {@code true} if primary replica belongs to the raft group topology: peers and learners, (@code false) otherwise. */ private final boolean primaryInPeersAndLearners; + /** The safe timestamp. */ private final long safeTimestamp; /**