Skip to content

Commit

Permalink
Add query output formats (json, csv, line); Use pipe-fittings querydi…
Browse files Browse the repository at this point in the history
…splay. Closes #30 (#57)
  • Loading branch information
pskrbasu authored Dec 2, 2024
1 parent 04a7154 commit 85cab6b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 18 deletions.
16 changes: 12 additions & 4 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/thediveo/enumflag/v2"
"github.com/turbot/go-kit/helpers"
Expand All @@ -11,7 +13,6 @@ import (
"github.com/turbot/tailpipe/internal/constants"
"github.com/turbot/tailpipe/internal/interactive"
"github.com/turbot/tailpipe/internal/query"
"strings"
)

// variable used to assign the output mode flag
Expand All @@ -30,7 +31,9 @@ func queryCmd() *cobra.Command {
cmdconfig.OnCmd(cmd).
AddVarFlag(enumflag.New(&queryOutputMode, pconstants.ArgOutput, constants.QueryOutputModeIds, enumflag.EnumCaseInsensitive),
pconstants.ArgOutput,
fmt.Sprintf("Output format; one of: %s", strings.Join(constants.FlagValues(constants.QueryOutputModeIds), ", ")))
fmt.Sprintf("Output format; one of: %s", strings.Join(constants.FlagValues(constants.QueryOutputModeIds), ", "))).
AddBoolFlag(pconstants.ArgHeader, true, "Include column headers csv and table output").
AddStringFlag(pconstants.ArgSeparator, ",", "Separator string for csv output")

return cmd
}
Expand All @@ -39,6 +42,8 @@ func runQueryCmd(cmd *cobra.Command, args []string) {
ctx := cmd.Context()

var err error
var failures int

defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
Expand All @@ -51,9 +56,12 @@ func runQueryCmd(cmd *cobra.Command, args []string) {
if len(args) == 0 {
err = interactive.RunInteractiveQuery(ctx)
} else {
err = query.ExecuteQuery(ctx, args[0])
failures, err = query.ExecuteQuery(ctx, args[0])
}
if failures > 0 {
exitCode = pconstants.ExitCodeQueryExecutionFailed
error_helpers.FailOnError(err)
}
error_helpers.FailOnError(err)

}

Expand Down
116 changes: 102 additions & 14 deletions internal/query/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,122 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/spf13/viper"
"github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/querydisplay"
"github.com/turbot/pipe-fittings/queryresult"
"github.com/turbot/tailpipe/internal/filepaths"
)

func ExecuteQuery(ctx context.Context, query string) error {
func ExecuteQuery(ctx context.Context, query string) (int, error) {
// Open a DuckDB connection
db, err := sql.Open("duckdb", filepaths.TailpipeDbFilePath())
if err != nil {
return err
return 0, err
}

defer db.Close()

// Run the query
rows, err := db.QueryContext(ctx, query)
if err != nil {
return err
return 0, err
}

switch viper.GetString(constants.ArgOutput) {
case constants.OutputFormatJSON:
panic("not implemented")
case constants.OutputFormatCSV:
return DisplayResultCsv(ctx, rows)
case constants.OutputFormatTable:
return DisplayResultTable(ctx, rows)
default:
return fmt.Errorf("unknown output format: %s", viper.GetString(constants.ArgOutput))
// Execute the query
result, err := Execute(ctx, rows, query)
if err != nil {
return 0, err
}

// show output
_, rowErrors := querydisplay.ShowOutput(ctx, result)
if rowErrors > 0 {
// TODO find a way to return the error
return rowErrors, fmt.Errorf("Error: query execution failed")
}
return 0, nil
}

type TimingMetadata struct {
Duration time.Duration
}

func (t TimingMetadata) GetTiming() any {
return t
}

func Execute(ctx context.Context, rows *sql.Rows, query string) (res *queryresult.Result[TimingMetadata], err error) {

colDefs, err := fetchColumnDefs(rows)
if err != nil {
return nil, err
}

result := queryresult.NewResult[TimingMetadata](colDefs, TimingMetadata{})

// stream rows from the query result
go streamResults(ctx, rows, result, colDefs)

return result, nil
}

func streamResults(ctx context.Context, rows *sql.Rows, result *queryresult.Result[TimingMetadata], colDefs []*queryresult.ColumnDef) {
defer func() {
rows.Close()
if err := rows.Err(); err != nil {
result.StreamError(err)
}
// close the channels in the result object
result.Close()

}()
for rows.Next() {
// Create a slice to hold the values for each row
columnsData := make([]interface{}, len(colDefs))
columnPointers := make([]interface{}, len(colDefs))

// Fill columnPointers with pointers to each item in columnsData
for i := range columnsData {
columnPointers[i] = &columnsData[i]
}

// Scan the current row into columnPointers
if err := rows.Scan(columnPointers...); err != nil {
// return result, err
}

result.StreamRow(columnsData)
}
}

// FetchColumnDefs extracts column definitions from sql.Rows and returns a slice of ColumnDef.
func fetchColumnDefs(rows *sql.Rows) ([]*queryresult.ColumnDef, error) {
// Get column names
columnNames, err := rows.Columns()
if err != nil {
return nil, err
}

// Get column types
columnTypes, err := rows.ColumnTypes()
if err != nil {
return nil, err
}

// Initialize a slice to hold column definitions
var columnDefs []*queryresult.ColumnDef

for i, colType := range columnTypes {
columnDef := &queryresult.ColumnDef{
Name: columnNames[i],
DataType: colType.DatabaseTypeName(),
OriginalName: columnNames[i], // Set this if you have a way to obtain the original name (optional) - this would be needed when multiple same columns are requested
}

// Append to the list of column definitions
columnDefs = append(columnDefs, columnDef)
}

return columnDefs, nil
}

0 comments on commit 85cab6b

Please sign in to comment.