Skip to content

Commit

Permalink
feat: add metrics to schema propagation notifications (#14376)
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Oct 17, 2024
1 parent 670ede8 commit a917da8
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import io.airbyte.config.NotificationSettings;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.notification.CustomerioNotificationClient;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.notification.messages.ConnectionInfo;
Expand All @@ -26,6 +30,7 @@
public class NotificationHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(NotificationHelper.class);
public static final String NOTIFICATION_TRIGGER_SCHEMA = "schema_propagated";

private final WebUrlHelper webUrlHelper;

Expand Down Expand Up @@ -72,26 +77,38 @@ public void notifySchemaPropagated(final NotificationSettings notificationSettin
item = notificationSettings.getSendOnConnectionUpdate();
}
for (final Notification.NotificationType type : item.getNotificationType()) {
try {
switch (type) {
case SLACK -> {
final SlackNotificationClient slackNotificationClient = new SlackNotificationClient(item.getSlackConfiguration());
slackNotificationClient.notifySchemaPropagated(notification, email);
switch (type) {
case SLACK -> {
final SlackNotificationClient slackNotificationClient = new SlackNotificationClient(item.getSlackConfiguration());
if (slackNotificationClient.notifySchemaPropagated(notification, email)) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.NOTIFICATION_SUCCESS, 1,
new MetricAttribute(MetricTags.NOTIFICATION_CLIENT, slackNotificationClient.getNotificationClientType()),
new MetricAttribute(MetricTags.NOTIFICATION_TRIGGER, NOTIFICATION_TRIGGER_SCHEMA));
} else {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.NOTIFICATION_FAILED, 1,
new MetricAttribute(MetricTags.NOTIFICATION_CLIENT, slackNotificationClient.getNotificationClientType()),
new MetricAttribute(MetricTags.NOTIFICATION_TRIGGER, NOTIFICATION_TRIGGER_SCHEMA));
}
case CUSTOMERIO -> {
final CustomerioNotificationClient emailNotificationClient = new CustomerioNotificationClient();
emailNotificationClient.notifySchemaPropagated(notification, email);
}
default -> {
LOGGER.warn("Notification type {} not supported", type);
}
case CUSTOMERIO -> {
final CustomerioNotificationClient emailNotificationClient = new CustomerioNotificationClient();
if (emailNotificationClient.notifySchemaPropagated(notification, email)) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.NOTIFICATION_SUCCESS, 1,
new MetricAttribute(MetricTags.NOTIFICATION_CLIENT, emailNotificationClient.getNotificationClientType()),
new MetricAttribute(MetricTags.NOTIFICATION_TRIGGER, NOTIFICATION_TRIGGER_SCHEMA));
} else {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.NOTIFICATION_FAILED, 1,
new MetricAttribute(MetricTags.NOTIFICATION_CLIENT, emailNotificationClient.getNotificationClientType()),
new MetricAttribute(MetricTags.NOTIFICATION_TRIGGER, NOTIFICATION_TRIGGER_SCHEMA));
}
}
} catch (final InterruptedException e) {
LOGGER.error("Failed to send notification for connectionId: '{}'", connection.getConnectionId(), e);
default -> {
LOGGER.warn("Notification type {} not supported", type);
}
}
}
} catch (final Exception e) {
LOGGER.error("Failed to send notification {}", workspace, e);
LOGGER.error("Failed to send notification {}: {}", workspace, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"excessive_catalog_size",
"Distribution of input catalog field counts that exceed the configured limit."),

NOTIFICATION_SUCCESS(MetricEmittingApps.SERVER,
"notification_success",
"A notification was successfully sent"),

NOTIFICATION_FAILED(MetricEmittingApps.SERVER,
"notification_failure",
"A notification failed to send"),

REPLICATION_CONTEXT_NOT_INITIALIZED_ERROR(MetricEmittingApps.ORCHESTRATOR,
"replication_context_not_initialized_error",
"The replication context was not initialized when it was expected to be."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.commons.envvar.EnvVar;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorType;
import io.airbyte.notification.messages.SchemaUpdateNotification;
Expand Down Expand Up @@ -137,83 +136,104 @@ public Response intercept(@NotNull final Chain chain) throws IOException {

@Override
public boolean notifyJobFailure(final SyncSummary summary,
final String receiverEmail)
throws IOException {
final String receiverEmail) {
final ObjectNode node = buildSyncCompletedJson(summary, receiverEmail, SYNC_FAILURE_MESSAGE_ID);
final String payload = Jsons.serialize(node);
return notifyByEmail(payload);
try {
return notifyByEmail(payload);
} catch (IOException e) {
return false;
}
}

@Override
public boolean notifyJobSuccess(final SyncSummary summary,
final String receiverEmail)
throws IOException {
final String receiverEmail) {
final ObjectNode node = buildSyncCompletedJson(summary, receiverEmail, SYNC_SUCCEED_MESSAGE_ID);
final String payload = Jsons.serialize(node);
return notifyByEmail(payload);
try {
return notifyByEmail(payload);
} catch (IOException e) {
return false;
}
}

// Once the configs are editable through the UI, the reciever email should be stored in
// airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml
// instead of being passed in
@Override
public boolean notifyConnectionDisabled(final SyncSummary summary,
final String receiverEmail)
throws IOException {
final String receiverEmail) {
final ObjectNode node = buildSyncCompletedJson(summary, receiverEmail, AUTO_DISABLE_TRANSACTION_MESSAGE_ID);
final String payload = Jsons.serialize(node);
return notifyByEmail(payload);
try {
return notifyByEmail(payload);
} catch (IOException e) {
return false;
}
}

@Override
public boolean notifyConnectionDisableWarning(final SyncSummary summary,
final String receiverEmail)
throws IOException {
final String receiverEmail) {
final ObjectNode node = buildSyncCompletedJson(summary, receiverEmail, AUTO_DISABLE_WARNING_TRANSACTION_MESSAGE_ID);
final String payload = Jsons.serialize(node);
return notifyByEmail(payload);
try {
return notifyByEmail(payload);
} catch (IOException e) {
return false;
}
}

@Override
public boolean notifyBreakingChangeWarning(final List<String> receiverEmails,
final String connectorName,
final ActorType actorType,
final ActorDefinitionBreakingChange breakingChange)
throws IOException {
return notifyByEmailBroadcast(BREAKING_CHANGE_WARNING_BROADCAST_ID, receiverEmails, Map.of(
"connector_name", connectorName,
"connector_type", actorType.value(),
"connector_version_new", breakingChange.getVersion().serialize(),
"connector_version_upgrade_deadline", formatDate(breakingChange.getUpgradeDeadline()),
"connector_version_change_description", convertMarkdownToHtml(breakingChange.getMessage()),
"connector_version_migration_url", breakingChange.getMigrationDocumentationUrl()));
final ActorDefinitionBreakingChange breakingChange) {
try {
return notifyByEmailBroadcast(BREAKING_CHANGE_WARNING_BROADCAST_ID, receiverEmails, Map.of(
"connector_name", connectorName,
"connector_type", actorType.value(),
"connector_version_new", breakingChange.getVersion().serialize(),
"connector_version_upgrade_deadline", formatDate(breakingChange.getUpgradeDeadline()),
"connector_version_change_description", convertMarkdownToHtml(breakingChange.getMessage()),
"connector_version_migration_url", breakingChange.getMigrationDocumentationUrl()));
} catch (IOException e) {
return false;
}
}

@Override
public boolean notifyBreakingChangeSyncsDisabled(final List<String> receiverEmails,
final String connectorName,
final ActorType actorType,
final ActorDefinitionBreakingChange breakingChange)
throws IOException {
return notifyByEmailBroadcast(BREAKING_CHANGE_SYNCS_DISABLED_BROADCAST_ID, receiverEmails, Map.of(
"connector_name", connectorName,
"connector_type", actorType.value(),
"connector_version_new", breakingChange.getVersion().serialize(),
"connector_version_change_description", convertMarkdownToHtml(breakingChange.getMessage()),
"connector_version_migration_url", breakingChange.getMigrationDocumentationUrl()));
final ActorDefinitionBreakingChange breakingChange) {
try {
return notifyByEmailBroadcast(BREAKING_CHANGE_SYNCS_DISABLED_BROADCAST_ID, receiverEmails, Map.of(
"connector_name", connectorName,
"connector_type", actorType.value(),
"connector_version_new", breakingChange.getVersion().serialize(),
"connector_version_change_description", convertMarkdownToHtml(breakingChange.getMessage()),
"connector_version_migration_url", breakingChange.getMigrationDocumentationUrl()));
} catch (IOException e) {
return false;
}
}

@Override
public boolean notifySchemaPropagated(final SchemaUpdateNotification notification,
final String recipient)
throws IOException {
final String recipient) {
final String transactionalMessageId = notification.isBreakingChange() ? SCHEMA_BREAKING_CHANGE_TRANSACTION_ID : SCHEMA_CHANGE_TRANSACTION_ID;

final ObjectNode node =
buildSchemaPropagationJson(notification, recipient, transactionalMessageId);

final String payload = Jsons.serialize(node);
return notifyByEmail(payload);
try {
return notifyByEmail(payload);
} catch (IOException e) {
return false;
}
}

static ObjectNode buildSyncCompletedJson(final SyncSummary syncSummary,
Expand Down Expand Up @@ -372,12 +392,6 @@ boolean sendNotifyRequest(final String urlEndpoint, final String payload) throws
}
}

@Override
public String renderTemplate(final String templateFile, final String... data) throws IOException {
final String template = MoreResources.readResource(templateFile);
return String.format(template, data);
}

private String convertMarkdownToHtml(final String message) {
final Parser markdownParser = Parser.builder().build();
final Node document = markdownParser.parse(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,44 @@

package io.airbyte.notification;

import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorType;
import io.airbyte.notification.messages.SchemaUpdateNotification;
import io.airbyte.notification.messages.SyncSummary;
import java.io.IOException;
import java.util.List;

/**
* Client for trigger notifications (regardless of notification type e.g. slack or email).
*/
@SuppressWarnings("PMD.ConfusingArgumentToVarargsMethod")
public abstract class NotificationClient {

public NotificationClient() {}

public abstract boolean notifyJobFailure(final SyncSummary summary,
final String receiverEmail)
throws IOException, InterruptedException;
final String receiverEmail);

public abstract boolean notifyJobSuccess(final SyncSummary summary,
final String receiverEmail)
throws IOException, InterruptedException;
final String receiverEmail);

public abstract boolean notifyConnectionDisabled(final SyncSummary summary,
final String receiverEmail)
throws IOException, InterruptedException;
final String receiverEmail);

public abstract boolean notifyConnectionDisableWarning(final SyncSummary summary,
final String receiverEmail)
throws IOException, InterruptedException;
final String receiverEmail);

public abstract boolean notifyBreakingChangeWarning(List<String> receiverEmails,
String connectorName,
ActorType actorType,
ActorDefinitionBreakingChange breakingChange)
throws IOException, InterruptedException;
ActorDefinitionBreakingChange breakingChange);

public abstract boolean notifyBreakingChangeSyncsDisabled(List<String> receiverEmails,
String connectorName,
final ActorType actorType,
final ActorDefinitionBreakingChange breakingChange)
throws IOException, InterruptedException;
final ActorDefinitionBreakingChange breakingChange);

public abstract boolean notifySchemaPropagated(final SchemaUpdateNotification notification,
final String recipient)
throws IOException, InterruptedException;
final String recipient);

public abstract String getNotificationClientType();

String renderTemplate(final String templateFile, final String... data) throws IOException {
final String template = MoreResources.readResource(templateFile);
return String.format(template, data);
}

}
Loading

0 comments on commit a917da8

Please sign in to comment.