Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot find siva file in complex query #364

Open
smola opened this issue Apr 9, 2018 · 2 comments
Open

Cannot find siva file in complex query #364

smola opened this issue Apr 9, 2018 · 2 comments
Labels

Comments

@smola
Copy link
Contributor

smola commented Apr 9, 2018

Expected Behavior

The following query should not crash:

-- Repository count per language precense.
-- If any file in language X is present in HEAD of a repository,
-- it contributes one for the language count.
-- Forks are excluded.
--
-- category: slow,enry
SELECT
    language,
    COUNT(repository_id) AS repository_count
FROM (
    SELECT DISTINCT
        t.repository_id AS repository_id,
        COALESCE(
            classifyLanguages(b.is_binary, t.path, b.content),
            'Unknown') AS language
    FROM
        tree_entries t JOIN blobs b
        ON
            t.repository_id = b.repository_id AND
            t.reference_name = b.reference_name AND
            t.blob = b.blob_id
        JOIN (
            SELECT DISTINCT
                s.repository_id AS repository_id
            FROM (
                SELECT
                    hash,
                    MAX(STRUCT(index, repository_id)) AS s
                FROM commits
                WHERE
                    index != 1
                    AND reference_name = 'refs/heads/HEAD'
                    AND SIZE(parents) == 0
                GROUP BY hash
            ) AS q1
        ) AS r
        ON
            b.repository_id = r.repository_id
    WHERE
        t.reference_name = 'refs/heads/HEAD'
    ) AS q2
GROUP BY language
ORDER BY repository_count DESC

Current Behavior

Query immediately crashes:

Relation in the other, or the Join type is not supported.
[Stage 2:>                  (0 + 9) / 9][Stage 4:>                  (0 + 9) / 9]18/04/09 09:35:33 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 20)
ExitCodeException exitCode=1: chmod: cannot access '/tmp/spark-a937af10-3f27-436a-8542-4bbf7033adce/siva-files/06a5cabd0dc53f58e36d3103d01fdd2ecd6d232f.siva': No such file or directory

	at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
	at org.apache.hadoop.util.Shell.run(Shell.java:479)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2034)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2003)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1979)
	at tech.sourced.engine.provider.RepositoryObjectFactory.genSivaRepository(RepositoryProvider.scala:206)
	at tech.sourced.engine.provider.RepositoryObjectFactory.create(RepositoryProvider.scala:106)
	at tech.sourced.engine.provider.RepositoryObjectFactory.create(RepositoryProvider.scala:100)

Possible Solution

My best guess is that a task is cleaning up a temporary siva file that other task needs simultaneously. But I couldn't debug this properly yet.

Steps to Reproduce

  1. Execute the provided SQL query against a siva dataset in HDFS.

Context

I'm trying a complex query containing a join with a subquery. Both sides access the same repositories.

Your Environment (for bugs)

  • Version used: engine-jupyter 0.5.5 on Docker
  • Operating System and version: pipeline-staging cluster
  • Some needed resources to reproduce the problem: If you need access to a system exhibiting this problem, ping me.
@smola
Copy link
Contributor Author

smola commented Apr 9, 2018

Here's the generated physical plan:

*Sort [repository_count#138L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(repository_count#138L DESC NULLS LAST, 200)
   +- *HashAggregate(keys=[language#137], functions=[count(1)])
      +- Exchange hashpartitioning(language#137, 200)
         +- *HashAggregate(keys=[language#137], functions=[partial_count(1)])
            +- *HashAggregate(keys=[repository_id#49, language#137], functions=[])
               +- Exchange hashpartitioning(repository_id#49, language#137, 200)
                  +- *HashAggregate(keys=[repository_id#49, language#137], functions=[])
                     +- *Project [repository_id#49, coalesce(UDF(is_binary#65, path#51, content#64), Unknown) AS language#137]
                        +- *SortMergeJoin [repository_id#62], [repository_id#135], Inner
                           :- *Sort [repository_id#62 ASC NULLS FIRST], false, 0
                           :  +- Exchange hashpartitioning(repository_id#62, 200)
                           :     +- *Project [repository_id#49, path#51, repository_id#62, content#64, is_binary#65]
                           :        +- *Filter (((blob#52 = blob_id#60) && (reference_name#50 = refs/heads/HEAD)) && (reference_name#63 = refs/heads/HEAD))
                           :           +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(path,StringType,false), StructField(blob,StringType,false), StructField(blob_id,StringType,false), StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(content,BinaryType,true), StructField(is_binary,BooleanType,false)),Some((((repository_id#49 = repository_id#62) && (reference_name#50 = reference_name#63)) && (blob#52 = blob_id#60))),None) [content#64,path#51,repository_id#62,reference_name#50,is_binary#65,blob_id#60,repository_id#49,reference_name#63,blob#52] PushedFilters: [EqualTo(reference_name,refs/heads/HEAD), EqualTo(reference_name,refs/heads/HEAD)], ReadSchema: struct<repository_id:string,path:string,repository_id:string,content:binary,is_binary:boolean>
                           +- *Sort [repository_id#135 ASC NULLS FIRST], false, 0
                              +- *HashAggregate(keys=[repository_id#135], functions=[])
                                 +- Exchange hashpartitioning(repository_id#135, 200)
                                    +- *HashAggregate(keys=[repository_id#135], functions=[])
                                       +- *Filter isnotnull(repository_id#135)
                                          +- SortAggregate(key=[hash#23], functions=[max(named_struct(index, index#22, repository_id, repository_id#20))])
                                             +- *Sort [hash#23 ASC NULLS FIRST], false, 0
                                                +- Exchange hashpartitioning(hash#23, 200)
                                                   +- SortAggregate(key=[hash#23], functions=[partial_max(named_struct(index, index#22, repository_id, repository_id#20))])
                                                      +- *Sort [hash#23 ASC NULLS FIRST], false, 0
                                                         +- *Project [repository_id#20, index#22, hash#23]
                                                            +- *Filter (((size(parents#25) = 0) && NOT (index#22 = 1)) && (reference_name#21 = refs/heads/HEAD))
                                                               +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(index,IntegerType,false), StructField(hash,StringType,false), StructField(message,StringType,false), StructField(parents,ArrayType(StringType,false),true), StructField(parents_count,IntegerType,false), StructField(author_email,StringType,true), StructField(author_name,StringType,true), StructField(author_date,TimestampType,true), StructField(committer_email,StringType,true), StructField(committer_name,StringType,true), StructField(committer_date,TimestampType,true)),None,Some(commits)) [reference_name#21,repository_id#20,hash#23,index#22,parents#25] PushedFilters: [Not(EqualTo(index,1)), EqualTo(reference_name,refs/heads/HEAD)], ReadSchema: struct<repository_id:string,index:int,hash:string>

@smola
Copy link
Contributor Author

smola commented Apr 9, 2018

I could reduce the failing query to this:

SELECT
        t.path
FROM
    tree_entries t
    JOIN (
        SELECT DISTINCT
            repository_id
        FROM commits
    ) AS r
    ON
        t.repository_id = r.repository_id

Plan:

*Project [path#51]
+- *SortMergeJoin [repository_id#49], [repository_id#20], Inner
   :- *Sort [repository_id#49 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(repository_id#49, 200)
   :     +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(path,StringType,false), StructField(blob,StringType,false)),None,Some(tree_entries)) [repository_id#49,path#51] ReadSchema: struct<repository_id:string,path:string>
   +- *Sort [repository_id#20 ASC NULLS FIRST], false, 0
      +- *HashAggregate(keys=[repository_id#20], functions=[])
         +- Exchange hashpartitioning(repository_id#20, 200)
            +- *HashAggregate(keys=[repository_id#20], functions=[])
               +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(index,IntegerType,false), StructField(hash,StringType,false), StructField(message,StringType,false), StructField(parents,ArrayType(StringType,false),true), StructField(parents_count,IntegerType,false), StructField(author_email,StringType,true), StructField(author_name,StringType,true), StructField(author_date,TimestampType,true), StructField(committer_email,StringType,true), StructField(committer_name,StringType,true), StructField(committer_date,TimestampType,true)),None,Some(commits)) [repository_id#20] ReadSchema: struct<repository_id:string>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant