From 0249ee6539089056ac42ca9dd791294f3c3d4261 Mon Sep 17 00:00:00 2001 From: Andras Toth <4157749+tothandras@users.noreply.github.com> Date: Sat, 20 Jan 2024 00:02:24 +0100 Subject: [PATCH] fix(clickhouse): fix query events implementation --- .../streaming/clickhouse_connector/query.go | 4 +- .../clickhouse_connector/query_test.go | 54 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index 91695c7a8..59eab2328 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -64,10 +64,10 @@ func (d queryEventsTable) toSQL() (string, []interface{}, error) { where = append(where, query.Equal("namespace", d.Namespace)) if d.From != nil { - where = append(where, query.GreaterEqualThan("windowstart", d.From.Unix())) + where = append(where, query.GreaterEqualThan("time", d.From.Unix())) } if d.To != nil { - where = append(where, query.LessEqualThan("windowend", d.To.Unix())) + where = append(where, query.LessEqualThan("time", d.To.Unix())) } query.Where(where...) diff --git a/internal/streaming/clickhouse_connector/query_test.go b/internal/streaming/clickhouse_connector/query_test.go index 9c398ef2f..f27efd5a4 100644 --- a/internal/streaming/clickhouse_connector/query_test.go +++ b/internal/streaming/clickhouse_connector/query_test.go @@ -402,3 +402,57 @@ func TestListMeterViewSubjects(t *testing.T) { }) } } + +func TestQueryEvents(t *testing.T) { + fromTime, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + toTime, _ := time.Parse(time.RFC3339, "2023-01-02T00:00:00Z") + + tests := []struct { + query queryEventsTable + wantSQL string + wantArgs []interface{} + }{ + { + query: queryEventsTable{ + Database: "openmeter", + Namespace: "my_namespace", + From: &fromTime, + To: &toTime, + Limit: 10, + }, + wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time >= ? AND time <= ? ORDER BY time DESC LIMIT 10", + wantArgs: []interface{}{"my_namespace", fromTime.Unix(), toTime.Unix()}, + }, + { + query: queryEventsTable{ + Database: "openmeter", + Namespace: "my_namespace", + From: &fromTime, + Limit: 10, + }, + wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time >= ? ORDER BY time DESC LIMIT 10", + wantArgs: []interface{}{"my_namespace", fromTime.Unix()}, + }, + { + query: queryEventsTable{ + Database: "openmeter", + Namespace: "my_namespace", + To: &toTime, + Limit: 10, + }, + wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time <= ? ORDER BY time DESC LIMIT 10", + wantArgs: []interface{}{"my_namespace", toTime.Unix()}, + }, + } + + for _, tt := range tests { + gotSql, gotArgs, err := tt.query.toSQL() + if err != nil { + t.Error(err) + return + } + + assert.Equal(t, tt.wantSQL, gotSql) + assert.Equal(t, tt.wantArgs, gotArgs) + } +}