Skip to content

Commit

Permalink
Synchronously invoke RepositoryListener
Browse files Browse the repository at this point in the history
Motivation:

`RepositoryListener` used `Repository.watch()` to continously invoke the
callback. Since the callback is not invoked immediately upon commit
completion, the cache cannot guarantee a happen-before relationship with
the write operation.
line#1099 (review)

Modifications:

- Fix `CommitWatchers` to reuse `Watch` added for `RepositoryListener`
  - The watch is removed when `WatchListener.onUpdate()` returns true.
- `recurseWatch()` no longer use a scheduler to submit a new watch task
  when it gets notified.
  - `find().join()` operation is invoked in the same thread as the
    commit to ensure that the callback is invoked before the commit's
    returned future.

Result:

`RepositoryListener` is now immediately invoked when the watching files
are modified.
  • Loading branch information
ikhoon committed Feb 24, 2025
1 parent 47723bc commit a7d66ba
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.armeria.ArmeriaCentralDogmaBuilder;
import com.linecorp.centraldogma.common.ProjectExistsException;
import com.linecorp.centraldogma.common.ProjectNotFoundException;

Expand All @@ -36,16 +37,27 @@ class ProjectManagementTest {
private static final Logger logger = LoggerFactory.getLogger(ProjectManagementTest.class);

@RegisterExtension
static final CentralDogmaExtensionWithScaffolding dogma = new CentralDogmaExtensionWithScaffolding();
static final CentralDogmaExtensionWithScaffolding dogma = new CentralDogmaExtensionWithScaffolding() {
@Override
protected void configureClient(ArmeriaCentralDogmaBuilder builder) {
builder.clientConfigurator(cb -> {
cb.responseTimeoutMillis(0);
});
}
};

@ParameterizedTest
@EnumSource(ClientType.class)
void unremoveProject(ClientType clientType) {
final CentralDogma client = clientType.client(dogma);

try {
final Set<String> oldProjects = client.listProjects().join();
logger.info("Old project: {}", oldProjects);
logger.info("Removed a project: {}", dogma.removedProject());
client.unremoveProject(dogma.removedProject()).join();
final Set<String> projects = client.listProjects().join();
logger.info("Project: {}", projects);
assertThat(projects).contains(dogma.removedProject());
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import com.linecorp.centraldogma.common.CentralDogmaException;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.server.internal.storage.repository.git.Watch.WatchListener;

final class CommitWatchers {

Expand All @@ -45,8 +46,9 @@ final class CommitWatchers {
@VisibleForTesting
final Map<PathPatternFilter, Set<Watch>> watchesMap = new WatcherMap(8192);

void add(Revision lastKnownRev, String pathPattern, CompletableFuture<Revision> future) {
add0(PathPatternFilter.of(pathPattern), new Watch(lastKnownRev, future));
void add(Revision lastKnownRev, String pathPattern,
@Nullable CompletableFuture<Revision> future, @Nullable WatchListener listener) {
add0(PathPatternFilter.of(pathPattern), new Watch(lastKnownRev, future, listener));
}

private void add0(final PathPatternFilter pathPattern, Watch watch) {
Expand All @@ -57,8 +59,12 @@ private void add0(final PathPatternFilter pathPattern, Watch watch) {
watches.add(watch);
}

watch.future.whenComplete((revision, cause) -> {
if (watch.removed) {
final CompletableFuture<Revision> future = watch.future();
if (future == null) {
return;
}
future.whenComplete((revision, cause) -> {
if (watch.wasRemoved()) {
return;
}

Expand Down Expand Up @@ -113,7 +119,7 @@ void notify(Revision revision, String path) {
// Notify the matching promises found above.
final int numEligiblePromises = eligibleWatches.size();
for (int i = 0; i < numEligiblePromises; i++) {
eligibleWatches.get(i).future.complete(revision);
eligibleWatches.get(i).notify(revision);
}
}

Expand All @@ -136,13 +142,15 @@ void close(Supplier<CentralDogmaException> causeSupplier) {
final CentralDogmaException cause = causeSupplier.get();
final int numEligiblePromises = eligibleWatches.size();
for (int i = 0; i < numEligiblePromises; i++) {
eligibleWatches.get(i).future.completeExceptionally(cause);
eligibleWatches.get(i).notifyFailure(cause);
}
}

private static List<Watch> move(@Nullable List<Watch> watches, Iterator<Watch> i, Watch w) {
i.remove();
w.removed = true;
if (w.shouldRemove()) {
i.remove();
w.remove();
}

if (watches == null) {
watches = new ArrayList<>();
Expand Down Expand Up @@ -175,16 +183,4 @@ protected boolean removeEldestEntry(Entry<PathPatternFilter, Set<Watch>> eldest)
return size() > maxEntries && eldest.getValue().isEmpty();
}
}

private static final class Watch {

final Revision lastKnownRevision;
final CompletableFuture<Revision> future;
volatile boolean removed;

Watch(Revision lastKnownRevision, CompletableFuture<Revision> future) {
this.lastKnownRevision = lastKnownRevision;
this.future = future;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -81,7 +80,6 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.centraldogma.common.Author;
Expand All @@ -107,6 +105,7 @@
import com.linecorp.centraldogma.server.internal.IsolatedSystemReader;
import com.linecorp.centraldogma.server.internal.JGitUtil;
import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache;
import com.linecorp.centraldogma.server.internal.storage.repository.git.Watch.WatchListener;
import com.linecorp.centraldogma.server.storage.StorageException;
import com.linecorp.centraldogma.server.storage.project.Project;
import com.linecorp.centraldogma.server.storage.repository.DiffResultType;
Expand All @@ -115,8 +114,6 @@
import com.linecorp.centraldogma.server.storage.repository.Repository;
import com.linecorp.centraldogma.server.storage.repository.RepositoryListener;

import io.netty.channel.EventLoop;

/**
* A {@link Repository} based on Git.
*/
Expand Down Expand Up @@ -1057,7 +1054,6 @@ public CompletableFuture<Revision> watch(Revision lastKnownRevision, String path
boolean errorOnEntryNotFound) {
requireNonNull(lastKnownRevision, "lastKnownRevision");
requireNonNull(pathPattern, "pathPattern");

final ServiceRequestContext ctx = context();
final Revision normLastKnownRevision = normalizeNow(lastKnownRevision);
final CompletableFuture<Revision> future = new CompletableFuture<>();
Expand All @@ -1072,7 +1068,7 @@ public CompletableFuture<Revision> watch(Revision lastKnownRevision, String path
if (latestRevision != null) {
future.complete(latestRevision);
} else {
commitWatchers.add(normLastKnownRevision, pathPattern, future);
commitWatchers.add(normLastKnownRevision, pathPattern, future, null);
}
} finally {
readUnlock();
Expand All @@ -1085,57 +1081,58 @@ public CompletableFuture<Revision> watch(Revision lastKnownRevision, String path
return future;
}

private void recursiveWatch(Revision lastKnownRevision, String pathPattern, WatchListener listener) {
requireNonNull(lastKnownRevision, "lastKnownRevision");
requireNonNull(pathPattern, "pathPattern");
final Revision normLastKnownRevision = normalizeNow(lastKnownRevision);
CompletableFuture.runAsync(() -> {
readLock();
try {
// If lastKnownRevision is outdated already and the recent changes match,
// there's no need to watch.
final Revision latestRevision = blockingFindLatestRevision(normLastKnownRevision, pathPattern,
false);
if (latestRevision != null) {
listener.onUpdate(latestRevision, null);
} else {
commitWatchers.add(normLastKnownRevision, pathPattern, null, listener);
}
} finally {
readUnlock();
}
}, repositoryWorker);
}

@Override
public void addListener(RepositoryListener listener) {
listeners.add(listener);
final EventLoop executor = CommonPools.workerGroup().next();
recursiveWatch(listener, Revision.INIT, executor);
}

private void recursiveWatch(RepositoryListener listener, Revision lastKnownRevision, EventLoop executor) {
if (shouldStopListening(listener)) {
return;
}

final String pathPattern = listener.pathPattern();
watch(lastKnownRevision, pathPattern).handle((newRevision, cause) -> {
recursiveWatch(Revision.INIT, pathPattern, (newRevision, cause) -> {
if (shouldStopListening(listener)) {
return true;
}

if (cause != null) {
cause = Exceptions.peel(cause);
if (cause instanceof ShuttingDownException) {
return null;
return true;
}

logger.warn("Failed to watch {} file in {}/{}. Try watching after 5 seconds.",
pathPattern, parent.name(), name, cause);
executor.schedule(() -> recursiveWatch(listener, lastKnownRevision, executor),
5, TimeUnit.SECONDS);
return null;
logger.warn("Failed to watch {} file in {}/{}.", pathPattern, parent.name(), name, cause);
return false;
}

find(newRevision, pathPattern).handle((entries, cause0) -> {
if (cause0 != null) {
cause0 = Exceptions.peel(cause0);
if (cause0 instanceof ShuttingDownException) {
return null;
}

logger.warn("Unexpected exception while retrieving {} in {}/{}. " +
"Try watching after 5 seconds.", pathPattern, parent.name(), name, cause0);
executor.schedule(() -> recursiveWatch(listener, newRevision, executor),
5, TimeUnit.SECONDS);
return null;
}

try {
listener.onUpdate(entries);
} catch (Exception ex) {
logger.warn("Unexpected exception while invoking {}.onUpdate(). listener: {}",
RepositoryListener.class.getSimpleName(), listener);
}
recursiveWatch(listener, newRevision, executor);
return null;
});
return null;
try {
assert newRevision != null;
final Map<String, Entry<?>> entries = find(newRevision, pathPattern).join();
listener.onUpdate(entries);
return false;
} catch (Exception ex) {
logger.warn("Unexpected exception while invoking {}.onUpdate(). listener: {}",
RepositoryListener.class.getSimpleName(), listener, ex);
return false;
}
});
}

Expand Down Expand Up @@ -1298,7 +1295,7 @@ public void cloneTo(File newRepoDir, BiConsumer<Integer, Integer> progressListen
// NB: We allow an empty commit here because an old version of Central Dogma had a bug
// which allowed the creation of an empty commit.
new CommitExecutor(newRepo, c.when(), c.author(), c.summary(),
c.detail(), c.markup(), true)
c.detail(), c.markup(), true)
.execute(baseRevision, normBaseRevision -> blockingPreviewDiff(
normBaseRevision, new DefaultChangesApplier(changes)).values());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2025 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.internal.storage.repository.git;

import java.util.concurrent.CompletableFuture;

import javax.annotation.Nullable;

import com.linecorp.centraldogma.common.Revision;

final class Watch {

final Revision lastKnownRevision;
@Nullable
private final CompletableFuture<Revision> future;
@Nullable
private final WatchListener listener;
private boolean shouldRemove;
private volatile boolean removed;

Watch(Revision lastKnownRevision,
@Nullable CompletableFuture<Revision> future,
@Nullable WatchListener listener) {
this.lastKnownRevision = lastKnownRevision;
assert (future != null && listener == null) || (future == null && listener != null);
this.future = future;
this.listener = listener;
shouldRemove = future != null;
}

void notify(Revision revision) {
if (future != null) {
future.complete(revision);
} else {
assert listener != null;
// The watch will be removed in the next notification.
shouldRemove = listener.onUpdate(revision, null);
}
}

void notifyFailure(Throwable cause) {
if (future != null) {
future.completeExceptionally(cause);
} else {
assert listener != null;
shouldRemove = listener.onUpdate(null, cause);
}
}

@Nullable
CompletableFuture<Revision> future() {
return future;
}

boolean shouldRemove() {
return shouldRemove;
}

void remove() {
removed = true;
}

boolean wasRemoved() {
return removed;
}

@FunctionalInterface
interface WatchListener {
/**
* Invoked when the {@link Watch} is notified of an update.
* Returns {@code true} if the {@link Watch} should be removed from the watch list.
*/
boolean onUpdate(@Nullable Revision revision, @Nullable Throwable cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static java.util.concurrent.ForkJoinPool.commonPool;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

import java.io.File;
Expand Down Expand Up @@ -95,14 +94,12 @@ void shouldUpdateLatestEntries() {
assertThat(listener.latestEntries).hasSize(1);
}

private void assertListenerEntries(String path, String expected) {
await().untilAsserted(() -> {
assertThat(listener.latestEntries.get(path).contentAsText().trim())
.isEqualTo(expected);
});
private static void assertListenerEntries(String path, String expected) {
assertThat(listener.latestEntries.get(path).contentAsText().trim())
.isEqualTo(expected);
}

private void commit(Change<?>... changes) {
private static void commit(Change<?>... changes) {
repo.commit(Revision.HEAD, Instant.now().toEpochMilli(), Author.SYSTEM, "summary", changes).join();
}

Expand Down

0 comments on commit a7d66ba

Please sign in to comment.