Skip to content

Commit

Permalink
Merge pull request #28 from abbbi/issue_25
Browse files Browse the repository at this point in the history
#25 Backup fails: pipelined request failed: stream error received
  • Loading branch information
tizbac authored Aug 14, 2024
2 parents 73c5ba2 + 5e3df19 commit ba4f331
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
25 changes: 20 additions & 5 deletions cmd/pmoxs3backuproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
w.Header().Add("Content-Length", fmt.Sprintf("%d", s.Size))
w.WriteHeader(http.StatusOK)

io.Copy(w, obj)
return
}
}

s3backuplog.WarnPrint("File %s not found in snapshot %s (%s)", r.URL.Query().Get("archive-name"), mostRecent.S3Prefix(), mostRecent.Files)

w.WriteHeader(http.StatusNotFound)
return
}

if strings.HasPrefix(r.RequestURI, "/fixed_index?") && s.H2Ticket != nil && r.Method == "POST" {
fidxname := r.URL.Query().Get("archive-name")
reusecsum := r.URL.Query().Get("reuse-csum")
Expand Down Expand Up @@ -485,6 +484,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wid, _ := strconv.ParseInt(r.URL.Query().Get("wid"), 10, 32)
s3name := fmt.Sprintf("chunks/%s/%s/%s", digest[0:2], digest[2:4], digest[4:])

var known bool = false

objectStat, e := s.H2Ticket.Client.StatObject(
context.Background(),
*s.SelectedDataStore,
Expand All @@ -493,7 +494,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
)
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "NoSuchKey" {
switch errResponse.Code {
case "NoSuchKey":
_, err := s.H2Ticket.Client.PutObject(
context.Background(),
*s.SelectedDataStore,
Expand All @@ -507,14 +509,29 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
default:
s3backuplog.WarnPrint("Unhandled response checking for existant object: %s", errResponse.Code)
}
} else {
s3backuplog.DebugPrint("%s already in S3", objectStat.Key)
io.ReadAll(r.Body) // we must read data from stream, otherwise backup client gets out of sync
known = true
}
if s.Writers[int32(wid)].Chunksize == 0 {
//Here chunk size is derived
s.Writers[int32(wid)].Chunksize = uint64(size)
}
info := ChunkUploadInfo{}
info.Digest = digest
info.Offset = 0 // todo
info.Size = int64(size)
info.Known = known

r := Response{Data: info}
responsedata, _ := json.Marshal(r)
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "application/json")
w.Write(responsedata)
}

if strings.HasPrefix(r.RequestURI, "/blob?") && s.H2Ticket != nil {
Expand Down Expand Up @@ -558,9 +575,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

w.Header().Add("Content-Length", fmt.Sprintf("%d", st.Size))
w.WriteHeader(http.StatusOK)

io.Copy(w, obj)

}

if strings.HasPrefix(r.RequestURI, "/chunk?") && s.H2Ticket != nil {
Expand Down
7 changes: 7 additions & 0 deletions cmd/pmoxs3backuproxy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type FixedIndexCloseRequest struct {
Size int64 `json:"size"`
}

type ChunkUploadInfo struct {
Digest string `json:"digest"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Known bool `json:"chunk_is_known"`
}

type Response struct {
Data interface{} `json:"data"`
// other fields
Expand Down

0 comments on commit ba4f331

Please sign in to comment.