Skip to content

Commit

Permalink
B-21669 - working close call on server; cleanup for handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-mchugh committed Jan 7, 2025
1 parent bd2254b commit 6e122f4
Showing 1 changed file with 26 additions and 22 deletions.
48 changes: 26 additions & 22 deletions pkg/handlers/internalapi/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-openapi/runtime/middleware"
"github.com/gobuffalo/validate/v3"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/transcom/mymove/pkg/appcontext"
Expand Down Expand Up @@ -276,8 +277,14 @@ const (
AVStatusTypeINFECTED AVStatusType = "INFECTED"
)

func constructEventStreamMessage(id int, data string) []byte {
return []byte(fmt.Sprintf("id: %s\nevent: message\ndata: %s\n\n", strconv.Itoa(id), data))
func writeEventStreamMessage(rw http.ResponseWriter, producer runtime.Producer, id int, event string, data string) {
resProcess := []byte(fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n", strconv.Itoa(id), event, data))
if produceErr := producer.Produce(rw, resProcess); produceErr != nil {
panic(produceErr)
}
if f, ok := rw.(http.Flusher); ok {
f.Flush()
}
}

func (o *CustomNewUploadStatusOK) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) {
Expand Down Expand Up @@ -305,17 +312,12 @@ func (o *CustomNewUploadStatusOK) WriteResponse(rw http.ResponseWriter, producer
} else {
uploadStatus = AVStatusType(tags["av-status"])
}
resProcess := constructEventStreamMessage(0, string(uploadStatus))
if produceErr := producer.Produce(rw, resProcess); produceErr != nil {
panic(produceErr)
}

if f, ok := rw.(http.Flusher); ok {
f.Flush()
}
writeEventStreamMessage(rw, producer, 0, "message", string(uploadStatus))

if uploadStatus == AVStatusTypeCLEAN || uploadStatus == AVStatusTypeINFECTED {
return
writeEventStreamMessage(rw, producer, 1, "close", "Connection closed")
return // skip notification loop since object already tagged from anti-virus
}

// Start waiting for tag updates
Expand Down Expand Up @@ -352,7 +354,7 @@ func (o *CustomNewUploadStatusOK) WriteResponse(rw http.ResponseWriter, producer
_ = o.receiver.CloseoutQueue(o.appCtx, queueUrl)
}()

id_counter := 0
id_counter := 1
// Run for 120 seconds, 20 second long polling for receiver, 6 times
for range 6 {
o.appCtx.Logger().Info("Receiving...")
Expand All @@ -376,25 +378,27 @@ func (o *CustomNewUploadStatusOK) WriteResponse(rw http.ResponseWriter, producer
uploadStatus = AVStatusType(tags["av-status"])
}

resProcess := constructEventStreamMessage(id_counter, string(uploadStatus))
if produceErr := producer.Produce(rw, resProcess); produceErr != nil {
panic(produceErr) // let the recovery middleware deal with this
writeEventStreamMessage(rw, producer, id_counter, "message", string(uploadStatus))

if uploadStatus == AVStatusTypeCLEAN || uploadStatus == AVStatusTypeINFECTED {
return errors.New("connection_closed")
}
return nil

return err
})

if errTransaction != nil {
o.appCtx.Logger().Error(err.Error())
if errTransaction != nil && errTransaction.Error() == "connection_closed" {
id_counter++
writeEventStreamMessage(rw, producer, id_counter, "close", "Connection closed")
break
}
}

if f, ok := rw.(http.Flusher); ok {
f.Flush()
if errTransaction != nil {
panic(errTransaction) // let the recovery middleware deal with this
}
}
id_counter++
}

// TODO: add a close here after ends
}

// Handle returns status of an upload
Expand Down

0 comments on commit 6e122f4

Please sign in to comment.