Skip to content

Commit

Permalink
Merge branch 'privacysandbox:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
pawelrudak-branch authored Oct 24, 2024
2 parents 6d8fe21 + dae5c45 commit 2c28a00
Show file tree
Hide file tree
Showing 227 changed files with 2,711 additions and 874 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [2.8.0](https://github.com/privacysandbox/aggregation-service/compare/v2.7.0...v2.8.0) (2024-09-11)

- Increased read threads in Aggregation Service to match the number of CPUs.
- Enabled worker instance OTel logs - processed job's id and worker health status.
- [GCP only] Updated the GCP image build target to "worker_mp_gcp_prod". Note: This change would
impact only if you build your own image.

## [2.7.0](https://github.com/privacysandbox/aggregation-service/compare/v2.6.0...v2.7.0) (2024-08-01)

- Added support for aggregating reports belonging to multiple reporting origins under the same
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.7.0
2.8.0
7 changes: 6 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ PROTOBUF_CORE_VERSION = "3.25.2"

PROTOBUF_SHA_256 = "3c83e4301b968d0b4f29a0c29c0b3cde1da81d790ffd344b111c523ba1954392"

COORDINATOR_VERSION = "v1.9.0-rc03" # version updated on 2024-07-17
COORDINATOR_VERSION = "v1.10.0-rc07" # version updated on 2024-09-12

JACKSON_VERSION = "2.16.1"

Expand Down Expand Up @@ -80,6 +80,7 @@ git_repository(
patches = [
"//build_defs/shared_libraries:coordinator.patch",
"//build_defs/shared_libraries:rules_pkg_build_fix.patch",
"//build_defs/shared_libraries:v1.10_serverless_connector.patch",
],
tag = COORDINATOR_VERSION,
workspace_file = "@shared_libraries_workspace//file",
Expand All @@ -97,6 +98,7 @@ OTEL_ARTIFACTS = [
"io.opentelemetry:opentelemetry-sdk-common:" + OTEL_VERSION,
"io.opentelemetry:opentelemetry-sdk-metrics:" + OTEL_VERSION,
"io.opentelemetry:opentelemetry-sdk-testing:" + OTEL_VERSION,
"io.opentelemetry:opentelemetry-sdk-logs:" + OTEL_VERSION,
"io.opentelemetry:opentelemetry-sdk-trace:" + OTEL_VERSION,
"io.opentelemetry.contrib:opentelemetry-aws-xray:" + OTEL_VERSION,
]
Expand All @@ -113,6 +115,7 @@ maven_install(
"com.amazonaws:aws-java-sdk-kms:" + AWS_JAVA_SDK_VERSION,
"com.amazonaws:aws-java-sdk-core:" + AWS_JAVA_SDK_VERSION,
"com.amazonaws:aws-java-sdk-xray:" + AWS_JAVA_SDK_VERSION,
"com.amazonaws:aws-java-sdk-logs:" + AWS_JAVA_SDK_VERSION,
"com.amazonaws:aws-java-sdk-cloudwatch:" + AWS_JAVA_SDK_VERSION,
"com.beust:jcommander:1.82",
"com.google.cloud.functions.invoker:java-function-invoker:1.1.0",
Expand All @@ -138,6 +141,8 @@ maven_install(
"com.google.cloud:google-cloud-storage:2.32.1",
"com.google.cloud:google-cloud-spanner:6.56.0",
"com.google.cloud:google-cloud-compute:1.44.0",
"com.google.cloud:google-cloud-logging:1.92.0",
"com.google.api.grpc:proto-google-cloud-logging-v2:0.109.0",
"com.google.api.grpc:proto-google-cloud-compute-v1:1.44.0",
"com.google.cloud.functions:functions-framework-api:1.1.0",
"commons-logging:commons-logging:1.3.0",
Expand Down
2 changes: 1 addition & 1 deletion build-scripts/DEBIAN_CONTAINER_DIGEST
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sha256:16112ae93b810eb1ec6d1db6e01835d2444c8ca99aa678e03dd104ea3ec68408
sha256:903d3225acecaa272bbdd7273c6c312c2af8b73644058838d23a8c9e6e5c82cf
2 changes: 1 addition & 1 deletion build-scripts/gcp/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
steps:
- name: '$_BUILD_IMAGE_REPO_PATH/bazel-build-container:$_VERSION'
script: |
bazel run worker/gcp:worker_mp_gcp_g3p_prod -- -dst "$_IMAGE_REPO_PATH/$_IMAGE_NAME:$_IMAGE_TAG"
bazel run worker/gcp:worker_mp_gcp_prod -- -dst "$_IMAGE_REPO_PATH/$_IMAGE_NAME:$_IMAGE_TAG"
bazel run //terraform/gcp:frontend_service_http_cloud_function_release \
--//terraform/gcp:bucket_flag=$_JARS_PUBLISH_BUCKET --//terraform/gcp:bucket_path_flag=$_JARS_PUBLISH_BUCKET_PATH \
-- --version=$_VERSION
Expand Down
6 changes: 3 additions & 3 deletions build_defs/container_dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
# - java_base: Distroless image for running Java.
################################################################################

# Updated as of: 2024-07-26
# Updated as of: 2024-09-08

CONTAINER_DEPS = {
"amazonlinux_2": {
"digest": "sha256:b2ed30084a71c34c0f41a5add7dd623a2e623f2c3b50117c720bbc02d7653fa1",
"digest": "sha256:238da73d5f7e26f01b30f1e30b4a7156d3b344d9368c278ef5bd14d2294f27f6",
"registry": "index.docker.io",
"repository": "amazonlinux",
},
Expand All @@ -38,7 +38,7 @@ CONTAINER_DEPS = {
"repository": "aws-observability/aws-otel-collector",
},
"java_base": {
"digest": "sha256:c7846b62436ccf2961972fea5b776527610a1a51b48d8e7b434287146904cf2d",
"digest": "sha256:587ce66b08faea2e2e1568d6bb6c5fd6b085909621f4c14762206d687ff7d202",
"registry": "gcr.io",
"repository": "distroless/java17-debian11",
},
Expand Down
12 changes: 12 additions & 0 deletions build_defs/shared_libraries/v1.10_serverless_connector.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
diff --git a/operator/terraform/gcp/modules/vpc/serverless-connector.tf b/operator/terraform/gcp/modules/vpc/serverless-connector.tf
index be2932bae..0c17a0da5 100644
--- a/operator/terraform/gcp/modules/vpc/serverless-connector.tf
+++ b/operator/terraform/gcp/modules/vpc/serverless-connector.tf
@@ -1,6 +1,7 @@
module "serverless-connector" {
count = var.create_connectors ? 1 : 0
source = "terraform-google-modules/network/google//modules/vpc-serverless-connector-beta"
+ version = "<= 9.1.0"
project_id = var.project_id
vpc_connectors = [
for index, region in tolist(var.regions) : {
11 changes: 11 additions & 0 deletions docs/collecting.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ for examples.
- Reporting URL:
`http://adtech.localhost/.well-known/attribution-reporting/debug/report-aggregate-attribution`

The Private Aggregation API is used to support reports triggered within a Protected Audience or
Shared Storage context. The predefined endpoints for those two use cases are defined
[here](https://github.com/patcg-individual-drafts/private-aggregation-api/blob/main/README.md#reports).

1. Protected Audience
- Reporting URL:
`http://adtech.localhost/.well-known/private-aggregation/report-protected-audience`
1. Shared Storage
- Reporting URL:
`http://adtech.localhost/.well-known/private-aggregation/report-shared-storage`

_The `.well-known/...` paths are predefined paths which can not be customized. To collect reports,
you need to run an endpoint that can respond to POST requests on the above paths._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,14 @@ public final class AggregationWorkerArgs {
description = "Overrides the region of the compute instance.")
private String adtechRegionOverride = "";

// TODO(b/241266079): remove trusted_party_region_override after giving migration time.
@Parameter(
names = {"--trusted_party_region_override", "--coordinator_a_region_override"},
description = "Overrides the region of coordinator A services.")
// TODO(b/241266079): set default to us-east-1 once services move there.
private String coordinatorARegionOverride = "us-west-2";

@Parameter(
names = "--coordinator_b_region_override",
description = "Overrides the region of coordinator B services.")
// TODO(b/241266079): set default to us-east-1 once services move there.
private String coordinatorBRegionOverride = "us-west-2";

@Parameter(
Expand Down Expand Up @@ -334,25 +331,21 @@ public final class AggregationWorkerArgs {
@Parameter(
names = "--coordinator_a_privacy_budgeting_endpoint",
description = "Coordinator A's HTTP endpoint for privacy budgeting.")
// TODO(b/218508112): Better default value
private String coordinatorAPrivacyBudgetingEndpoint = "https://foo.com/v1";

@Parameter(
names = "--coordinator_a_privacy_budget_service_auth_endpoint",
description = "Coordinator A's Auth endpoint for privacy budgeting service.")
// TODO(b/218508112): Better default value
private String coordinatorAPrivacyBudgetServiceAuthEndpoint = "https://foo.com/auth";

@Parameter(
names = "--coordinator_b_privacy_budgeting_endpoint",
description = "Coordinator B's HTTP endpoint for privacy budgeting.")
// TODO(b/218508112): Better default value
private String coordinatorBPrivacyBudgetingEndpoint = "https://bar.com/v1";

@Parameter(
names = "--coordinator_b_privacy_budget_service_auth_endpoint",
description = "Coordinator B's Auth endpoint for privacy budgeting service.")
// TODO(b/218508112): Better default value
private String coordinatorBPrivacyBudgetServiceAuthEndpoint = "https://bar.com/auth";

@Parameter(names = "--noising_distribution", description = "Distribution to use for noising.")
Expand Down Expand Up @@ -392,6 +385,11 @@ public final class AggregationWorkerArgs {
+ " http://localhost:4317")
private String grpcCollectorEndpoint = "http://localhost:4317";

@Parameter(
names = "--otel_logs_enabled",
description = "Flag to enable the otel to export the logs.")
private boolean otelLogsEnabled = false;

@Parameter(
names = "--return_stack_trace",
description =
Expand Down Expand Up @@ -721,6 +719,10 @@ String getGrpcCollectorEndpoint() {
return grpcCollectorEndpoint;
}

boolean isOTelLogsEnabled() {
return otelLogsEnabled;
}

public boolean isEnableReturningStackTraceInResponse() {
return enableReturningStackTraceInResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.aggregate.adtech.worker.Annotations.EnablePrivacyBudgetKeyFiltering;
import com.google.aggregate.adtech.worker.Annotations.EnableStackTraceInResponse;
import com.google.aggregate.adtech.worker.Annotations.EnableThresholding;
import com.google.aggregate.adtech.worker.Annotations.InstanceId;
import com.google.aggregate.adtech.worker.Annotations.MaxDepthOfStackTrace;
import com.google.aggregate.adtech.worker.Annotations.NonBlockingThreadPool;
import com.google.aggregate.adtech.worker.Annotations.OutputShardFileSizeBytes;
Expand Down Expand Up @@ -69,6 +70,7 @@
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.OptionalBinder;
import com.google.privacysandbox.otel.Annotations.EnableOTelLogs;
import com.google.privacysandbox.otel.Annotations.GrpcOtelCollectorEndpoint;
import com.google.scp.operator.cpio.blobstorageclient.aws.S3BlobStorageClientModule.S3EndpointOverrideBinding;
import com.google.scp.operator.cpio.blobstorageclient.aws.S3BlobStorageClientModule.S3UsePartialRequests;
Expand Down Expand Up @@ -121,6 +123,7 @@
import javax.inject.Singleton;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.internal.util.EC2MetadataUtils;

public final class AggregationWorkerModule extends AbstractModule {

Expand Down Expand Up @@ -324,8 +327,8 @@ protected void configure() {

// Parameter to set exception cache. This is a test only flag.
bind(Long.class)
.annotatedWith(ExceptionCacheEntryTtlSec.class)
.toInstance(args.getExceptionCacheEntryTtlSec());
.annotatedWith(ExceptionCacheEntryTtlSec.class)
.toInstance(args.getExceptionCacheEntryTtlSec());

// Dependencies for privacy budgeting.
bind(PrivacyBudgetingServiceBridge.class).to(args.getPrivacyBudgeting().getBridge());
Expand Down Expand Up @@ -381,17 +384,19 @@ protected void configure() {

// Otel exporter.
switch (args.getOTelExporterSelector()) {
// Specifying CollectorEndpoint is required for GRPC exporter because aggregation service
// would send metric to the CollectorEndpoint and thus collector/exporter could collect.
// Specifying CollectorEndpoint is required for GRPC exporter because aggregation service
// would send metric to the CollectorEndpoint and thus collector/exporter could collect.
case GRPC:
bind(String.class)
.annotatedWith(GrpcOtelCollectorEndpoint.class)
.toInstance(args.getGrpcCollectorEndpoint());
break;
default:
// No need to bind anything for JSON.
case JSON:
break;
}
install(args.getOTelExporterSelector().getOTelConfigurationModule());
bind(boolean.class).annotatedWith(EnableOTelLogs.class).toInstance(args.isOTelLogsEnabled());

// Response related flags.
bind(boolean.class)
Expand Down Expand Up @@ -457,7 +462,6 @@ Supplier<ImmutableMap<String, String>> providesLocalFileJobParameters() {
@Singleton
@NonBlockingThreadPool
ListeningExecutorService provideNonBlockingThreadPool() {
// TODO(b/281572881): Investigate on optimal value for nonBlockingThreadPool size.
return MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(args.getNonBlockingThreadPoolSize()));
}
Expand All @@ -466,7 +470,6 @@ ListeningExecutorService provideNonBlockingThreadPool() {
@Singleton
@BlockingThreadPool
ListeningExecutorService provideBlockingThreadPool() {
// TODO(b/281572881): Investigate on optimal value for blockingThreadPool size.
return MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(args.getBlockingThreadPoolSize()));
}
Expand All @@ -477,4 +480,13 @@ ListeningExecutorService provideBlockingThreadPool() {
ListeningExecutorService provideCustomForkJoinThreadPool() {
return MoreExecutors.listeningDecorator(new ForkJoinPool(args.getNonBlockingThreadPoolSize()));
}

@Provides
@InstanceId
String provideInstanceID() {
if (EC2MetadataUtils.getInstanceId() == null) {
return "";
}
return EC2MetadataUtils.getInstanceId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ final class AggregationWorkerRunner {

public static void main(String[] args) {
logger.info("Worker Args: \n" + String.join("\n", args));
logger.info("Worker Max Heap Size (MiB): " + Runtime.getRuntime().maxMemory() / (1024 * 1024));

AggregationWorkerArgs cliArgs = new AggregationWorkerArgs();
JCommander.newBuilder().allowParameterOverwriting(true).addObject(cliArgs).build().parse(args);
Expand All @@ -48,23 +47,27 @@ public void failure(Service service) {
logger.error("Failure in the Aggregation Worker. Exiting the enclave process.");
System.exit(1);
}

@Override
public void healthy(){
public void healthy() {
logger.info("The aggregation worker is healthy.");
}
}, MoreExecutors.directExecutor());
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
// Give the service some time to stop to ensure that we are responsive to shutdown
// requests.
try {
workerServiceManager.stopAsync().awaitStopped(Duration.ofMinutes(1));
} catch (TimeoutException timeout) {
// Stopping timed out
logger.error("Unable to stop the worker service: " + timeout);
}
}
});
},
MoreExecutors.directExecutor());
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
public void run() {
// Give the service some time to stop to ensure that we are responsive to shutdown
// requests.
try {
workerServiceManager.stopAsync().awaitStopped(Duration.ofMinutes(1));
} catch (TimeoutException timeout) {
// Stopping timed out
logger.error("Unable to stop the worker service: " + timeout);
}
}
});
workerServiceManager.startAsync();
}
}
6 changes: 6 additions & 0 deletions java/com/google/aggregate/adtech/worker/Annotations.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,11 @@ public final class Annotations {
@Retention(RUNTIME)
public @interface CustomForkJoinThreadPool {}

/** Annotation for cloud instance id. */
@BindingAnnotation
@Target({FIELD, PARAMETER, METHOD})
@Retention(RUNTIME)
public @interface InstanceId {}

private Annotations() {}
}
2 changes: 2 additions & 0 deletions java/com/google/aggregate/adtech/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ java_library(
"//java/external:guava",
"//java/external:guice",
"//java/external:javax_inject",
"//java/external:opentelemetry_api",
"//java/external:operator_protos",
"//java/external:scp_shared_proto",
"//java/external:shared_model",
Expand Down Expand Up @@ -132,6 +133,7 @@ java_library(
"//java/external:guava",
"//java/external:guice",
"//java/external:javax_inject",
"//java/external:opentelemetry_api",
"//java/external:operator_protos",
"//java/external:scp_shared_proto",
"//java/external:shared_model",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.nio.file.Paths;
import java.util.stream.Stream;

/** TODO(b/226499868): Add test for the LocalResultLogger for Standalone worker library */
final class LocalResultLogger implements ResultLogger {

private final LocalResultFileWriter localResultFileWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.aggregate.adtech.worker.Annotations.EnablePrivacyBudgetKeyFiltering;
import com.google.aggregate.adtech.worker.Annotations.EnableStackTraceInResponse;
import com.google.aggregate.adtech.worker.Annotations.EnableThresholding;
import com.google.aggregate.adtech.worker.Annotations.InstanceId;
import com.google.aggregate.adtech.worker.Annotations.MaxDepthOfStackTrace;
import com.google.aggregate.adtech.worker.Annotations.NonBlockingThreadPool;
import com.google.aggregate.adtech.worker.Annotations.OutputShardFileSizeBytes;
Expand Down Expand Up @@ -67,6 +68,7 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.privacysandbox.otel.Annotations.EnableOTelLogs;
import com.google.privacysandbox.otel.OtlpJsonLoggingOTelConfigurationModule;
import com.google.scp.operator.cpio.jobclient.local.LocalFileJobHandlerModule;
import com.google.scp.operator.cpio.jobclient.local.LocalFileJobHandlerModule.LocalFileJobHandlerPath;
Expand Down Expand Up @@ -114,6 +116,7 @@ protected void configure() {
}
install(new WorkerModule());
install(new OtlpJsonLoggingOTelConfigurationModule());
bind(boolean.class).annotatedWith(EnableOTelLogs.class).toInstance(false);
bind(PrivacyBudgetingServiceBridge.class).to(PrivacyBudgetingSelector.UNLIMITED.getBridge());
install(new PrivacyBudgetKeyGeneratorModule());
bind(StopwatchExporter.class).to(NoOpStopwatchExporter.class);
Expand Down Expand Up @@ -236,4 +239,10 @@ ListeningExecutorService provideCustomForkJoinThreadPool() {
return MoreExecutors.listeningDecorator(
new ForkJoinPool(localWorkerArgs.getNonBlockingThreadPoolSize()));
}

@Provides
@InstanceId
String provideInstanceID() {
return "local";
}
}
Loading

0 comments on commit 2c28a00

Please sign in to comment.