Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8780][RFC-83] Incremental Table Service #12514

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added rfc/rfc-83/cleanIncrementalpartitions.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
233 changes: 233 additions & 0 deletions rfc/rfc-83/rfc-83.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# RFC-83: Incremental Table Service

## Proposers

- @zhangyue19921010

## Approvers
- @danny0405
- @yuzhaojing

## Status

JIRA: https://issues.apache.org/jira/browse/HUDI-8780

## Abstract

In Hudi, when scheduling Compaction and Clustering, the default behavior is to scan all partitions under the current table.
When there are many historical partitions, such as 640,000 in our production environment, this scanning and planning operation becomes very inefficient.
For Flink, it often leads to checkpoint timeouts, resulting in data delays.
As for cleaning, we already have the ability to do cleaning for incremental partitions.

This RFC will draw on the design of Incremental Clean to generalize the capability of processing incremental partitions to all table services, such as Clustering and Compaction.

## Background

`earliestInstantToRetain` in clean plan meta

HoodieCleanerPlan.avsc

```text
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieCleanerPlan",
"fields": [
{
"name": "earliestInstantToRetain",
"type":["null", {
"type": "record",
"name": "HoodieActionInstant",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "action",
"type": "string"
},
{
"name": "state",
"type": "string"
}
]
}],
"default" : null
},
xxxx
]
}
```

`EarliestCommitToRetan` in clean commit meta

HoodieCleanMetadata.avsc

```text
{"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieCleanMetadata",
"fields": [
xxxx,
{"name": "earliestCommitToRetain", "type": "string"},
xxxx
]
}
```
How to get incremental partitions during cleaning

![cleanIncrementalpartitions.png](cleanIncrementalpartitions.png)

**Note**
`EarliestCommitToRetain` is recorded in `HoodieCleanMetadata`
newInstantToRetain is computed based on Clean configs such as `hoodie.clean.commits.retained` and will be record in clean meta as new EarliestCommitToRetain

## Design And Implementation

### Changes in TableService Metadata Schema

Add new column `earliestInstantToRetain` (default null) in Clustering/Compaction plan same as `earliestInstantToRetain` in clean plan

```text
{
"name": "earliestInstantToRetain",
"type":["null", {
"type": "record",
"name": "HoodieActionInstant",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "action",
"type": "string"
},
{
"name": "state",
"type": "string"
}
]
}],
"default" : null
},
```

We also need a unified interface/abstract-class to control the Plan behavior of the TableService including clustering and compaction.

### Abstraction

Use `PartitionBaseTableServicePlanStrategy` to control the behavior of getting partitions, filter partitions and generate table service plan etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we name it IncrementalPartitionAwareStrategy to emphasize it is "incremental".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed


Since we want to control the logic of partition acquisition, partition filtering, and plan generation through different strategies,
in the first step, we need to use an abstraction to converge the logic of partition acquisition, partition filtering, and plan generation into the base strategy.

```java
package org.apache.hudi.table;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;

import java.io.IOException;
import java.util.List;

public abstract class PartitionBaseTableServicePlanStrategy<R,S> {

/**
* Generate table service plan based on given instant.
* @return
*/
public abstract R generateTableServicePlan(Option<String> instant) throws IOException;

/**
* Generate table service plan based on given instant.
* @return
*/
public abstract R generateTableServicePlan(List<S> operations) throws IOException;


/**
* Get partition paths to be performed for current table service.
* @param metaClient
* @return
*/
public abstract List<String> getPartitionPaths(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient, HoodieEngineContext engineContext);

/**
* Filter partition path for given fully paths.
* @param metaClient
* @return
*/
public abstract List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths);

/**
* Get incremental partitions from EarliestCommitToRetain to instantToRetain
* @param instantToRetain
* @param type
* @param deleteEmptyCommit
* @return
* @throws IOException
*/
public List<String> getIncrementalPartitionPaths(Option<HoodieInstant> instantToRetain) {
throw new UnsupportedOperationException("Not support yet");
}

/**
* Returns the earliest commit to retain from instant meta
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
throw new UnsupportedOperationException("Not support yet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IncrementalPartitionAwareStrategy should be an user interface IMO, the only API we expose to user is the incremental partitions since last table service. So the logic of following should be removed:

  1. generate plan (should be responsibility of the planner)
  2. getEarliestCommitToRetain (should be responsibility of the planner within the plan executor)

And because the implementaion of compaction and clustering are quite different, maybe we just add two new interfaces: IncrementalPartitionAwareCompactionStrategy and IncrementalPartitionAwareClusteringStrategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. generate plan and getEarliestCommitToRetain is removed.
  2. As for base abstraction, although the implementation of compaction and clustering are quite different, but for Partition Aware's Compaction and clustering, they both have the same partition processing logic, that is, first obtain the partition and then filter the partition, so maybe we can use one interface for both to control partition related operations. What do u think :)

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, Danny, what's your opinion for the logic of incremental partition acquisition?

Option1 : Record a metadata field in the commit to indicate where the last processing was done. The partition acquisition behavior under Option1 is more flexible.

Option2: Directly obtain the last completed table service commit time as the new starting point. Option2 is simpler and does not require modifying and processing commit metadata fields.

}
}

```

Default action of `generateTableServicePlan`, `getPartitionPaths` and `filterPartitionPaths` API remains the same as it is now.

Let baseAbstraction `CompactionStrategy` and `ClusteringPlanStrategy` extends this `PartitionBaseTableServicePlanStrategy` which are
1. `public abstract class CompactionStrategy extends PartitionBaseTableServicePlanStrategy<HoodieCompactionPlan, HoodieCompactionOperation> implements Serializable`
2. `public abstract class ClusteringPlanStrategy<T,I,K,O> extends PartitionBaseTableServicePlanStrategy<Option<HoodieClusteringPlan>, HoodieClusteringGroup> implements Serializable`

**For Incremental Table Service including clustering and compaction, we will support a new IncrementalCompactionStrategy and
new IncrementalClusteringPlanStrategy**


### Work Flow for Incremental Clustering/Compaction Strategy

Table Service Planner with Incremental Clustering/Compaction Strategy
1. Retrieve the instant recorded in the last table service `xxxx.requested` as **INSTANT 1**.
2. Calculate the current instant(Request time) to be processed as **INSTANT 2**.
3. Obtain all partitions involved from **INSTANT 1** to **INSTANT 2** as incremental partitions and perform the table service plan operation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we turn on the incremental table service mode, are the various flexible partition selection mechanisms now unavailable? Consider the following scenario:

  • on ts_0, write to two partitions: p_1 and p_2
  • on ts_1, a schedule compaction compaction with parittion-selection-strategy that only compacts p_2
  • on ts_2, write to p_2 again.
  • on ts_3 compaction will only process partitions written between ts_1 and ts_3. Compaction will still only merge p_2. When can a compaction occur that compacts p_1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For common strategy, this various flexible partition selection mechanisms still works.
For IncrementalxxxxStrategy, this flexible partition selection mechanisms will apply to incremental fetched partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in IncrementalxxxxStrategy maybe we can record missing partitions in plan and Process the missing partitions together with the new fetched incremental partitions next time

4. Record **INSTANT 2** in the table service plan.


### About archive

We record `EarliestCommitToRetain` in the TableService Request metadata file and use it as the basis for retrieving incremental partitions.
Therefore, when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline.

## Rollout/Adoption Plan

low impact for current users

## Test Plan
Loading