Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(core): make path filter configurable #329

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package com.github.mjakubowski84.parquet4s

import org.apache.hadoop.fs.FileAlreadyExistsException
import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.scalatest.BeforeAndAfter
import org.scalatest.EitherValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfter, EitherValues}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Comment on lines +5 to +10
Copy link
Contributor Author

@i10416 i10416 Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ran organize imports by scalafmt


class IOOpsITSpec extends AnyFlatSpec with Matchers with IOOps with TestUtils with BeforeAndAfter with EitherValues {

Expand Down Expand Up @@ -139,6 +141,26 @@ class IOOpsITSpec extends AnyFlatSpec with Matchers with IOOps with TestUtils wi
dir.schema should be(Col("x") :: Nil)
}

it should "ignore paths that do not match the pathFilter predicate" in {
val path_X1 = tempPath.append("_x=1").append("file.parquet")
val pathX1 = tempPath.append("x=1").append("file.parquet")
fileSystem.createNewFile(path_X1.toHadoop)
fileSystem.createNewFile(pathX1.toHadoop)

val dir = findPartitionedPaths(tempPath, configuration, !_.getName().startsWith("_")).value
dir.paths should be(List(PartitionedPath(pathX1, configuration, (Col("x") -> "1") :: Nil)))
dir.schema should be(List(Col("x")))
}

it should "accept hadoop default hidden paths when pathFilter always returns true" in {
val pathX1 = tempPath.append("_x=1/.y=1").append("file.parquet")
fileSystem.createNewFile(pathX1.toHadoop)

val dir = findPartitionedPaths(tempPath, configuration, _ => true).value
dir.paths should be(List(PartitionedPath(pathX1, configuration, List(Col("_x") -> "1", Col(".y") -> "1"))))
dir.schema should be(List(Col("_x"), Col(".y")))
}

it should "return a partitioned directory with single file of no partitions values" in {
val pathX1 = tempPath.append("x=1").append("file.parquet")
fileSystem.createNewFile(pathX1.toHadoop)
Expand Down Expand Up @@ -186,4 +208,15 @@ class IOOpsITSpec extends AnyFlatSpec with Matchers with IOOps with TestUtils wi
findPartitionedPaths(tempPath, configuration) should be a Symbol("Left")
}

it should "fail to create partitions from inconsistent directory [case3]" in {
val path1 = tempPath.append("x=1/file.parquet")
val path2 = tempPath.append("_x=1/file.parquet")
fileSystem.createNewFile(path1.toHadoop)
fileSystem.createNewFile(path2.toHadoop)
// This case ignores `_x=1`, so there's no inconsistent directory.
findPartitionedPaths(tempPath, configuration) should be a Symbol("Right")
// This case regards `_x=1` an as ordinary directory, so it conflicts with `x=1`.
findPartitionedPaths(tempPath, configuration, _ => true) should be a Symbol("Left")
}

}
26 changes: 19 additions & 7 deletions core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package com.github.mjakubowski84.parquet4s

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, FileSystem, RemoteIterator}
import org.apache.hadoop.fs.FileAlreadyExistsException
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.PathFilter
import org.apache.hadoop.fs.RemoteIterator
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.util.HiddenFileFilter
import org.slf4j.Logger

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.matching.Regex

private[parquet4s] object IOOps {
Expand Down Expand Up @@ -67,12 +72,18 @@ trait IOOps {
}
}

/** @param path
* a location in a file tree from which `findPartitionedPaths` collects descendant paths recursively
* @param pathFilter
* `findPartitionedPaths` traverses paths that match this predicate
*/
protected def findPartitionedPaths(
path: Path,
configuration: Configuration
configuration: Configuration,
pathFilter: PathFilter = HiddenFileFilter.INSTANCE
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

): Either[Exception, PartitionedDirectory] = {
val fs = path.toHadoop.getFileSystem(configuration)
findPartitionedPaths(fs, configuration, path, List.empty).fold(
findPartitionedPaths(fs, configuration, path, pathFilter, List.empty).fold(
PartitionedDirectory.failed,
PartitionedDirectory.apply
)
Expand All @@ -82,10 +93,11 @@ trait IOOps {
fs: FileSystem,
configuration: Configuration,
path: Path,
pathFilter: PathFilter,
partitions: List[Partition]
): Either[List[Path], List[PartitionedPath]] = {
val (dirs, files) = fs
.listStatus(path.toHadoop, HiddenFileFilter.INSTANCE)
.listStatus(path.toHadoop, pathFilter)
.toList
.partition(_.isDirectory)
if (dirs.nonEmpty && files.nonEmpty)
Expand All @@ -96,11 +108,11 @@ trait IOOps {
Right(List.empty) // empty leaf dir
else if (partitionedDirs.isEmpty)
// leaf files
Right(files.map(fileStatus => PartitionedPath(fileStatus, configuration, partitions)))
Right(files.map(PartitionedPath(_, configuration, partitions)))
else
partitionedDirs
.map { case (subPath, partition) =>
findPartitionedPaths(fs, configuration, subPath, partitions :+ partition)
findPartitionedPaths(fs, configuration, subPath, pathFilter, partitions :+ partition)
}
.foldLeft[Either[List[Path], List[PartitionedPath]]](Right(List.empty)) {
case (Left(invalidPaths), Left(moreInvalidPaths)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.etl.CompoundParquetIterable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.PathFilter
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.hadoop
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.InputFile
import org.apache.parquet.schema.{MessageType, Type}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.Type
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.io.Closeable
import java.util.TimeZone
Expand Down Expand Up @@ -42,6 +46,12 @@ object ParquetReader extends IOOps {
*/
def filter(filter: Filter): Builder[T]

/** @param pathFilter
* optional path filter; ParquetReader traverses paths that match this predicate to resolve partitions. It uses
* org.apache.parquet.hadoop.util.HiddenFileFilter by default.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioning org.apache.parquet.hadoop.util.HiddenFileFilter here feels leaking implementation details.

Current HiddenFileFilter definition is as simple as !_.getName().startsWith(Set('.','_')). Should we define it in com.github.mjakubowski84.parquet4s package instead of using org.apache.parquet.hadoop.util.HiddenFileFilter?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet4s relies so much on parquet-hadoop that I am not so concerned about leaking this detail.
However, PathFilter seems to be quite an esoteric (but static & reusable) option, so I think that it can go into ParquetReader.Options case class alongside Hadoop Configuration. That way, it will also not cause confusion with an existing filter. What do you think?

*/
def pathFilter(filter: PathFilter): Builder[T]

/** Attempt to read data as partitioned. Partition names must follow Hive format. Partition values will be set in
* read records to corresponding fields.
*/
Expand Down Expand Up @@ -77,14 +87,17 @@ object ParquetReader extends IOOps {
options: ParquetReader.Options = ParquetReader.Options(),
filter: Filter = Filter.noopFilter,
projectedSchemaResolverOpt: Option[ParquetSchemaResolver[T]] = None,
columnProjections: Seq[ColumnProjection] = Seq.empty
columnProjections: Seq[ColumnProjection] = Seq.empty,
pathFilter: PathFilter = hadoop.util.HiddenFileFilter.INSTANCE
) extends Builder[T] {
override def options(options: ParquetReader.Options): Builder[T] =
this.copy(options = options)

override def filter(filter: Filter): Builder[T] =
this.copy(filter = filter)

override def pathFilter(filter: PathFilter): Builder[T] = this.copy(pathFilter = pathFilter)

override def partitioned: Builder[T] = this

override def read(path: Path)(implicit decoder: ParquetRecordDecoder[T]): ParquetIterable[T] =
Expand All @@ -96,7 +109,7 @@ object ParquetReader extends IOOps {

inputFile match {
case hadoopInputFile: HadoopInputFile =>
partitionedIterable(Path(hadoopInputFile.getPath), valueCodecConfiguration, hadoopConf)
partitionedIterable(Path(hadoopInputFile.getPath), valueCodecConfiguration, hadoopConf, pathFilter)
case _ =>
singleIterable(
inputFile = inputFile,
Expand All @@ -112,9 +125,10 @@ object ParquetReader extends IOOps {
private def partitionedIterable(
path: Path,
valueCodecConfiguration: ValueCodecConfiguration,
hadoopConf: Configuration
hadoopConf: Configuration,
pathFilter: PathFilter
)(implicit decoder: ParquetRecordDecoder[T]): ParquetIterable[T] =
findPartitionedPaths(path, hadoopConf) match {
findPartitionedPaths(path, hadoopConf, pathFilter) match {
case Left(exception) =>
throw exception
case Right(partitionedDirectory) =>
Expand Down