Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Use an upload session for large attachments (#1148)
Browse files Browse the repository at this point in the history
## Description

Use an upload session and chunked writes for large attachments.

This commit moves the logic used for OneDrive file upload to the common `uploadsession` pkg and 
leverages that for the upload.

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* #1115 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x] ⚡ Unit test
- [ ] 💚 E2E
  • Loading branch information
vkamra authored Oct 13, 2022
1 parent 0c921e8 commit f1f6c06
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 72 deletions.
117 changes: 117 additions & 0 deletions src/internal/connector/exchange/attachment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package exchange

import (
"bytes"
"context"
"io"

"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item/attachments/createuploadsession"
"github.com/pkg/errors"

"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/connector/uploadsession"
"github.com/alcionai/corso/src/pkg/logger"
)

const (
// Use large attachment logic for attachments > 3MB
// https://learn.microsoft.com/en-us/graph/outlook-large-attachments
largeAttachmentSize = int32(3 * 1024 * 1024)
attachmentChunkSize = 4 * 1024 * 1024
fileAttachmentOdataValue = "#microsoft.graph.fileAttachment"
itemAttachmentOdataValue = "#microsoft.graph.itemAttachment"
referenceAttachmentOdataValue = "#microsoft.graph.referenceAttachment"
)

func attachmentType(attachment models.Attachmentable) models.AttachmentType {
switch *attachment.GetOdataType() {
case fileAttachmentOdataValue:
return models.FILE_ATTACHMENTTYPE
case itemAttachmentOdataValue:
return models.ITEM_ATTACHMENTTYPE
case referenceAttachmentOdataValue:
return models.REFERENCE_ATTACHMENTTYPE
default:
// Should not hit this but default to ITEM_ATTACHMENTTYPE
// which will pick the default attachment upload mechanism
return models.ITEM_ATTACHMENTTYPE
}
}

// uploadAttachment will upload the specified message attachment to M365
func uploadAttachment(ctx context.Context, service graph.Service, userID, folderID, messageID string,
attachment models.Attachmentable,
) error {
logger.Ctx(ctx).Debugf("uploading attachment with size %d", *attachment.GetSize())

// For Item/Reference attachments *or* file attachments < 3MB, use the attachments endpoint
if attachmentType(attachment) != models.FILE_ATTACHMENTTYPE || *attachment.GetSize() < largeAttachmentSize {
_, err := service.Client().
UsersById(userID).
MailFoldersById(folderID).
MessagesById(messageID).
Attachments().
Post(ctx, attachment, nil)

return err
}

return uploadLargeAttachment(ctx, service, userID, folderID, messageID, attachment)
}

// uploadLargeAttachment will upload the specified attachment by creating an upload session and
// doing a chunked upload
func uploadLargeAttachment(ctx context.Context, service graph.Service, userID, folderID, messageID string,
attachment models.Attachmentable,
) error {
ab := attachmentBytes(attachment)

aw, err := attachmentWriter(ctx, service, userID, folderID, messageID, attachment, int64(len(ab)))
if err != nil {
return err
}

// Upload the stream data
copyBuffer := make([]byte, attachmentChunkSize)

_, err = io.CopyBuffer(aw, bytes.NewReader(ab), copyBuffer)
if err != nil {
return errors.Wrapf(err, "failed to upload attachment: item %s", messageID)
}

return nil
}

// attachmentWriter is used to initialize and return an io.Writer to upload data for the specified attachment
// It does so by creating an upload session and using that URL to initialize an `itemWriter`
func attachmentWriter(ctx context.Context, service graph.Service, userID, folderID, messageID string,
attachment models.Attachmentable, size int64,
) (io.Writer, error) {
session := createuploadsession.NewCreateUploadSessionPostRequestBody()

attItem := models.NewAttachmentItem()
attType := models.FILE_ATTACHMENTTYPE
attItem.SetAttachmentType(&attType)
attItem.SetName(attachment.GetName())
attItem.SetSize(&size)
session.SetAttachmentItem(attItem)

r, err := service.Client().UsersById(userID).MailFoldersById(folderID).
MessagesById(messageID).Attachments().CreateUploadSession().Post(ctx, session, nil)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to create attachment upload session for item %s. details: %s",
messageID,
support.ConnectorStackErrorTrace(err),
)
}

url := *r.GetUploadUrl()

logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", messageID, url)

return uploadsession.NewWriter(messageID, url, size), nil
}
13 changes: 13 additions & 0 deletions src/internal/connector/exchange/exchange_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,19 @@ func (suite *ExchangeServiceSuite) TestRestoreExchangeObject() {
return *folder.GetId()
},
},
{
name: "Test Mail: One Large Attachment",
bytes: mockconnector.GetMockMessageWithLargeAttachment("Restore Large Attachment"),
category: path.EmailCategory,
cleanupFunc: DeleteMailFolder,
destination: func() string {
folderName := "TestRestoreMailwithLargeAttachment: " + common.FormatSimpleDateTime(now)
folder, err := CreateMailFolder(ctx, suite.es, userID, folderName)
require.NoError(t, err)

return *folder.GetId()
},
},
{
name: "Test Mail: Two Attachments",
bytes: mockconnector.GetMockMessageWithTwoAttachments("Restore 2 Attachments"),
Expand Down
32 changes: 20 additions & 12 deletions src/internal/connector/exchange/service_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"reflect"
"runtime/trace"

"github.com/microsoftgraph/msgraph-sdk-go/models"
Expand All @@ -20,7 +21,8 @@ import (
)

// GetRestoreContainer utility function to create
// an unique folder for the restore process
// an unique folder for the restore process
//
// @param category: input from fullPath()[2]
// that defines the application the folder is created in.
func GetRestoreContainer(
Expand Down Expand Up @@ -223,8 +225,19 @@ func RestoreMailMessage(
"policy", cp)
fallthrough
case control.Copy:
return MessageInfo(clone), SendMailToBackStore(ctx, service, user, destination, clone)
err := SendMailToBackStore(ctx, service, user, destination, clone)
if err != nil {
return nil, err
}
}

return MessageInfo(clone), nil
}

// attachmentBytes is a helper to retrieve the attachment content from a models.Attachmentable
// TODO: Revisit how we retrieve/persist attachment content during backup so this is not needed
func attachmentBytes(attachment models.Attachmentable) []byte {
return reflect.Indirect(reflect.ValueOf(attachment)).FieldByName("contentBytes").Bytes()
}

// SendMailToBackStore function for transporting in-memory messageable item to M365 backstore
Expand Down Expand Up @@ -261,24 +274,19 @@ func SendMailToBackStore(
if len(attached) > 0 {
id := *sentMessage.GetId()
for _, attachment := range attached {
_, err = service.Client().
UsersById(user).
MailFoldersById(destination).
MessagesById(id).
Attachments().
Post(ctx, attachment, nil)
err := uploadAttachment(ctx, service, user, destination, id, attachment)
if err != nil {
errs = support.WrapAndAppend(id,
errs = support.WrapAndAppend(fmt.Sprintf("uploading attachment for message %s", id),
err,
errs,
)

break
}
}

return errs
}

return nil
return errs
}

// RestoreExchangeDataCollections restores M365 objects in data.Collection to MSFT
Expand Down
29 changes: 29 additions & 0 deletions src/internal/connector/mockconnector/mock_data_message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mockconnector

import (
"encoding/base64"
"fmt"
"math/rand"
"strconv"
Expand Down Expand Up @@ -166,6 +167,34 @@ func GetMockMessageWithDirectAttachment(subject string) []byte {
return []byte(message)
}

// GetMockMessageWithDirectAttachment returns a message with a large attachment. This is derived from the message
// used in GetMockMessageWithDirectAttachment
// Serialized with: kiota-serialization-json-go v0.7.1
func GetMockMessageWithLargeAttachment(subject string) []byte {
//nolint:lll
messageFmt := "{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA=\"," +
"\"@odata.type\":\"#microsoft.graph.message\",\"@odata.etag\":\"W/\\\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\\\"\",\"@odata.context\":\"https://graph.microsoft.com/v1.0/$metadata#users('a4a472f8-ccb0-43ec-bf52-3697a91b926c')/messages/$entity\",\"categories\":[]," +
"\"changeKey\":\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\",\"createdDateTime\":\"2022-09-29T17:39:06Z\",\"lastModifiedDateTime\":\"2022-09-29T17:39:08Z\"," +
"\"attachments\":[{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAABEgAQANMmZLFhjWJJj4X9mj8piqg=\",\"@odata.type\":\"#microsoft.graph.fileAttachment\",\"@odata.mediaContentType\":\"application/octet-stream\"," +
"\"contentType\":\"application/octet-stream\",\"isInline\":false,\"lastModifiedDateTime\":\"2022-09-29T17:39:06Z\",\"name\":\"database.db\",\"size\":%d," +
"\"contentBytes\":\"%s\"}]," +
"\"bccRecipients\":[],\"body\":{\"content\":\"<html><head>\\r\\n<meta http-equiv=\\\"Content-Type\\\" content=\\\"text/html; charset=utf-8\\\"><style type=\\\"text/css\\\" style=\\\"display:none\\\">\\r\\n<!--\\r\\np\\r\\n\\t{margin-top:0;\\r\\n\\tmargin-bottom:0}\\r\\n-->\\r\\n</style></head><body dir=\\\"ltr\\\"><div class=\\\"elementToProof\\\" style=\\\"font-family:Calibri,Arial,Helvetica,sans-serif; font-size:12pt; color:rgb(0,0,0)\\\"><span class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Lidia,</span> <div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><div class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">I hope this message finds you well. I am researching a database construct for next quarter's review. SkyNet will<span class=\\\"ContentPasted0\\\">&nbsp;</span><span data-ogsb=\\\"rgb(255, 255, 0)\\\" class=\\\"ContentPasted0\\\" style=\\\"margin:0px; background-color:rgb(255,255,0)!important\\\">not</span><span class=\\\"ContentPasted0\\\">&nbsp;</span>be able to match our database process speeds if we utilize the formulae that are included.&nbsp;</div><div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><div class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Please give me your thoughts on the implementation.</div><div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><div class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Best,</div><div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><span class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Dustin</span><br></div></body></html>\",\"contentType\":\"html\",\"@odata.type\":\"#microsoft.graph.itemBody\"}," +
"\"bodyPreview\":\"Lidia,\\r\\n\\r\\nI hope this message finds you well. I am researching a database construct for next quarter's review. SkyNet will not be able to match our database process speeds if we utilize the formulae that are included.\\r\\n\\r\\nPlease give me your thoughts on th\",\"ccRecipients\":[]," +
"\"conversationId\":\"AAQkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwAQANPFOcy_BapBghezTzIIldI=\",\"conversationIndex\":\"AQHY1Cpb08U5zL4FqkGCF7NPMgiV0g==\",\"flag\":{\"flagStatus\":\"notFlagged\",\"@odata.type\":\"#microsoft.graph.followupFlag\"}," +
"\"from\":{\"emailAddress\":{\"address\":\"[email protected]\",\"name\":\"Dustin Abbot\",\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"},\"hasAttachments\":true,\"importance\":\"normal\",\"inferenceClassification\":\"focused\"," +
"\"internetMessageId\":\"<SJ0PR17MB56220C509D0006B8CC8FD952C3579@SJ0PR17MB5622.namprd17.prod.outlook.com>\",\"isDeliveryReceiptRequested\":false,\"isDraft\":false,\"isRead\":false,\"isReadReceiptRequested\":false," +
"\"parentFolderId\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwAuAAAAAADCNgjhM9QmQYWNcI7hCpPrAQDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAAA=\",\"receivedDateTime\":\"2022-09-29T17:39:07Z\",\"replyTo\":[],\"sender\":{\"emailAddress\":{\"address\":\"[email protected]\",\"name\":\"Dustin Abbot\"," +
"\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"},\"sentDateTime\":\"2022-09-29T17:39:02Z\"," +
"\"subject\":\"" + subject + "\",\"toRecipients\":[{\"emailAddress\":{\"address\":\"[email protected]\",\"name\":\"Lidia Holloway\",\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"}]," +
"\"webLink\":\"https://outlook.office365.com/owa/?ItemID=AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA%3D&exvsurl=1&viewmodel=ReadMessageItem\"}"

attachmentSize := 3 * 1024 * 1024 // 3 MB
attachmentBytes := make([]byte, attachmentSize)

// Attachment content bytes are base64 encoded
return []byte(fmt.Sprintf(messageFmt, attachmentSize, base64.StdEncoding.EncodeToString([]byte(attachmentBytes))))
}

// GetMessageWithOneDriveAttachment returns a message with an OneDrive attachment represented in bytes
// Serialized with: kiota-serialization-json-go v0.7.1
func GetMessageWithOneDriveAttachment(subject string) []byte {
Expand Down
62 changes: 2 additions & 60 deletions src/internal/connector/onedrive/item.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package onedrive

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

msup "github.com/microsoftgraph/msgraph-sdk-go/drives/item/items/item/createuploadsession"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
"gopkg.in/resty.v1"

"github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/connector/uploadsession"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
Expand Down Expand Up @@ -103,61 +101,5 @@ func driveItemWriter(

logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url)

return &itemWriter{id: itemID, contentLength: itemSize, url: url}, nil
}

// itemWriter implements an io.Writer for the OneDrive URL
// it is initialized with
type itemWriter struct {
// Item ID
id string
// Upload URL for this item
url string
// Tracks how much data will be written
contentLength int64
// Last item offset that was written to
lastWrittenOffset int64
}

const (
contentRangeHeaderKey = "Content-Range"
// Format for Content-Range is "bytes <start>-<end>/<total>"
contentRangeHeaderValueFmt = "bytes %d-%d/%d"
contentLengthHeaderKey = "Content-Length"
)

// Write will upload the provided data to OneDrive. It sets the `Content-Length` and `Content-Range` headers based on
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession
func (iw *itemWriter) Write(p []byte) (n int, err error) {
rangeLength := len(p)
logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d",
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)

endOffset := iw.lastWrittenOffset + int64(rangeLength)

client := resty.New()

// PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of
// data in the current request
resp, err := client.R().
SetHeaders(map[string]string{
contentRangeHeaderKey: fmt.Sprintf(contentRangeHeaderValueFmt,
iw.lastWrittenOffset,
endOffset-1,
iw.contentLength),
contentLengthHeaderKey: fmt.Sprintf("%d", iw.contentLength),
}).
SetBody(bytes.NewReader(p)).Put(iw.url)
if err != nil {
return 0, errors.Wrapf(err,
"failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ",
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
}

// Update last offset
iw.lastWrittenOffset = endOffset

logger.Ctx(context.Background()).Debugf("Response: %s", resp.String())

return rangeLength, nil
return uploadsession.NewWriter(itemID, url, itemSize), nil
}
Loading

0 comments on commit f1f6c06

Please sign in to comment.