From c03c8f695c528c8abb384a5621475eea1becd682 Mon Sep 17 00:00:00 2001 From: Victor Date: Mon, 11 Nov 2024 08:18:47 +0000 Subject: [PATCH] Add multidb support --- pkg/database/database.go | 71 ++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/pkg/database/database.go b/pkg/database/database.go index f0aaf6c..1da73bd 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -31,27 +31,20 @@ type ExpireHint struct { func (database *DatabaseHandler) populateIndex() { } - -func (database *DatabaseHandler) GetVirtualExpireIndexes(port uint64) (map[string]bool, map[string]uint64, error) { //TODO несколько баз - db, err := getDatabase(port) - if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix - } +func (database *DatabaseHandler) GetVirtualExpireIndex(port uint64, db DB, virtualIndex *map[string]bool, expireIndex *map[string]uint64) error { ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") conn, err := connectToDatabase(port, db.name) if err != nil { - return nil, nil, err + return err } defer conn.Close() //error ylogger.Zero.Debug().Msg("connected to database") - c := make(map[string]uint64, 0) - /* Todo: check that yezzey version >= 1.8.1 */ if false { rows, err := conn.Query(`SELECT x_path, expire_lsn FROM yezzey.yezzey_expire_hint;`) if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + return fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } defer rows.Close() ylogger.Zero.Debug().Msg("executed select") @@ -59,50 +52,67 @@ func (database *DatabaseHandler) GetVirtualExpireIndexes(port uint64) (map[strin for rows.Next() { row := ExpireHint{} if err := rows.Scan(&row.x_path, &row.expireLsn); err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) + return fmt.Errorf("unable to parse query output %v", err) } lsn, err := pgx.ParseLSN(row.expireLsn) if err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) + return fmt.Errorf("unable to parse query output %v", err) } ylogger.Zero.Debug().Str("x_path", row.x_path).Str("lsn", row.expireLsn).Msg("added file to expire hint") - c[row.x_path] = lsn + (*expireIndex)[row.x_path] = lsn } ylogger.Zero.Debug().Msg("fetched expire hint info") } rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) if err != nil { - return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + return fmt.Errorf("unable to get ao/aocs tables %v", err) //fix } defer rows2.Close() - c2 := make(map[string]bool, 0) for rows2.Next() { xpath := "" if err := rows2.Scan(&xpath); err != nil { - return nil, nil, fmt.Errorf("unable to parse query output %v", err) + return fmt.Errorf("unable to parse query output %v", err) } - c2[xpath] = true + (*virtualIndex)[xpath] = true ylogger.Zero.Debug().Str("x_path", xpath).Msg("added") } - ylogger.Zero.Debug().Msg("fetched virtual index info") + ylogger.Zero.Debug().Msg("fetched virtual index info") + + return err +} +func (database *DatabaseHandler) GetVirtualExpireIndexes(port uint64) (map[string]bool, map[string]uint64, error) { //TODO несколько баз + databases, err := getDatabase(port) + if err != nil || databases == nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + + expireIndex := make(map[string]uint64, 0) + virtualIndex := make(map[string]bool, 0) + for _, db := range databases { + err = database.GetVirtualExpireIndex(port, db, &virtualIndex, &expireIndex) + if err != nil { + return nil, nil, err + } - return c2, c, err + } + return virtualIndex, expireIndex, nil } -func getDatabase(port uint64) (DB, error) { +func getDatabase(port uint64) ([]DB, error) { + var databases = []DB{} conn, err := connectToDatabase(port, "postgres") if err != nil { - return DB{}, err + return nil, err } defer conn.Close() //error ylogger.Zero.Debug().Msg("connected to db") rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) if err != nil { - return DB{}, err + return nil, err } defer rows.Close() ylogger.Zero.Debug().Msg("recieved db list") @@ -111,7 +121,7 @@ func getDatabase(port uint64) (DB, error) { row := DB{} ylogger.Zero.Debug().Msg("cycle 1") if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { - return DB{}, err + return nil, err } ylogger.Zero.Debug().Msg("cycle 2") ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") @@ -122,14 +132,14 @@ func getDatabase(port uint64) (DB, error) { ylogger.Zero.Debug().Str("db", row.name).Msg("check database") connDb, err := connectToDatabase(port, row.name) if err != nil { - return DB{}, err + return nil, err } defer connDb.Close() //error ylogger.Zero.Debug().Msg("cycle 3") rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) if err != nil { - return DB{}, err + return nil, err } defer rowsdb.Close() ylogger.Zero.Debug().Msg("cycle 4") @@ -138,18 +148,23 @@ func getDatabase(port uint64) (DB, error) { err = rowsdb.Scan(&ans) if err != nil { ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") - return DB{}, err + return nil, err } ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") if ans { ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") - return row, nil + databases = append(databases, row) } ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") } - return DB{}, fmt.Errorf("no yezzey schema across databases") + if len(databases) == 0 { + return nil, fmt.Errorf("no yezzey schema across databases") + + } else { + return databases, nil + } } func connectToDatabase(port uint64, database string) (*pgx.Conn, error) {