Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write directly to arrow.Record in logictest #629

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions logictest/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,3 @@ func TestLogic(t *testing.T) {
}
})
}

func SchemaMust(def *schemapb.Schema) *dynparquet.Schema {
schema, err := dynparquet.SchemaFromDefinition(def)
if err != nil {
panic(err.Error())
}

return schema
}
297 changes: 149 additions & 148 deletions logictest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ import (
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/cockroachdb/datadriven"
"github.com/google/uuid"
"github.com/parquet-go/parquet-go"

"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
"github.com/polarsignals/frostdb/pqarrow"
"github.com/polarsignals/frostdb/query"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/sqlparse"
)

Expand Down Expand Up @@ -67,8 +64,11 @@ type Table interface {
}

type Runner struct {
db DB
schemas map[string]*schemapb.Schema
db DB
schemas map[string]*schemapb.Schema

// maps table name -> schema
tableSchema map[string]*schemapb.Schema
activeTable Table
activeTableName string
activeTableDynamicColumns []string
Expand All @@ -77,9 +77,10 @@ type Runner struct {

func NewRunner(db DB, schemas map[string]*schemapb.Schema) *Runner {
return &Runner{
db: db,
schemas: schemas,
sqlParser: sqlparse.NewParser(),
db: db,
schemas: schemas,
tableSchema: make(map[string]*schemapb.Schema),
sqlParser: sqlparse.NewParser(),
}
}

Expand Down Expand Up @@ -127,6 +128,7 @@ func (r *Runner) handleCreateTable(_ context.Context, c *datadriven.TestData) (s
}
r.activeTable = table
r.activeTableName = name
r.tableSchema[name] = schema
for _, c := range table.Schema().Columns() {
if c.Dynamic {
r.activeTableDynamicColumns = append(r.activeTableDynamicColumns, c.Name)
Expand All @@ -135,141 +137,44 @@ func (r *Runner) handleCreateTable(_ context.Context, c *datadriven.TestData) (s
return c.Expected, nil
}

type colDef struct {
dynparquet.ColumnDefinition
// This is the "label1" in "labels.label1". Only set if
// ColumnDefinition.Dynamic is true.
dynColName string
}

func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (string, error) {
var colDefs []colDef

schema := r.activeTable.Schema()
schema := r.tableSchema[r.activeTableName]
var build *array.RecordBuilder
var builds []buildFunc
var err error
for _, arg := range c.CmdArgs {
if arg.Key == "cols" {
for _, cname := range arg.Vals {
def := colDef{}
dotIndex := strings.IndexRune(cname, '.')
if dotIndex != -1 {
// This is a dynamic column.
def.dynColName = cname[dotIndex+1:]
// Only search for the name before the ".".
cname = cname[:dotIndex]
}
var found bool
def.ColumnDefinition, found = schema.ColumnByName(cname)
if !found {
return "", fmt.Errorf(
"insert: column %s specified in insert not found in schema %v",
cname,
schema,
)
}
colDefs = append(colDefs, def)
build, builds, err = recordBuilder(schema, arg.Vals)
if err != nil {
return "", err
}
}
}

if len(colDefs) == 0 {
if len(builds) == 0 {
return "", fmt.Errorf("insert: no input schema provided")
}

dynCols := make(map[string][]string)
specifiedCols := make(map[string]struct{})
for _, def := range colDefs {
if def.Dynamic {
dynCols[def.Name] = append(dynCols[def.Name], def.dynColName)
}
specifiedCols[def.Name] = struct{}{}
}

defer build.Release()
inputLines := strings.Split(c.Input, "\n")
rows := make([]parquet.Row, len(inputLines))
for i := range rows {
colIdx := 0
values := strings.Fields(inputLines[i])
if len(values) != len(colDefs) {
for i, line := range inputLines {
values := strings.Fields(line)
if len(values) != len(builds) {
return "", fmt.Errorf(
"insert: row %d (%d values) does not match expected schema (%d cols)",
i+1,
len(values),
len(colDefs),
len(builds),
)
}

valueForCol := make(map[string]string)
for j, def := range colDefs {
key := def.Name
if def.Dynamic {
key += "." + def.dynColName
}
valueForCol[key] = values[j]
}

for _, col := range schema.Columns() {
if _, ok := specifiedCols[col.Name]; !ok {
// Column not specified. Insert a NULL value.
rows[i] = append(rows[i], parquet.ValueOf(nil).Level(0, 0, colIdx))
colIdx++
continue
}

if !col.Dynamic {
// Column is not dynamic.
v, err := stringToValue(col.StorageLayout.Type(), valueForCol[col.Name])
if err != nil {
return "", fmt.Errorf("insert: %w", err)
}
if col.StorageLayout.Optional() {
if parquet.ValueOf(v).IsNull() {
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx))
} else {
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 1, colIdx))
}
} else {
rows[i] = append(rows[i], parquet.ValueOf(v).Level(0, 0, colIdx))
}
colIdx++
continue
}

for _, dynCol := range dynCols[col.Name] {
v, err := stringToValue(col.StorageLayout.Type(), valueForCol[col.Name+"."+dynCol])
if err != nil {
return "", fmt.Errorf("insert: %w", err)
}
parquetV := parquet.ValueOf(v)
if parquetV.IsNull() {
parquetV = parquetV.Level(0, 0, colIdx)
} else {
parquetV = parquetV.Level(0, 1, colIdx)
}
rows[i] = append(rows[i], parquetV)
colIdx++
for col := range values {
err := builds[col](values[col])
if err != nil {
return "", err
}
}
}

buf, err := schema.NewBuffer(dynCols)
if err != nil {
return "", fmt.Errorf("insert: %w", err)
}

if _, err := buf.WriteRows(rows); err != nil {
return "", fmt.Errorf("insert: %w", err)
}
buf.Sort()

// TODO: https://github.com/polarsignals/frostdb/issues/548 Should just build the arrow record directly.
converter := pqarrow.NewParquetConverter(memory.NewGoAllocator(), logicalplan.IterOptions{})
defer converter.Close()

if err := converter.Convert(ctx, buf); err != nil {
return "", err
}

rec := converter.NewRecord()
rec := build.NewRecord()
defer rec.Release()

if _, err := r.activeTable.InsertRecord(ctx, rec); err != nil {
Expand All @@ -279,38 +184,134 @@ func (r *Runner) handleInsert(ctx context.Context, c *datadriven.TestData) (stri
return c.Expected, nil
}

func stringToValue(t parquet.Type, stringValue string) (any, error) {
if stringValue == nullString {
return nil, nil
func recordBuilder(schema *schemapb.Schema, columns []string) (*array.RecordBuilder, []buildFunc, error) {
names := make(map[string]*schemapb.Column)
for _, col := range schema.Columns {
names[col.Name] = col
}
fields := make([]arrow.Field, len(columns))
for i := range columns {
column := columns[i]
if strings.Contains(column, ".") {
base := strings.Split(column, ".")[0]
def, ok := names[base]
if !ok {
return nil, nil, fmt.Errorf("column %q is missing from active schema", column)
}
fields[i] = arrow.Field{
Name: column,
Nullable: true,
Type: layoutToArrowField(def.StorageLayout),
}
continue
}
def, ok := names[column]
if !ok {
return nil, nil, fmt.Errorf("column %q is missing from active schema", column)
}
fields[i] = arrow.Field{
Name: def.Name,
Nullable: def.StorageLayout.Nullable,
Type: layoutToArrowField(def.StorageLayout),
}
}
r := array.NewRecordBuilder(memory.NewGoAllocator(),
arrow.NewSchema(fields, nil),
)
builds := make([]buildFunc, len(fields))
for i := range fields {
builds[i] = newBuild(r.Field(i), fields[i])
}
return r, builds, nil
}

type buildFunc func(s string) error

switch t.Kind() {
case parquet.ByteArray:
return stringValue, nil
case parquet.Int64:
intValue, err := strconv.Atoi(stringValue)
if err != nil {
return nil, fmt.Errorf("unexpected error converting %s to int: %w", stringValue, err)
func newBuild(b array.Builder, f arrow.Field) buildFunc {
switch e := b.(type) {
case *array.Int64Builder:
return func(s string) error {
if f.Nullable {
if s == "null" {
e.AppendNull()
return nil
}
}
v, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
e.Append(v)
return nil
}
return intValue, nil
case parquet.Double:
floatValue, err := strconv.ParseFloat(stringValue, 64)
if err != nil {
return nil, fmt.Errorf("unexpected error converting %s to float: %w", stringValue, err)
case *array.Float64Builder:
return func(s string) error {
if f.Nullable {
if s == "null" {
e.AppendNull()
return nil
}
}
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return err
}
e.Append(v)
return nil
}
return floatValue, nil
case parquet.Boolean:
switch stringValue {
case "true":
return true, nil
case "false":
return false, nil
default:
return nil, fmt.Errorf("invalid boolean value: %s", stringValue)
case *array.BooleanBuilder:
return func(s string) error {
if f.Nullable {
if s == "null" {
e.AppendNull()
return nil
}
}
v, err := strconv.ParseBool(s)
if err != nil {
return err
}
e.Append(v)
return nil
}
case *array.StringBuilder:
return func(s string) error {
e.AppendString(s)
return nil
}
// There is no need to handle more dictionaries . For now only string
// dictionary is enough. More can be added when needed.
case *array.BinaryDictionaryBuilder:
return func(s string) error {
return e.AppendString(s)
}
default:
return nil, fmt.Errorf("unhandled type %T", t.Kind())
panic(fmt.Sprintf("unexpected array builder type %T", e))
}
}

func layoutToArrowField(layout *schemapb.StorageLayout) arrow.DataType {
var typ arrow.DataType
switch layout.Type {
case schemapb.StorageLayout_TYPE_INT64:
typ = arrow.PrimitiveTypes.Int64
case schemapb.StorageLayout_TYPE_DOUBLE:
typ = arrow.PrimitiveTypes.Float64
case schemapb.StorageLayout_TYPE_BOOL:
typ = arrow.FixedWidthTypes.Boolean
case schemapb.StorageLayout_TYPE_STRING:
typ = arrow.BinaryTypes.Binary
}
if layout.Encoding == schemapb.StorageLayout_ENCODING_RLE_DICTIONARY {
typ = &arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Int32,
ValueType: typ,
}
}
if layout.Repeated {
typ = arrow.ListOf(typ)
}
return typ
}

func (r *Runner) handleExec(ctx context.Context, c *datadriven.TestData) (string, error) {
Expand Down
Loading