-
Notifications
You must be signed in to change notification settings - Fork 242
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Mateusz Urbanek <[email protected]>
- Loading branch information
Showing
6 changed files
with
246 additions
and
125 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
//go:build cgo | ||
// +build cgo | ||
|
||
package sqlite | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
"os" | ||
"time" | ||
|
||
"github.com/k3s-io/kine/pkg/drivers/generic" | ||
"github.com/k3s-io/kine/pkg/logstructured" | ||
"github.com/k3s-io/kine/pkg/logstructured/sqllog" | ||
"github.com/k3s-io/kine/pkg/server" | ||
"github.com/k3s-io/kine/pkg/util" | ||
"github.com/mattn/go-sqlite3" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/sirupsen/logrus" | ||
|
||
// sqlite db driver | ||
_ "github.com/mattn/go-sqlite3" | ||
) | ||
|
||
func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) { | ||
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig, metricsRegisterer) | ||
return backend, err | ||
} | ||
|
||
func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) { | ||
if dataSourceName == "" { | ||
if err := os.MkdirAll("./db", 0700); err != nil { | ||
return nil, nil, err | ||
} | ||
dataSourceName = "./db/state.db?_journal=WAL&cache=shared&_busy_timeout=30000" | ||
} | ||
|
||
dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false, metricsRegisterer) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
dialect.LastInsertID = true | ||
dialect.GetSizeSQL = `SELECT SUM(pgsize) FROM dbstat` | ||
dialect.CompactSQL = ` | ||
DELETE FROM kine AS kv | ||
WHERE | ||
kv.id IN ( | ||
SELECT kp.prev_revision AS id | ||
FROM kine AS kp | ||
WHERE | ||
kp.name != 'compact_rev_key' AND | ||
kp.prev_revision != 0 AND | ||
kp.id <= ? | ||
UNION | ||
SELECT kd.id AS id | ||
FROM kine AS kd | ||
WHERE | ||
kd.deleted != 0 AND | ||
kd.id <= ? | ||
)` | ||
dialect.PostCompactSQL = `PRAGMA wal_checkpoint(FULL)` | ||
dialect.TranslateErr = func(err error) error { | ||
if err, ok := err.(sqlite3.Error); ok && err.ExtendedCode == sqlite3.ErrConstraintUnique { | ||
return server.ErrKeyExists | ||
} | ||
return err | ||
} | ||
dialect.ErrCode = func(err error) string { | ||
if err == nil { | ||
return "" | ||
} | ||
if err, ok := err.(sqlite3.Error); ok { | ||
return fmt.Sprint(err.ExtendedCode) | ||
} | ||
return err.Error() | ||
} | ||
|
||
// this is the first SQL that will be executed on a new DB conn so | ||
// loop on failure here because in the case of dqlite it could still be initializing | ||
for i := 0; i < 300; i++ { | ||
err = setup(dialect.DB) | ||
if err == nil { | ||
break | ||
} | ||
logrus.Errorf("failed to setup db: %v", err) | ||
select { | ||
case <-ctx.Done(): | ||
return nil, nil, ctx.Err() | ||
case <-time.After(time.Second): | ||
} | ||
time.Sleep(time.Second) | ||
} | ||
if err != nil { | ||
return nil, nil, errors.Wrap(err, "setup db") | ||
} | ||
|
||
dialect.Migrate(context.Background()) | ||
return logstructured.New(sqllog.New(dialect)), dialect, nil | ||
} | ||
|
||
func setup(db *sql.DB) error { | ||
logrus.Infof("Configuring database table schema and indexes, this may take a moment...") | ||
|
||
for _, stmt := range schema { | ||
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) | ||
_, err := db.Exec(stmt) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
logrus.Infof("Database tables and indexes are up to date") | ||
return nil | ||
} |
Oops, something went wrong.