Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Gemini file-level duplicate detection on PGA #42

Open
bzz opened this issue Jan 23, 2018 · 33 comments
Open

Run Gemini file-level duplicate detection on PGA #42

bzz opened this issue Jan 23, 2018 · 33 comments
Assignees

Comments

@bzz
Copy link
Contributor

bzz commented Jan 23, 2018

Document in README the resources, needed to successfully process 1k, 2k, 10k, 100k and whole PGA of the .siva files.

So good start would be

  • document the known configuration of the cluster we use internally
  • running Gemini hash, documenting how long does it take to finish,
  • what CPU-load, Mem, IO-thoughtput workload it creates on that cluster (i.e from Sysdig Dashbord, to get access file an issue)
  • see which resource is a bottleneck
  • try to optimize, in order to utilize that resource better (i.e in case of throughput - have more executor JVMs running on the same machine)
  • see if we are hit and can help with some Engine issues
@smacker
Copy link
Contributor

smacker commented Jan 23, 2018

optimize, in order to utilize that resource better (i.e in case of throughput - have more executor JVMs running on the same machine)

how do to that? we don't control spark cluster.

@bzz
Copy link
Contributor Author

bzz commented Jan 23, 2018

how do to that? we don't control spark cluster.

let's measure, identify and document the bottleneck first, set preliminary expectations on resources for 100k and then discuss the possible options that we might have i.e this can be powerful argument for changing https://github.com/src-d/charts/tree/master/spark to Apache Spark on k8s.

We would be able to improve the performance expectation model, based on more data later on.

@smacker
Copy link
Contributor

smacker commented Jan 25, 2018

@bzz
Copy link
Contributor Author

bzz commented Jan 25, 2018

Thanks for keeping it updated!

BTW, super-nice issue description and example how to reproduce 👍

@bzz bzz removed the review label Jan 31, 2018
@bzz
Copy link
Contributor Author

bzz commented Feb 8, 2018

Engine issue is resolved in https://github.com/src-d/engine/releases/tag/v0.5.1

@smacker
Copy link
Contributor

smacker commented Feb 8, 2018

yep. But the engine api has changed a bit. We need to update gemini.

@smacker
Copy link
Contributor

smacker commented Feb 9, 2018

Run gemini on new 1k dataset with new engine. And it works!!!!

The bad new is timing: 24 min.
I don't really know how to profile it, but I saw that only 1 job is taking much time, most probably there is 1 huge repo.

@bzz bzz changed the title Document performance expectations Performance expectations on duplicate detection Feb 21, 2018
@smacker
Copy link
Contributor

smacker commented Feb 26, 2018

10k has failed with https://github.com/src-d/engine/issues/332

@smacker
Copy link
Contributor

smacker commented Mar 2, 2018

currently is blocked by https://github.com/src-d/engine/issues/336

@bzz
Copy link
Contributor Author

bzz commented Mar 12, 2018

To move this forward, as DR team is super-busy now, can we please submit a PR to engine that just logs RevWalkException without failing, same way as MissingObjectException is handled and run Gemini with this custom built version of Engine from this PR to avoid waiting for a release?

@bzz
Copy link
Contributor Author

bzz commented Mar 27, 2018

@carlosms could you please check if https://github.com/src-d/engine/pull/347 solves the issue and allows us to move forward with #42 ?

If that PR is tested on real data and solves the issue - it may be worth posting this information on the PR as well.

@carlosms carlosms removed their assignment Apr 5, 2018
@bzz
Copy link
Contributor Author

bzz commented Apr 11, 2018

Engine 0.5.7 was release 🎉 with many bug fixes and discussion like https://github.com/src-d/minutes/pull/210/files#diff-a0ec2b18d53b6bebfc2a342ed864a52fR34 should rise the priority of finishing running Gemini file duplication up to PGA sizes.

@bzz bzz changed the title Performance expectations on duplicate detection Run Gemini file-level duplicate detection on PGA Apr 11, 2018
@bzz
Copy link
Contributor Author

bzz commented Apr 11, 2018

Title and description are updated to represent the current goal.

@smacker
Copy link
Contributor

smacker commented Apr 17, 2018

10k repos are processed successfully with engine 0.5.7. Full PGA is failing with OOM with default params. Need to tune them.

@bzz bzz self-assigned this May 3, 2018
@bzz
Copy link
Contributor Author

bzz commented May 3, 2018

Plan is:

  • make sure PGA is available on staging cluster
  • run Gemini \w latest Engine 0.6.1 on 1k, 10k, 100k, 200k
  • document, how to reproduce the results

@bzz
Copy link
Contributor Author

bzz commented May 7, 2018

PGA is downloading to the pipeline HDFS cluster on hdfs dfs -ls hdfs://hdfs-namenode/pga/siva/latest.

WIP by pga-alex pod with pga get -v -j 32 -o hdfs://hdfs-namenode:8020/pga 2>&1 | tee -a /go/pga-1.log

At this rate it will take ~25h to get there.

@bzz
Copy link
Contributor Author

bzz commented May 9, 2018

PGA download is finished 🎉 but it's a bit :suspect: as only 2.4Tb not 2.7Tb as the rumor has it to be. Would verify PGA integrity first with src-d/datasets#53

@bzz
Copy link
Contributor Author

bzz commented May 17, 2018

Pre-conditions for running new Gemini on pipeline staging Apache Spark cluster:

  • PGA downloaded to HDFS (WIP)
  • deploy Feature Extractors, same as bblfsh, collocated with Spark Workers, src-d/backlog#1266

@bzz bzz added the blocked label May 18, 2018
@bzz
Copy link
Contributor Author

bzz commented May 18, 2018

blocked by src-d/backlog#1266

@bzz
Copy link
Contributor Author

bzz commented Jun 8, 2018

  • FS are running
  • pga get second round, ETA 8h52m56

@bzz bzz removed the blocked label Jun 8, 2018
@bzz
Copy link
Contributor Author

bzz commented Jun 12, 2018

Full PGA was downloaded to HDFS 🎉 src-d/datasets#53 (comment)

$ zgrep -o "[0-9a-z]*\.siva" ~/.pga/latest.csv.gz | sort | uniq | wc -l
239807

$ hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest | grep -c "\.siva$"
239807

@bzz
Copy link
Contributor Author

bzz commented Jun 12, 2018

Plan

  1. WIP: run latest Gemini Hash \w latest Engine 0.6.3 on single shard (~10Gb, ~1000 repos, ~1/250 of whole) using current staging pipeline cluster configuration
  2. run on whole PGA
    Improvements directions:
    • will need more performance optimizations
      • share single feClient/connecting per partition, instead of creating for every row
      • uast de-serialize -> serialize -> feature extractor
      • RDD -> DataFrames
    • support on-disk Parquet "cache" at every stage
    • process each shard individually?

@bzz bzz added the blocked label Jun 18, 2018
@bzz
Copy link
Contributor Author

bzz commented Jun 18, 2018

Blocked, as all Feature Extractors are deployed under https://github.com/src-d/issues-infrastructure/issues/184 are part of new, separate Apache Spark cluster in a different k8s namespace -n feature-extractor, that does not seem to have access to HDFS 😕

@bzz
Copy link
Contributor Author

bzz commented Jun 25, 2018

Hash has finished successfully, I'm submitting PRs now to Gemini that enabled it.

Report is

  • cc.makeBuckets() 40min
  • Report.findConnectedComponents() ~6h

@bzz
Copy link
Contributor Author

bzz commented Jun 27, 2018

1h for hashing a ~1/250 of PGA on 3 machines of pipeline staging cluster

Configuration

  --conf "spark.executor.memory=16g" \
  --conf "spark.local.dir=/spark-temp-data" \
  --conf "spark.executor.extraJavaOptions='-Djava.io.tmpdir=/spark-temp-data -Dlog4j.configuration=log4j.properties'" \
  --conf "spark.driver.memory=8g" \
  --conf "spark.tech.sourced.engine.skip.read.errors=true" \
  --conf "spark.files.maxPartitionBytes=12582912" \
  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  --conf "spark.eventLog.enabled=true" \
  --conf "spark.eventLog.dir=hdfs://hdfs-namenode.default.svc.cluster.local/apps/gemini/spark-logs" \
  --files src/main/resources/log4j.properties \

Command

time MASTER="spark://fe-spark-spark-master:7077" ./hash -v \
  -k dockergemini4 \
  -h scylladb.default.svc.cluster.local \
  hdfs://hdfs-namenode.default.svc.cluster.local/pga/siva/latest/ff | tee hash-pga-ff-4.logs

Output

Feature Extraction exceptions  
Processed: 4304060, skipped: 120
 - TimeoutException -> 109
 - StatusRuntimeException -> 11

real    61m2.058s

FE exceptions

ERROR SparkFEClient: feature extractor error: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
ERROR SparkFEClient: feature extractor error: io.grpc.StatusRuntimeException: INTERNAL: Exception deserializing request!   
FATAL vendor/golang.org/x/text/unicode/norm/tables.go: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (4323138 vs. 4194304)

UAST extraction exceptions

WARN Bblfsh: FATAL src/main/java/org/yardstickframework/BenchmarkServerStartUp.java: EOF
WARN Bblfsh: FATAL xs6/extensions/crypt/crypt_ghash.js: message is not defined; unsupported: non-object root node
WARN Bblfsh: FATAL vendor/golang.org/x/text/encoding/charmap/tables.go: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5617479 vs. 4194304)

DB

$  kubectl exec -it scylladb-0 -- /bin/bash
$ cqlsh

use dockergemini4;
select count(1) from meta;
 127379

select count(1) from hashtables;
 426560

@marnovo
Copy link
Member

marnovo commented Jun 28, 2018

Thanks a lot for the detailed results, @bzz!

Question: how are we sampling the repos for each of these tests?

@bzz
Copy link
Contributor Author

bzz commented Jun 28, 2018

Question: how are we sampling the repos for each of these tests?

Good question. We always just used only a single shard of PGA dataset - all the repos, who's siva file names start with prefix /ff/.

Overall, on Apache Spark performance depends on data distribution A LOT, so attaching .siva file size distribution histogram in 10mb buckets

hdfs dfs -du hdfs://hdfs-namenode/pga/siva/latest/ff/ | grep "\.siva$" | awk -v "size=100048576" -f hist.awk
0 100048576 912
100048576 200097152 17
200097152 300145728 2
300145728 400194304 1
400194304 500242880 1
500242880 600291456
600291456 700340032
700340032 800388608
800388608 900437184
900437184 1000485760 1
1000485760 1100534336 1

@bzz
Copy link
Contributor Author

bzz commented Jul 4, 2018

Local: 1mb, 30k features
Cluster: 170Mb, 5.5mil features

DataFrame

local: 8sec, cluster: 4sec

val freqDf = features.withColumnRenamed("_1", "feature").withColumnRenamed("_2", "doc")
  .select("feature", "doc")
  .distinct
  .groupBy("feature")
  .agg(count("*").alias("cnt"))
  .map(row => (row.getAs[String]("feature"), row.getAs[Long]("cnt")))
  .collect().toMap

RDD

local: 4sec, cluster: 5s

val freq = features.rdd
  .map { case (feature, doc, _) => (feature, doc) }
  .distinct
  .map { case (token, _) => (token, 1) }
  .reduceByKey(_ + _)
  .collectAsMap()

DataFrame API does not seem to change performance much, but still has nice benefit of uniform API.

@bzz
Copy link
Contributor Author

bzz commented Jul 17, 2018

There are 141 .siva files bigger then 1Gb, with rest 260+k being smaller.
Those outliers can be moved, to get shorter tail of task execution time on average

hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest/ | grep "\.siva$" | awk '{if ($5 > 1073741824) print $8}' | wc -l
141

hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest/ \
  | grep "\.siva$" | awk '{if ($5 > 1073741824) print $8}' \
  | cut -d '/' -f 7- \
  | xargs -I{} sh -c 'hdfs dfs -mkdir -p $(dirname hdfs://hdfs-namenode/pga/siva/biggest/{}); hdfs dfs -mv hdfs://hdfs-namenode/pga/siva/latest/{} hdfs://hdfs-namenode/pga/siva/biggest/{}'

@bzz
Copy link
Contributor Author

bzz commented Jul 17, 2018

After moving biggest files, jobs fail with

org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 32343809.

After setting spark.kryoserializer.buffer.max=1g jobs fail with

tech.sourced.siva.SivaException: Exception at file 022c7272f0c1333a536cb319beadc4171cc8ff6a.siva: At Index footer, index size: Java implementation of siva doesn't support values greater than 9223372036854775807

which at this point might indicate broken .siva files on pga get

@smacker
Copy link
Contributor

smacker commented Jul 17, 2018

@bzz here is your issue: https://github.com/src-d/engine/issues/414
different file but the same error.

@bzz
Copy link
Contributor Author

bzz commented Jul 26, 2018

Simple processing of full PGA from /pga2 \w Engine finished in 59.1 h, using 16cores/8Gb RAM on 9 machines on staging pipeline cluster 🎉

Removing outliers, ~140 .siva files (of ~270k) which are >1Gb each, would speed it up x2-3 times.

export SPARK_HOME="/opt/spark-2.2.0-bin-hadoop2.7"
$SPARK_HOME/bin/spark-shell --master "spark://fe-spark-spark-master:7077" \
  --name "Spark shell \w Gemini - PGA2, count file sizes" \
  --conf "spark.executor.memory=8g" \
  --conf "spark.cores.max=144" \
  --conf "spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=metastore_db2;create=true" \
  --conf "spark.local.dir=/spark-temp-data" \
  --conf "spark.executor.extraJavaOptions=-Djava.io.tmpdir=/spark-temp-data -Dlog4j.configuration=log4j.properties -XX:+HeapDumpOnOutOfMemoryError -XX:MaxDirectMemorySize=20G" \
  --conf "spark.driver.memory=8g" \
  --conf "spark.tech.sourced.engine.skip.read.errors=true" \
  --conf "spark.files.maxPartitionBytes=12582912" \
  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  --conf "spark.kryoserializer.buffer.max=1g" \
  --conf "spark.eventLog.enabled=true" \
  --conf "spark.eventLog.dir=hdfs://hdfs-namenode.default.svc.cluster.local/apps/spark" \
  --files src/main/resources/log4j.properties \
  --jars target/gemini-deps.jar,target/gemini-uber.jar
val files = repos.getHEAD
  .getCommits
  .getTreeEntries
  .getBlobs
  .filter('is_binary === false)

files
 .sort("repository_id")
 .coalesce(1000)
 .write.parquet("hdfs://hdfs-namenode.default.svc.cluster.local/pga2/parquet/files")

Caching all files on-disk in Parquet fails though, \w

Job aborted due to stage failure: Total size of serialized results of XX tasks (1025 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

This happens due to the fact that DF API for a String column keeps in-memory the longest string, which is a full file content and is more then 1Gb.

@bzz
Copy link
Contributor Author

bzz commented Aug 16, 2018

On-disk Parquet cache was failing due to the number of tasks ~40k beeing to high for our clusted configuration, was fixed by reducing the number and can proceed over full PGA (~50h) 🎉 but is failing at the end now 😖 with

Job aborted due to stage failure: Task 49 in stage 4.1 failed 4 times, most recent failure:
Lost task 49.3 in stage 4.1 (TID 27912, 10.2.56.116, executor 1):
java.io.FileNotFoundException: /spark-temp-data/spark-fb7fbf1e-033e-4122-9464-16acdc52fe34/executor-801bb555-6e93-4c4e-b3f8-46bc33ca9639/blockmgr-309ecee9-419e-45ab-a595-03dc3157b641/26/temp_shuffle_2eddf3bb-b5c0-488c-b397-47e4e4921a32 
(No such file or directory)

Simple example to reproduce

val path = "hdfs://hdfs-namenode.default.svc.cluster.local/pga2/siva/latest/"
val engine = Engine(spark, path, "siva")
val repos = engine.getRepositories

val files = repos.getHEAD
  .getCommits
  .getTreeEntries
  .getBlobs
  .filter('is_binary === false)

files
 .coalesce(1000)
 .sort("repository_id")
 .write.parquet("hdfs://hdfs-namenode.default.svc.cluster.local/pga2/parquet/files")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants