Skip to content

Commit

Permalink
fixed some cases
Browse files Browse the repository at this point in the history
  • Loading branch information
debebantur committed Aug 6, 2024
1 parent fe241f0 commit 658d9af
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 43 deletions.
109 changes: 69 additions & 40 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/jackc/pgx"
"github.com/jackc/pgx/pgtype"
"github.com/pkg/errors"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
Expand Down Expand Up @@ -328,66 +329,89 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl

ycl.SetExternalFilePath(msg.Name)

// получить время первого бэкапа
firstBackupLSN, err := getLSN(msg.Segnum)
//get firsr backup lsn
firstBackupLSN, err := getFirstLSN(msg.Segnum)
if err != nil {
fmt.Printf("error in gettime %v", err) // delete this
ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups?
}
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN")

//залистить файлы
//list files in storage
ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path")
objectMetas, err := s.ListPath(msg.Name)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")

_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to complete request")
return nil
}
ylogger.Zero.Info().Int("metas count", len(objectMetas)).Msg("got object metas")
// проверить yezzey vi и ei - может прочитать зарание
//conect pgx to port and select from yezzey schema
vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method
ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count")

vi, ei, err := getVirtualExpireIndexes(msg.Port)
if err != nil {
fmt.Printf("error in copy %v", err) // delete this
ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes")
_ = ycl.ReplyError(fmt.Errorf("could not get virtual and expire indexes: %s", err), "failed to compelete request")
return nil
}
ylogger.Zero.Info().Msg("recieved virtual index and expire index")
ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count")
ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count")

//проверить и скопировать
var failed []*storage.S3ObjectMeta
retryCount := 0
amount := 0
deletedFilesCount := 0
for len(objectMetas) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(objectMetas); i++ {
p1 := strings.Split(objectMetas[i].Path, "/")
p1 := strings.Split(objectMetas[i].Path, "/") //TODO fix this
p2 := p1[len(p1)-1]
p3 := strings.Split(p2, "_")
ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3])
lsn, ok := ei[ans]
if !vi[ans] && (lsn < firstBackupLSN || !ok) {
amount++
if !msg.Confirm {
ylogger.Zero.Info().Str("file: %s", objectMetas[i].Path).Msg("File will be deleted")
if len(p3) >= 4 {
p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3])
}
lsn, ok := ei[p2]
ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn")
if !vi[p2] && (lsn < firstBackupLSN || !ok) {
ylogger.Zero.Debug().Str("file", objectMetas[i].Path).
Bool("file in expire indexi", ok).
Bool("lsn is less than in first backup", lsn < firstBackupLSN).
Msg("file will be deleted")
deletedFilesCount++
if !msg.Confirm { //do not delete files if no confirmation flag provided
continue
}
err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash")
filePathParts := strings.Split(objectMetas[i].Path, "/")
err = s.MoveObject(objectMetas[i].Path, fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1]))
if err != nil {
amount--
fmt.Printf("error in copy %v", err) // delete this
deletedFilesCount--
ylogger.Zero.Warn().AnErr("err", err).Str("file", objectMetas[i].Path).Msg("failed to move file")
failed = append(failed, objectMetas[i])
}
}
}
objectMetas = failed
fmt.Printf("failed files count: %d\n", len(objectMetas))
failed = make([]*storage.S3ObjectMeta, 0)
ylogger.Zero.Info().Int("amount", amount).Msg("deleted files count")
}

ylogger.Zero.Info().Int("amount", deletedFilesCount).Msg("deleted files count")

if len(objectMetas) > 0 {
ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("some files were not moved")
ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to move some files")

_ = ycl.ReplyError(err, "failed to move some files")
return nil
}

if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
return nil
}
ylogger.Zero.Info().Msg("Deleted garbage successfully")
if !msg.Confirm {
ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted")
}

default:
ylogger.Zero.Error().Any("type", tp).Msg("what type is it")
ylogger.Zero.Error().Any("type", tp).Msg("unknown message type")
_ = ycl.ReplyError(nil, "wrong request type")

return nil
Expand All @@ -397,7 +421,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
}

// get lsn of the oldest backup
func getLSN(seg int) (uint64, error) {
func getFirstLSN(seg int) (uint64, error) {
cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml")
ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args")
var out bytes.Buffer
Expand Down Expand Up @@ -463,7 +487,7 @@ func connectToDatabase(port int, database string) (*pgx.Conn, error) {
return conn, nil
}

func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз
func getVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз
db, err := getDatabase(port)
if err != nil {
return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix
Expand Down Expand Up @@ -495,7 +519,8 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T
return nil, nil, fmt.Errorf("unable to parse query output %v", err)
}

c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn
ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei")
c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn
}
ylogger.Zero.Debug().Msg("read 1")

Expand All @@ -515,9 +540,11 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T
p1 := strings.Split(xpath, "/")
p2 := p1[len(p1)-1]
p3 := strings.Split(p2, "_")
ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3])
c2[ans] = true
ylogger.Zero.Debug().Str("file", ans).Msg("added")
if len(p3) >= 4 {
p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3])
}
c2[p2] = true
ylogger.Zero.Debug().Str("file", p2).Msg("added")
}
ylogger.Zero.Debug().Msg("read 3")

Expand All @@ -531,7 +558,7 @@ func getDatabase(port int) (DB, error) {
}
defer conn.Close() //error
ylogger.Zero.Debug().Msg("connected to db")
rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`)
rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`)
if err != nil {
return DB{}, err
}
Expand All @@ -541,10 +568,11 @@ func getDatabase(port int) (DB, error) {
for rows.Next() {
row := DB{}
ylogger.Zero.Debug().Msg("cycle 1")
if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil {
if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil {
return DB{}, err
}
ylogger.Zero.Debug().Msg("cycle 2")
ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database")
if row.name == "postgres" {
continue
}
Expand Down Expand Up @@ -572,26 +600,27 @@ func getDatabase(port int) (DB, error) {
}
ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema")
if ans {
ylogger.Zero.Debug().Str("db: %s", row.name).Msg("found yezzey schema in database")
ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database")
ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database")
return row, nil
}

ylogger.Zero.Debug().Str("db: %s", row.name).Msg("no yezzey schema in database")
ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database")
}
return DB{}, fmt.Errorf("no yezzey schema across databases")
}

type Ei struct {
reloid string
relfileoid string
reloid pgtype.OID
relfileoid pgtype.OID
expireLsn string
fqnmd5 string
}

type DB struct {
name string
tablespace int
oid int
tablespace pgtype.OID
oid pgtype.OID
}

type BackupLSN struct {
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,24 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
return err
}
ylogger.Zero.Debug().Msg("aquired session")

fromPath := path.Join(s.cnf.StoragePrefix, from)
fromPath := from
toPath := path.Join(s.cnf.StoragePrefix, to)
ylogger.Zero.Debug().Str("to", toPath).Msg("to path")

input := s3.CopyObjectInput{
Bucket: &s.cnf.StorageBucket,
CopySource: aws.String(fromPath),
CopySource: aws.String(s.cnf.StorageBucket + "/" + fromPath),
Key: aws.String(toPath),
}

_, err = sess.CopyObject(&input)
out, err := sess.CopyObject(&input)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to copy object")
return err
}
ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object")

input2 := s3.DeleteObjectInput{
Bucket: &s.cnf.StorageBucket,
Expand All @@ -205,5 +208,6 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error {
if err != nil {
ylogger.Zero.Err(err).Msg("failed to delete old object")
}
ylogger.Zero.Debug().Msg("deleted object")
return err
}

0 comments on commit 658d9af

Please sign in to comment.