Skip to content

Commit

Permalink
feat(pdp): add endpoint to poll for completed upload (#304)
Browse files Browse the repository at this point in the history
* feat(pdp): add handler to check for piece cid status

* feat(pdp): add handler to check for upload exists
  • Loading branch information
hannahhoward authored Oct 30, 2024
1 parent a3a1c3a commit 8fe488c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func Routes(r *chi.Mux, p *PDPService) {
// POST /pdp/piece
r.Post(path.Join(PDPRoutePath, "/piece"), p.handlePiecePost)

// GET /pdp/piece/
r.Get(path.Join(PDPRoutePath, "/piece/"), p.handleFindPiece)

// PUT /pdp/piece/upload/{uploadUUID}
r.Put(path.Join(PDPRoutePath, "/piece/upload/{uploadUUID}"), p.handlePieceUpload)
}
Expand Down
70 changes: 69 additions & 1 deletion pdp/handlers_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"os"
"path"
"strconv"

"github.com/go-chi/chi/v5"
"github.com/google/uuid"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (p *PDPService) handlePiecePost(w http.ResponseWriter, r *http.Request) {
return false, fmt.Errorf("failed to insert into parked_piece_refs: %w", err)
}

// Create a new 'pdp_piece_uploads' entry pointing to the 'pdp_piecerefs' entry
// Create a new 'pdp_piece_uploads' entry pointing to the 'parked_piece_refs' entry
uploadUUID = uuid.New()
_, err = tx.Exec(`
INSERT INTO pdp_piece_uploads (id, service, piece_cid, notify_url, piece_ref, check_hash_codec, check_hash, check_size)
Expand Down Expand Up @@ -442,3 +443,70 @@ func (p *PDPService) handlePieceUpload(w http.ResponseWriter, r *http.Request) {
// Respond with 204 No Content
w.WriteHeader(http.StatusNoContent)
}

// handle find piece allows one to look up a pdp piece by its original post data as
// query parameters
func (p *PDPService) handleFindPiece(w http.ResponseWriter, r *http.Request) {
// Verify that the request is authorized using ECDSA JWT
_, err := p.verifyJWTToken(r)
if err != nil {
http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}

// Parse query parameters

sizeString := r.URL.Query().Get("size")
size, err := strconv.ParseInt(sizeString, 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("errors parsing size: %s", err.Error()), 400)
return
}
req := PieceHash{
Name: r.URL.Query().Get("name"),
Hash: r.URL.Query().Get("hash"),
Size: size,
}

ctx := r.Context()

pieceCid, havePieceCid, err := req.commp(ctx, p.db)
if err != nil {
http.Error(w, "Failed to process request: "+err.Error(), http.StatusInternalServerError)
return
}

// upload either not complete or does not exist
if !havePieceCid {
http.NotFound(w, r)
return
}

// Verify that a 'parked_pieces' entry exists for the given 'piece_cid'
var count int
err = p.db.QueryRow(ctx, `
SELECT count(*) FROM parked_pieces WHERE piece_cid = $1 AND long_term = TRUE AND complete = TRUE
`, pieceCid.String()).Scan(&count)
if err != nil {
http.Error(w, "Database error", http.StatusInternalServerError)
return
}
if count == 0 {
http.NotFound(w, r)
return
}

response := struct {
PieceCID string `json:"piece_cid"`
}{
PieceCID: pieceCid.String(),
}

// encode response
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, "Failed to write response: "+err.Error(), http.StatusInternalServerError)
return
}
}

0 comments on commit 8fe488c

Please sign in to comment.