Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ExtractDomain to extract apex domains. #520

Merged
merged 2 commits into from
Oct 21, 2021
Merged

Update ExtractDomain to extract apex domains. #520

merged 2 commits into from
Oct 21, 2021

Conversation

ruebot
Copy link
Member

@ruebot ruebot commented Oct 4, 2021

GitHub issue(s): #519

What does this Pull Request do?

  • Resolves ExtractDomains returns non-Apex Domains #519
  • Add scala-uri as a dependency
  • Replace getHost method of extracting domains with apexDomain from scala-uri
  • Update tests as needed
  • Removed unused source parameter from ExtractDomain

How should this be tested?

Additional Notes:

  1. I removed the source parameter since I honestly had no idea what it was for, or what the use case was. Couldn't really see anything in the commit history. @ianmilligan1 @lintool do y'all recall what it was for?

  2. Once this is merged, I'll cut a new release since we'll need it for ARCH.

- Resolves #519
- Add scala-uri as a dependency
- Replace getHost method of extracting domains with apexDomain from scala-uri
- Update tests as needed
- Removed unused source parameter from ExtractDomain
@ruebot
Copy link
Member Author

ruebot commented Oct 4, 2021

Well, this blows up in Spark 3.0.3 and 3.1.2 with:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
  at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1094)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.reduce(RDD.scala:1076)
  at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1498)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1486)
  at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:183)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
  ... 47 elided
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.lemonlabs.uri.Url$
  at io.archivesunleashed.matchbox.ExtractDomain$.apply(ExtractDomain.scala:33)
  at io.archivesunleashed.ArchiveRecordImpl.<init>(ArchiveRecord.scala:185)
  at io.archivesunleashed.package$RecordLoader$.$anonfun$loadArchives$2(package.scala:116)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)

io.lemonlabs.uri.Url is in the fatjar 🤷‍♂️

@ianmilligan1
Copy link
Member

I removed the source parameter since I honestly had no idea what it was for, or what the use case was. Couldn't really see anything in the commit history. @ianmilligan1 @lintool do y'all recall what it was for?

No clue. Went back in the blame and looks like it's been there forever (maybe from when Alice was working on this five or six years ago). If I had to guess it was some janky error handling. I think yanking it makes sense.

@ianmilligan1
Copy link
Member

Well, this blows up in Spark 3.0.3 and 3.

😞

(that sucks! and odd behaviour... )

@ruebot
Copy link
Member Author

ruebot commented Oct 4, 2021

I built the branch on tuna, and ending up hitting the error as described in lemonlabsuk/scala-uri#341 with Spark 3.1.2.

Caused by: java.lang.NoSuchMethodError: cats.kernel.Eq$.catsKernelInstancesForString()Lcats/kernel/Order;
  at io.lemonlabs.uri.Url$.<init>(Uri.scala:597)
  at io.lemonlabs.uri.Url$.<clinit>(Uri.scala)
  at io.archivesunleashed.matchbox.ExtractDomain$.apply(ExtractDomain.scala:33)
  at io.archivesunleashed.ArchiveRecordImpl.<init>(ArchiveRecord.scala:185)
  at io.archivesunleashed.package$RecordLoader$.$anonfun$loadArchives$2(package.scala:116)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)

@lintool
Copy link
Member

lintool commented Oct 4, 2021

  1. I removed the source parameter since I honestly had no idea what it was for, or what the use case was. Couldn't really see anything in the commit history. @ianmilligan1 @lintool do y'all recall what it was for?

My memory is faint on this, but - per docs, "source an optional default url for urls with no valid domain host".

I think we've run into cases before where the full URL that we're trying to extract from was mangled. Usually, we're doing extraction on an URL on some page - so we set "source" to that page. Rationale is that if the URL is mangled, we can back up to the page that URL was found on.

That was the original rationale. As to whether it's a good one, that's a different matter...

- Caused by: io.lemonlabs.uri.parsing.UriParsingException: Invalid URL could not be parsed. Error(4,NonEmptyList(EndOfString(4,146)))
@codecov
Copy link

codecov bot commented Oct 21, 2021

Codecov Report

Merging #520 (9a343e9) into main (8104a65) will increase coverage by 0.20%.
The diff coverage is 95.83%.

@@             Coverage Diff              @@
##               main     #520      +/-   ##
============================================
+ Coverage     88.83%   89.04%   +0.20%     
  Complexity       57       57              
============================================
  Files            43       43              
  Lines          1012     1022      +10     
  Branches         85       83       -2     
============================================
+ Hits            899      910      +11     
  Misses           74       74              
+ Partials         39       38       -1     

@ruebot
Copy link
Member Author

ruebot commented Oct 21, 2021

Tested locally on Spark 3.1.1:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import io.archivesunleashed._
import io.archivesunleashed.udfs._
import io.archivesunleashed.app._

val webpages = RecordLoader.loadArchives("/home/nruest/Projects/au/sample-data/geocities/GEOCITIES-20091027143841-00136-ia400104.us.archive.org.warc.gz", sc).webpages()

val domain_count = webpages.groupBy(removePrefixWWW(extractDomain($"Url")).alias("url")).count().sort($"count".desc)

domain_count.show()

// Exiting paste mode, now interpreting.

21/10/21 12:24:07 WARN PDFParser: J2KImageReader not loaded. JPEG2000 files will not be processed.
See https://pdfbox.apache.org/2.0/dependencies.html#jai-image-io
for optional dependencies.

21/10/21 12:24:07 WARN SQLite3Parser: org.xerial's sqlite-jdbc is not loaded.
Please provide the jar on your classpath to parse sqlite files.
See tika-parsers/pom.xml for the correct version.
+------------------+-----+                                                      
|               url|count|
+------------------+-----+
|     geocities.com|13265|
|        amazon.com|   52|
|aplustemplates.com|    1|
|   takashi0808.com|    1|
|      freelogs.com|    1|
|     st-armada.com|    1|
|    htmlplanet.com|    1|
+------------------+-----+

import io.archivesunleashed._
import io.archivesunleashed.udfs._
import io.archivesunleashed.app._
webpages: org.apache.spark.sql.DataFrame = [crawl_date: string, url: string ... 4 more fields]
domain_count: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [url: string, count: bigint]

I'm doing the same test on tuna now on the full geocities dataset, and everything seems to be running smoothly.

I'm not 100% happy with my nest try and catch to swallow Url.parse exceptions, but we can loop back to that some other time.

@ianmilligan1 @lintool any objects to me merging once the tuna job finishes, and cutting a dot release?

Copy link
Member

@ianmilligan1 ianmilligan1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delay (was just on a call).

Looks good! Builds locally and sample scripts running on my end. I am happy for this to be merged when you're good to go @ruebot.

@ruebot
Copy link
Member Author

ruebot commented Oct 21, 2021

Ran without issue on tuna!

+--------------------+--------+                                                 
|                 url|   count|
+--------------------+--------+
|       geocities.com|57922226|
|           yahoo.com| 1110551|
|          amazon.com|   79819|
|         myspace.com|   67706|
|        bravenet.com|   62904|
|         youtube.com|   54087|
|           1mgis.com|   46776|
|          google.com|   46757|
|  viewonbuddhism.org|   46254|
|       physforum.com|   44282|
|         forumer.com|   44223|
|internetarchaeolo...|   42428|
|       bravehost.com|   42388|
| econtableyss.com.ar|   39910|
|             tvoe.tv|   39323|
|       google.com.br|   38472|
|       monteriski.ca|   37527|
|             lds.org|   37277|
|           bagus.com|   36675|
|     figueirense.net|   36473|
+--------------------+--------+
only showing top 20 rows

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ExtractDomains returns non-Apex Domains
3 participants