From c58a6ebbae50e95c59ad5cdcc7c2f2023c5d38f3 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Tue, 3 Sep 2024 17:33:47 +0800 Subject: [PATCH] [FLINK-35389][State/ForSt] Async list state (#25126) --- .../runtime/state/v2/InternalKeyedState.java | 4 + .../flink/state/forst/ForStDBGetRequest.java | 28 +-- .../state/forst/ForStDBListGetRequest.java | 58 ++++++ .../flink/state/forst/ForStDBPutRequest.java | 28 ++- .../state/forst/ForStDBSingleGetRequest.java | 47 +++++ .../forst/ForStGeneralMultiGetOperation.java | 6 +- .../flink/state/forst/ForStInnerTable.java | 4 +- .../state/forst/ForStKeyedStateBackend.java | 40 +++-- .../flink/state/forst/ForStListIterator.java | 48 +++++ .../flink/state/forst/ForStListState.java | 168 ++++++++++++++++++ .../flink/state/forst/ForStStateExecutor.java | 2 +- .../forst/ForStStateRequestClassifier.java | 20 ++- .../flink/state/forst/ForStValueState.java | 4 +- .../state/forst/ForStWriteBatchOperation.java | 5 + .../state/forst/ListDelimitedSerializer.java | 94 ++++++++++ .../state/forst/ForStDBOperationTestBase.java | 27 ++- .../ForStGeneralMultiGetOperationTest.java | 58 +++++- .../forst/ForStWriteBatchOperationTest.java | 83 +++++++++ 18 files changed, 666 insertions(+), 58 deletions(-) create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBSingleGetRequest.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ListDelimitedSerializer.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java index af1b49df66956..e2bdac9815d61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java @@ -91,4 +91,8 @@ public StateDescriptor getStateDescriptor() { public TypeSerializer getValueSerializer() { return stateDescriptor.getSerializer(); } + + public StateRequestHandler getStateRequestHandler() { + return stateRequestHandler; + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java index 24c699cc01851..0c6c70e61b600 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java @@ -28,16 +28,18 @@ * The Get access request for ForStDB. * * @param The type of key in get access request. + * @param The type of namespace in put access request. * @param The type of value returned by get request. + * @param The type of returned value in state future. */ -public class ForStDBGetRequest { +public abstract class ForStDBGetRequest { - private final ContextKey key; - private final ForStInnerTable table; - private final InternalStateFuture future; + final ContextKey key; + final ForStInnerTable table; + final InternalStateFuture future; - private ForStDBGetRequest( - ContextKey key, ForStInnerTable table, InternalStateFuture future) { + ForStDBGetRequest( + ContextKey key, ForStInnerTable table, InternalStateFuture future) { this.key = key; this.table = table; this.future = future; @@ -51,21 +53,9 @@ public ColumnFamilyHandle getColumnFamilyHandle() { return table.getColumnFamilyHandle(); } - public void completeStateFuture(byte[] bytesValue) throws IOException { - if (bytesValue == null) { - future.complete(null); - return; - } - V value = table.deserializeValue(bytesValue); - future.complete(value); - } + public abstract void completeStateFuture(byte[] bytesValue) throws IOException; public void completeStateFutureExceptionally(String message, Throwable ex) { future.completeExceptionally(message, ex); } - - static ForStDBGetRequest of( - ContextKey key, ForStInnerTable table, InternalStateFuture future) { - return new ForStDBGetRequest<>(key, table, future); - } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java new file mode 100644 index 0000000000000..99f6654d928de --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import java.io.IOException; +import java.util.List; + +/** + * The Get access request for ForStDB. + * + * @param The type of key in get access request. + * @param The type of value returned by get request. + */ +public class ForStDBListGetRequest + extends ForStDBGetRequest, StateIterator> { + + ForStDBListGetRequest( + ContextKey key, + ForStInnerTable> table, + InternalStateFuture> future) { + super(key, table, future); + } + + @Override + public void completeStateFuture(byte[] bytesValue) throws IOException { + if (bytesValue == null) { + future.complete(null); + return; + } + List value = table.deserializeValue(bytesValue); + future.complete( + new ForStListIterator<>( + (ForStListState) table, + StateRequestType.LIST_GET, + ((ForStListState) table).getStateRequestHandler(), + value)); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java index bfe1b15d8ccd0..0359566c03173 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java @@ -30,22 +30,26 @@ * The Put access request for ForStDB. * * @param The type of key in put access request. + * @param The type of namespace in put access request. * @param The type of value in put access request. */ public class ForStDBPutRequest { - private final ContextKey key; - @Nullable private final V value; - private final ForStInnerTable table; - private final InternalStateFuture future; + final ContextKey key; + @Nullable final V value; + final boolean isMerge; + final ForStInnerTable table; + final InternalStateFuture future; - private ForStDBPutRequest( + ForStDBPutRequest( ContextKey key, V value, + boolean isMerge, ForStInnerTable table, InternalStateFuture future) { this.key = key; this.value = value; + this.isMerge = isMerge; this.table = table; this.future = future; } @@ -54,6 +58,10 @@ public boolean valueIsNull() { return value == null; } + public boolean isMerge() { + return isMerge; + } + public ColumnFamilyHandle getColumnFamilyHandle() { return table.getColumnFamilyHandle(); } @@ -84,6 +92,14 @@ static ForStDBPutRequest of( @Nullable V value, ForStInnerTable table, InternalStateFuture future) { - return new ForStDBPutRequest<>(key, value, table, future); + return new ForStDBPutRequest<>(key, value, false, table, future); + } + + static ForStDBPutRequest ofMerge( + ContextKey key, + @Nullable V value, + ForStInnerTable table, + InternalStateFuture future) { + return new ForStDBPutRequest<>(key, value, true, table, future); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBSingleGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBSingleGetRequest.java new file mode 100644 index 0000000000000..4386e1b12e615 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBSingleGetRequest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.core.state.InternalStateFuture; + +import java.io.IOException; + +/** + * The Get access request for ForStDB. + * + * @param The type of key in get access request. + * @param The type of value returned by get request. + */ +public class ForStDBSingleGetRequest extends ForStDBGetRequest { + + ForStDBSingleGetRequest( + ContextKey key, ForStInnerTable table, InternalStateFuture future) { + super(key, table, future); + } + + @Override + public void completeStateFuture(byte[] bytesValue) throws IOException { + if (bytesValue == null) { + future.complete(null); + return; + } + V value = table.deserializeValue(bytesValue); + future.complete(value); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java index ed7abbf7df8c4..1a8f825e28de2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java @@ -38,12 +38,12 @@ public class ForStGeneralMultiGetOperation implements ForStDBOperation { private final RocksDB db; - private final List> batchRequest; + private final List> batchRequest; private final Executor executor; ForStGeneralMultiGetOperation( - RocksDB db, List> batchRequest, Executor executor) { + RocksDB db, List> batchRequest, Executor executor) { this.db = db; this.batchRequest = batchRequest; this.executor = executor; @@ -58,7 +58,7 @@ public CompletableFuture process() { AtomicReference error = new AtomicReference<>(); AtomicInteger counter = new AtomicInteger(batchRequest.size()); for (int i = 0; i < batchRequest.size(); i++) { - ForStDBGetRequest request = batchRequest.get(i); + ForStDBGetRequest request = batchRequest.get(i); executor.execute( () -> { try { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java index e8b763ee81693..141c63cf1e2e8 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java @@ -73,7 +73,7 @@ public interface ForStInnerTable { * @param stateRequest The given stateRequest. * @return The corresponding ForSt GetRequest. */ - ForStDBGetRequest buildDBGetRequest(StateRequest stateRequest); + ForStDBGetRequest buildDBGetRequest(StateRequest stateRequest); /** * Build a {@link ForStDBPutRequest} that belong to {@code ForStInnerTable} with the given @@ -82,5 +82,5 @@ public interface ForStInnerTable { * @param stateRequest The given stateRequest. * @return The corresponding ForSt PutRequest. */ - ForStDBPutRequest buildDBPutRequest(StateRequest stateRequest); + ForStDBPutRequest buildDBPutRequest(StateRequest stateRequest); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index b9ff2e4b83da2..fd021920a36b0 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.FlinkRuntimeException; @@ -149,20 +150,33 @@ public S createState( ColumnFamilyHandle columnFamilyHandle = ForStOperationUtils.createColumnFamilyHandle( stateDesc.getStateId(), db, columnFamilyOptionsFactory); - if (stateDesc.getType() == StateDescriptor.Type.VALUE) { - return (S) - new ForStValueState<>( - stateRequestHandler, - columnFamilyHandle, - (ValueStateDescriptor) stateDesc, - serializedKeyBuilder, - defaultNamespace, - namespaceSerializer::duplicate, - valueSerializerView, - valueDeserializerView); + switch (stateDesc.getType()) { + case VALUE: + return (S) + new ForStValueState<>( + stateRequestHandler, + columnFamilyHandle, + (ValueStateDescriptor) stateDesc, + serializedKeyBuilder, + defaultNamespace, + namespaceSerializer::duplicate, + valueSerializerView, + valueDeserializerView); + case LIST: + return (S) + new ForStListState<>( + stateRequestHandler, + columnFamilyHandle, + (ListStateDescriptor) stateDesc, + serializedKeyBuilder, + defaultNamespace, + namespaceSerializer::duplicate, + valueSerializerView, + valueDeserializerView); + default: + throw new UnsupportedOperationException( + String.format("Unsupported state type: %s", stateDesc.getType())); } - throw new UnsupportedOperationException( - String.format("Unsupported state type: %s", stateDesc.getType())); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java new file mode 100644 index 0000000000000..76377acf47111 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.runtime.asyncprocessing.AbstractStateIterator; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import java.util.Collection; + +/** The forst implementation for list iterator. */ +public class ForStListIterator extends AbstractStateIterator { + + public ForStListIterator( + State originalState, + StateRequestType requestType, + StateRequestHandler stateHandler, + Collection partialResult) { + super(originalState, requestType, stateHandler, partialResult); + } + + @Override + protected boolean hasNext() { + return false; + } + + @Override + protected Object nextPayloadForContinuousLoading() { + return null; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java new file mode 100644 index 0000000000000..152bc0d92ea61 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.asyncprocessing.StateRequest; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.v2.InternalListState; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; +import org.apache.flink.util.Preconditions; + +import org.rocksdb.ColumnFamilyHandle; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +/** + * The {@link InternalListState} implement for ForStDB. + * + *

{@link ForStStateBackend} must ensure that we set the {@link org.rocksdb.StringAppendOperator} + * on the column family that we use for our state since we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the value. + */ +public class ForStListState extends InternalListState + implements ListState, ForStInnerTable> { + + /** The column family which this internal value state belongs to. */ + private final ColumnFamilyHandle columnFamilyHandle; + + /** The serialized key builder which should be thread-safe. */ + private final ThreadLocal> serializedKeyBuilder; + + /** The default namespace if not set. * */ + private final N defaultNamespace; + + private final ThreadLocal> namespaceSerializer; + + /** The data outputStream used for value serializer, which should be thread-safe. */ + private final ThreadLocal valueSerializerView; + + /** The data inputStream used for value deserializer, which should be thread-safe. */ + private final ThreadLocal valueDeserializerView; + + public ForStListState( + StateRequestHandler stateRequestHandler, + ColumnFamilyHandle columnFamily, + ListStateDescriptor listStateDescriptor, + Supplier> serializedKeyBuilderInitializer, + N defaultNamespace, + Supplier> namespaceSerializerInitializer, + Supplier valueSerializerViewInitializer, + Supplier valueDeserializerViewInitializer) { + super(stateRequestHandler, listStateDescriptor); + this.columnFamilyHandle = columnFamily; + this.serializedKeyBuilder = ThreadLocal.withInitial(serializedKeyBuilderInitializer); + this.defaultNamespace = defaultNamespace; + this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer); + this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer); + this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer); + } + + @Override + public ColumnFamilyHandle getColumnFamilyHandle() { + return columnFamilyHandle; + } + + @Override + public byte[] serializeKey(ContextKey contextKey) throws IOException { + return contextKey.getOrCreateSerializedKey( + ctxKey -> { + SerializedCompositeKeyBuilder builder = serializedKeyBuilder.get(); + builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); + N namespace = contextKey.getNamespace(this); + return builder.buildCompositeKeyNamespace( + namespace == null ? defaultNamespace : namespace, + namespaceSerializer.get()); + }); + } + + @Override + public byte[] serializeValue(List valueList) throws IOException { + DataOutputSerializer outputView = valueSerializerView.get(); + outputView.clear(); + return ListDelimitedSerializer.serializeList(valueList, getValueSerializer(), outputView); + } + + @Override + public List deserializeValue(byte[] valueBytes) throws IOException { + DataInputDeserializer inputView = valueDeserializerView.get(); + inputView.setBuffer(valueBytes); + return ListDelimitedSerializer.deserializeList(valueBytes, getValueSerializer(), inputView); + } + + @SuppressWarnings("unchecked") + @Override + public ForStDBGetRequest, StateIterator> buildDBGetRequest( + StateRequest stateRequest) { + Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.LIST_GET); + ContextKey contextKey = + new ContextKey<>((RecordContext) stateRequest.getRecordContext()); + return new ForStDBListGetRequest<>( + contextKey, this, (InternalStateFuture>) stateRequest.getFuture()); + } + + @SuppressWarnings("unchecked") + @Override + public ForStDBPutRequest> buildDBPutRequest(StateRequest stateRequest) { + ContextKey contextKey = + new ContextKey<>((RecordContext) stateRequest.getRecordContext()); + List value; + boolean merge = false; + switch (stateRequest.getRequestType()) { + case CLEAR: + value = null; + // "Delete(key)" is equivalent to "Put(key, null)" + break; + case LIST_UPDATE: + value = (List) stateRequest.getPayload(); + break; + case LIST_ADD: + value = Collections.singletonList((V) stateRequest.getPayload()); + merge = true; + break; + case LIST_ADD_ALL: + value = (List) stateRequest.getPayload(); + merge = true; + break; + default: + throw new IllegalArgumentException(); + } + if (merge) { + return ForStDBPutRequest.ofMerge( + contextKey, value, this, (InternalStateFuture) stateRequest.getFuture()); + } else { + return ForStDBPutRequest.of( + contextKey, value, this, (InternalStateFuture) stateRequest.getFuture()); + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index 7973ab05f3b8d..6f5caa80facf5 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -92,7 +92,7 @@ public CompletableFuture executeBatchRequests( futures.add(writeOperations.process()); } - List> getRequests = + List> getRequests = stateRequestClassifier.pollDbGetRequests(); if (!getRequests.isEmpty()) { ForStGeneralMultiGetOperation getOperations = diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java index 1cfc3ca4c8f54..6d72051c00e30 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java @@ -31,7 +31,7 @@ */ public class ForStStateRequestClassifier implements StateRequestContainer { - private final List> dbGetRequests; + private final List> dbGetRequests; private final List> dbPutRequests; @@ -55,17 +55,21 @@ private void convertStateRequestsToForStDBRequests(StateRequest stateRe StateRequestType stateRequestType = stateRequest.getRequestType(); switch (stateRequestType) { case VALUE_GET: + case LIST_GET: { - ForStValueState forStValueState = - (ForStValueState) stateRequest.getState(); - dbGetRequests.add(forStValueState.buildDBGetRequest(stateRequest)); + ForStInnerTable innerTable = + (ForStInnerTable) stateRequest.getState(); + dbGetRequests.add(innerTable.buildDBGetRequest(stateRequest)); return; } case VALUE_UPDATE: + case LIST_UPDATE: + case LIST_ADD: + case LIST_ADD_ALL: { - ForStValueState forStValueState = - (ForStValueState) stateRequest.getState(); - dbPutRequests.add(forStValueState.buildDBPutRequest(stateRequest)); + ForStInnerTable innerTable = + (ForStInnerTable) stateRequest.getState(); + dbPutRequests.add(innerTable.buildDBPutRequest(stateRequest)); return; } case CLEAR: @@ -88,7 +92,7 @@ private void convertStateRequestsToForStDBRequests(StateRequest stateRe } } - public List> pollDbGetRequests() { + public List> pollDbGetRequests() { return dbGetRequests; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java index f0327a3cdf0c6..cd6968bb12fac 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -117,11 +117,11 @@ public V deserializeValue(byte[] valueBytes) throws IOException { @SuppressWarnings("unchecked") @Override - public ForStDBGetRequest buildDBGetRequest(StateRequest stateRequest) { + public ForStDBGetRequest buildDBGetRequest(StateRequest stateRequest) { Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.VALUE_GET); ContextKey contextKey = new ContextKey<>((RecordContext) stateRequest.getRecordContext()); - return ForStDBGetRequest.of( + return new ForStDBSingleGetRequest<>( contextKey, this, (InternalStateFuture) stateRequest.getFuture()); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java index 188d5c49d20c8..4fa290bff90db 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java @@ -63,6 +63,11 @@ public CompletableFuture process() { writeBatch.delete( request.getColumnFamilyHandle(), request.buildSerializedKey()); + } else if (request.isMerge()) { + writeBatch.merge( + request.getColumnFamilyHandle(), + request.buildSerializedKey(), + request.buildSerializedValue()); } else { writeBatch.put( request.getColumnFamilyHandle(), diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ListDelimitedSerializer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ListDelimitedSerializer.java new file mode 100644 index 0000000000000..ab114258870be --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ListDelimitedSerializer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Encapsulates a logic of serialization and deserialization of a list with a delimiter. Used in the + * savepoint format. + */ +public final class ListDelimitedSerializer { + + private static final byte DELIMITER = ','; + + private ListDelimitedSerializer() {} + + public static List deserializeList( + byte[] valueBytes, + TypeSerializer elementSerializer, + DataInputDeserializer dataInputView) + throws IOException { + if (valueBytes == null) { + return null; + } + + dataInputView.setBuffer(valueBytes); + + List result = new ArrayList<>(); + T next; + while ((next = deserializeNextElement(dataInputView, elementSerializer)) != null) { + result.add(next); + } + return result; + } + + public static byte[] serializeList( + List valueList, + TypeSerializer elementSerializer, + DataOutputSerializer dataOutputView) + throws IOException { + + dataOutputView.clear(); + boolean first = true; + + for (T value : valueList) { + Preconditions.checkNotNull(value, "You cannot add null to a value list."); + + if (first) { + first = false; + } else { + dataOutputView.write(DELIMITER); + } + elementSerializer.serialize(value, dataOutputView); + } + + return dataOutputView.getCopyOfBuffer(); + } + + /** Deserializes a single element from a serialized list. */ + public static T deserializeNextElement( + DataInputDeserializer in, TypeSerializer elementSerializer) throws IOException { + if (in.available() > 0) { + T element = elementSerializer.deserialize(in); + if (in.available() > 0) { + in.readByte(); + } + return element; + } + return null; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index 053873c189246..a4edd01eb2cf5 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.InternalPartitionedState; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; @@ -76,7 +77,10 @@ protected ColumnFamilyHandle createColumnFamilyHandle(String columnFamilyName) throws Exception { byte[] nameBytes = columnFamilyName.getBytes(ConfigConstants.DEFAULT_CHARSET); ColumnFamilyDescriptor columnFamilyDescriptor = - new ColumnFamilyDescriptor(nameBytes, new ColumnFamilyOptions()); + new ColumnFamilyDescriptor( + nameBytes, + ForStOperationUtils.createColumnFamilyOptions( + (e) -> new ColumnFamilyOptions(), columnFamilyName)); return db.createColumnFamily(columnFamilyDescriptor); } @@ -133,6 +137,27 @@ protected ForStValueState buildForStValueState(S valueDeserializerView); } + protected ForStListState buildForStListState(String stateName) + throws Exception { + ColumnFamilyHandle cf = createColumnFamilyHandle(stateName); + ListStateDescriptor valueStateDescriptor = + new ListStateDescriptor<>(stateName, BasicTypeInfo.STRING_TYPE_INFO); + Supplier> serializedKeyBuilder = + () -> new SerializedCompositeKeyBuilder<>(IntSerializer.INSTANCE, 2, 32); + Supplier valueSerializerView = () -> new DataOutputSerializer(32); + Supplier valueDeserializerView = + () -> new DataInputDeserializer(new byte[128]); + return new ForStListState<>( + buildMockStateRequestHandler(), + cf, + valueStateDescriptor, + serializedKeyBuilder, + VoidNamespace.INSTANCE, + () -> VoidNamespaceSerializer.INSTANCE, + valueSerializerView, + valueDeserializerView); + } + static class TestStateFuture implements InternalStateFuture { public CompletableFuture future = new CompletableFuture<>(); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java index 1a16c8d3cdf5e..b5b1e47094cac 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java @@ -18,12 +18,14 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.VoidNamespace; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -39,7 +41,7 @@ public void testValueStateMultiGet() throws Exception { buildForStValueState("test-multiGet-1"); ForStValueState valueState2 = buildForStValueState("test-multiGet-2"); - List> batchGetRequest = new ArrayList<>(); + List> batchGetRequest = new ArrayList<>(); List>> resultCheckList = new ArrayList<>(); int keyNum = 1000; @@ -47,8 +49,8 @@ public void testValueStateMultiGet() throws Exception { TestStateFuture future = new TestStateFuture<>(); ForStValueState table = ((i % 2 == 0) ? valueState1 : valueState2); - ForStDBGetRequest request = - ForStDBGetRequest.of(buildContextKey(i), table, future); + ForStDBSingleGetRequest request = + new ForStDBSingleGetRequest<>(buildContextKey(i), table, future); batchGetRequest.add(request); String value = (i % 10 != 0 ? String.valueOf(i) : null); @@ -72,4 +74,54 @@ public void testValueStateMultiGet() throws Exception { executor.shutdownNow(); } + + @Test + public void testListStateMultiGet() throws Exception { + ForStListState listState1 = + buildForStListState("test-multiGet-1"); + ForStListState listState2 = + buildForStListState("test-multiGet-2"); + List> batchGetRequest = new ArrayList<>(); + List, TestStateFuture>>> resultCheckList = + new ArrayList<>(); + + int keyNum = 1000; + for (int i = 0; i < keyNum; i++) { + TestStateFuture> future = new TestStateFuture<>(); + ForStListState table = + ((i % 2 == 0) ? listState1 : listState2); + ForStDBListGetRequest request = + new ForStDBListGetRequest<>(buildContextKey(i), table, future); + batchGetRequest.add(request); + + List value = new ArrayList<>(); + value.add(String.valueOf(i)); + value.add(String.valueOf(i + 1)); + + resultCheckList.add(Tuple2.of(value, future)); + if (value == null) { + continue; + } + byte[] keyBytes = request.buildSerializedKey(); + byte[] valueBytes = table.serializeValue(value); + db.put(request.getColumnFamilyHandle(), keyBytes, valueBytes); + } + + ExecutorService executor = Executors.newFixedThreadPool(4); + ForStGeneralMultiGetOperation generalMultiGetOperation = + new ForStGeneralMultiGetOperation(db, batchGetRequest, executor); + generalMultiGetOperation.process().get(); + + for (Tuple2, TestStateFuture>> tuple : resultCheckList) { + HashSet expected = new HashSet<>(tuple.f0); + tuple.f1 + .getCompletedResult() + .onNext( + (e) -> { + assertThat(expected.remove(e)).isTrue(); + }); + assertThat(expected.isEmpty()).isTrue(); + } + executor.shutdownNow(); + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java index b609c9ebaa3b6..f814eb6080055 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.VoidNamespace; import org.junit.jupiter.api.Test; @@ -28,6 +29,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -115,4 +117,85 @@ public void testWriteBatchWithNullValue() throws Exception { } } } + + @Test + public void testListStateWriteBatch() throws Exception { + ForStListState listState = + buildForStListState("test-write-batch-1"); + List> batchPutRequest = new ArrayList<>(); + ArrayList, List>> resultCheckList = + new ArrayList<>(); + + // update + int keyNum = 100; + for (int i = 0; i < keyNum; i++) { + List value = new ArrayList<>(); + value.add(String.valueOf(i)); + value.add(String.valueOf(i + 1)); + + ForStDBPutRequest> request = + ForStDBPutRequest.of( + buildContextKey(i), value, listState, new TestStateFuture<>()); + + resultCheckList.add(Tuple2.of(request, value)); + batchPutRequest.add(request); + } + + ExecutorService executor = Executors.newFixedThreadPool(2); + ForStWriteBatchOperation writeBatchOperation = + new ForStWriteBatchOperation(db, batchPutRequest, new WriteOptions(), executor); + writeBatchOperation.process().get(); + + // check data correctness + for (Tuple2, List> tuple : resultCheckList) { + byte[] keyBytes = tuple.f0.buildSerializedKey(); + byte[] valueBytes = db.get(tuple.f0.getColumnFamilyHandle(), keyBytes); + assertThat(listState.deserializeValue(valueBytes)).isEqualTo(tuple.f1); + } + + // add or addall with update + batchPutRequest.clear(); + resultCheckList.clear(); + for (int i = 0; i < keyNum / 2; i++) { + List value = new ArrayList<>(); + value.add(String.valueOf(i)); + value.add(String.valueOf(i + 1)); + + List deltaValue = new ArrayList<>(); + deltaValue.add(String.valueOf(i * 2)); + deltaValue.add(String.valueOf(i * 3)); + + value.addAll(deltaValue); + + ForStDBPutRequest> request = + ForStDBPutRequest.ofMerge( + buildContextKey(i), deltaValue, listState, new TestStateFuture<>()); + + resultCheckList.add(Tuple2.of(request, value)); + batchPutRequest.add(request); + } + for (int i = keyNum / 2; i < keyNum; i++) { + List value = new ArrayList<>(); + value.add(String.valueOf(i)); + value.add(String.valueOf(i + 1)); + + ForStDBPutRequest> request = + ForStDBPutRequest.of( + buildContextKey(i), value, listState, new TestStateFuture<>()); + + resultCheckList.add(Tuple2.of(request, value)); + batchPutRequest.add(request); + } + + writeBatchOperation = + new ForStWriteBatchOperation(db, batchPutRequest, new WriteOptions(), executor); + writeBatchOperation.process().get(); + + // check data correctness + for (Tuple2, List> tuple : resultCheckList) { + byte[] keyBytes = tuple.f0.buildSerializedKey(); + byte[] valueBytes = db.get(tuple.f0.getColumnFamilyHandle(), keyBytes); + assertThat(listState.deserializeValue(valueBytes)).isEqualTo(tuple.f1); + } + } }