Skip to content

Commit

Permalink
Add multidb support (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
visill authored Nov 11, 2024
1 parent 3d1c950 commit ddcbe32
Showing 1 changed file with 43 additions and 28 deletions.
71 changes: 43 additions & 28 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,78 +31,88 @@ type ExpireHint struct {
func (database *DatabaseHandler) populateIndex() {

}

func (database *DatabaseHandler) GetVirtualExpireIndexes(port uint64) (map[string]bool, map[string]uint64, error) { //TODO несколько баз
db, err := getDatabase(port)
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}
func (database *DatabaseHandler) GetVirtualExpireIndex(port uint64, db DB, virtualIndex *map[string]bool, expireIndex *map[string]uint64) error {
ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database")
conn, err := connectToDatabase(port, db.name)
if err != nil {
return nil, nil, err
return err
}
defer conn.Close() //error
ylogger.Zero.Debug().Msg("connected to database")

c := make(map[string]uint64, 0)

/* Todo: check that yezzey version >= 1.8.1 */
if false {
rows, err := conn.Query(`SELECT x_path, expire_lsn FROM yezzey.yezzey_expire_hint;`)
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
return fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}
defer rows.Close()
ylogger.Zero.Debug().Msg("executed select")

for rows.Next() {
row := ExpireHint{}
if err := rows.Scan(&row.x_path, &row.expireLsn); err != nil {
return nil, nil, fmt.Errorf("unable to parse query output %v", err)
return fmt.Errorf("unable to parse query output %v", err)
}

lsn, err := pgx.ParseLSN(row.expireLsn)
if err != nil {
return nil, nil, fmt.Errorf("unable to parse query output %v", err)
return fmt.Errorf("unable to parse query output %v", err)
}

ylogger.Zero.Debug().Str("x_path", row.x_path).Str("lsn", row.expireLsn).Msg("added file to expire hint")
c[row.x_path] = lsn
(*expireIndex)[row.x_path] = lsn
}
ylogger.Zero.Debug().Msg("fetched expire hint info")
}

rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`)
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
return fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}
defer rows2.Close()

c2 := make(map[string]bool, 0)
for rows2.Next() {
xpath := ""
if err := rows2.Scan(&xpath); err != nil {
return nil, nil, fmt.Errorf("unable to parse query output %v", err)
return fmt.Errorf("unable to parse query output %v", err)
}
c2[xpath] = true
(*virtualIndex)[xpath] = true
ylogger.Zero.Debug().Str("x_path", xpath).Msg("added")
}
ylogger.Zero.Debug().Msg("fetched virtual index info")
ylogger.Zero.Debug().Msg("fetched virtual index info")

return err
}
func (database *DatabaseHandler) GetVirtualExpireIndexes(port uint64) (map[string]bool, map[string]uint64, error) { //TODO несколько баз
databases, err := getDatabase(port)
if err != nil || databases == nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
}

expireIndex := make(map[string]uint64, 0)
virtualIndex := make(map[string]bool, 0)
for _, db := range databases {
err = database.GetVirtualExpireIndex(port, db, &virtualIndex, &expireIndex)
if err != nil {
return nil, nil, err
}

return c2, c, err
}
return virtualIndex, expireIndex, nil
}

func getDatabase(port uint64) (DB, error) {
func getDatabase(port uint64) ([]DB, error) {
var databases = []DB{}
conn, err := connectToDatabase(port, "postgres")
if err != nil {
return DB{}, err
return nil, err
}
defer conn.Close() //error
ylogger.Zero.Debug().Msg("connected to db")
rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`)
if err != nil {
return DB{}, err
return nil, err
}
defer rows.Close()
ylogger.Zero.Debug().Msg("recieved db list")
Expand All @@ -111,7 +121,7 @@ func getDatabase(port uint64) (DB, error) {
row := DB{}
ylogger.Zero.Debug().Msg("cycle 1")
if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil {
return DB{}, err
return nil, err
}
ylogger.Zero.Debug().Msg("cycle 2")
ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database")
Expand All @@ -122,14 +132,14 @@ func getDatabase(port uint64) (DB, error) {
ylogger.Zero.Debug().Str("db", row.name).Msg("check database")
connDb, err := connectToDatabase(port, row.name)
if err != nil {
return DB{}, err
return nil, err
}
defer connDb.Close() //error
ylogger.Zero.Debug().Msg("cycle 3")

rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`)
if err != nil {
return DB{}, err
return nil, err
}
defer rowsdb.Close()
ylogger.Zero.Debug().Msg("cycle 4")
Expand All @@ -138,18 +148,23 @@ func getDatabase(port uint64) (DB, error) {
err = rowsdb.Scan(&ans)
if err != nil {
ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check")
return DB{}, err
return nil, err
}
ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema")
if ans {
ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database")
ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database")
return row, nil
databases = append(databases, row)
}

ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database")
}
return DB{}, fmt.Errorf("no yezzey schema across databases")
if len(databases) == 0 {
return nil, fmt.Errorf("no yezzey schema across databases")

} else {
return databases, nil
}
}

func connectToDatabase(port uint64, database string) (*pgx.Conn, error) {
Expand Down

0 comments on commit ddcbe32

Please sign in to comment.