Skip to content

Commit

Permalink
fix(clickhouse): fix query events implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tothandras committed Jan 19, 2024
1 parent 8c891f0 commit 0249ee6
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
4 changes: 2 additions & 2 deletions internal/streaming/clickhouse_connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func (d queryEventsTable) toSQL() (string, []interface{}, error) {

where = append(where, query.Equal("namespace", d.Namespace))
if d.From != nil {
where = append(where, query.GreaterEqualThan("windowstart", d.From.Unix()))
where = append(where, query.GreaterEqualThan("time", d.From.Unix()))
}
if d.To != nil {
where = append(where, query.LessEqualThan("windowend", d.To.Unix()))
where = append(where, query.LessEqualThan("time", d.To.Unix()))
}
query.Where(where...)

Expand Down
54 changes: 54 additions & 0 deletions internal/streaming/clickhouse_connector/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,57 @@ func TestListMeterViewSubjects(t *testing.T) {
})
}
}

func TestQueryEvents(t *testing.T) {
fromTime, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z")
toTime, _ := time.Parse(time.RFC3339, "2023-01-02T00:00:00Z")

tests := []struct {
query queryEventsTable
wantSQL string
wantArgs []interface{}
}{
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
From: &fromTime,
To: &toTime,
Limit: 10,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time >= ? AND time <= ? ORDER BY time DESC LIMIT 10",
wantArgs: []interface{}{"my_namespace", fromTime.Unix(), toTime.Unix()},
},
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
From: &fromTime,
Limit: 10,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time >= ? ORDER BY time DESC LIMIT 10",
wantArgs: []interface{}{"my_namespace", fromTime.Unix()},
},
{
query: queryEventsTable{
Database: "openmeter",
Namespace: "my_namespace",
To: &toTime,
Limit: 10,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time <= ? ORDER BY time DESC LIMIT 10",
wantArgs: []interface{}{"my_namespace", toTime.Unix()},
},
}

for _, tt := range tests {
gotSql, gotArgs, err := tt.query.toSQL()
if err != nil {
t.Error(err)
return
}

assert.Equal(t, tt.wantSQL, gotSql)
assert.Equal(t, tt.wantArgs, gotArgs)
}
}

0 comments on commit 0249ee6

Please sign in to comment.