From 57741be4b37b41bf6e50a1acaddd50c464cf041f Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Thu, 23 Jan 2025 15:00:13 +0300 Subject: [PATCH 1/7] IGNITE-24283 Remove shared command from replication pipeline. --- .../ignite/internal/raft/WriteCommand.java | 7 -- .../internal/raft/service/CommandClosure.java | 15 ++-- ....java => SafeTimeAwareCommandClosure.java} | 19 ++++- .../internal/raft/RaftGroupServiceImpl.java | 4 +- .../ignite/internal/raft/RetryContext.java | 5 +- .../raft/server/impl/JraftServerImpl.java | 17 +++- .../ignite/raft/jraft/core/NodeImpl.java | 8 +- .../rpc/impl/ActionRequestProcessor.java | 25 ++++-- .../command/SafeTimePropagatingCommand.java | 9 +- .../ReplicasSafeTimePropagationTest.java | 83 ++++++++++++++++++- .../distributed/raft/PartitionListener.java | 68 ++++++++------- .../replicator/PartitionReplicaListener.java | 6 +- .../table/impl/DummyInternalTableImpl.java | 17 ++-- .../internal/tx/UpdateCommandResult.java | 1 + 14 files changed, 197 insertions(+), 87 deletions(-) rename modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/{WriteCommandClosure.java => SafeTimeAwareCommandClosure.java} (65%) diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java index 3b05698a858..47af734b306 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java @@ -32,11 +32,4 @@ public interface WriteCommand extends Command { default @Nullable HybridTimestamp initiatorTime() { return null; } - - /** - * This is called before a command is submitted to replication pipeline. - * - * @param safeTs Safe timestamp. - */ - default void patch(HybridTimestamp safeTs) {} } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java index fa052c93036..7074b4b5328 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/CommandClosure.java @@ -45,6 +45,14 @@ default long term() { return 0; } + /** + * Safe timestamp (replication begin time) of a command. {@code Null} if a command doesn't carry safe timestamp. + * Returns the safe time. + */ + default @Nullable HybridTimestamp safeTimestamp() { + return null; + } + /** * Returns command. */ @@ -56,11 +64,4 @@ default long term() { * @param res Execution result. */ void result(@Nullable Serializable res); - - /** - * Patches the command. - * - * @param safeTs Safe timestamp. - */ - default void patch(HybridTimestamp safeTs) {} } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/WriteCommandClosure.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java similarity index 65% rename from modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/WriteCommandClosure.java rename to modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java index 2a788d8642f..0a90a06732e 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/WriteCommandClosure.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java @@ -17,10 +17,25 @@ package org.apache.ignite.internal.raft.service; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.raft.WriteCommand; /** - * The marker interface for a write command closure. + * The marker interface for a safe time aware command closure. */ -public interface WriteCommandClosure extends CommandClosure { +public interface SafeTimeAwareCommandClosure extends CommandClosure { + /** + * Get the safe timestamp. + * + * @return The timestamp. + */ + @Override + HybridTimestamp safeTimestamp(); + + /** + * Set the safe time. + * + * @param safeTs Safe timestamp. + */ + default void safeTime(HybridTimestamp safeTs) {} } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java index fc5b6da96da..5d36991010f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java @@ -492,11 +492,9 @@ public CompletableFuture run(Command cmd, long timeoutMillis) { Function requestFactory; if (cmd instanceof WriteCommand) { - byte[] commandBytes = commandsMarshaller.marshall(cmd); - requestFactory = targetPeer -> factory.writeActionRequest() .groupId(groupId) - .command(commandBytes) + .command(commandsMarshaller.marshall(cmd)) // Having prepared deserialized command makes its handling more efficient in the state machine. // This saves us from extra-deserialization on a local machine, which would take precious time to do. .deserializedCommand((WriteCommand) cmd) diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java index 4fbfce4d2e7..dfa8fe2e10b 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java @@ -132,10 +132,7 @@ void resetUnavailablePeers() { * @return {@code this}. */ RetryContext nextAttempt(Peer newTargetPeer) { - // We can avoid recreating the request if the target peer has not changed. - if (!newTargetPeer.equals(targetPeer)) { - request = requestFactory.apply(newTargetPeer); - } + request = requestFactory.apply(newTargetPeer); targetPeer = newTargetPeer; diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 0938cec18ca..a64f9ad244e 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.failure.FailureContext; import org.apache.ignite.internal.failure.FailureManager; import org.apache.ignite.internal.failure.FailureType; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; @@ -70,6 +71,7 @@ import org.apache.ignite.internal.raft.service.CommittedConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents; +import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; import org.apache.ignite.internal.raft.storage.impl.StorageDestructionIntent; @@ -782,10 +784,12 @@ public boolean hasNext() { @Override public CommandClosure next() { - @Nullable CommandClosure done = (CommandClosure) iter.done(); + @Nullable CommandClosure doneClo = (CommandClosure) iter.done(); ByteBuffer data = iter.getData(); - WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command(); + WriteCommand command = doneClo == null ? marshaller.unmarshall(data) : doneClo.command(); + + HybridTimestamp safeTs = doneClo == null ? null : doneClo.safeTimestamp(); long commandIndex = iter.getIndex(); long commandTerm = iter.getTerm(); @@ -803,6 +807,11 @@ public long term() { return commandTerm; } + @Override + public @Nullable HybridTimestamp safeTimestamp() { + return safeTs; + } + /** {@inheritDoc} */ @Override public WriteCommand command() { @@ -812,8 +821,8 @@ public WriteCommand command() { /** {@inheritDoc} */ @Override public void result(Serializable res) { - if (done != null) { - done.result(res); + if (doneClo != null) { + doneClo.result(res); } iter.next(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index ba59260b719..89872fef723 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -50,7 +50,7 @@ import org.apache.ignite.internal.raft.JraftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.internal.raft.service.WriteCommandClosure; +import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure; import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; @@ -303,8 +303,8 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b } // Patch the command. - if (event.done instanceof WriteCommandClosure) { - WriteCommandClosure clo = (WriteCommandClosure) event.done; + if (event.done instanceof SafeTimeAwareCommandClosure) { + SafeTimeAwareCommandClosure clo = (SafeTimeAwareCommandClosure) event.done; WriteCommand command = clo.command(); HybridTimestamp timestamp = command.initiatorTime(); @@ -314,7 +314,7 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b safeTs = clock.update(timestamp); } - clo.patch(safeTs); + clo.safeTime(safeTs); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java index bc2fef1ce9e..bc3c84eaeff 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java @@ -35,7 +35,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupListener.ShutdownException; -import org.apache.ignite.internal.raft.service.WriteCommandClosure; +import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure; import org.apache.ignite.raft.jraft.Closure; import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.RaftMessagesFactory; @@ -176,7 +176,9 @@ private Object groupIdSyncMonitor(String groupId) { */ private void applyWrite(Node node, WriteActionRequest request, WriteCommand command, RpcContext rpcCtx) { ByteBuffer wrapper = ByteBuffer.wrap(request.command()); - node.apply(new Task(wrapper, new RaftWriteCommandClosure() { + node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure() { + private HybridTimestamp safeTs; + @Override public void result(Serializable res) { sendResponse(res, rpcCtx); @@ -187,6 +189,13 @@ public WriteCommand command() { return command; } + @Override + public HybridTimestamp safeTimestamp() { + assert safeTs != null; + + return safeTs; + } + @Override public void run(Status status) { assert !status.isOk() : status; @@ -195,9 +204,12 @@ public void run(Status status) { } @Override - public void patch(HybridTimestamp safeTs) { + public void safeTime(HybridTimestamp safeTs) { + assert this.safeTs == null : "Closure can be patched only once"; + // Apply binary patch. node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs); - command.patch(safeTs); + // Avoid modifying WriteCommand object, because it's shared between raft pipeline threads. + this.safeTs = safeTs; } })); } @@ -335,6 +347,9 @@ private void sendRaftError(RpcContext ctx, Status status, Node node) { ctx.sendResponse(response); } - private interface RaftWriteCommandClosure extends Closure, WriteCommandClosure { + /** + * The command closure which is used to propagate replication state to local FSM. + */ + private interface LocalAwareWriteCommandClosure extends Closure, SafeTimeAwareCommandClosure { } } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java index 1b7515156f8..5ef012f5f65 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java @@ -31,21 +31,16 @@ public interface SafeTimePropagatingCommand extends WriteCommand { HybridTimestamp initiatorTime(); /** - * The Safe timestamp. This field's value is used by a replication layer and not thread safe for external usage. + * The Safe timestamp. A value for this field is automatically generated by a replication layer. */ @Transient @WithSetter @Nullable HybridTimestamp safeTime(); /** - * Setter for the safeTime field. A value for this field is auto generated and should not be set manually. + * Setter for the safeTime field for internal usage. Should not be set manually. */ default void safeTime(HybridTimestamp safeTime) { // No-op. } - - @Override - default void patch(HybridTimestamp safeTs) { - safeTime(safeTs); - } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index 9333abdebaa..f74315ed657 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -25,12 +25,16 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import java.util.Iterator; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -44,6 +48,8 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.StaticNodeFinder; @@ -54,8 +60,10 @@ import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.TestLozaFactory; +import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageFactory; @@ -85,6 +93,8 @@ */ @ExtendWith(ConfigurationExtension.class) public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(ReplicasSafeTimePropagationTest.class); + @InjectConfiguration("mock: { fsync: false }") private RaftConfiguration raftConfiguration; @@ -107,6 +117,11 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { private Map cluster; + private volatile boolean blockFSMThread = false; + + private CountDownLatch l1 = new CountDownLatch(1); + private CountDownLatch l2 = new CountDownLatch(1); + @AfterEach public void after() throws Exception { for (PartialNode partialNode : cluster.values()) { @@ -122,14 +137,19 @@ private static void sendSafeTimeSyncCommand( RaftGroupService raftClient, HybridTimestamp initiatorTime ) { - CompletableFuture safeTimeCommandFuture = raftClient.run( + sendSafeTimeSyncCommandAsync(raftClient, initiatorTime).join(); + } + + private static CompletableFuture sendSafeTimeSyncCommandAsync( + RaftGroupService raftClient, + HybridTimestamp initiatorTime + ) { + return raftClient.run( REPLICA_MESSAGES_FACTORY .safeTimeSyncCommand() .initiatorTime(initiatorTime) .build() ); - - assertThat(safeTimeCommandFuture, willCompleteSuccessfully()); } /** @@ -254,6 +274,45 @@ public void testSafeTimeReorderingOnClusterShrink() throws Exception { assertTrue(leaderNode2.safeTs.current().compareTo(firstSafeTs) > 0); } + @Test + public void testSafeTimeReorderingOnRetry() throws Exception { + { + cluster = Stream.of("node1").collect(toMap(identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + // Assumes stable clock on test runner. + HybridClock initiatorClock = new TestHybridClock(() -> System.currentTimeMillis() - 100); + + sendSafeTimeSyncCommand(raftClient, initiatorClock.now()); + + blockFSMThread = true; + + var fut = sendSafeTimeSyncCommandAsync(raftClient, initiatorClock.now()); + var fut2 = sendSafeTimeSyncCommandAsync(raftClient, initiatorClock.now()); + + l1.await(); + + // Block enough to trigger retry. + Thread.sleep(raftConfiguration.responseTimeout().value() + 1000); + + LOG.info("Unblocking FSM thread"); + + blockFSMThread = false; + + l2.countDown(); + + fut.join(); + fut2.join(); + } + private void startCluster(Map cluster) throws Exception { for (PartialNode node : cluster.values()) { node.start(); @@ -324,7 +383,23 @@ void start() throws Exception { mock(IndexMetaStorage.class), clusterService.topologyService().localMember().id(), mock(MinimumRequiredTimeCollectorService.class) - ), + ) { + @Override + public void onWrite(Iterator> iterator) { + if (blockFSMThread) { + LOG.info("Blocked on FSM call: safeTs=" + safeTs.current()); + l1.countDown(); + + try { + assertTrue(l2.await(30_000, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + fail("Unexpected interruption", e); + } + } + + super.onWrite(iterator); + } + }, RaftGroupEventsListener.noopLsnr, RaftGroupOptions.defaults() .maxClockSkew(schemaSynchronizationConfiguration.maxClockSkew().value().intValue()) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 12ba362c399..377f5c4ab8b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -170,6 +170,8 @@ public void onWrite(Iterator> iterator) { long commandIndex = clo.index(); long commandTerm = clo.term(); + @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp(); + assert safeTimestamp == null || command instanceof SafeTimePropagatingCommand; // We choose the minimum applied index, since we choose it (the minimum one) on local recovery so as not to lose the data for // one of the storages. @@ -195,9 +197,9 @@ public void onWrite(Iterator> iterator) { try { if (command instanceof UpdateCommand) { - result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm); + result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm, safeTimestamp); } else if (command instanceof UpdateAllCommand) { - result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm); + result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm, safeTimestamp); } else if (command instanceof FinishTxCommand) { result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm); } else if (command instanceof WriteIntentSwitchCommand) { @@ -219,12 +221,8 @@ public void onWrite(Iterator> iterator) { if (Boolean.TRUE.equals(result.get2())) { // Adjust safe time before completing update to reduce waiting. - if (command instanceof SafeTimePropagatingCommand) { - SafeTimePropagatingCommand safeTimePropagatingCommand = (SafeTimePropagatingCommand) command; - - assert safeTimePropagatingCommand.safeTime() != null; - - updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTimePropagatingCommand.safeTime()); + if (safeTimestamp != null) { + updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTimestamp); } updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex); @@ -249,16 +247,30 @@ public void onWrite(Iterator> iterator) { }); } + private final void updateTrackers(@Nullable HybridTimestamp safeTs, long commandIndex) { + if (safeTs != null) { + updateTrackerIgnoringTrackerClosedException(safeTimeTracker, safeTs); + } + + updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex); + } + /** * Handler for the {@link UpdateCommand}. * + * @param clo * @param cmd Command. * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. - * + * @param safeTimestamp Safe timestamp. * @return The result. */ - private IgniteBiTuple handleUpdateCommand(UpdateCommand cmd, long commandIndex, long commandTerm) { + private IgniteBiTuple handleUpdateCommand( + UpdateCommand cmd, + long commandIndex, + long commandTerm, + HybridTimestamp safeTimestamp + ) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return new IgniteBiTuple<>(null, false); // Update result is not needed. @@ -270,12 +282,7 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c long storageLeaseStartTime = storage.leaseStartTime(); if (leaseStartTime != storageLeaseStartTime) { - return new IgniteBiTuple<>(new UpdateCommandResult( - false, - storageLeaseStartTime, - isPrimaryInGroupTopology(), - cmd.safeTime().longValue() - ), false); + return new IgniteBiTuple<>(new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), 0), false); } } @@ -292,7 +299,7 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c cmd.rowToUpdate(), !cmd.full(), () -> storage.lastApplied(commandIndex, commandTerm), - cmd.full() ? cmd.safeTime() : null, + cmd.full() ? safeTimestamp : null, cmd.lastCommitTimestamp(), indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId()) ); @@ -304,9 +311,9 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c advanceLastAppliedIndexConsistently(commandIndex, commandTerm); } - replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full()); + replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp : null, cmd.full()); - return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), cmd.safeTime().longValue()), true); + return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), safeTimestamp.longValue()), true); } /** @@ -315,8 +322,14 @@ private IgniteBiTuple handleUpdateCommand(UpdateCommand c * @param cmd Command. * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. + * @param safeTimestamp Safe timestamp. */ - private IgniteBiTuple handleUpdateAllCommand(UpdateAllCommand cmd, long commandIndex, long commandTerm) { + private IgniteBiTuple handleUpdateAllCommand( + UpdateAllCommand cmd, + long commandIndex, + long commandTerm, + HybridTimestamp safeTimestamp + ) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return new IgniteBiTuple<>(null, false); @@ -328,12 +341,7 @@ private IgniteBiTuple handleUpdateAllCommand(UpdateAllCom long storageLeaseStartTime = storage.leaseStartTime(); if (leaseStartTime != storageLeaseStartTime) { - return new IgniteBiTuple<>(new UpdateCommandResult( - false, - storageLeaseStartTime, - isPrimaryInGroupTopology(), - cmd.safeTime().longValue() - ), false); + return new IgniteBiTuple<>(new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), 0), false); } } @@ -346,7 +354,7 @@ private IgniteBiTuple handleUpdateAllCommand(UpdateAllCom cmd.tablePartitionId().asTablePartitionId(), !cmd.full(), () -> storage.lastApplied(commandIndex, commandTerm), - cmd.full() ? cmd.safeTime() : null, + cmd.full() ? safeTimestamp : null, indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId()) ); } else { @@ -357,9 +365,9 @@ private IgniteBiTuple handleUpdateAllCommand(UpdateAllCom advanceLastAppliedIndexConsistently(commandIndex, commandTerm); } - replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full()); + replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp: null, cmd.full()); - return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), cmd.safeTime().longValue()), true); + return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), safeTimestamp.longValue()), true); } /** @@ -461,7 +469,7 @@ private IgniteBiTuple handleWriteIntentSwitchCommand( * @param commandIndex RAFT index of the command. * @param commandTerm RAFT term of the command. */ - private IgniteBiTuple handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long commandIndex, long commandTerm) { + private IgniteBiTuple handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long commandIndex, long commandTerm) { // Skips the write command because the storage has already executed it. if (commandIndex <= storage.lastAppliedIndex()) { return new IgniteBiTuple<>(null, false); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index aaca0add491..d1fba47f045 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -2906,6 +2906,8 @@ private CompletableFuture applyUpdateAllCommand( return safeTime.waitFor(safeTs) .thenApply(ignored -> new CommandApplicationResult(safeTs, null)); } else { + HybridTimestamp safeTs = ((UpdateAllCommand) res.getCommand()).safeTime(); + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll( cmd.txId(), @@ -2913,11 +2915,11 @@ private CompletableFuture applyUpdateAllCommand( cmd.tablePartitionId().asTablePartitionId(), false, null, - cmd.safeTime(), + safeTs, indexIdsAtRwTxBeginTs(txId) ); - return completedFuture(new CommandApplicationResult(((UpdateAllCommand) res.getCommand()).safeTime(), null)); + return completedFuture(new CommandApplicationResult(safeTs, null)); } }); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index cb498f9629d..355be4f56aa 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -352,6 +352,8 @@ public NetworkMessage clone() { long commandIndex = raftIndex.incrementAndGet(); + HybridTimestamp safeTs = CLOCK.now(); + CompletableFuture res = new CompletableFuture<>(); // All read commands are handled directly throw partition replica listener. @@ -364,7 +366,13 @@ public long index() { /** {@inheritDoc} */ @Override - public WriteCommand command() { + public HybridTimestamp safeTimestamp() { + return safeTs; + } + + /** {@inheritDoc} */ + @Override + public @Nullable WriteCommand command() { return (WriteCommand) cmd; } @@ -377,15 +385,8 @@ public void result(@Nullable Serializable r) { res.complete(r); } } - - @Override - public void patch(HybridTimestamp safeTs) { - command().patch(safeTs); - } }; - clo.patch(CLOCK.now()); - try { partitionListener.onWrite(List.of(clo).iterator()); } catch (Throwable e) { diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java index 96e2c4cc27f..c3e636e6951 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java @@ -36,6 +36,7 @@ public class UpdateCommandResult implements Serializable { /** {@code true} if primary replica belongs to the raft group topology: peers and learners, (@code false) otherwise. */ private final boolean primaryInPeersAndLearners; + /** The safe timestamp. */ private final long safeTimestamp; /** From 49c3002eb977a6389da691819eb9b82757d11ddc Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 27 Jan 2025 15:54:38 +0300 Subject: [PATCH 2/7] IGNITE-24283 Fix tests. --- .../service/SafeTimeAwareCommandClosure.java | 3 +- .../rpc/impl/ActionRequestProcessor.java | 8 ++- .../raft/PartitionCommandListenerTest.java | 52 ++++++++----------- 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java index 0a90a06732e..29c38951169 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java @@ -19,6 +19,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.raft.WriteCommand; +import org.jetbrains.annotations.Nullable; /** * The marker interface for a safe time aware command closure. @@ -30,7 +31,7 @@ public interface SafeTimeAwareCommandClosure extends CommandClosure hybridClock.now()); - WriteIntentSwitchCommand writeIntentSwitchCommand = mock(WriteIntentSwitchCommand.class); - lenient().when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - SafeTimeSyncCommand safeTimeSyncCommand = mock(SafeTimeSyncCommand.class); - lenient().when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> hybridClock.now()); - FinishTxCommand finishTxCommand = mock(FinishTxCommand.class); - lenient().when(finishTxCommand.safeTime()).thenAnswer(v -> hybridClock.now()); PrimaryReplicaChangeCommand primaryReplicaChangeCommand = mock(PrimaryReplicaChangeCommand.class); // Checks for MvPartitionStorage. commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 1, updateCommand, updateCommandClosureResultCaptor), - writeCommandCommandClosure(10, 1, updateCommand, updateCommandClosureResultCaptor), - writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor), - writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor), - writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, commandClosureResultCaptor) + writeCommandCommandClosure(3, 1, updateCommand, updateCommandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(10, 1, updateCommand, updateCommandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, commandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(5, 1, safeTimeSyncCommand, commandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand, commandClosureResultCaptor, null) ).iterator()); // Two storage runConsistently runs are expected: one for configuration application and another for primaryReplicaChangeCommand @@ -426,8 +419,8 @@ void testSkipWriteCommandByAppliedIndex() { commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class); commandListener.onWrite(List.of( - writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor), - writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor) + writeCommandCommandClosure(2, 1, finishTxCommand, commandClosureResultCaptor, hybridClock.now()), + writeCommandCommandClosure(10, 1, finishTxCommand, commandClosureResultCaptor, hybridClock.now()) ).iterator()); verify(txStateStorage, never()).compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong(), anyLong()); @@ -436,32 +429,36 @@ void testSkipWriteCommandByAppliedIndex() { assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null})); } - private static CommandClosure writeCommandCommandClosure( + private CommandClosure writeCommandCommandClosure( long index, long term, WriteCommand writeCommand ) { - return writeCommandCommandClosure(index, term, writeCommand, null); + return writeCommandCommandClosure(index, term, writeCommand, null, hybridClock.now()); } /** * Create a command closure. * * @param index Index of the RAFT command. + * @param term Term of RAFT command. * @param writeCommand Write command. * @param resultClosureCaptor Captor for {@link CommandClosure#result(Serializable)} + * @param safeTimestamp The safe timestamp. */ private static CommandClosure writeCommandCommandClosure( long index, long term, WriteCommand writeCommand, - @Nullable ArgumentCaptor resultClosureCaptor + @Nullable ArgumentCaptor resultClosureCaptor, + @Nullable HybridTimestamp safeTimestamp ) { CommandClosure commandClosure = mock(CommandClosure.class); when(commandClosure.index()).thenReturn(index); when(commandClosure.term()).thenReturn(term); when(commandClosure.command()).thenReturn(writeCommand); + when(commandClosure.safeTimestamp()).thenReturn(safeTimestamp); if (resultClosureCaptor != null) { doNothing().when(commandClosure).result(resultClosureCaptor.capture()); @@ -544,7 +541,6 @@ void updatesLastAppliedForUpdateCommands() { .txCoordinatorId(UUID.randomUUID()) .txId(TestTransactionIds.newTransactionId()) .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()) .build(); commandListener.onWrite(List.of( @@ -554,10 +550,6 @@ void updatesLastAppliedForUpdateCommands() { verify(mvPartitionStorage).lastApplied(3, 2); } - private HybridTimestamp staleOrFreshSafeTime(boolean stale) { - return stale ? safeTimeTracker.current().subtractPhysicalTime(1) : hybridClock.now(); - } - @Test void updatesLastAppliedForUpdateAllCommands() { safeTimeTracker.update(hybridClock.now(), null); @@ -571,7 +563,6 @@ void updatesLastAppliedForUpdateAllCommands() { .txCoordinatorId(UUID.randomUUID()) .txId(TestTransactionIds.newTransactionId()) .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()) .build(); commandListener.onWrite(List.of( @@ -588,7 +579,6 @@ void updatesLastAppliedForFinishTxCommands() { FinishTxCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() .txId(TestTransactionIds.newTransactionId()) .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()) .partitionIds(List.of()) .build(); @@ -604,11 +594,10 @@ void updatesLastAppliedForFinishTxCommands() { void locksOnCommandApplication() { SafeTimeSyncCommandBuilder safeTimeSyncCommand = new ReplicaMessagesFactory() .safeTimeSyncCommand() - .initiatorTime(hybridClock.now()) - .safeTime(hybridClock.now()); + .initiatorTime(hybridClock.now()); commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), commandClosureResultCaptor, hybridClock.now()) ).iterator()); InOrder inOrder = inOrder(partitionDataStorage); @@ -714,9 +703,8 @@ private BuildIndexCommand createBuildIndexCommand(int indexId, List rowUui private void applySafeTimeCommand(Class cls, HybridTimestamp timestamp) { SafeTimePropagatingCommand command = mock(cls); - when(command.safeTime()).thenReturn(timestamp); - CommandClosure closure = writeCommandCommandClosure(3, 1, command, commandClosureResultCaptor); + CommandClosure closure = writeCommandCommandClosure(3, 1, command, commandClosureResultCaptor, timestamp); commandListener.onWrite(asList(closure).iterator()); assertEquals(timestamp, safeTimeTracker.current()); } @@ -943,6 +931,7 @@ private void update(Function keyValueMapper) { txIds.add(txId); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); when(clo.command()).thenReturn( @@ -995,6 +984,7 @@ private void delete() { txIds.add(txId); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); when(clo.command()).thenReturn( @@ -1067,6 +1057,7 @@ private void insert() { UUID txId = TestTransactionIds.newTransactionId(); txIds.add(txId); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); when(clo.command()).thenReturn( @@ -1147,6 +1138,7 @@ private void invokeBatchedCommand(WriteCommand cmd) { return null; }).when(clo).result(any()); + when(clo.safeTimestamp()).thenReturn(hybridClock.now()); when(clo.command()).thenReturn(cmd); })); } From eeb569f66f5a944b467b638150ace2902ec20d68 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 27 Jan 2025 16:34:16 +0300 Subject: [PATCH 3/7] IGNITE-24283 Fix tests 2. --- .../metastorage/command/MetaStorageWriteCommand.java | 1 + .../org/apache/ignite/internal/raft/WriteCommand.java | 11 +++++++++++ .../internal/raft/server/impl/JraftServerImpl.java | 2 +- .../command/SafeTimePropagatingCommand.java | 1 + .../internal/table/impl/DummyInternalTableImpl.java | 3 ++- 5 files changed, 16 insertions(+), 2 deletions(-) diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java index f715bf2880b..81e2faa37e2 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java @@ -38,6 +38,7 @@ public interface MetaStorageWriteCommand extends WriteCommand { * command is saved into the Raft log (see {@link BeforeApplyHandler#onBeforeApply(Command)}. */ @WithSetter + @Override @Nullable HybridTimestamp safeTime(); /** diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java index 47af734b306..a5bacde3792 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.raft; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.annotations.Transient; +import org.apache.ignite.internal.network.annotations.WithSetter; import org.jetbrains.annotations.Nullable; /** @@ -32,4 +34,13 @@ public interface WriteCommand extends Command { default @Nullable HybridTimestamp initiatorTime() { return null; } + + /** + * Get safe timestamp. + * + * @return The timestamp. + */ + default @Nullable HybridTimestamp safeTime() { + return null; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index a64f9ad244e..29edd34d81d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -789,7 +789,7 @@ public CommandClosure next() { WriteCommand command = doneClo == null ? marshaller.unmarshall(data) : doneClo.command(); - HybridTimestamp safeTs = doneClo == null ? null : doneClo.safeTimestamp(); + HybridTimestamp safeTs = doneClo == null ? command.safeTime() : doneClo.safeTimestamp(); long commandIndex = iter.getIndex(); long commandTerm = iter.getTerm(); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java index 5ef012f5f65..658c7e85aac 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java @@ -35,6 +35,7 @@ public interface SafeTimePropagatingCommand extends WriteCommand { */ @Transient @WithSetter + @Override @Nullable HybridTimestamp safeTime(); /** diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 355be4f56aa..7fa89d72221 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -78,6 +78,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand; import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; @@ -352,7 +353,7 @@ public NetworkMessage clone() { long commandIndex = raftIndex.incrementAndGet(); - HybridTimestamp safeTs = CLOCK.now(); + HybridTimestamp safeTs = cmd instanceof SafeTimePropagatingCommand ? CLOCK.now() : null; CompletableFuture res = new CompletableFuture<>(); From 273a88ffef5c6b729cc097dba083d0c3c97764b2 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 27 Jan 2025 18:07:52 +0300 Subject: [PATCH 4/7] IGNITE-24283 Fix styles. --- .../org/apache/ignite/internal/raft/WriteCommand.java | 2 -- .../ignite/internal/raft/server/impl/JraftServerImpl.java | 1 - .../distributed/ReplicasSafeTimePropagationTest.java | 8 ++++---- .../table/distributed/raft/PartitionListener.java | 3 +-- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java index a5bacde3792..5664be71d9f 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.raft; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.network.annotations.Transient; -import org.apache.ignite.internal.network.annotations.WithSetter; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 29edd34d81d..da4f91110fb 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -71,7 +71,6 @@ import org.apache.ignite.internal.raft.service.CommittedConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents; -import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; import org.apache.ignite.internal.raft.storage.impl.StorageDestructionIntent; diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index f74315ed657..28ba280bb83 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -117,7 +117,7 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { private Map cluster; - private volatile boolean blockFSMThread = false; + private volatile boolean blockFsmThread = false; private CountDownLatch l1 = new CountDownLatch(1); private CountDownLatch l2 = new CountDownLatch(1); @@ -293,7 +293,7 @@ public void testSafeTimeReorderingOnRetry() throws Exception { sendSafeTimeSyncCommand(raftClient, initiatorClock.now()); - blockFSMThread = true; + blockFsmThread = true; var fut = sendSafeTimeSyncCommandAsync(raftClient, initiatorClock.now()); var fut2 = sendSafeTimeSyncCommandAsync(raftClient, initiatorClock.now()); @@ -305,7 +305,7 @@ public void testSafeTimeReorderingOnRetry() throws Exception { LOG.info("Unblocking FSM thread"); - blockFSMThread = false; + blockFsmThread = false; l2.countDown(); @@ -386,7 +386,7 @@ void start() throws Exception { ) { @Override public void onWrite(Iterator> iterator) { - if (blockFSMThread) { + if (blockFsmThread) { LOG.info("Blocked on FSM call: safeTs=" + safeTs.current()); l1.countDown(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 377f5c4ab8b..867210bc455 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -258,7 +258,6 @@ private final void updateTrackers(@Nullable HybridTimestamp safeTs, long command /** * Handler for the {@link UpdateCommand}. * - * @param clo * @param cmd Command. * @param commandIndex Index of the RAFT command. * @param commandTerm Term of the RAFT command. @@ -365,7 +364,7 @@ private IgniteBiTuple handleUpdateAllCommand( advanceLastAppliedIndexConsistently(commandIndex, commandTerm); } - replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp: null, cmd.full()); + replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp : null, cmd.full()); return new IgniteBiTuple<>(new UpdateCommandResult(true, isPrimaryInGroupTopology(), safeTimestamp.longValue()), true); } From 9b9410506b491bd2d4b488b7cac31e9b1bacc4b9 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Tue, 28 Jan 2025 10:01:55 +0300 Subject: [PATCH 5/7] IGNITE-24283 Add TODO. --- .../java/org/apache/ignite/internal/raft/WriteCommand.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java index 5664be71d9f..1a9b7e52396 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java @@ -25,7 +25,7 @@ */ public interface WriteCommand extends Command { /** - * Holds request's initiator timestamp. + * Holds request's initiator timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand. * * @return The timestamp. */ @@ -34,7 +34,7 @@ public interface WriteCommand extends Command { } /** - * Get safe timestamp. + * Get safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand. * * @return The timestamp. */ From c49312d33bf6c824ac62d6cdf32cb5c1b9693c2a Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Tue, 28 Jan 2025 10:40:44 +0300 Subject: [PATCH 6/7] IGNITE-24283 Post merge. --- .../raft/server/impl/JraftServerImpl.java | 61 ++----------------- 1 file changed, 6 insertions(+), 55 deletions(-) diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 1648c10e807..5d8d06bae76 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -773,61 +773,6 @@ public RaftGroupListener getListener() { @Override public void onApply(Iterator iter) { - listener.onWrite(new java.util.Iterator<>() { - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public CommandClosure next() { - @Nullable CommandClosure doneClo = (CommandClosure) iter.done(); - ByteBuffer data = iter.getData(); - - WriteCommand command = doneClo == null ? marshaller.unmarshall(data) : doneClo.command(); - - HybridTimestamp safeTs = doneClo == null ? command.safeTime() : doneClo.safeTimestamp(); - - long commandIndex = iter.getIndex(); - long commandTerm = iter.getTerm(); - - return new CommandClosure<>() { - /** {@inheritDoc} */ - @Override - public long index() { - return commandIndex; - } - - /** {@inheritDoc} */ - @Override - public long term() { - return commandTerm; - } - - @Override - public @Nullable HybridTimestamp safeTimestamp() { - return safeTs; - } - - /** {@inheritDoc} */ - @Override - public WriteCommand command() { - return command; - } - - /** {@inheritDoc} */ - @Override - public void result(Serializable res) { - if (doneClo != null) { - doneClo.result(res); - } - - iter.next(); - } - }; - } - }); - var writeCommandIterator = new WriteCommandIterator(iter, marshaller); try { @@ -967,6 +912,7 @@ public CommandClosure next() { ByteBuffer data = iter.getData(); WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command(); + HybridTimestamp safeTs = done == null ? command.safeTime() : done.safeTimestamp(); long commandIndex = iter.getIndex(); long commandTerm = iter.getTerm(); @@ -982,6 +928,11 @@ public long term() { return commandTerm; } + @Override + public @Nullable HybridTimestamp safeTimestamp() { + return safeTs; + } + @Override public WriteCommand command() { return command; From 2ad6ee03ecb5df6b64dd633aad572338b5361afb Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Thu, 30 Jan 2025 13:40:30 +0300 Subject: [PATCH 7/7] IGNITE-24283 Fix review comments. --- .../org/apache/ignite/internal/raft/WriteCommand.java | 2 +- .../raft/service/SafeTimeAwareCommandClosure.java | 8 ++++---- .../internal/raft/server/impl/JraftServerImpl.java | 1 + .../org/apache/ignite/raft/jraft/core/NodeImpl.java | 5 +++-- .../raft/jraft/rpc/impl/ActionRequestProcessor.java | 5 +++-- .../distributed/ReplicasSafeTimePropagationTest.java | 10 +++++----- .../table/distributed/raft/PartitionListener.java | 9 ++++++--- .../replicator/PartitionReplicaListener.java | 2 +- 8 files changed, 24 insertions(+), 18 deletions(-) diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java index 1a9b7e52396..3d36712f4fa 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/WriteCommand.java @@ -34,7 +34,7 @@ public interface WriteCommand extends Command { } /** - * Get safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand. + * Gets safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand. * * @return The timestamp. */ diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java index 29c38951169..563e27dc86d 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/SafeTimeAwareCommandClosure.java @@ -26,7 +26,7 @@ */ public interface SafeTimeAwareCommandClosure extends CommandClosure { /** - * Get the safe timestamp. + * Gets the safe timestamp. * * @return The timestamp. */ @@ -34,9 +34,9 @@ public interface SafeTimeAwareCommandClosure extends CommandClosure next() { @Nullable CommandClosure done = (CommandClosure) currentDone; ByteBuffer data = iter.getData(); + // done != null means we are on the leader, otherwise a command has been read from the log. WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command(); HybridTimestamp safeTs = done == null ? command.safeTime() : done.safeTimestamp(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 89872fef723..8923cb4062e 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -309,12 +309,13 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b HybridTimestamp timestamp = command.initiatorTime(); if (timestamp != null) { - // Tick once per batch. if (safeTs == null) { safeTs = clock.update(timestamp); + } else if (timestamp.compareTo(safeTs) > 0) { + safeTs = clock.update(timestamp); } - clo.safeTime(safeTs); + clo.safeTimestamp(safeTs); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java index 596dcf6c11e..d5f55790b76 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java @@ -52,7 +52,8 @@ import org.apache.ignite.raft.jraft.rpc.RpcProcessor; import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.rpc.WriteActionRequest; -import org.apache.ignite.raft.jraft.util.BytesUtil;import org.jetbrains.annotations.Nullable; +import org.apache.ignite.raft.jraft.util.BytesUtil; +import org.jetbrains.annotations.Nullable; /** * Process action request. @@ -202,7 +203,7 @@ public void run(Status status) { } @Override - public void safeTime(HybridTimestamp safeTs) { + public void safeTimestamp(HybridTimestamp safeTs) { assert this.safeTs == null : "Safe time can be set only once"; // Apply binary patch. node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index 28ba280bb83..117a580a8e4 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -119,8 +119,8 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { private volatile boolean blockFsmThread = false; - private CountDownLatch l1 = new CountDownLatch(1); - private CountDownLatch l2 = new CountDownLatch(1); + private final CountDownLatch l1 = new CountDownLatch(1); + private final CountDownLatch l2 = new CountDownLatch(1); @AfterEach public void after() throws Exception { @@ -137,7 +137,7 @@ private static void sendSafeTimeSyncCommand( RaftGroupService raftClient, HybridTimestamp initiatorTime ) { - sendSafeTimeSyncCommandAsync(raftClient, initiatorTime).join(); + assertThat(sendSafeTimeSyncCommandAsync(raftClient, initiatorTime), willCompleteSuccessfully()); } private static CompletableFuture sendSafeTimeSyncCommandAsync( @@ -309,8 +309,8 @@ public void testSafeTimeReorderingOnRetry() throws Exception { l2.countDown(); - fut.join(); - fut2.join(); + assertThat(fut, willCompleteSuccessfully()); + assertThat(fut2, willCompleteSuccessfully()); } private void startCluster(Map cluster) throws Exception { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 867210bc455..14e909f7e90 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table.distributed.raft; import static java.util.Objects.requireNonNull; +import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs; import static org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING; @@ -171,7 +172,7 @@ public void onWrite(Iterator> iterator) { long commandIndex = clo.index(); long commandTerm = clo.term(); @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp(); - assert safeTimestamp == null || command instanceof SafeTimePropagatingCommand; + assert safeTimestamp == null || command instanceof SafeTimePropagatingCommand : command; // We choose the minimum applied index, since we choose it (the minimum one) on local recovery so as not to lose the data for // one of the storages. @@ -281,7 +282,8 @@ private IgniteBiTuple handleUpdateCommand( long storageLeaseStartTime = storage.leaseStartTime(); if (leaseStartTime != storageLeaseStartTime) { - return new IgniteBiTuple<>(new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), 0), false); + return new IgniteBiTuple<>( + new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), NULL_HYBRID_TIMESTAMP), false); } } @@ -340,7 +342,8 @@ private IgniteBiTuple handleUpdateAllCommand( long storageLeaseStartTime = storage.leaseStartTime(); if (leaseStartTime != storageLeaseStartTime) { - return new IgniteBiTuple<>(new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), 0), false); + return new IgniteBiTuple<>( + new UpdateCommandResult(false, storageLeaseStartTime, isPrimaryInGroupTopology(), NULL_HYBRID_TIMESTAMP), false); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index d1fba47f045..1d097183b96 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -2906,7 +2906,7 @@ private CompletableFuture applyUpdateAllCommand( return safeTime.waitFor(safeTs) .thenApply(ignored -> new CommandApplicationResult(safeTs, null)); } else { - HybridTimestamp safeTs = ((UpdateAllCommand) res.getCommand()).safeTime(); + HybridTimestamp safeTs = hybridTimestamp(updateCommandResult.safeTimestamp()); // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll(