Skip to content

Commit

Permalink
Usage report for clickhouse (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
mieciu authored Dec 6, 2024
1 parent 7e4022a commit f9b4387
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
96 changes: 96 additions & 0 deletions quesma/telemetry/phone_home.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package telemetry
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"database/sql"
"fmt"
Expand All @@ -19,6 +20,7 @@ import (
"quesma/elasticsearch"
"quesma/health"
telemetry_headers "quesma/telemetry/headers"
"sort"

"quesma/logger"
"quesma/quesma/config"
Expand Down Expand Up @@ -46,6 +48,7 @@ const (

// for local debugging purposes
phoneHomeLocalEnabled = false // used initially for testing
tablesInUsageReport = 10
)

type ClickHouseStats struct {
Expand All @@ -55,6 +58,9 @@ type ClickHouseStats struct {
OpenConnection int `json:"open_connection"`
MaxOpenConnection int `json:"max_open_connection"`
ServerVersion string `json:"server_version"`
DbInfoHash string `json:"db_info_hash"`
BillableSize int64 `json:"billable_size"`
TopTablesSizeInfo string `json:"top_tables_size_info"`
}

type ElasticStats struct {
Expand Down Expand Up @@ -296,6 +302,88 @@ where active
return nil
}

func (a *agent) collectClickHouseTableSizes(ctx context.Context) (int64, map[string]int64, error) {
totalSize, tablesWithSizes, err := a.getTableSizes(a.ctx)
if err != nil {
logger.WarnWithCtx(ctx).Msgf("Error getting table sizes from clickhouse: %v", err)
return 0, nil, err
}
return totalSize, tablesWithSizes, nil
}

func (a *agent) getDbInfoHash() string {
dbUrl, dbName, dbUser := "", "default", "<no-user>"
if a.config.ClickHouse.User != "" {
dbUser = a.config.ClickHouse.User
}
if a.config.ClickHouse.Database != "" {
dbName = a.config.ClickHouse.Database
}
if a.config.ClickHouse.Url != nil {
dbUrl = a.config.ClickHouse.Url.String()
}
// we hash it to avoid leaking sensitive info
dbInfoHash := sha256.Sum256([]byte(fmt.Sprintf("%s@%s/%s", dbUser, dbUrl, dbName)))
return fmt.Sprintf("%x", dbInfoHash[:8])
}

func (a *agent) getTableSizes(ctx context.Context) (int64, map[string]int64, error) {
tableSizes := make(map[string]int64)
dbName := "default"
allTablesSize := int64(0)
if a.config.ClickHouse.Database != "" {
dbName = a.config.ClickHouse.Database
}
query := `SELECT table, sum(bytes_on_disk) AS total_size
FROM system.parts
WHERE active = 1 AND database = ?
GROUP BY table
ORDER BY total_size DESC;`

rows, err := a.clickHouseDb.QueryContext(ctx, query, dbName)
if err != nil {
return 0, nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
for rows.Next() {
var tableName string
var totalSize int64
if err := rows.Scan(&tableName, &totalSize); err != nil {
return 0, nil, fmt.Errorf("failed to scan row: %w", err)
}
tableSize := totalSize / 1000000 // convert bytes to megabytes
if tableSize >= 1 { // we're not interested in tables smaller than 1MB
tableSizes[tableName] = tableSize
}
allTablesSize += tableSize
}
tableSizes = getTopNValues(tableSizes, tablesInUsageReport)

if err := rows.Err(); err != nil {
return 0, nil, fmt.Errorf("error iterating over rows: %w", err)
}
return allTablesSize, tableSizes, nil
}

func getTopNValues(in map[string]int64, n int) map[string]int64 {
type kv struct {
Key string
Value int64
}
var sortedSlice []kv
for k, v := range in {
sortedSlice = append(sortedSlice, kv{k, v})
}
sort.Slice(sortedSlice, func(i, j int) bool {
return sortedSlice[i].Value > sortedSlice[j].Value
})
result := make(map[string]int64) // get the top `n` values
for i := 0; i < n && i < len(sortedSlice); i++ {
result[sortedSlice[i].Key] = sortedSlice[i].Value
}
return result
}

func (a *agent) collectClickHouseVersion(ctx context.Context, stats *ClickHouseStats) error {

// https://clickhouse.com/docs/en/sql-reference/functions/other-functions#version
Expand Down Expand Up @@ -539,6 +627,14 @@ func (a *agent) collect(ctx context.Context, reportType string) (stats PhoneHome
} else {
stats.ClickHouse = ClickHouseStats{Status: "paused"}
}
if !strings.HasPrefix(a.config.ClickHouse.ConnectorType, "hydrolix") { // we only check table sizes for ClickHouse
if totalSize, topTableSizes, err := a.collectClickHouseTableSizes(ctx); err == nil {
stats.ClickHouse.DbInfoHash = a.getDbInfoHash()
stats.ClickHouse.BillableSize = totalSize
stats.ClickHouse.TopTablesSizeInfo = fmt.Sprintf("%v", topTableSizes)
logger.Info().Msgf("[USAGE REPORT] dababase=[%s] billable_size_in_Mbs=[%d] top_table_sizes=%v", a.getDbInfoHash(), totalSize, topTableSizes)
}
}

stats.IngestCounters = a.ingestCounters.AggregateAndReset()

Expand Down
73 changes: 73 additions & 0 deletions quesma/telemetry/phone_home_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,76 @@ func TestAgent_CollectElastic_Version(t *testing.T) {
assert.Equal(t, "8.11.1", response.Version.Number)

}

func TestGetTopNValues(t *testing.T) {
tests := []struct {
name string
input map[string]int64
n int
expected map[string]int64
}{
{
name: "LessThanN",
input: map[string]int64{
"table2": 300,
"table1": 500,
},
n: 5,
expected: map[string]int64{
"table1": 500,
"table2": 300,
},
},
{
name: "EqualToN",
input: map[string]int64{
"table1": 200,
"table3": 500,
"table2": 300,
},
n: 3,
expected: map[string]int64{
"table3": 500,
"table2": 300,
"table1": 200,
},
},
{
name: "MoreThanN",
input: map[string]int64{
"table2": 300,
"table4": 100,
"table1": 500,
"table3": 200,
},
n: 3,
expected: map[string]int64{
"table1": 500,
"table2": 300,
"table3": 200,
},
},
{
name: "EmptyMap",
input: map[string]int64{},
n: 3,
expected: map[string]int64{},
},
{
name: "NegativeN",
input: map[string]int64{
"table1": 500,
"table2": 300,
},
n: -1,
expected: map[string]int64{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := getTopNValues(tt.input, tt.n)
assert.Equal(t, tt.expected, result)
})
}
}

0 comments on commit f9b4387

Please sign in to comment.