Skip to content

Commit

Permalink
[FLINK-36693][state/forst] Implement checkpoint/restore for ForStSync…
Browse files Browse the repository at this point in the history
…KeyedStateBackend (#25643)
  • Loading branch information
fredia authored Nov 19, 2024
1 parent f415348 commit 7341b6e
Show file tree
Hide file tree
Showing 17 changed files with 780 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception {
}
}

@TestTemplate
void testValueStateWorkWithTtl() throws Exception {
TestAsyncFrameworkExceptionHandler testExceptionHandler =
new TestAsyncFrameworkExceptionHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
Expand Down Expand Up @@ -157,7 +156,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> {
* retrieve the column family that is used for a state and also for sanity checks when
* restoring.
*/
private final LinkedHashMap<String, ForStKvStateInfo> kvStateInformation;
private final LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;

/** Lock guarding the {@code managedStateExecutors} and {@code disposed}. */
private final Object lock = new Object();
Expand All @@ -181,7 +180,7 @@ public ForStKeyedStateBackend(
Supplier<DataOutputSerializer> valueSerializerView,
Supplier<DataInputDeserializer> valueDeserializerView,
RocksDB db,
LinkedHashMap<String, ForStKvStateInfo> kvStateInformation,
LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
ColumnFamilyHandle defaultColumnFamilyHandle,
Expand Down Expand Up @@ -328,11 +327,12 @@ public <N, S extends InternalKeyedState, SV> S createStateInternal(
StateDescriptor<SV> stateDesc, TypeSerializer<N> namespaceSerializer)
throws Exception {

ForStKvStateInfo oldStateInfo = kvStateInformation.get(stateDesc.getStateId());
ForStOperationUtils.ForStKvStateInfo oldStateInfo =
kvStateInformation.get(stateDesc.getStateId());

TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();

ForStKvStateInfo newStateInfo;
ForStOperationUtils.ForStKvStateInfo newStateInfo;
RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo;
if (oldStateInfo != null) {
@SuppressWarnings("unchecked")
Expand All @@ -346,7 +346,9 @@ public <N, S extends InternalKeyedState, SV> S createStateInternal(
stateSerializer,
namespaceSerializer);

newStateInfo = new ForStKvStateInfo(oldStateInfo.columnFamilyHandle, newMetaInfo);
newStateInfo =
new ForStOperationUtils.ForStKvStateInfo(
oldStateInfo.columnFamilyHandle, newMetaInfo);
kvStateInformation.put(stateDesc.getStateId(), newStateInfo);
} else {
newMetaInfo =
Expand All @@ -358,12 +360,13 @@ public <N, S extends InternalKeyedState, SV> S createStateInternal(
StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform());

newStateInfo =
ForStOperationUtils.createAsyncStateInfo(
ForStOperationUtils.createStateInfo(
newMetaInfo,
db,
columnFamilyOptionsFactory,
ttlCompactFiltersManager,
optionsContainer.getWriteBufferManagerCapacity());
optionsContainer.getWriteBufferManagerCapacity(),
null);
ForStOperationUtils.registerKvStateInformation(
this.kvStateInformation,
this.nativeMetricMonitor,
Expand Down Expand Up @@ -558,21 +561,4 @@ KeyGroupedInternalPriorityQueue<T> create(
stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
}
}

/** ForSt specific information about the k/v states. */
public static class ForStKvStateInfo implements AutoCloseable {
public final ColumnFamilyHandle columnFamilyHandle;
public final RegisteredStateMetaInfoBase metaInfo;

public ForStKvStateInfo(
ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo) {
this.columnFamilyHandle = columnFamilyHandle;
this.metaInfo = metaInfo;
}

@Override
public void close() throws Exception {
this.columnFamilyHandle.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreResult;
import org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
import org.apache.flink.util.CollectionUtil;
Expand Down Expand Up @@ -171,7 +172,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {

CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();

LinkedHashMap<String, ForStKeyedStateBackend.ForStKvStateInfo> kvStateInformation =
LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation =
new LinkedHashMap<>();
LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
new LinkedHashMap<>();
Expand Down Expand Up @@ -293,7 +294,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
}

private ForStRestoreOperation getForStRestoreOperation(
LinkedHashMap<String, ForStKeyedStateBackend.ForStKvStateInfo> kvStateInformation,
LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
// Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will
// concatenates the dfs directory with the local directory as working dir when using flink
Expand Down Expand Up @@ -363,9 +364,7 @@ private ForStRestoreOperation getForStRestoreOperation(
@Nonnull RocksDB db,
@Nonnull ResourceGuard forstResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull
LinkedHashMap<String, ForStKeyedStateBackend.ForStKvStateInfo>
kvStateInformation,
@Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull UUID backendUID,
Expand All @@ -375,31 +374,36 @@ private ForStRestoreOperation getForStRestoreOperation(
long lastCompletedCheckpointId)
throws IOException {

ForStSnapshotStrategyBase<K, ?> snapshotStrategy;

ForStFlinkFileSystem forStFs = optionsContainer.getFileSystem();
ForStStateDataTransfer stateTransfer =
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs);

if (enableIncrementalCheckpointing) {
snapshotStrategy =
new ForStIncrementalSnapshotStrategy<>(
db,
forstResourceGuard,
optionsContainer,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
backendUID,
uploadedStateHandles,
stateTransfer,
lastCompletedCheckpointId);
return new ForStIncrementalSnapshotStrategy<>(
db,
forstResourceGuard,
optionsContainer,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
backendUID,
uploadedStateHandles,
stateTransfer,
lastCompletedCheckpointId);

} else {
throw new UnsupportedOperationException("Not implemented yet for ForStStateBackend");
return new ForStNativeFullSnapshotStrategy<>(
db,
forstResourceGuard,
optionsContainer,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
backendUID,
stateTransfer);
}
return snapshotStrategy;
}

private HeapPriorityQueueSetFactory createHeapQueueFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo;
import org.apache.flink.state.forst.sync.ForStIteratorWrapper;
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;
Expand Down Expand Up @@ -209,7 +207,7 @@ public static void addColumnFamilyOptionsToCloseLater(
* @param importFilesMetaData if not empty, we import the files specified in the metadata to the
* column family.
*/
public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
public static ForStKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
Expand All @@ -227,8 +225,7 @@ public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(

try {
ColumnFamilyHandle columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db);
return new ForStSyncKeyedStateBackend.ForStDbKvStateInfo(
columnFamilyHandle, metaInfoBase);
return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
} catch (Exception ex) {
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
Expand All @@ -241,7 +238,7 @@ public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
* @param cancelStreamRegistryForRestore {@link ICloseableRegistry#close closing} it interrupts
* KV state creation
*/
public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
public static ForStKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
Expand Down Expand Up @@ -279,7 +276,7 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(

if (ttlCompactFiltersManager != null) {
if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2(
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(
metaInfoBase, options);
} else {
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2(
Expand Down Expand Up @@ -340,19 +337,6 @@ static boolean sanityCheckArenaBlockSize(
}
}

public static void registerKvStateInformation(
Map<String, ForStSyncKeyedStateBackend.ForStDbKvStateInfo> kvStateInformation,
ForStNativeMetricMonitor nativeMetricMonitor,
String columnFamilyName,
ForStSyncKeyedStateBackend.ForStDbKvStateInfo registeredColumn) {

kvStateInformation.put(columnFamilyName, registeredColumn);
if (nativeMetricMonitor != null) {
nativeMetricMonitor.registerColumnFamily(
columnFamilyName, registeredColumn.columnFamilyHandle);
}
}

public static void registerKvStateInformation(
Map<String, ForStKvStateInfo> kvStateInformation,
ForStNativeMetricMonitor nativeMetricMonitor,
Expand All @@ -370,7 +354,6 @@ public static ForStKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {

ColumnFamilyDescriptor columnFamilyDescriptor =
createColumnFamilyDescriptor(metaInfoBase.getName(), columnFamilyOptionsFactory);

Expand All @@ -385,28 +368,6 @@ public static ForStKvStateInfo createStateInfo(
return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
}

public static ForStKvStateInfo createAsyncStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@Nullable ForStDBTtlCompactFiltersManager ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity) {

ColumnFamilyDescriptor columnFamilyDescriptor =
createColumnFamilyDescriptor(
metaInfoBase,
columnFamilyOptionsFactory,
ttlCompactFiltersManager,
writeBufferManagerCapacity);
try {
ColumnFamilyHandle columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db);
return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
} catch (Exception ex) {
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
}
}

private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exception cause)
throws IOException {
// max directory path length on Windows is 247.
Expand All @@ -422,4 +383,21 @@ private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exc
cause);
}
}

/** ForSt specific information about the k/v states. */
public static class ForStKvStateInfo implements AutoCloseable {
public final ColumnFamilyHandle columnFamilyHandle;
public final RegisteredStateMetaInfoBase metaInfo;

public ForStKvStateInfo(
ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo) {
this.columnFamilyHandle = columnFamilyHandle;
this.metaInfo = metaInfo;
}

@Override
public void close() throws Exception {
this.columnFamilyHandle.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ private ForStOptionsFactory configureOptionsFactory(
return optionsFactory;
}

/** Both ForStSyncKeyedStateBackend and ForStKeyedStateBackend support no claim mode. */
public boolean supportsNoClaimRestoreMode() {
return true;
}

// ------------------------------------------------------------------------
// Parameters
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo;
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
Expand All @@ -49,7 +48,7 @@ class ForStHandle implements AutoCloseable {

private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
private final DBOptions dbOptions;
private final Map<String, ForStKvStateInfo> kvStateInformation;
private final Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;
private final String dbPath;
private List<ColumnFamilyHandle> columnFamilyHandles;
private List<ColumnFamilyDescriptor> columnFamilyDescriptors;
Expand All @@ -61,7 +60,7 @@ class ForStHandle implements AutoCloseable {
@Nullable private ForStNativeMetricMonitor nativeMetricMonitor;

protected ForStHandle(
Map<String, ForStKvStateInfo> kvStateInformation,
Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
Path instanceRocksDBPath,
DBOptions dbOptions,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
Expand Down Expand Up @@ -114,10 +113,10 @@ private void loadDb() throws IOException {
: null;
}

ForStKvStateInfo getOrRegisterStateColumnFamilyHandle(
ForStOperationUtils.ForStKvStateInfo getOrRegisterStateColumnFamilyHandle(
ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) {

ForStKvStateInfo registeredStateMetaInfoEntry =
ForStOperationUtils.ForStKvStateInfo registeredStateMetaInfoEntry =
kvStateInformation.get(stateMetaInfoSnapshot.getName());

if (null == registeredStateMetaInfoEntry) {
Expand All @@ -131,7 +130,7 @@ ForStKvStateInfo getOrRegisterStateColumnFamilyHandle(
stateMetaInfo, db, columnFamilyOptionsFactory);
} else {
registeredStateMetaInfoEntry =
new ForStKvStateInfo(columnFamilyHandle, stateMetaInfo);
new ForStOperationUtils.ForStKvStateInfo(columnFamilyHandle, stateMetaInfo);
}

ForStOperationUtils.registerKvStateInformation(
Expand Down
Loading

0 comments on commit 7341b6e

Please sign in to comment.