Skip to content

Commit

Permalink
Improve replication read/write metrics (#11490)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Mar 1, 2024
1 parent d560b87 commit 3b0d76b
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -72,13 +70,8 @@ public class BufferedReplicationWorker implements ReplicationWorker {
private final ClosableQueue<AirbyteMessage> messagesFromSourceQueue;
private final ClosableQueue<AirbyteMessage> messagesForDestinationQueue;
private final ExecutorService executors;
private final ScheduledExecutorService scheduledExecutors;
private final DestinationTimeoutMonitor destinationTimeoutMonitor;

private final AtomicLong destMessagesRead;
private final AtomicLong destMessagesSent;
private final AtomicLong sourceMessagesRead;

private volatile boolean isReadFromDestRunning;
private volatile boolean writeToDestFailed;

Expand Down Expand Up @@ -139,14 +132,9 @@ public BufferedReplicationWorker(final String jobId,
// readFromSource + processMessage + writeToDestination + readFromDestination +
// source heartbeat + dest timeout monitor + workload heartbeat = 7 threads
this.executors = Executors.newFixedThreadPool(7);
this.scheduledExecutors = Executors.newSingleThreadScheduledExecutor();
this.isReadFromDestRunning = true;
this.writeToDestFailed = false;

this.destMessagesRead = new AtomicLong();
this.destMessagesSent = new AtomicLong();
this.sourceMessagesRead = new AtomicLong();

this.readFromSourceStopwatch = new Stopwatch();
this.processFromSourceStopwatch = new Stopwatch();
this.writeToDestStopwatch = new Stopwatch();
Expand All @@ -170,8 +158,6 @@ public ReplicationOutput run(final ReplicationInput replicationInput, final Path
// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (recordSchemaValidator; syncPersistence; srcHeartbeatTimeoutChaperone; source; destinationTimeoutMonitor; destinationWithCloseTimeout) {
scheduledExecutors.scheduleAtFixedRate(this::reportObservabilityMetrics, 0, observabilityMetricsPeriodInSeconds, TimeUnit.SECONDS);

CompletableFuture.allOf(
runAsync(() -> replicationWorkerHelper.startDestination(destination, replicationInput, jobRoot), mdc),
runAsync(() -> replicationWorkerHelper.startSource(source, replicationInput, jobRoot), mdc)).join();
Expand Down Expand Up @@ -201,13 +187,11 @@ public ReplicationOutput run(final ReplicationInput replicationInput, final Path
replicationWorkerHelper.markFailed();
} finally {
executors.shutdownNow();
scheduledExecutors.shutdownNow();

try {
// Best effort to mark as complete when the Worker is actually done.
executors.awaitTermination(executorShutdownGracePeriodInSeconds, TimeUnit.SECONDS);
scheduledExecutors.awaitTermination(executorShutdownGracePeriodInSeconds, TimeUnit.SECONDS);
if (!executors.isTerminated() || !scheduledExecutors.isTerminated()) {
if (!executors.isTerminated()) {
final MetricClient metricClient = MetricClientFactory.getMetricClient();
metricClient.count(OssMetricsRegistry.REPLICATION_WORKER_EXECUTOR_SHUTDOWN_ERROR, 1,
new MetricAttribute(MetricTags.IMPLEMENTATION, "buffered"));
Expand Down Expand Up @@ -237,15 +221,6 @@ public ReplicationOutput run(final ReplicationInput replicationInput, final Path

}

private void reportObservabilityMetrics() {
final MetricClient metricClient = MetricClientFactory.getMetricClient();
metricClient.gauge(OssMetricsRegistry.WORKER_DESTINATION_BUFFER_SIZE, messagesForDestinationQueue.size());
metricClient.gauge(OssMetricsRegistry.WORKER_SOURCE_BUFFER_SIZE, messagesFromSourceQueue.size());
metricClient.count(OssMetricsRegistry.WORKER_DESTINATION_MESSAGE_READ, destMessagesRead.getAndSet(0));
metricClient.count(OssMetricsRegistry.WORKER_DESTINATION_MESSAGE_SENT, destMessagesSent.getAndSet(0));
metricClient.count(OssMetricsRegistry.WORKER_SOURCE_MESSAGE_READ, sourceMessagesRead.getAndSet(0));
}

private CompletableFuture<?> runAsync(final Runnable runnable, final Map<String, String> mdc) {
return CompletableFuture.runAsync(() -> {
MDC.setContextMap(mdc);
Expand Down Expand Up @@ -317,10 +292,8 @@ public void cancel() {

LOGGER.info("Cancelling replication worker...");
executors.shutdownNow();
scheduledExecutors.shutdownNow();
try {
executors.awaitTermination(executorShutdownGracePeriodInSeconds, TimeUnit.SECONDS);
scheduledExecutors.awaitTermination(executorShutdownGracePeriodInSeconds, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
wasInterrupted = true;
ApmTraceUtils.addExceptionToTrace(e);
Expand Down Expand Up @@ -372,7 +345,6 @@ private void readFromSource() {
while (!replicationWorkerHelper.getShouldAbort() && !(sourceIsFinished = sourceIsFinished()) && !messagesFromSourceQueue.isClosed()) {
final Optional<AirbyteMessage> messageOptional = source.attemptRead();
if (messageOptional.isPresent()) {
sourceMessagesRead.incrementAndGet();
while (!replicationWorkerHelper.getShouldAbort() && !messagesFromSourceQueue.add(messageOptional.get())
&& !messagesFromSourceQueue.isClosed()) {
Thread.sleep(100);
Expand Down Expand Up @@ -461,7 +433,6 @@ private void writeToDestination() {
try (final var t = writeToDestStopwatch.start()) {
destination.accept(message);
}
destMessagesSent.incrementAndGet();
}
} finally {
destination.notifyEndOfInput();
Expand Down Expand Up @@ -492,7 +463,6 @@ private void readFromDestination() {
throw new DestinationException("Destination process read attempt failed", e);
}
if (messageOptional.isPresent()) {
destMessagesRead.incrementAndGet();
try (final var t = processFromDestStopwatch.start()) {
replicationWorkerHelper.processMessageFromDestination(messageOptional.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.protocol.models.AirbyteMessage;
import java.nio.file.Path;
import java.util.Optional;
import java.util.UUID;

/**
* This interface provides a java interface over all interactions with a Source from the POV of the
Expand All @@ -21,9 +22,10 @@ public interface AirbyteSource extends AutoCloseable {
* @param sourceConfig - contains the arguments that must be passed to the read method of the
* Source.
* @param jobRoot - directory where the job can write data.
* @param connectionId - connectionId if applicable
* @throws Exception - throws if there is any failure in startup.
*/
void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception;
void start(WorkerSourceConfig sourceConfig, Path jobRoot, UUID connectionId) throws Exception;

/**
* Means no more data will be emitted by the Source. This may be because all data has already been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import io.airbyte.commons.protocol.DefaultProtocolSerializer;
import io.airbyte.commons.protocol.ProtocolSerializer;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerUtils;
Expand Down Expand Up @@ -56,6 +60,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
private final AirbyteStreamFactory streamFactory;
private final AirbyteMessageBufferedWriterFactory messageWriterFactory;
private final ProtocolSerializer protocolSerializer;
private final MetricClient metricClient;

private final AtomicBoolean inputHasEnded = new AtomicBoolean(false);

Expand All @@ -64,9 +69,12 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;
private final DestinationTimeoutMonitor destinationTimeoutMonitor;
private MetricAttribute connectionAttribute = null;

@VisibleForTesting
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher, final DestinationTimeoutMonitor destinationTimeoutMonitor) {
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
final DestinationTimeoutMonitor destinationTimeoutMonitor,
final MetricClient metricClient) {
this(integrationLauncher,
VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(
LOGGER,
Expand All @@ -77,26 +85,33 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
new GsonPksExtractor()),
new DefaultAirbyteMessageBufferedWriterFactory(),
new DefaultProtocolSerializer(),
destinationTimeoutMonitor);
destinationTimeoutMonitor,
metricClient);
}

@SuppressWarnings({"PMD.ArrayIsStoredDirectly", "PMD.UseVarargs"})
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final AirbyteMessageBufferedWriterFactory messageWriterFactory,
final ProtocolSerializer protocolSerializer,
final DestinationTimeoutMonitor destinationTimeoutMonitor) {
final DestinationTimeoutMonitor destinationTimeoutMonitor,
final MetricClient metricClient) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.messageWriterFactory = messageWriterFactory;
this.protocolSerializer = protocolSerializer;
this.destinationTimeoutMonitor = destinationTimeoutMonitor;
this.metricClient = metricClient;
}

@Override
public void start(final WorkerDestinationConfig destinationConfig, final Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(destinationProcess == null);

if (destinationConfig.getConnectionId() != null) {
connectionAttribute = new MetricAttribute(MetricTags.CONNECTION_ID, destinationConfig.getConnectionId().toString());
}

LOGGER.info("Running destination...");
destinationProcess = integrationLauncher.write(
jobRoot,
Expand All @@ -117,6 +132,7 @@ public void start(final WorkerDestinationConfig destinationConfig, final Path jo

@Override
public void accept(final AirbyteMessage message) throws IOException {
metricClient.count(OssMetricsRegistry.WORKER_DESTINATION_MESSAGE_SENT, 1, connectionAttribute);
destinationTimeoutMonitor.startAcceptTimer();
acceptWithNoTimeoutMonitor(message);
destinationTimeoutMonitor.resetAcceptTimer();
Expand Down Expand Up @@ -206,7 +222,11 @@ public int getExitValue() {
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(destinationProcess != null);

return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
final Optional<AirbyteMessage> m = Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
if (m.isPresent()) {
metricClient.count(OssMetricsRegistry.WORKER_DESTINATION_MESSAGE_READ, 1, connectionAttribute);
}
return m;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.protocol.ProtocolSerializer;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerUtils;
Expand All @@ -34,6 +38,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,28 +64,36 @@ public class DefaultAirbyteSource implements AirbyteSource {
private final AirbyteStreamFactory streamFactory;
private final ProtocolSerializer protocolSerializer;
private final HeartbeatMonitor heartbeatMonitor;
private final MetricClient metricClient;

private Process sourceProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;
private final boolean featureFlagLogConnectorMsgs;
private MetricAttribute connectionAttribute = null;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final HeartbeatMonitor heartbeatMonitor,
final ProtocolSerializer protocolSerializer,
final FeatureFlags featureFlags) {
final FeatureFlags featureFlags,
final MetricClient metricClient) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.protocolSerializer = protocolSerializer;
this.heartbeatMonitor = heartbeatMonitor;
this.featureFlagLogConnectorMsgs = featureFlags.logConnectorMessages();
this.metricClient = metricClient;
}

@Override
public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot) throws Exception {
public void start(final WorkerSourceConfig sourceConfig, final Path jobRoot, final UUID connectionId) throws Exception {
Preconditions.checkState(sourceProcess == null);

if (connectionId != null) {
connectionAttribute = new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString());
}

sourceProcess = integrationLauncher.read(jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(sourceConfig.getSourceConnectionConfiguration()),
Expand Down Expand Up @@ -137,7 +150,11 @@ public int getExitValue() throws IllegalStateException {
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(sourceProcess != null);

return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
final Optional<AirbyteMessage> m = Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
if (m.isPresent()) {
metricClient.count(OssMetricsRegistry.WORKER_SOURCE_MESSAGE_READ, 1, connectionAttribute);
}
return m;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -54,7 +55,7 @@ public EmptyAirbyteSource() {

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot) throws Exception {
public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoot, final UUID connectionId) throws Exception {
if (workerSourceConfig != null && workerSourceConfig.getSourceConnectionConfiguration() != null) {
final ResetSourceConfiguration resetSourceConfiguration = parseResetSourceConfigurationAndLogError(workerSourceConfig);
streamsToReset.addAll(resetSourceConfiguration.getStreamsToReset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.PrintLongRecordPks;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.helper.GsonPksExtractor;
Expand Down Expand Up @@ -48,19 +49,22 @@ public class AirbyteIntegrationLauncherFactory {
private final FeatureFlags featureFlags;
private final FeatureFlagClient featureFlagClient;
private final GsonPksExtractor gsonPksExtractor;
private final MetricClient metricClient;

public AirbyteIntegrationLauncherFactory(final ProcessFactory processFactory,
final AirbyteMessageSerDeProvider serDeProvider,
final AirbyteProtocolVersionedMigratorFactory migratorFactory,
final FeatureFlags featureFlags,
final FeatureFlagClient featureFlagClient,
final GsonPksExtractor gsonPksExtractor) {
final GsonPksExtractor gsonPksExtractor,
final MetricClient metricClient) {
this.processFactory = processFactory;
this.serDeProvider = serDeProvider;
this.migratorFactory = migratorFactory;
this.featureFlags = featureFlags;
this.featureFlagClient = featureFlagClient;
this.gsonPksExtractor = gsonPksExtractor;
this.metricClient = metricClient;
}

/**
Expand Down Expand Up @@ -127,7 +131,8 @@ public AirbyteSource createAirbyteSource(final IntegrationLauncherConfig sourceL
printLongRecordPks)),
heartbeatMonitor,
getProtocolSerializer(sourceLauncherConfig),
featureFlags);
featureFlags,
metricClient);
}

/**
Expand All @@ -151,7 +156,9 @@ public AirbyteDestination createAirbyteDestination(final IntegrationLauncherConf
new VersionedAirbyteStreamFactory.InvalidLineFailureConfiguration(false, false, false)),
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(),
Optional.of(configuredAirbyteCatalog)),
getProtocolSerializer(destinationLauncherConfig), destinationTimeoutMonitor);
getProtocolSerializer(destinationLauncherConfig),
destinationTimeoutMonitor,
metricClient);
}

private VersionedProtocolSerializer getProtocolSerializer(final IntegrationLauncherConfig launcherConfig) {
Expand Down
Loading

0 comments on commit 3b0d76b

Please sign in to comment.