Skip to content

Commit

Permalink
latest received time
Browse files Browse the repository at this point in the history
  • Loading branch information
be-hase committed Feb 12, 2025
1 parent f1a77c4 commit 987dc7b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,41 @@ void queryOfText() throws Exception {

// Metrics will not be emitted until the values are ready.
assertThat(latestRevisionGauges(tags(watcher, "/hoge.txt")).size()).isEqualTo(0);
assertThat(latestCommitTimeGauges(tags(watcher, "/hoge.txt")).size()).isEqualTo(0);
assertThat(latestReceivedTimeGauges(tags(watcher, "/hoge.txt")).size()).isEqualTo(0);

// Metrics will be emitted once the values are ready.
watcher.awaitInitialValue();
await().untilAsserted(() -> {
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/hoge.txt"));
assertThat(latestRevisionGauges.size()).isEqualTo(1);
assertThat(latestRevisionGauges.get(0).value()).isEqualTo(hoge1stResult.revision().major());
assertThat(latestCommitTimeGauges.size()).isEqualTo(1);
assertThat(latestCommitTimeGauges.get(0).value()).isEqualTo(hoge1stResult.when());
assertThat(latestReceivedTimeGauges.size()).isEqualTo(1);
assertThat(latestReceivedTimeGauges.get(0).value())
.isGreaterThanOrEqualTo(hoge1stResult.when() / 1000.0);
});

// When a commit is added, the metrics will also be updated.
final PushResult hoge2ndResult = dogmaRepo.commit("Add hoge.txt", Change.ofTextUpsert("/hoge.txt", "2"))
.push().join();
await().untilAsserted(() -> {
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/hoge.txt"));
assertThat(latestRevisionGauges.size()).isEqualTo(1);
assertThat(latestRevisionGauges.get(0).value()).isEqualTo(hoge2ndResult.revision().major());
assertThat(latestCommitTimeGauges.size()).isEqualTo(1);
assertThat(latestCommitTimeGauges.get(0).value()).isEqualTo(hoge2ndResult.when());
assertThat(latestReceivedTimeGauges.size()).isEqualTo(1);
assertThat(latestReceivedTimeGauges.get(0).value())
.isGreaterThanOrEqualTo(hoge2ndResult.when() / 1000.0);
assertThat(hoge2ndResult.revision().major()).isGreaterThan(hoge1stResult.revision().major());
assertThat(hoge2ndResult.when()).isGreaterThanOrEqualTo(hoge1stResult.when());
});

// When a commit is added, the metrics will also be updated.
watcher.close();
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/hoge.txt"));
assertThat(latestRevisionGauges.size()).isEqualTo(0);
assertThat(latestCommitTimeGauges.size()).isEqualTo(0);
assertThat(latestReceivedTimeGauges.size()).isEqualTo(0);
}

@Test
Expand All @@ -126,39 +128,41 @@ void pathPatternAll() throws Exception {

// Metrics will not be emitted until the values are ready.
assertThat(latestRevisionGauges(tags(watcher, "/**")).size()).isEqualTo(0);
assertThat(latestCommitTimeGauges(tags(watcher, "/**")).size()).isEqualTo(0);
assertThat(latestReceivedTimeGauges(tags(watcher, "/**")).size()).isEqualTo(0);

// Metrics will be emitted once the values are ready.
watcher.awaitInitialValue();
await().untilAsserted(() -> {
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/**"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/**"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/**"));
assertThat(latestRevisionGauges.size()).isEqualTo(1);
assertThat(latestRevisionGauges.get(0).value()).isEqualTo(fooResult.revision().major());
assertThat(latestCommitTimeGauges.size()).isEqualTo(1);
assertThat(latestCommitTimeGauges.get(0).value()).isEqualTo(fooResult.when());
assertThat(latestReceivedTimeGauges.size()).isEqualTo(1);
assertThat(latestReceivedTimeGauges.get(0).value())
.isGreaterThanOrEqualTo(fooResult.when() / 1000.0);
});

// When a commit is added, the metrics will also be updated.
final PushResult barResult = dogmaRepo.commit("Add bar.txt", Change.ofTextUpsert("/bar.txt", "1"))
.push().join();
await().untilAsserted(() -> {
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/**"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/**"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/**"));
assertThat(latestRevisionGauges.size()).isEqualTo(1);
assertThat(latestRevisionGauges.get(0).value()).isEqualTo(barResult.revision().major());
assertThat(latestCommitTimeGauges.size()).isEqualTo(1);
assertThat(latestCommitTimeGauges.get(0).value()).isEqualTo(barResult.when());
assertThat(latestReceivedTimeGauges.size()).isEqualTo(1);
assertThat(latestReceivedTimeGauges.get(0).value())
.isGreaterThanOrEqualTo(barResult.when() / 1000.0);
assertThat(barResult.revision().major()).isGreaterThan(fooResult.revision().major());
assertThat(barResult.when()).isGreaterThanOrEqualTo(fooResult.when());
});

// When a commit is added, the metrics will also be updated.
watcher.close();
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/**"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/**"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/**"));
assertThat(latestRevisionGauges.size()).isEqualTo(0);
assertThat(latestCommitTimeGauges.size()).isEqualTo(0);
assertThat(latestReceivedTimeGauges.size()).isEqualTo(0);
}

@Test
Expand All @@ -172,17 +176,18 @@ void noMetrics() throws Exception {
watcher.awaitInitialValue();
Thread.sleep(1000); // wait updateLatestCommitAsync
final List<Gauge> latestRevisionGauges = latestRevisionGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestCommitTimeGauges = latestCommitTimeGauges(tags(watcher, "/hoge.txt"));
final List<Gauge> latestReceivedTimeGauges = latestReceivedTimeGauges(tags(watcher, "/hoge.txt"));
assertThat(latestRevisionGauges.size()).isEqualTo(0);
assertThat(latestCommitTimeGauges.size()).isEqualTo(0);
assertThat(latestReceivedTimeGauges.size()).isEqualTo(0);
}

private static List<Gauge> latestRevisionGauges(Tags tags) {
return new ArrayList<>(meterRegistry.find("centraldogma.watcher.latest.revision").tags(tags).gauges());
return new ArrayList<>(meterRegistry.find("centraldogma.client.watcher.latest.revision").tags(tags)
.gauges());
}

private static List<Gauge> latestCommitTimeGauges(Tags tags) {
return new ArrayList<>(meterRegistry.find("centraldogma.watcher.latest.commit.time").tags(tags)
private static List<Gauge> latestReceivedTimeGauges(Tags tags) {
return new ArrayList<>(meterRegistry.find("centraldogma.client.watcher.latest.received.time").tags(tags)
.gauges());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.math.LongMath.saturatedAdd;
import static java.util.Objects.requireNonNull;

import java.time.Instant;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.List;
import java.util.Map;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

Expand All @@ -43,7 +45,6 @@
import com.google.common.base.MoreObjects;

import com.linecorp.centraldogma.common.CentralDogmaException;
import com.linecorp.centraldogma.common.Commit;
import com.linecorp.centraldogma.common.EntryNotFoundException;
import com.linecorp.centraldogma.common.PathPattern;
import com.linecorp.centraldogma.common.Query;
Expand All @@ -58,16 +59,16 @@
abstract class AbstractWatcher<T> implements Watcher<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
private static final String LATEST_REVISION_METER_NAME = "centraldogma.watcher.latest.revision";
private static final String LATEST_COMMIT_TIME_METER_NAME = "centraldogma.watcher.latest.commit.time";
private static final String LATEST_REVISION_METER_NAME = "centraldogma.client.watcher.latest.revision";
private static final String LATEST_RECEIVED_TIME_METER_NAME =
"centraldogma.client.watcher.latest.received.time";

private enum State {
INIT,
STARTED,
STOPPED
}

private final CentralDogma centralDogma;
private final ScheduledExecutorService watchScheduler;
private final String projectName;
private final String repositoryName;
Expand All @@ -94,14 +95,15 @@ private enum State {
@Nullable
private volatile CompletableFuture<?> currentWatchFuture;

@SuppressWarnings("DataFlowIssue") // AtomicReference's value is nullable.
private final AtomicReference<Commit> latestCommit = new AtomicReference<>(null);
// unix epoch seconds
// In Prometheus, it is common to handle data with second precision, so we intentionally use second precision.
// ref: https://prometheus.io/docs/prometheus/latest/querying/functions/#time
private final AtomicLong latestReceivedTime = new AtomicLong();

AbstractWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler, String projectName,
String repositoryName, String pathPattern, boolean errorOnEntryNotFound,
long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis, double multiplier,
double jitterRate, @Nullable MeterRegistry meterRegistry) {
this.centralDogma = centralDogma;
AbstractWatcher(ScheduledExecutorService watchScheduler, String projectName, String repositoryName,
String pathPattern, boolean errorOnEntryNotFound, long delayOnSuccessMillis,
long initialDelayMillis, long maxDelayMillis, double multiplier, double jitterRate,
@Nullable MeterRegistry meterRegistry) {
this.watchScheduler = watchScheduler;
this.projectName = projectName;
this.repositoryName = repositoryName;
Expand Down Expand Up @@ -178,7 +180,8 @@ public void close() {

if (meterRegistry != null) {
meterRegistry.remove(new Id(LATEST_REVISION_METER_NAME, tags, null, null, Type.GAUGE));
meterRegistry.remove(new Id(LATEST_COMMIT_TIME_METER_NAME, tags, null, null, Type.GAUGE));
meterRegistry.remove(
new Id(LATEST_RECEIVED_TIME_METER_NAME, tags, null, null, Type.GAUGE));
}
}

Expand Down Expand Up @@ -285,7 +288,7 @@ private void doWatch(int numAttemptsSoFar) {
logger.debug("watcher noticed updated file {}/{}{}: rev={}",
projectName, repositoryName, pathPattern, newLatest.revision());
notifyListeners(newLatest);
updateLatestCommitAsync(newLatest.revision());
updateLatestReceivedTime();
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
Expand Down Expand Up @@ -356,30 +359,15 @@ private void notifyListeners(Latest<T> latest) {
}
}

private void updateLatestCommitAsync(Revision revision) {
private void updateLatestReceivedTime() {
if (meterRegistry == null) {
return;
}

centralDogma.forRepo(projectName, repositoryName)
.history()
.get(revision, revision)
.whenComplete((commits, e) -> {
if (e == null) {
commits.stream().findFirst().ifPresent(commit -> {
// noinspection ConstantValue
if (latestCommit.getAndSet(commit) == null) {
// emit metrics once the values are ready
meterRegistry.gauge(LATEST_COMMIT_TIME_METER_NAME, tags, latestCommit,
it -> it.get().when());
}
});
} else {
logger.warn(
"Failed to retrieve the commit to record metrics. watcher={}/{}{}, rev={}",
projectName, repositoryName, pathPattern, revision, e);
}
});
if (latestReceivedTime.getAndSet(Instant.now().getEpochSecond()) == 0) {
// emit metrics once the values are ready
meterRegistry.gauge(LATEST_RECEIVED_TIME_METER_NAME, tags, latestReceivedTime, AtomicLong::get);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class FileWatcher<T> extends AbstractWatcher<T> {
@Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis, double multiplier,
double jitterRate, @Nullable MeterRegistry meterRegistry) {
super(centralDogma, watchScheduler, projectName, repositoryName, query.path(), errorOnEntryNotFound,
super(watchScheduler, projectName, repositoryName, query.path(), errorOnEntryNotFound,
delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
this.centralDogma = centralDogma;
this.projectName = projectName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ final class FilesWatcher<T> extends AbstractWatcher<T> {
@Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis,
double multiplier, double jitterRate, @Nullable MeterRegistry meterRegistry) {
super(centralDogma, watchScheduler, projectName, repositoryName, pathPattern.patternString(),
errorOnEntryNotFound, delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier,
jitterRate, meterRegistry);
super(watchScheduler, projectName, repositoryName, pathPattern.patternString(), errorOnEntryNotFound,
delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
this.centralDogma = centralDogma;
this.projectName = projectName;
this.repositoryName = repositoryName;
Expand Down

0 comments on commit 987dc7b

Please sign in to comment.