Skip to content

Commit

Permalink
Support parquet file-wide configuration allows to tune I/O performanc…
Browse files Browse the repository at this point in the history
…e and file size
  • Loading branch information
syucream committed Apr 28, 2020
1 parent 7485083 commit 4bbae20
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 4 deletions.
12 changes: 11 additions & 1 deletion cmd/columnify/columnify.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func main() {
recordType := flag.String("recordType", "jsonl", "record data format type, [avro|csv|jsonl|ltsv|msgpack|tsv]")
output := flag.String("output", "", "path to output file; default: stdout")

// parquet specific options
parquetPageSize := flag.Int64("parquetPageSize", 8*1024, "parquet file page size, default: 8kB")
parquetRowGroupSize := flag.Int64("parquetRowGroupSize", 128*1024*1024, "parquet file row group size, default: 128MB")
parquetCompressionCodec := flag.String("parquetCompressionCodec", "SNAPPY", "parquet compression codec, default: SNAPPY")

flag.Parse()

files := flag.Args()
Expand All @@ -45,7 +50,12 @@ func main() {
log.Fatalf("Missed required parameter(s)")
}

c, err := columnifier.NewColumnifier(*schemaType, *schemaFile, *recordType, *output)
config, err := columnifier.NewConfig(*parquetPageSize, *parquetRowGroupSize, *parquetCompressionCodec)
if err != nil {
log.Fatalf("Failed to init: %v\n", err)
}

c, err := columnifier.NewColumnifier(*schemaType, *schemaFile, *recordType, *output, *config)
if err != nil {
log.Fatalf("Failed to init: %v\n", err)
}
Expand Down
4 changes: 2 additions & 2 deletions columnifier/columnifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ type Columnifier interface {
}

// NewColumnifier creates a new Columnifier.
func NewColumnifier(st string, sf string, rt string, o string) (Columnifier, error) {
return NewParquetColumnifier(st, sf, rt, o)
func NewColumnifier(st string, sf string, rt string, o string, config Config) (Columnifier, error) {
return NewParquetColumnifier(st, sf, rt, o, config)
}
30 changes: 30 additions & 0 deletions columnifier/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package columnifier

import (
"github.com/xitongsys/parquet-go/parquet"
)

type Config struct {
Parquet Parquet
}

type Parquet struct {
PageSize int64
RowGroupSize int64
CompressionCodec parquet.CompressionCodec
}

func NewConfig(parquetPageSize, parquetRowGroupSize int64, parquetCompressionCodec string) (*Config, error) {
cc, err := parquet.CompressionCodecFromString(parquetCompressionCodec)
if err != nil {
return nil, err
}

return &Config{
Parquet: Parquet{
PageSize: parquetPageSize,
RowGroupSize: parquetRowGroupSize,
CompressionCodec: cc,
},
}, nil
}
52 changes: 52 additions & 0 deletions columnifier/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package columnifier

import (
"reflect"
"testing"

"github.com/xitongsys/parquet-go/parquet"
)

func TestNewConfig(t *testing.T) {
cases := []struct {
parquetPageSize int64
parquetRowGroupSize int64
parquetCompressionCodec string
expected *Config
isErr bool
}{
{
parquetPageSize: 8 * 1024,
parquetRowGroupSize: 128 * 1024 * 1024,
parquetCompressionCodec: "SNAPPY",
expected: &Config{
Parquet: Parquet{
PageSize: 8 * 1024,
RowGroupSize: 128 * 1024 * 1024,
CompressionCodec: parquet.CompressionCodec_SNAPPY,
},
},
isErr: false,
},

{
parquetPageSize: 8 * 1024,
parquetRowGroupSize: 128 * 1024 * 1024,
parquetCompressionCodec: "INVALID",
expected: nil,
isErr: true,
},
}

for _, c := range cases {
actual, err := NewConfig(c.parquetPageSize, c.parquetRowGroupSize, c.parquetCompressionCodec)

if err != nil != c.isErr {
t.Errorf("expected %v, but actual %v", c.isErr, err)
}

if !reflect.DeepEqual(actual, c.expected) {
t.Errorf("expected %v, but actual %v", c.expected, actual)
}
}
}
6 changes: 5 additions & 1 deletion columnifier/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type parquetColumnifier struct {
}

// NewParquetColumnifier creates a new parquetColumnifier.
func NewParquetColumnifier(st string, sf string, rt string, output string) (*parquetColumnifier, error) {
func NewParquetColumnifier(st string, sf string, rt string, output string, config Config) (*parquetColumnifier, error) {
schemaContent, err := ioutil.ReadFile(sf)
if err != nil {
return nil, err
Expand Down Expand Up @@ -53,6 +53,10 @@ func NewParquetColumnifier(st string, sf string, rt string, output string) (*par
w.SchemaHandler = sh
w.Footer.Schema = append(w.Footer.Schema, sh.SchemaElements...)

w.PageSize = config.Parquet.PageSize
w.RowGroupSize = config.Parquet.RowGroupSize
w.CompressionType = config.Parquet.CompressionCodec

return &parquetColumnifier{
w: w,
schema: intermediateSchema,
Expand Down

0 comments on commit 4bbae20

Please sign in to comment.