Skip to content

Commit

Permalink
[MINOR] Avoid wrong way to get the latest completed instant (apache#1…
Browse files Browse the repository at this point in the history
…2590)

1. avoid wrong way to get the latest completed instant

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un authored Jan 8, 2025
1 parent 5d888eb commit 4efc241
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,7 @@ public void updateLastCommitTimeSynced(String tableName) {
HoodieTimeline activeTimeline = getActiveTimeline();
Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::requestedTime);
Option<String> lastCommitCompletionSynced = activeTimeline
.getInstantsOrderedByCompletionTime()
.skip(activeTimeline.countInstants() - 1)
.findFirst()
.map(i -> Option.of(i.getCompletionTime()))
.orElse(Option.empty());
.getLatestCompletionTime();
if (lastCommitSynced.isPresent()) {
try {
HashMap<String, String> propertyMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public interface HoodieTimeline extends Serializable {
Option<String> getLatestCompletionTime();

/**
* Get the stream of instants in order by state transition timestamp of actions.
* Get the stream of instants in order by completion timestamp of actions.
*/
Stream<HoodieInstant> getInstantsOrderedByCompletionTime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
filteredTimeline match {
case activeInstants if !activeInstants.empty() =>
val timestamp = if (hollowCommitHandling == USE_TRANSITION_TIME) {
activeInstants.getInstantsOrderedByCompletionTime
.skip(activeInstants.countInstants() - 1)
.findFirst()
.get()
.getCompletionTime
activeInstants.getLatestCompletionTime.get()
} else {
activeInstants.lastInstant().get().requestedTime()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,7 @@ protected Option<String> getLastCommitTime() {
}

protected Option<String> getLastCommitCompletionTime() {
int countInstants = getActiveTimeline().countInstants();
return getActiveTimeline()
.getInstantsOrderedByCompletionTime()
.skip(countInstants - 1)
.findFirst()
.map(HoodieInstant::getCompletionTime)
.map(Option::of).orElseGet(Option::empty);
return getActiveTimeline().getLatestCompletionTime();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,7 @@ public void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time and commit completion from the TBLproperties
HoodieTimeline activeTimeline = getActiveTimeline();
Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::requestedTime);
Option<String> lastCommitCompletionSynced = activeTimeline
.getInstantsOrderedByCompletionTime()
.skip(activeTimeline.countInstants() - 1)
.findFirst()
.map(i -> Option.of(i.getCompletionTime()))
.orElse(Option.empty());
Option<String> lastCommitCompletionSynced = activeTimeline.getLatestCompletionTime();
if (lastCommitSynced.isPresent()) {
try {
Table table = client.getTable(databaseName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1979,9 +1979,7 @@ private void reSyncHiveTable() {
}

private String getLastCommitCompletionTimeSynced() {
return hiveClient.getActiveTimeline()
.getInstantsOrderedByCompletionTime()
.skip(hiveClient.getActiveTimeline().countInstants() - 1).findFirst().get().getCompletionTime();
return hiveClient.getActiveTimeline().getLatestCompletionTime().get();
}

private void reInitHiveSyncClient() {
Expand Down

0 comments on commit 4efc241

Please sign in to comment.