Skip to content

Commit

Permalink
Go Client: Change TimeTable method to allow a variety of time/period …
Browse files Browse the repository at this point in the history
…types
  • Loading branch information
kosak committed Dec 29, 2023
1 parent f4f1d2b commit 0cd7d27
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 9 deletions.
32 changes: 29 additions & 3 deletions go/pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,35 @@ func TestTimeTable(t *testing.T) {
}
defer c.Close()

tbl, err := c.TimeTable(ctx, 10000000, time.Now())
// test a variety of period types
testTimeTableHelper(t, ctx, c, int(10000000), time.Now())
testTimeTableHelper(t, ctx, c, int32(10000000), time.Now())
testTimeTableHelper(t, ctx, c, int64(10000000), time.Now())
testTimeTableHelper(t, ctx, c, time.Duration(10000000), time.Now())
testTimeTableHelper(t, ctx, c, "PT0.01S", time.Now())

// test a variety of startTime types
testTimeTableHelper(t, ctx, c, "PT1H", int(1703874484000000000))
testTimeTableHelper(t, ctx, c, "PT1H", int32(123456)) // a date far in the past but should work
testTimeTableHelper(t, ctx, c, "PT1H", int64(1703874484000000000))
testTimeTableHelper(t, ctx, c, "PT1H", time.Now())
testTimeTableHelper(t, ctx, c, "PT1M", "2023-03-01T12:34:56-05:00")

_, err = c.TimeTable(ctx, "unparseable", "2023-03-01T12:34:56-05:00")
if err == nil {
t.Errorf("Expected failure, got success")
}
_, err = c.TimeTable(ctx, "PT1M", "unparseable")
if err == nil {
t.Errorf("Expected failure, got success")
}
}

func testTimeTableHelper(t *testing.T, ctx context.Context, c *client.Client, period any, startTime any) {
tbl, err := c.TimeTable(ctx, period, startTime)
if err != nil {
t.Errorf("EmptyTable err %s", err.Error())
t.Errorf("EmptyTable err %v", err)
return
}

if tbl.IsStatic() {
Expand All @@ -110,7 +136,7 @@ func TestTimeTable(t *testing.T) {

err = tbl.Release(ctx)
if err != nil {
t.Errorf("Release err %s", err.Error())
t.Errorf("Release err %v", err)
}
}

Expand Down
35 changes: 29 additions & 6 deletions go/pkg/client/table_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"reflect"
"time"

"github.com/apache/arrow/go/v8/arrow/flight"
Expand Down Expand Up @@ -195,19 +196,41 @@ func (ts *tableStub) TimeTableQuery(period time.Duration, startTime time.Time) Q
}

// TimeTable creates a ticking time table in the global scope.
// The period is time between adding new rows to the table.
// The startTime is the time of the first row in the table.
func (ts *tableStub) TimeTable(ctx context.Context, period time.Duration, startTime time.Time) (*TableHandle, error) {
// The period is time between adding new rows to the table. It needs to be either a signed integer type, time.Duration,
// or string.
// The startTime is the time of the first row in the table. It needs to be either a signed integer type, time.Time or a
// string.
func (ts *tableStub) TimeTable(ctx context.Context, period any, startTime any) (*TableHandle, error) {
ctx, err := ts.client.tokenMgr.withToken(ctx)
if err != nil {
return nil, err
}

result := ts.client.ticketFact.newTicket()

req := tablepb2.TimeTableRequest{ResultId: &result,
Period: &tablepb2.TimeTableRequest_PeriodNanos{PeriodNanos: period.Nanoseconds()},
StartTime: &tablepb2.TimeTableRequest_StartTimeNanos{StartTimeNanos: startTime.UnixNano()}}
req := tablepb2.TimeTableRequest{ResultId: &result}
periodVal := reflect.ValueOf(period)
if periodVal.CanInt() {
req.Period = &tablepb2.TimeTableRequest_PeriodNanos{PeriodNanos: periodVal.Int()}
} else if val, ok := period.(time.Duration); ok {
req.Period = &tablepb2.TimeTableRequest_PeriodNanos{PeriodNanos: val.Nanoseconds()}
} else if val, ok := period.(string); ok {
req.Period = &tablepb2.TimeTableRequest_PeriodString{PeriodString: val}
} else {
return nil, fmt.Errorf("expected period of signed integer type, time.Duration or string, found %T", period)
}

timeVal := reflect.ValueOf(startTime)
if timeVal.CanInt() {
req.StartTime = &tablepb2.TimeTableRequest_StartTimeNanos{StartTimeNanos: timeVal.Int()}
} else if val, ok := startTime.(time.Time); ok {
req.StartTime = &tablepb2.TimeTableRequest_StartTimeNanos{StartTimeNanos: val.UnixNano()}
} else if val, ok := startTime.(string); ok {
req.StartTime = &tablepb2.TimeTableRequest_StartTimeString{StartTimeString: val}
} else {
return nil, fmt.Errorf("expected startTime of signed integer type, time.Time or string, found %T", startTime)
}

resp, err := ts.stub.TimeTable(ctx, &req)
if err != nil {
return nil, err
Expand Down

0 comments on commit 0cd7d27

Please sign in to comment.