Skip to content

Commit

Permalink
Add livetail support
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo committed Sep 26, 2023
1 parent dbb3632 commit dc5392d
Show file tree
Hide file tree
Showing 4 changed files with 422 additions and 2 deletions.
210 changes: 210 additions & 0 deletions cmd/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package cmd

import (
"context"
"encoding/base64"
"encoding/json"
"pb/pkg/config"
"strconv"
"time"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/flight"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

var TailCmd = &cobra.Command{
Use: "tail stream-name",
Example: " pb tail backend_logs",
Short: "tail a log stream",
Args: cobra.ExactArgs(1),
PreRunE: PreRunDefaultProfile,
RunE: func(cmd *cobra.Command, args []string) error {
name := args[0]
profile := DefaultProfile
return tail(profile, name)
},
}

func tail(profile config.Profile, stream string) error {
payload, _ := json.Marshal(struct {
Stream string `json:"stream"`
}{
Stream: stream,
})
client, err := flight.NewClientWithMiddleware("localhost:7000", nil, nil, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

authHeader := basicAuth(profile.Username, profile.Password)
resp, err := client.DoGet(metadata.NewOutgoingContext(context.Background(), metadata.New(map[string]string{"Authorization": "Basic " + authHeader})), &flight.Ticket{
Ticket: payload,
})
if err != nil {
return err
}

records, err := flight.NewRecordReader(resp)
if err != nil {
return err
}

for true {

Check failure on line 57 in cmd/tail.go

View workflow job for this annotation

GitHub Actions / build

empty-block: this block is empty, you can remove it (revive)
// if records.Next() {
// fmt.Println("here")
// record, err := records.Read()
// if err != nil {
// return err
// }
// recs := toPretty(*record.Schema(), record)
// fmt.Println(recs)
// }
}

defer records.Release()

return nil
}

func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func toPretty(schema arrow.Schema, record array.Record) [][]string {

Check failure on line 79 in cmd/tail.go

View workflow job for this annotation

GitHub Actions / build

func `toPretty` is unused (unused)
nullValue := "-"

recs := make([][]string, record.NumRows())
for i := range recs {
recs[i] = make([]string, record.NumCols())
}

for j, col := range record.Columns() {
ty := schema.Field(j).Type
switch ty.(type) {
case *arrow.BooleanType:
arr := col.(*array.Boolean)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatBool(arr.Value(i))
} else {
recs[i][j] = nullValue
}
}
case *arrow.Int8Type:
arr := col.(*array.Int8)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatInt(int64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Int16Type:
arr := col.(*array.Int16)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatInt(int64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Int32Type:
arr := col.(*array.Int32)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatInt(int64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Int64Type:
arr := col.(*array.Int64)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatInt(int64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Uint8Type:
arr := col.(*array.Uint8)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatUint(uint64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Uint16Type:
arr := col.(*array.Uint16)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatUint(uint64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Uint32Type:
arr := col.(*array.Uint32)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatUint(uint64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Uint64Type:
arr := col.(*array.Uint64)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatUint(uint64(arr.Value(i)), 10)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Float32Type:
arr := col.(*array.Float32)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatFloat(float64(arr.Value(i)), 'g', -1, 32)
} else {
recs[i][j] = nullValue
}
}
case *arrow.Float64Type:
arr := col.(*array.Float64)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = strconv.FormatFloat(float64(arr.Value(i)), 'g', -1, 64)
} else {
recs[i][j] = nullValue
}
}
case *arrow.StringType:
arr := col.(*array.String)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = arr.Value(i)
} else {
recs[i][j] = nullValue
}
}
case *arrow.TimestampType:
arr := col.(*array.Time64)
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs[i][j] = time.UnixMilli(int64(arr.Value(i))).Format(time.RFC3339)
} else {
recs[i][j] = nullValue
}
}
}
}
return recs
}
15 changes: 13 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,36 @@ require (
github.com/charmbracelet/bubbles v0.16.1
github.com/charmbracelet/bubbletea v0.24.2
github.com/dustin/go-humanize v1.0.1
golang.org/x/term v0.10.0
golang.org/x/exp v0.0.0-20230807204917-050eac23e9de
golang.org/x/term v0.10.0
)

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79 // indirect
google.golang.org/grpc v1.39.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
)

require (
github.com/muesli/termenv v0.15.2
github.com/atotto/clipboard v0.1.4 // indirect
github.com/charmbracelet/lipgloss v0.7.1
github.com/evertras/bubble-table v0.15.2
github.com/muesli/termenv v0.15.2
github.com/pelletier/go-toml/v2 v2.0.9
github.com/sahilm/fuzzy v0.1.0 // indirect
)

require (
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
Expand Down
Loading

0 comments on commit dc5392d

Please sign in to comment.