diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 970a113..1a69b84 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" @@ -328,66 +329,89 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ycl.SetExternalFilePath(msg.Name) - // получить время первого бэкапа - firstBackupLSN, err := getLSN(msg.Segnum) + //get firsr backup lsn + firstBackupLSN, err := getFirstLSN(msg.Segnum) if err != nil { - fmt.Printf("error in gettime %v", err) // delete this + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? } ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") - //залистить файлы + //list files in storage ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") objectMetas, err := s.ListPath(msg.Name) if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") - + _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to complete request") return nil } - ylogger.Zero.Info().Int("metas count", len(objectMetas)).Msg("got object metas") - // проверить yezzey vi и ei - может прочитать зарание - //conect pgx to port and select from yezzey schema - vi, ei, err := getVirtualIndex(msg.Port) //TODO rename method + ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") + + vi, ei, err := getVirtualExpireIndexes(msg.Port) if err != nil { - fmt.Printf("error in copy %v", err) // delete this + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") + _ = ycl.ReplyError(fmt.Errorf("could not get virtual and expire indexes: %s", err), "failed to compelete request") + return nil } ylogger.Zero.Info().Msg("recieved virtual index and expire index") ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") - //проверить и скопировать var failed []*storage.S3ObjectMeta retryCount := 0 - amount := 0 + deletedFilesCount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(objectMetas); i++ { - p1 := strings.Split(objectMetas[i].Path, "/") + p1 := strings.Split(objectMetas[i].Path, "/") //TODO fix this p2 := p1[len(p1)-1] p3 := strings.Split(p2, "_") - ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - lsn, ok := ei[ans] - if !vi[ans] && (lsn < firstBackupLSN || !ok) { - amount++ - if !msg.Confirm { - ylogger.Zero.Info().Str("file: %s", objectMetas[i].Path).Msg("File will be deleted") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + lsn, ok := ei[p2] + ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") + if !vi[p2] && (lsn < firstBackupLSN || !ok) { + ylogger.Zero.Debug().Str("file", objectMetas[i].Path). + Bool("file in expire indexi", ok). + Bool("lsn is less than in first backup", lsn < firstBackupLSN). + Msg("file will be deleted") + deletedFilesCount++ + if !msg.Confirm { //do not delete files if no confirmation flag provided continue } - err = s.MoveObject(objectMetas[i].Path, objectMetas[i].Path+"_trash") + filePathParts := strings.Split(objectMetas[i].Path, "/") + err = s.MoveObject(objectMetas[i].Path, fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) if err != nil { - amount-- - fmt.Printf("error in copy %v", err) // delete this + deletedFilesCount-- + ylogger.Zero.Warn().AnErr("err", err).Str("file", objectMetas[i].Path).Msg("failed to move file") failed = append(failed, objectMetas[i]) } } } objectMetas = failed - fmt.Printf("failed files count: %d\n", len(objectMetas)) failed = make([]*storage.S3ObjectMeta, 0) - ylogger.Zero.Info().Int("amount", amount).Msg("deleted files count") + } + + ylogger.Zero.Info().Int("amount", deletedFilesCount).Msg("deleted files count") + + if len(objectMetas) > 0 { + ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("some files were not moved") + ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to move some files") + + _ = ycl.ReplyError(err, "failed to move some files") + return nil + } + + if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return nil + } + ylogger.Zero.Info().Msg("Deleted garbage successfully") + if !msg.Confirm { + ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") } default: - ylogger.Zero.Error().Any("type", tp).Msg("what type is it") + ylogger.Zero.Error().Any("type", tp).Msg("unknown message type") _ = ycl.ReplyError(nil, "wrong request type") return nil @@ -397,7 +421,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl } // get lsn of the oldest backup -func getLSN(seg int) (uint64, error) { +func getFirstLSN(seg int) (uint64, error) { cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") var out bytes.Buffer @@ -463,7 +487,7 @@ func connectToDatabase(port int, database string) (*pgx.Conn, error) { return conn, nil } -func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз +func getVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз db, err := getDatabase(port) if err != nil { return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix @@ -495,7 +519,8 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T return nil, nil, fmt.Errorf("unable to parse query output %v", err) } - c[fmt.Sprintf("%d_%d_%s_%s_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn + ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") + c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn } ylogger.Zero.Debug().Msg("read 1") @@ -515,9 +540,11 @@ func getVirtualIndex(port int) (map[string]bool, map[string]uint64, error) { //T p1 := strings.Split(xpath, "/") p2 := p1[len(p1)-1] p3 := strings.Split(p2, "_") - ans := fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) - c2[ans] = true - ylogger.Zero.Debug().Str("file", ans).Msg("added") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + c2[p2] = true + ylogger.Zero.Debug().Str("file", p2).Msg("added") } ylogger.Zero.Debug().Msg("read 3") @@ -531,7 +558,7 @@ func getDatabase(port int) (DB, error) { } defer conn.Close() //error ylogger.Zero.Debug().Msg("connected to db") - rows, err := conn.Query(`SELECT datname, dattablespace, oid FROM pg_database WHERE datallowconn;`) + rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) if err != nil { return DB{}, err } @@ -541,10 +568,11 @@ func getDatabase(port int) (DB, error) { for rows.Next() { row := DB{} ylogger.Zero.Debug().Msg("cycle 1") - if err := rows.Scan(&row.name, &row.tablespace, &row.oid); err != nil { + if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { return DB{}, err } ylogger.Zero.Debug().Msg("cycle 2") + ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") if row.name == "postgres" { continue } @@ -572,26 +600,27 @@ func getDatabase(port int) (DB, error) { } ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") if ans { - ylogger.Zero.Debug().Str("db: %s", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") return row, nil } - ylogger.Zero.Debug().Str("db: %s", row.name).Msg("no yezzey schema in database") + ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") } return DB{}, fmt.Errorf("no yezzey schema across databases") } type Ei struct { - reloid string - relfileoid string + reloid pgtype.OID + relfileoid pgtype.OID expireLsn string fqnmd5 string } type DB struct { name string - tablespace int - oid int + tablespace pgtype.OID + oid pgtype.OID } type BackupLSN struct { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index e0ffd39..a4fed5a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -180,21 +180,24 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") return err } + ylogger.Zero.Debug().Msg("aquired session") - fromPath := path.Join(s.cnf.StoragePrefix, from) + fromPath := from toPath := path.Join(s.cnf.StoragePrefix, to) + ylogger.Zero.Debug().Str("to", toPath).Msg("to path") input := s3.CopyObjectInput{ Bucket: &s.cnf.StorageBucket, - CopySource: aws.String(fromPath), + CopySource: aws.String(s.cnf.StorageBucket + "/" + fromPath), Key: aws.String(toPath), } - _, err = sess.CopyObject(&input) + out, err := sess.CopyObject(&input) if err != nil { ylogger.Zero.Err(err).Msg("failed to copy object") return err } + ylogger.Zero.Debug().Str("", out.GoString()).Msg("copied object") input2 := s3.DeleteObjectInput{ Bucket: &s.cnf.StorageBucket, @@ -205,5 +208,6 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { if err != nil { ylogger.Zero.Err(err).Msg("failed to delete old object") } + ylogger.Zero.Debug().Msg("deleted object") return err }