From 7e693d9d3dd39be4d7a6d69e8c1d7076ce81b460 Mon Sep 17 00:00:00 2001 From: gernest Date: Thu, 14 Dec 2023 15:57:10 +0300 Subject: [PATCH 1/2] Write directly to arrow.Record in logictest --- logictest/logic_test.go | 9 -- logictest/runner.go | 298 ++++++++++++++++++++-------------------- 2 files changed, 150 insertions(+), 157 deletions(-) diff --git a/logictest/logic_test.go b/logictest/logic_test.go index 1433a8e69..5b45bb677 100644 --- a/logictest/logic_test.go +++ b/logictest/logic_test.go @@ -140,12 +140,3 @@ func TestLogic(t *testing.T) { } }) } - -func SchemaMust(def *schemapb.Schema) *dynparquet.Schema { - schema, err := dynparquet.SchemaFromDefinition(def) - if err != nil { - panic(err.Error()) - } - - return schema -} diff --git a/logictest/runner.go b/logictest/runner.go index 294acf38c..eb15a1fe8 100644 --- a/logictest/runner.go +++ b/logictest/runner.go @@ -14,13 +14,10 @@ import ( "github.com/apache/arrow/go/v14/arrow/memory" "github.com/cockroachdb/datadriven" "github.com/google/uuid" - "github.com/parquet-go/parquet-go" "github.com/polarsignals/frostdb/dynparquet" schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1" - "github.com/polarsignals/frostdb/pqarrow" "github.com/polarsignals/frostdb/query" - "github.com/polarsignals/frostdb/query/logicalplan" "github.com/polarsignals/frostdb/sqlparse" ) @@ -67,8 +64,11 @@ type Table interface { } type Runner struct { - db DB - schemas map[string]*schemapb.Schema + db DB + schemas map[string]*schemapb.Schema + + // maps table name -> schema + tableSchema map[string]*schemapb.Schema activeTable Table activeTableName string activeTableDynamicColumns []string @@ -77,9 +77,10 @@ type Runner struct { func NewRunner(db DB, schemas map[string]*schemapb.Schema) *Runner { return &Runner{ - db: db, - schemas: schemas, - sqlParser: sqlparse.NewParser(), + db: db, + schemas: schemas, + tableSchema: make(map[string]*schemapb.Schema), + sqlParser: sqlparse.NewParser(), } } @@ -127,6 +128,7 @@ func (r *Runner) handleCreateTable(_ context.Context, c *datadriven.TestData) (s } r.activeTable = table r.activeTableName = name + r.tableSchema[name] = schema for _, c := range table.Schema().Columns() { if c.Dynamic { r.activeTableDynamicColumns = append(r.activeTableDynamicColumns, c.Name) @@ -135,141 +137,44 @@ func (r *Runner) handleCreateTable(_ context.Context, c *datadriven.TestData) (s return c.Expected, nil } -type colDef struct { - dynparquet.ColumnDefinition - // This is the "label1" in "labels.label1". Only set if - // ColumnDefinition.Dynamic is true. - dynColName string -} - func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (string, error) { - var colDefs []colDef - - schema := r.activeTable.Schema() + schema := r.tableSchema[r.activeTableName] + var build *array.RecordBuilder + var builds []buildFunc + var err error for _, arg := range c.CmdArgs { if arg.Key == "cols" { - for _, cname := range arg.Vals { - def := colDef{} - dotIndex := strings.IndexRune(cname, '.') - if dotIndex != -1 { - // This is a dynamic column. - def.dynColName = cname[dotIndex+1:] - // Only search for the name before the ".". - cname = cname[:dotIndex] - } - var found bool - def.ColumnDefinition, found = schema.ColumnByName(cname) - if !found { - return "", fmt.Errorf( - "insert: column %s specified in insert not found in schema %v", - cname, - schema, - ) - } - colDefs = append(colDefs, def) + build, builds, err = recordBuilder(schema, arg.Vals) + if err != nil { + return "", err } } } - if len(colDefs) == 0 { + if len(builds) == 0 { return "", fmt.Errorf("insert: no input schema provided") } - - dynCols := make(map[string][]string) - specifiedCols := make(map[string]struct{}) - for _, def := range colDefs { - if def.Dynamic { - dynCols[def.Name] = append(dynCols[def.Name], def.dynColName) - } - specifiedCols[def.Name] = struct{}{} - } - + defer build.Release() inputLines := strings.Split(c.Input, "\n") - rows := make([]parquet.Row, len(inputLines)) - for i := range rows { - colIdx := 0 - values := strings.Fields(inputLines[i]) - if len(values) != len(colDefs) { + for i, line := range inputLines { + values := strings.Fields(line) + if len(values) != len(builds) { return "", fmt.Errorf( "insert: row %d (%d values) does not match expected schema (%d cols)", i+1, len(values), - len(colDefs), + len(builds), ) } - - valueForCol := make(map[string]string) - for j, def := range colDefs { - key := def.Name - if def.Dynamic { - key += "." + def.dynColName - } - valueForCol[key] = values[j] - } - - for _, col := range schema.Columns() { - if _, ok := specifiedCols[col.Name]; !ok { - // Column not specified. Insert a NULL value. - rows[i] = append(rows[i], parquet.ValueOf(nil).Level(0, 0, colIdx)) - colIdx++ - continue - } - - if !col.Dynamic { - // Column is not dynamic. - v, err := stringToValue(col.StorageLayout.Type(), valueForCol[col.Name]) - if err != nil { - return "", fmt.Errorf("insert: %w", err) - } - if col.StorageLayout.Optional() { - if parquet.ValueOf(v).IsNull() { - rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx)) - } else { - rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 1, colIdx)) - } - } else { - rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx)) - } - colIdx++ - continue - } - - for _, dynCol := range dynCols[col.Name] { - v, err := stringToValue(col.StorageLayout.Type(), valueForCol[col.Name+"."+dynCol]) - if err != nil { - return "", fmt.Errorf("insert: %w", err) - } - parquetV := parquet.ValueOf(v) - if parquetV.IsNull() { - parquetV = parquetV.Level(0, 0, colIdx) - } else { - parquetV = parquetV.Level(0, 1, colIdx) - } - rows[i] = append(rows[i], parquetV) - colIdx++ + for col := range values { + err := builds[col](values[col]) + if err != nil { + return "", err } } } - buf, err := schema.NewBuffer(dynCols) - if err != nil { - return "", fmt.Errorf("insert: %w", err) - } - - if _, err := buf.WriteRows(rows); err != nil { - return "", fmt.Errorf("insert: %w", err) - } - buf.Sort() - - // TODO: https://github.com/polarsignals/frostdb/issues/548 Should just build the arrow record directly. - converter := pqarrow.NewParquetConverter(memory.NewGoAllocator(), logicalplan.IterOptions{}) - defer converter.Close() - - if err := converter.Convert(ctx, buf); err != nil { - return "", err - } - - rec := converter.NewRecord() + rec := build.NewRecord() defer rec.Release() if _, err := r.activeTable.InsertRecord(ctx, rec); err != nil { @@ -279,38 +184,135 @@ func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (stri return c.Expected, nil } -func stringToValue(t parquet.Type, stringValue string) (any, error) { - if stringValue == nullString { - return nil, nil +func recordBuilder(schema *schemapb.Schema, columns []string) (*array.RecordBuilder, []buildFunc, error) { + names := make(map[string]*schemapb.Column) + for _, col := range schema.Columns { + names[col.Name] = col + } + fields := make([]arrow.Field, len(columns)) + for i := range columns { + column := columns[i] + if strings.Contains(column, ".") { + base := strings.Split(column, ".")[0] + def, ok := names[base] + if !ok { + return nil, nil, fmt.Errorf("column %q is missing from active schema", column) + } + fields[i] = arrow.Field{ + Name: column, + Nullable: true, + Type: layoutToArrowField(def.StorageLayout), + } + continue + } + def, ok := names[column] + if !ok { + return nil, nil, fmt.Errorf("column %q is missing from active schema", column) + } + fields[i] = arrow.Field{ + Name: def.Name, + Nullable: def.StorageLayout.Nullable, + Type: layoutToArrowField(def.StorageLayout), + } } + r := array.NewRecordBuilder(memory.NewGoAllocator(), + arrow.NewSchema(fields, nil), + ) + builds := make([]buildFunc, len(fields)) + for i := range fields { + builds[i] = newBuild(r.Field(i), fields[i]) + } + return r, builds, nil +} + +type buildFunc func(s string) error - switch t.Kind() { - case parquet.ByteArray: - return stringValue, nil - case parquet.Int64: - intValue, err := strconv.Atoi(stringValue) - if err != nil { - return nil, fmt.Errorf("unexpected error converting %s to int: %w", stringValue, err) +func newBuild(b array.Builder, f arrow.Field) buildFunc { + switch e := b.(type) { + case *array.Int64Builder: + return func(s string) error { + if f.Nullable { + if s == "null" { + e.AppendNull() + return nil + } + } + v, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + e.Append(v) + return nil } - return intValue, nil - case parquet.Double: - floatValue, err := strconv.ParseFloat(stringValue, 64) - if err != nil { - return nil, fmt.Errorf("unexpected error converting %s to float: %w", stringValue, err) + case *array.Float64Builder: + return func(s string) error { + if f.Nullable { + if s == "null" { + e.AppendNull() + return nil + } + } + v, err := strconv.ParseFloat(s, 64) + if err != nil { + return err + } + e.Append(v) + return nil } - return floatValue, nil - case parquet.Boolean: - switch stringValue { - case "true": - return true, nil - case "false": - return false, nil - default: - return nil, fmt.Errorf("invalid boolean value: %s", stringValue) + case *array.BooleanBuilder: + return func(s string) error { + if f.Nullable { + if s == "null" { + e.AppendNull() + return nil + } + } + v, err := strconv.ParseBool(s) + if err != nil { + return err + } + e.Append(v) + return nil + } + case *array.StringBuilder: + return func(s string) error { + e.AppendString(s) + return nil + } + // There is no need to handle more dictionaries . For now only string + // dictionary is enough. More can be added when needed. + case *array.BinaryDictionaryBuilder: + return func(s string) error { + e.AppendString(s) + return nil } default: - return nil, fmt.Errorf("unhandled type %T", t.Kind()) + panic(fmt.Sprintf("unexpected array builder type %T", e)) + } +} + +func layoutToArrowField(layout *schemapb.StorageLayout) arrow.DataType { + var typ arrow.DataType + switch layout.Type { + case schemapb.StorageLayout_TYPE_INT64: + typ = arrow.PrimitiveTypes.Int64 + case schemapb.StorageLayout_TYPE_DOUBLE: + typ = arrow.PrimitiveTypes.Float64 + case schemapb.StorageLayout_TYPE_BOOL: + typ = arrow.FixedWidthTypes.Boolean + case schemapb.StorageLayout_TYPE_STRING: + typ = arrow.BinaryTypes.Binary + } + if layout.Encoding == schemapb.StorageLayout_ENCODING_RLE_DICTIONARY { + typ = &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: typ, + } + } + if layout.Repeated { + typ = arrow.ListOf(typ) } + return typ } func (r *Runner) handleExec(ctx context.Context, c *datadriven.TestData) (string, error) { From f24dbb664bf5c7837afb471cd0c4b960141a7ba1 Mon Sep 17 00:00:00 2001 From: gernest Date: Thu, 14 Dec 2023 16:10:20 +0300 Subject: [PATCH 2/2] fix lint: handle unchecked error --- logictest/runner.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/logictest/runner.go b/logictest/runner.go index eb15a1fe8..41f92b91f 100644 --- a/logictest/runner.go +++ b/logictest/runner.go @@ -283,8 +283,7 @@ func newBuild(b array.Builder, f arrow.Field) buildFunc { // dictionary is enough. More can be added when needed. case *array.BinaryDictionaryBuilder: return func(s string) error { - e.AppendString(s) - return nil + return e.AppendString(s) } default: panic(fmt.Sprintf("unexpected array builder type %T", e))