Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Spark support in pipelinedp4j for end-to-end Differential Privacy #278

Merged
merged 38 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4401ec8
Spark kotlin setup
sakkumar Nov 10, 2024
a1d164f
jackson databind dependency issue
sakkumar Nov 11, 2024
78dee66
fix jacksondata bind version issue
sakkumar Nov 11, 2024
45345d3
Spark Encoders
sakkumar Nov 12, 2024
fd08491
Add Spark Table implementation
sakkumar Nov 16, 2024
70c0af3
spark table encoder
sakkumar Nov 16, 2024
8e36964
adding unit test for spark table
sakkumar Nov 16, 2024
86a2ec8
Add more unit tests for SparkTable
sakkumar Nov 17, 2024
c19df1b
spark encoder pair runtime exception
sakkumar Nov 17, 2024
62900f0
Use scala 2.13
sakkumar Nov 18, 2024
90c1862
Add more unit test for spark collection
sakkumar Nov 18, 2024
2be844f
remove spaces
sakkumar Nov 18, 2024
82b0e75
Pair<T1, T2> Encoder for Spark
sakkumar Nov 19, 2024
5e7ef96
resolve PR comments
sakkumar Nov 23, 2024
827ca27
gitignore /.ijwb/.idea/ files
sakkumar Nov 23, 2024
b9e6282
gitignore /.ijwb/.idea files
sakkumar Nov 23, 2024
d25d324
gitignore /.ijwb/.idea files
sakkumar Nov 23, 2024
9ef6d38
gitignore ijwb files
sakkumar Nov 23, 2024
e5cf4d1
create class rule to create spark session for each test class run
sakkumar Nov 24, 2024
af9a64a
Add implementation of samplePerKey for SparkTable
sakkumar Nov 24, 2024
bb8ff4b
Add QueryBuilder for Spark
sakkumar Nov 25, 2024
d51ab2f
resolve comments
sakkumar Nov 25, 2024
e6cd0f1
Spark QueryBuilder implementation
sakkumar Nov 26, 2024
488cfff
Remove comments
sakkumar Nov 26, 2024
bcff159
add copyright comment
sakkumar Nov 26, 2024
cd91c0a
rename variables
sakkumar Nov 26, 2024
d537548
Add copyright for files
sakkumar Nov 26, 2024
5b0649a
Added comment and renamed variables in filterKeysStoredInSparkCollection
sakkumar Nov 26, 2024
809f524
Added formatting changes for new liner
sakkumar Nov 26, 2024
6b48b47
SparkExample for end-to-end Differential Privacy
sakkumar Nov 27, 2024
d62c6f5
Remove println
sakkumar Nov 28, 2024
f3cda40
resolve comments
sakkumar Nov 28, 2024
b87c7fc
resolve comments
sakkumar Nov 28, 2024
5f7eda2
spark kryo serializer requires class to be public
sakkumar Nov 28, 2024
60abc3d
resolve comments
sakkumar Nov 28, 2024
33426db
Merge with origin
sakkumar Nov 28, 2024
f95557d
Correct Java comment
sakkumar Nov 28, 2024
26ea30b
Format files
sakkumar Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,12 @@
**/bazel-java
**/bazel-out
**/bazel-testlogs

**/bazel-differential-privacy
**/.ijwb/
**/pipelinedp4j/.ijwb/
**/pipelinedp4j/bazel-pipelinedp4j
**/pipelinedp4j/MODULE**
**/examples/.idea/
**/examples/pipelinedp4j/bazel-pipelinedp4j
**/examples/pipelinedp4j/MODULE**
12 changes: 12 additions & 0 deletions examples/pipelinedp4j/WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ maven_install(
"org.apache.beam:beam-sdks-java-core:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-avro:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-protobuf:2.49.0",

"info.picocli:picocli:4.7.6",
# For Apache Spark
"org.apache.spark:spark-core_2.13:3.3.2",
"org.apache.spark:spark-sql_2.13:3.3.2",
"org.apache.spark:spark-mllib_2.13:3.3.2",
"org.apache.spark:spark-catalyst_2.13:3.3.2",
"com.fasterxml.jackson.core:jackson-databind:2.14.2",
"com.fasterxml.jackson.module:jackson-module-paranamer:2.14.2",
"com.fasterxml.jackson.module:jackson-module-scala_2.13:2.14.2",
"org.scala-lang:scala-library:2.13.12",

# For logging to console.
"org.slf4j:slf4j-jdk14:1.7.36",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ java_binary(
"@maven//:org_slf4j_slf4j_jdk14",
],
)

java_binary(
name = "SparkExample",
srcs = [
"SparkExample.java",
"MovieMetrics.java",
"MovieView.java",
],
main_class = "com.google.privacy.differentialprivacy.pipelinedp4j.examples.SparkExample",
deps = [
"@com_google_privacy_differentialprivacy_pipielinedp4j//main/com/google/privacy/differentialprivacy/pipelinedp4j/api",
"@maven//:com_google_guava_guava",
"@maven//:info_picocli_picocli",
"@maven//:org_jetbrains_kotlin_kotlin_stdlib",

"@maven//:org_apache_spark_spark_core_2_13",
"@maven//:org_apache_spark_spark_sql_2_13",
"@maven//:org_apache_spark_spark_mllib_2_13",
"@maven//:org_apache_spark_spark_catalyst_2_13",
"@maven//:com_fasterxml_jackson_core_jackson_databind",
"@maven//:com_fasterxml_jackson_module_jackson_module_paranamer",
"@maven//:org_scala_lang_scala_library",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,23 @@ static void runBeamExample(BeamExampleOptions options) {

// Data extractors. They always have to implement Function1 and Serializable interfaces. If it
// doesn't implement Serializable interface, it will fail on Beam. If it doesn't implement
// Function1, it will at compile time due to types mismatch. Do not use lambdas for data
// Function1, it will fail at compile time due to types mismatch. Do not use lambdas for data
// extractors as they won't be serializable.
static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
private static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getUserId();
}
}

static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
private static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getMovieId();
}
}

static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
private static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
@Override
public Double invoke(MovieView movieView) {
return movieView.getRating();
Expand All @@ -185,7 +185,7 @@ private static PCollection<MovieView> readData(Pipeline pipeline, String inputFi

/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 1-10.
* groups 4500-4509.
*/
private static PCollection<String> publiclyKnownMovieIds(Pipeline pipeline) {
var publicGroupsAsJavaList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* <p>It is the result of the DP metrics query.
*/
final class MovieMetrics {
public final class MovieMetrics {
private final String movieId;

private final long numberOfViewers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;

/** Represents a single movie view from the Netflix dataset. */
final class MovieView {
public final class MovieView {
private final String userId;
private final String movieId;
private final Double rating;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.privacy.differentialprivacy.pipelinedp4j.examples;

import static java.lang.Math.round;
import static java.util.stream.Collectors.toCollection;

import com.google.privacy.differentialprivacy.pipelinedp4j.api.NoiseKind;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryBuilder;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryPerGroupResult;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.TotalBudget;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.stream.IntStream;
import kotlin.jvm.functions.Function1;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

/**
* An end-to-end example how to compute DP metrics on a Netflix dataset using the library on Spark.
*
* <p>See README for details including how to run the example.
*/
@Command(
name = "SparkExample",
version = {"SparkExample 1.0"},
mixinStandardHelpOptions = true)
public class SparkExample implements Runnable {
@Option(
names = "--use-public-groups",
description =
"If true we will assume in the example that movie ids are publicly known and are from "
+ "4500 to 4509"
+ ". Default is false, i.e. we will choose movie ids in a differentially"
+ " private way.",
defaultValue = "false")
private boolean usePublicGroups = false;

@Option(
names = "--local-input-file-path",
description =
"Input file. For using as input file you can download data from"
+ " https://www.kaggle.com/datasets/netflix-inc/netflix-prize-data. Use only part of"
+ " it to speed up the calculations.",
required = true)
private String localInputFilePath;

@Option(
names = "--local-output-file-path",
description = "Output file.",
defaultValue = "/tmp/anonymized_output/")
private String localOutputFilePath;

public static void main(String[] args) {
int exitCode = new CommandLine(new SparkExample()).execute(args);
System.exit(exitCode);
}

@Override
public void run() {
System.out.println("Starting calculations...");
SparkSession spark = initSpark();
// Read the input data, these are movie views that contain movie id, user id and rating.
Dataset<MovieView> data = readData(spark);

// Define the query
var query =
QueryBuilder.from(data, /* privacyIdExtractor= */ new UserIdExtractor())
.groupBy(
/* groupKeyExtractor= */ new MovieIdExtractor(),
/* maxGroupsContributed= */ 3,
/* maxContributionsPerGroup= */ 1,
usePublicGroups ? publiclyKnownMovieIds(spark) : null)
.countDistinctPrivacyUnits("numberOfViewers")
.count(/* outputColumnName= */ "numberOfViews")
.mean(
new RatingExtractor(),
/* minValue= */ 1.0,
/* maxValue= */ 5.0,
/* outputColumnName= */ "averageOfRatings",
/* budget= */ null)
.build();
// Run the query with DP parameters.
Dataset<QueryPerGroupResult> result =
query.run(new TotalBudget(/* epsilon= */ 1.1, /* delta= */ 1e-10), NoiseKind.LAPLACE);

// Convert the result to better representation, i.e. to MovieMetrics.
Encoder<MovieMetrics> movieMetricsEncoder = Encoders.kryo(MovieMetrics.class);
MapFunction<QueryPerGroupResult, MovieMetrics> mapToMovieMetricsFn =
perGroupResult -> {
String movieId = perGroupResult.getGroupKey();
long numberOfViewers =
round(perGroupResult.getAggregationResults().get("numberOfViewers"));
long numberOfViews = round(perGroupResult.getAggregationResults().get("numberOfViews"));
double averageOfRatings = perGroupResult.getAggregationResults().get("averageOfRatings");
return new MovieMetrics(movieId, numberOfViewers, numberOfViews, averageOfRatings);
};
// We now have our anonymized metrics of movie views.
Dataset<MovieMetrics> anonymizedMovieMetrics =
result.map(mapToMovieMetricsFn, movieMetricsEncoder);

// Save the result to a file.
writeOutput(anonymizedMovieMetrics);

// Stop spark session
spark.stop();
System.out.println("Finished calculations.");
}

// Data extractors. They always have to implement Function1 and Serializable interfaces. If it
// doesn't implement Serializable interface, it will fail on Spark. If it doesn't implement
// Function1, it will fail at compile time due to types mismatch. Do not use lambdas for data
// extractors as they won't be serializable.
private static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getUserId();
}
}

private static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getMovieId();
}
}

private static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
@Override
public Double invoke(MovieView movieView) {
return movieView.getRating();
}
}

private static SparkSession initSpark() {
return SparkSession.builder()
.appName("Kotlin Spark Example")
.master("local[*]")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate();
}

private Dataset<MovieView> readData(SparkSession spark) {
Dataset<Row> inputDataFrame = spark.read().option("header", "false").csv(localInputFilePath);
MapFunction<Row, MovieView> mapToMovieView =
row ->
new MovieView(
row.getString(1), row.getString(0), java.lang.Double.valueOf((String) row.get(2)));
return inputDataFrame.map(mapToMovieView, Encoders.kryo(MovieView.class));
}

/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 4500-4509.
*/
private static Dataset<String> publiclyKnownMovieIds(SparkSession spark) {
ArrayList<String> publicGroupsAsJavaList =
IntStream.rangeClosed(4500, 4509)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayList::new));
return spark.createDataset(publicGroupsAsJavaList, Encoders.STRING());
}

private void writeOutput(Dataset<MovieMetrics> result) {
Dataset<String> lines =
result.map((MapFunction<MovieMetrics, String>) MovieMetrics::toString, Encoders.STRING());
lines
.write()
.mode(SaveMode.Overwrite) // Overwrite existing file if any
.text(localOutputFilePath);
}
}
17 changes: 17 additions & 0 deletions pipelinedp4j/.bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


build --java_language_version=11
build --java_runtime_version=remotejdk_11
4 changes: 4 additions & 0 deletions pipelinedp4j/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pom_file(
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:accumulators_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:dpaggregates_kt_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:dpaggregates_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_dp_engine_factory",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_encoders",

],
template_file = "pom.template",
)
Expand Down
10 changes: 10 additions & 0 deletions pipelinedp4j/WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ maven_install(
"org.apache.beam:beam-sdks-java-core:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-avro:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-protobuf:2.49.0",

"org.apache.spark:spark-core_2.13:3.3.2",
"org.apache.spark:spark-sql_2.13:3.3.2",
"org.apache.spark:spark-mllib_2.13:3.3.2",
"org.apache.spark:spark-catalyst_2.13:3.3.2",
"com.fasterxml.jackson.core:jackson-databind:2.14.2",
"com.fasterxml.jackson.module:jackson-module-paranamer:2.14.2",
"com.fasterxml.jackson.module:jackson-module-scala_2.13:2.14.2",
"org.scala-lang:scala-library:2.13.12",

# Test only dependencies.
maven.artifact(
"com.google.truth",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ kt_jvm_library(
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/local:local_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/local:local_encoders",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:dpaggregates_kt_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_encoders",
"@maven//:com_google_errorprone_error_prone_annotations",
"@maven//:com_google_guava_guava",
"@maven//:org_apache_beam_beam_sdks_java_core",
"@maven//:org_apache_beam_beam_sdks_java_extensions_avro",

],
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import com.google.privacy.differentialprivacy.pipelinedp4j.core.EncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.core.FrameworkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.local.LocalCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.local.LocalEncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkEncoderFactory
import org.apache.beam.sdk.values.PCollection as BeamPCollection
import org.apache.spark.sql.Dataset

/**
* An internal interface to represent an arbitrary collection that is supported by PipelineDP4j.
Expand Down Expand Up @@ -52,3 +55,10 @@ internal data class LocalPipelineDpCollection<T>(val data: Sequence<T>) : Pipeli

override fun toFrameworkCollection() = LocalCollection<T>(data)
}

/** Spark Collection represented as a Spark Dataset. */
internal data class SparkPipelineDpCollection<T>(val data: Dataset<T>) : PipelineDpCollection<T> {
override val encoderFactory = SparkEncoderFactory()

override fun toFrameworkCollection() = SparkCollection<T>(data)
}
Loading
Loading