Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Cassandra db schema on session initialization #5922

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
5041f31
Embeded template file in binary and added query construction and exec…
akstron Oct 28, 2024
30db170
Removed unnecessary SchemaConfig struct
akstron Oct 28, 2024
ce0c375
Added new schema configs in default config generator
akstron Oct 28, 2024
810ab1c
Revert Keyspace removal
akstron Oct 28, 2024
0d6383f
Bug fix while creating queries
akstron Oct 29, 2024
207945f
Improving test
akstron Oct 29, 2024
985f65b
Created new struct for derived params
akstron Oct 29, 2024
1c30503
Remove fields from yaml file
akstron Oct 29, 2024
e4ab709
Added integration test
akstron Nov 19, 2024
c329bba
Rebase fixes
akstron Nov 19, 2024
e3c6045
Minor changes in integration script
akstron Nov 19, 2024
492e15e
removed test
akstron Nov 19, 2024
44c39dc
Updated fields with time.Duration type and added validators and tests
akstron Nov 20, 2024
dfc0c43
minor changes in script
akstron Nov 20, 2024
cb8ae19
Addressed comments
akstron Nov 20, 2024
c3d0fbd
Addressed comments
akstron Nov 21, 2024
728a139
Update pkg/cassandra/config/schema.go
akstron Nov 21, 2024
1b6683d
Update pkg/cassandra/config/config.go
akstron Nov 21, 2024
ce11cc1
Addressed comments
akstron Nov 21, 2024
de1c563
Removed unused CasVersion
akstron Nov 21, 2024
edabe22
Addressed validation comments
akstron Nov 22, 2024
d0e1976
Created helper function for session created and updated tests
akstron Nov 26, 2024
d8479b5
Added schema unit tests
akstron Nov 26, 2024
02b6159
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
73d276a
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
57349a8
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
84b52e1
Fixed build
akstron Nov 26, 2024
9c2f05b
formatting fixes
akstron Nov 26, 2024
2c8de88
test fix
akstron Nov 27, 2024
eeb1951
Added test in workflow
akstron Nov 28, 2024
601365d
fmt fixes
akstron Nov 28, 2024
4aeaad7
create schema bug fix
akstron Nov 28, 2024
0167cb6
exclude v1 run with skip-apply-schema as true
akstron Nov 28, 2024
07b99da
Added schemaCreator and comments in workflow
akstron Nov 28, 2024
04ec76f
ci changes
akstron Nov 28, 2024
7246494
made template params private
akstron Nov 28, 2024
d8613f1
workflow fix
akstron Nov 28, 2024
a7853ec
Changed env variable name
akstron Nov 29, 2024
10bd9aa
lint fixes
akstron Nov 29, 2024
a682db2
lint fix
akstron Nov 29, 2024
35c26b5
test fix
akstron Nov 29, 2024
ddf4fc0
Workflow and test minor changes
akstron Nov 29, 2024
bbf3ac8
test fix
akstron Nov 30, 2024
a8e3aae
workflow changes
akstron Nov 30, 2024
a46bf37
Merge branch 'main' into create-database-scheme-cassandra
akstron Nov 30, 2024
c2b89e0
Apply suggestions from code review
yurishkuro Nov 30, 2024
de2d2ef
Update docs
yurishkuro Nov 30, 2024
100208c
refactor
yurishkuro Nov 30, 2024
37d34dd
clean-up imports
yurishkuro Nov 30, 2024
8d65894
clean-up
yurishkuro Nov 30, 2024
23a9b62
simplify
yurishkuro Nov 30, 2024
d925074
fix
yurishkuro Nov 30, 2024
154deb9
Fix workflow
yurishkuro Nov 30, 2024
a238455
Merge branch 'main' into create-database-scheme-cassandra
yurishkuro Nov 30, 2024
cc9db9c
rename
yurishkuro Nov 30, 2024
9d8ba06
Fix script
yurishkuro Nov 30, 2024
b0ac38a
fix naming for code coverage
yurishkuro Nov 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
CreateSchema bool `mapstructure:"create" valid:"optional"`
// Datacenter is the name for network topology
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Datacenter string `mapstructure:"datacenter" valid:"optional"`
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional,cassandraTTLValidation"`
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional,cassandraTTLValidation"`
// Replication factor for the db
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
// CompactionWindow of format tells the compaction window of the db. Should atleast be 1 minute
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional,isDurationGreaterThanOrEqualMinute"`
}

type Query struct {
Expand Down Expand Up @@ -86,7 +98,13 @@
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
CreateSchema: false,
Keyspace: "jaeger_v1_dc1",
akstron marked this conversation as resolved.
Show resolved Hide resolved
Datacenter: "test",
akstron marked this conversation as resolved.
Show resolved Hide resolved
TraceTTL: 2 * 24 * time.Hour,
DependenciesTTL: 2 * 24 * time.Hour,
ReplicationFactor: 1,
CompactionWindow: time.Minute,
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Expand All @@ -106,6 +124,27 @@
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}

if c.Schema.Datacenter == "" {
c.Schema.Datacenter = source.Schema.Datacenter
}

if c.Schema.TraceTTL == 0 {
c.Schema.TraceTTL = source.Schema.TraceTTL
}

if c.Schema.DependenciesTTL == 0 {
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
}

if c.Schema.ReplicationFactor == 0 {
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
}

if c.Schema.CompactionWindow == 0 {
c.Schema.CompactionWindow = source.Schema.CompactionWindow
}

if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
Expand Down Expand Up @@ -134,8 +173,31 @@
NewSession() (cassandra.Session, error)
}

func (c *Configuration) newSessionPrerequisites() error {
if !c.Schema.CreateSchema {
return nil
}
cluster, err := c.NewCluster()

Check failure on line 180 in pkg/cassandra/config/config.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, all this code effectively repeats NewSession, the only difference is the keyspace. Can you move it into a helper createSession(keyspace string)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation of newSessionPrerequisites kind of uses a hack to override cluster.Keyspace which was set in c.NewCluster(), so that the connection can be established without keyspace.
So, you are suggesting enclosing this override logic in createSession(keyspace string)?

Or something like I create a copy of the Configuration with different keyspace to do the work, so that we don't set cluster.keyspace = "" after creating it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make the new function to depend on the config argument, and pass whichever config is needed in each instance, including cloning it to override keyspace


cluster.Keyspace = ""

session, err := cluster.CreateSession()
if err != nil {
return err
}

Check warning on line 187 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L180-L187

Added lines #L180 - L187 were not covered by tests

wSession := gocqlw.WrapCQLSession(session)
defer wSession.Close()

return generateSchemaIfNotPresent(wSession, &c.Schema)

Check warning on line 192 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L189-L192

Added lines #L189 - L192 were not covered by tests
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
if err := c.newSessionPrerequisites(); err != nil {
return nil, err
}

Check warning on line 199 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L198-L199

Added lines #L198 - L199 were not covered by tests

cluster, err := c.NewCluster()
if err != nil {
return nil, err
Expand Down Expand Up @@ -211,6 +273,23 @@
}

func (c *Configuration) Validate() error {
govalidator.CustomTypeTagMap.Set("cassandraTTLValidation", func(i interface{}, _ interface{}) bool {

Check failure on line 276 in pkg/cassandra/config/config.go

View workflow job for this annotation

GitHub Actions / lint

use-any: since Go 1.18 'interface{}' can be replaced by 'any' (revive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need custom functions, can the conditions not be expressed directly via tags?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we must have custom logic then just write it directly, with proper error messages - right now you are just returning a Boolean, so at best validator can say "property x did not validate".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need custom functions, can the conditions not be expressed directly via tags?

Couldn't find such validations here: https://github.com/asaskevich/govalidator

if we must have custom logic then just write it directly, with proper error messages - right now you are just returning a Boolean, so at best validator can say "property x did not validate".

Fixed it.

duration, ok := i.(time.Duration)
if !ok {
return false
}

Check warning on line 280 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L279-L280

Added lines #L279 - L280 were not covered by tests
// Either set to no-expiry using 0 or if set should be greater than or equal to second
return duration == 0 || duration >= time.Second
})

govalidator.CustomTypeTagMap.Set("isDurationGreaterThanOrEqualMinute", func(i interface{}, _ interface{}) bool {

Check failure on line 285 in pkg/cassandra/config/config.go

View workflow job for this annotation

GitHub Actions / lint

use-any: since Go 1.18 'interface{}' can be replaced by 'any' (revive)
duration, ok := i.(time.Duration)
if !ok {
return false
}

Check warning on line 289 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L288-L289

Added lines #L288 - L289 were not covered by tests
return duration >= time.Minute
})

_, err := govalidator.ValidateStruct(c)
return err
}
16 changes: 16 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config

import (
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -94,3 +95,18 @@ func TestToString(t *testing.T) {
s := cfg.String()
assert.Contains(t, s, "Keyspace:test")
}

func TestConfigSchemaValidation(t *testing.T) {
cfg := DefaultConfiguration()
err := cfg.Validate()
require.NoError(t, err)

cfg.Schema.TraceTTL = time.Millisecond
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.TraceTTL = time.Second
cfg.Schema.CompactionWindow = time.Minute - 1
err = cfg.Validate()
require.Error(t, err)
}
143 changes: 143 additions & 0 deletions pkg/cassandra/config/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2024 The Jaeger Authors.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package config

import (
"bytes"
"embed"
"errors"
"fmt"
"text/template"
"time"

"github.com/jaegertracing/jaeger/pkg/cassandra"
)

//go:embed v004-go-tmpl.cql.tmpl
var schemaFile embed.FS

type TemplateParams struct {
akstron marked this conversation as resolved.
Show resolved Hide resolved
Schema
// Replication is the replication strategy used. Ex: "{'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }"
Replication string
// CompactionWindowInMinutes is constructed from CompactionWindow for using in template
CompactionWindowInMinutes int64
// TraceTTLInSeconds is constructed from TraceTTL for using in template
TraceTTLInSeconds int64
// DependenciesTTLInSeconds is constructed from DependenciesTTL for using in template
DependenciesTTLInSeconds int64
}

func constructTemplateParams(cfg Schema) (TemplateParams, error) {
params := TemplateParams{
Schema: cfg,
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

params.Replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', 'replication_factor': '%v' }", params.ReplicationFactor)
params.CompactionWindowInMinutes = int64(params.CompactionWindow / time.Minute)
params.TraceTTLInSeconds = int64(params.TraceTTL / time.Second)
params.DependenciesTTLInSeconds = int64(params.DependenciesTTL / time.Second)

return params, nil

Check warning on line 42 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L32-L42

Added lines #L32 - L42 were not covered by tests
}

func getQueryFileAsBytes(fileName string, params TemplateParams) ([]byte, error) {
tmpl, err := template.ParseFS(schemaFile, fileName)
if err != nil {
return nil, err
}

Check warning on line 49 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L45-L49

Added lines #L45 - L49 were not covered by tests

var result bytes.Buffer
err = tmpl.Execute(&result, params)
if err != nil {
return nil, err
}

Check warning on line 55 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L51-L55

Added lines #L51 - L55 were not covered by tests

return result.Bytes(), nil

Check warning on line 57 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L57

Added line #L57 was not covered by tests
}

func getQueriesFromBytes(queryFile []byte) ([]string, error) {
lines := bytes.Split(queryFile, []byte("\n"))

var extractedLines [][]byte

for _, line := range lines {
// Remove any comments, if at the end of the line
commentIndex := bytes.Index(line, []byte(`--`))
if commentIndex != -1 {
// remove everything after comment
line = line[0:commentIndex]
}

Check warning on line 71 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L60-L71

Added lines #L60 - L71 were not covered by tests

if len(line) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would make sense to trim spaces before checking for len=0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

continue

Check warning on line 74 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}

extractedLines = append(extractedLines, bytes.TrimSpace(line))

Check warning on line 77 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L77

Added line #L77 was not covered by tests
}

var queries []string

// Construct individual queries strings
var queryString string
for _, line := range extractedLines {
queryString += string(line) + "\n"
if bytes.HasSuffix(line, []byte(";")) {
queries = append(queries, queryString)
queryString = ""
}

Check warning on line 89 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L80-L89

Added lines #L80 - L89 were not covered by tests
}

if len(queryString) > 0 {
return nil, errors.New(`invalid template`)
}

Check warning on line 94 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L92-L94

Added lines #L92 - L94 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do it in generateSchemaIfNotPresent against casQueries, not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite get this. The logic here is that if queryString > 0, means that there is a query string which does not ends with ;, that's what I am validating here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok then please make the error message self-explanatory


return queries, nil

Check warning on line 96 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L96

Added line #L96 was not covered by tests
}

func getCassandraQueriesFromQueryStrings(session cassandra.Session, queries []string) []cassandra.Query {
var casQueries []cassandra.Query

for _, query := range queries {
casQueries = append(casQueries, session.Query(query))
}

Check warning on line 104 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L99-L104

Added lines #L99 - L104 were not covered by tests

return casQueries

Check warning on line 106 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L106

Added line #L106 was not covered by tests
}

func contructSchemaQueries(session cassandra.Session, cfg *Schema) ([]cassandra.Query, error) {
params, err := constructTemplateParams(*cfg)
if err != nil {
return nil, err
}

Check warning on line 113 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L109-L113

Added lines #L109 - L113 were not covered by tests

queryFile, err := getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, params)
if err != nil {
return nil, err
}

Check warning on line 118 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L115-L118

Added lines #L115 - L118 were not covered by tests

queryStrings, err := getQueriesFromBytes(queryFile)
if err != nil {
return nil, err
}

Check warning on line 123 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L120-L123

Added lines #L120 - L123 were not covered by tests

casQueries := getCassandraQueriesFromQueryStrings(session, queryStrings)

return casQueries, nil

Check warning on line 127 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L125-L127

Added lines #L125 - L127 were not covered by tests
}

func generateSchemaIfNotPresent(session cassandra.Session, cfg *Schema) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many functions in this file that are polluting the overall package namespace. I would prefer to introduce a helper struct

type schemaCreator struct {
  session cassandra.Session
  cfg *Schema
}

and define those functions on that struct (and minimize parameter passing)

casQueries, err := contructSchemaQueries(session, cfg)
if err != nil {
return err
}

Check warning on line 134 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L130-L134

Added lines #L130 - L134 were not covered by tests

for _, query := range casQueries {
if err := query.Exec(); err != nil {
return err
}

Check warning on line 139 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L136-L139

Added lines #L136 - L139 were not covered by tests
}

return nil

Check warning on line 142 in pkg/cassandra/config/schema.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L142

Added line #L142 was not covered by tests
}
Loading
Loading