diff --git a/cmd/columnify/columnify.go b/cmd/columnify/columnify.go index fe78195..7ba91c9 100644 --- a/cmd/columnify/columnify.go +++ b/cmd/columnify/columnify.go @@ -36,6 +36,11 @@ func main() { recordType := flag.String("recordType", "jsonl", "record data format type, [avro|csv|jsonl|ltsv|msgpack|tsv]") output := flag.String("output", "", "path to output file; default: stdout") + // parquet specific options + parquetPageSize := flag.Int64("parquetPageSize", 8*1024, "parquet file page size, default: 8kB") + parquetRowGroupSize := flag.Int64("parquetRowGroupSize", 128*1024*1024, "parquet file row group size, default: 128MB") + parquetCompressionCodec := flag.String("parquetCompressionCodec", "SNAPPY", "parquet compression codec, default: SNAPPY") + flag.Parse() files := flag.Args() @@ -45,7 +50,12 @@ func main() { log.Fatalf("Missed required parameter(s)") } - c, err := columnifier.NewColumnifier(*schemaType, *schemaFile, *recordType, *output) + config, err := columnifier.NewConfig(*parquetPageSize, *parquetRowGroupSize, *parquetCompressionCodec) + if err != nil { + log.Fatalf("Failed to init: %v\n", err) + } + + c, err := columnifier.NewColumnifier(*schemaType, *schemaFile, *recordType, *output, *config) if err != nil { log.Fatalf("Failed to init: %v\n", err) } diff --git a/columnifier/columnifier.go b/columnifier/columnifier.go index 032d6a5..8d730d3 100644 --- a/columnifier/columnifier.go +++ b/columnifier/columnifier.go @@ -10,6 +10,6 @@ type Columnifier interface { } // NewColumnifier creates a new Columnifier. -func NewColumnifier(st string, sf string, rt string, o string) (Columnifier, error) { - return NewParquetColumnifier(st, sf, rt, o) +func NewColumnifier(st string, sf string, rt string, o string, config Config) (Columnifier, error) { + return NewParquetColumnifier(st, sf, rt, o, config) } diff --git a/columnifier/config.go b/columnifier/config.go new file mode 100644 index 0000000..226de6d --- /dev/null +++ b/columnifier/config.go @@ -0,0 +1,30 @@ +package columnifier + +import ( + "github.com/xitongsys/parquet-go/parquet" +) + +type Config struct { + Parquet Parquet +} + +type Parquet struct { + PageSize int64 + RowGroupSize int64 + CompressionCodec parquet.CompressionCodec +} + +func NewConfig(parquetPageSize, parquetRowGroupSize int64, parquetCompressionCodec string) (*Config, error) { + cc, err := parquet.CompressionCodecFromString(parquetCompressionCodec) + if err != nil { + return nil, err + } + + return &Config{ + Parquet: Parquet{ + PageSize: parquetPageSize, + RowGroupSize: parquetRowGroupSize, + CompressionCodec: cc, + }, + }, nil +} diff --git a/columnifier/config_test.go b/columnifier/config_test.go new file mode 100644 index 0000000..a601c71 --- /dev/null +++ b/columnifier/config_test.go @@ -0,0 +1,52 @@ +package columnifier + +import ( + "reflect" + "testing" + + "github.com/xitongsys/parquet-go/parquet" +) + +func TestNewConfig(t *testing.T) { + cases := []struct { + parquetPageSize int64 + parquetRowGroupSize int64 + parquetCompressionCodec string + expected *Config + isErr bool + }{ + { + parquetPageSize: 8 * 1024, + parquetRowGroupSize: 128 * 1024 * 1024, + parquetCompressionCodec: "SNAPPY", + expected: &Config{ + Parquet: Parquet{ + PageSize: 8 * 1024, + RowGroupSize: 128 * 1024 * 1024, + CompressionCodec: parquet.CompressionCodec_SNAPPY, + }, + }, + isErr: false, + }, + + { + parquetPageSize: 8 * 1024, + parquetRowGroupSize: 128 * 1024 * 1024, + parquetCompressionCodec: "INVALID", + expected: nil, + isErr: true, + }, + } + + for _, c := range cases { + actual, err := NewConfig(c.parquetPageSize, c.parquetRowGroupSize, c.parquetCompressionCodec) + + if err != nil != c.isErr { + t.Errorf("expected %v, but actual %v", c.isErr, err) + } + + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("expected %v, but actual %v", c.expected, actual) + } + } +} diff --git a/columnifier/parquet.go b/columnifier/parquet.go index e1ce3fe..97f56a3 100644 --- a/columnifier/parquet.go +++ b/columnifier/parquet.go @@ -20,7 +20,7 @@ type parquetColumnifier struct { } // NewParquetColumnifier creates a new parquetColumnifier. -func NewParquetColumnifier(st string, sf string, rt string, output string) (*parquetColumnifier, error) { +func NewParquetColumnifier(st string, sf string, rt string, output string, config Config) (*parquetColumnifier, error) { schemaContent, err := ioutil.ReadFile(sf) if err != nil { return nil, err @@ -53,6 +53,10 @@ func NewParquetColumnifier(st string, sf string, rt string, output string) (*par w.SchemaHandler = sh w.Footer.Schema = append(w.Footer.Schema, sh.SchemaElements...) + w.PageSize = config.Parquet.PageSize + w.RowGroupSize = config.Parquet.RowGroupSize + w.CompressionType = config.Parquet.CompressionCodec + return &parquetColumnifier{ w: w, schema: intermediateSchema,