diff --git a/src/internal/connector/exchange/attachment.go b/src/internal/connector/exchange/attachment.go new file mode 100644 index 0000000000..10a28bc644 --- /dev/null +++ b/src/internal/connector/exchange/attachment.go @@ -0,0 +1,117 @@ +package exchange + +import ( + "bytes" + "context" + "io" + + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item/attachments/createuploadsession" + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/connector/uploadsession" + "github.com/alcionai/corso/src/pkg/logger" +) + +const ( + // Use large attachment logic for attachments > 3MB + // https://learn.microsoft.com/en-us/graph/outlook-large-attachments + largeAttachmentSize = int32(3 * 1024 * 1024) + attachmentChunkSize = 4 * 1024 * 1024 + fileAttachmentOdataValue = "#microsoft.graph.fileAttachment" + itemAttachmentOdataValue = "#microsoft.graph.itemAttachment" + referenceAttachmentOdataValue = "#microsoft.graph.referenceAttachment" +) + +func attachmentType(attachment models.Attachmentable) models.AttachmentType { + switch *attachment.GetOdataType() { + case fileAttachmentOdataValue: + return models.FILE_ATTACHMENTTYPE + case itemAttachmentOdataValue: + return models.ITEM_ATTACHMENTTYPE + case referenceAttachmentOdataValue: + return models.REFERENCE_ATTACHMENTTYPE + default: + // Should not hit this but default to ITEM_ATTACHMENTTYPE + // which will pick the default attachment upload mechanism + return models.ITEM_ATTACHMENTTYPE + } +} + +// uploadAttachment will upload the specified message attachment to M365 +func uploadAttachment(ctx context.Context, service graph.Service, userID, folderID, messageID string, + attachment models.Attachmentable, +) error { + logger.Ctx(ctx).Debugf("uploading attachment with size %d", *attachment.GetSize()) + + // For Item/Reference attachments *or* file attachments < 3MB, use the attachments endpoint + if attachmentType(attachment) != models.FILE_ATTACHMENTTYPE || *attachment.GetSize() < largeAttachmentSize { + _, err := service.Client(). + UsersById(userID). + MailFoldersById(folderID). + MessagesById(messageID). + Attachments(). + Post(ctx, attachment, nil) + + return err + } + + return uploadLargeAttachment(ctx, service, userID, folderID, messageID, attachment) +} + +// uploadLargeAttachment will upload the specified attachment by creating an upload session and +// doing a chunked upload +func uploadLargeAttachment(ctx context.Context, service graph.Service, userID, folderID, messageID string, + attachment models.Attachmentable, +) error { + ab := attachmentBytes(attachment) + + aw, err := attachmentWriter(ctx, service, userID, folderID, messageID, attachment, int64(len(ab))) + if err != nil { + return err + } + + // Upload the stream data + copyBuffer := make([]byte, attachmentChunkSize) + + _, err = io.CopyBuffer(aw, bytes.NewReader(ab), copyBuffer) + if err != nil { + return errors.Wrapf(err, "failed to upload attachment: item %s", messageID) + } + + return nil +} + +// attachmentWriter is used to initialize and return an io.Writer to upload data for the specified attachment +// It does so by creating an upload session and using that URL to initialize an `itemWriter` +func attachmentWriter(ctx context.Context, service graph.Service, userID, folderID, messageID string, + attachment models.Attachmentable, size int64, +) (io.Writer, error) { + session := createuploadsession.NewCreateUploadSessionPostRequestBody() + + attItem := models.NewAttachmentItem() + attType := models.FILE_ATTACHMENTTYPE + attItem.SetAttachmentType(&attType) + attItem.SetName(attachment.GetName()) + attItem.SetSize(&size) + session.SetAttachmentItem(attItem) + + r, err := service.Client().UsersById(userID).MailFoldersById(folderID). + MessagesById(messageID).Attachments().CreateUploadSession().Post(ctx, session, nil) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to create attachment upload session for item %s. details: %s", + messageID, + support.ConnectorStackErrorTrace(err), + ) + } + + url := *r.GetUploadUrl() + + logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", messageID, url) + + return uploadsession.NewWriter(messageID, url, size), nil +} diff --git a/src/internal/connector/exchange/exchange_service_test.go b/src/internal/connector/exchange/exchange_service_test.go index 2d13c8b875..ce201a116b 100644 --- a/src/internal/connector/exchange/exchange_service_test.go +++ b/src/internal/connector/exchange/exchange_service_test.go @@ -554,6 +554,19 @@ func (suite *ExchangeServiceSuite) TestRestoreExchangeObject() { return *folder.GetId() }, }, + { + name: "Test Mail: One Large Attachment", + bytes: mockconnector.GetMockMessageWithLargeAttachment("Restore Large Attachment"), + category: path.EmailCategory, + cleanupFunc: DeleteMailFolder, + destination: func() string { + folderName := "TestRestoreMailwithLargeAttachment: " + common.FormatSimpleDateTime(now) + folder, err := CreateMailFolder(ctx, suite.es, userID, folderName) + require.NoError(t, err) + + return *folder.GetId() + }, + }, { name: "Test Mail: Two Attachments", bytes: mockconnector.GetMockMessageWithTwoAttachments("Restore 2 Attachments"), diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index 756fbd4502..7e6d278689 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "reflect" "runtime/trace" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -20,7 +21,8 @@ import ( ) // GetRestoreContainer utility function to create -// an unique folder for the restore process +// an unique folder for the restore process +// // @param category: input from fullPath()[2] // that defines the application the folder is created in. func GetRestoreContainer( @@ -223,8 +225,19 @@ func RestoreMailMessage( "policy", cp) fallthrough case control.Copy: - return MessageInfo(clone), SendMailToBackStore(ctx, service, user, destination, clone) + err := SendMailToBackStore(ctx, service, user, destination, clone) + if err != nil { + return nil, err + } } + + return MessageInfo(clone), nil +} + +// attachmentBytes is a helper to retrieve the attachment content from a models.Attachmentable +// TODO: Revisit how we retrieve/persist attachment content during backup so this is not needed +func attachmentBytes(attachment models.Attachmentable) []byte { + return reflect.Indirect(reflect.ValueOf(attachment)).FieldByName("contentBytes").Bytes() } // SendMailToBackStore function for transporting in-memory messageable item to M365 backstore @@ -261,24 +274,19 @@ func SendMailToBackStore( if len(attached) > 0 { id := *sentMessage.GetId() for _, attachment := range attached { - _, err = service.Client(). - UsersById(user). - MailFoldersById(destination). - MessagesById(id). - Attachments(). - Post(ctx, attachment, nil) + err := uploadAttachment(ctx, service, user, destination, id, attachment) if err != nil { - errs = support.WrapAndAppend(id, + errs = support.WrapAndAppend(fmt.Sprintf("uploading attachment for message %s", id), err, errs, ) + + break } } - - return errs } - return nil + return errs } // RestoreExchangeDataCollections restores M365 objects in data.Collection to MSFT diff --git a/src/internal/connector/mockconnector/mock_data_message.go b/src/internal/connector/mockconnector/mock_data_message.go index 5aafe73256..a44184f952 100644 --- a/src/internal/connector/mockconnector/mock_data_message.go +++ b/src/internal/connector/mockconnector/mock_data_message.go @@ -1,6 +1,7 @@ package mockconnector import ( + "encoding/base64" "fmt" "math/rand" "strconv" @@ -166,6 +167,34 @@ func GetMockMessageWithDirectAttachment(subject string) []byte { return []byte(message) } +// GetMockMessageWithDirectAttachment returns a message with a large attachment. This is derived from the message +// used in GetMockMessageWithDirectAttachment +// Serialized with: kiota-serialization-json-go v0.7.1 +func GetMockMessageWithLargeAttachment(subject string) []byte { + //nolint:lll + messageFmt := "{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA=\"," + + "\"@odata.type\":\"#microsoft.graph.message\",\"@odata.etag\":\"W/\\\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\\\"\",\"@odata.context\":\"https://graph.microsoft.com/v1.0/$metadata#users('a4a472f8-ccb0-43ec-bf52-3697a91b926c')/messages/$entity\",\"categories\":[]," + + "\"changeKey\":\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\",\"createdDateTime\":\"2022-09-29T17:39:06Z\",\"lastModifiedDateTime\":\"2022-09-29T17:39:08Z\"," + + "\"attachments\":[{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAABEgAQANMmZLFhjWJJj4X9mj8piqg=\",\"@odata.type\":\"#microsoft.graph.fileAttachment\",\"@odata.mediaContentType\":\"application/octet-stream\"," + + "\"contentType\":\"application/octet-stream\",\"isInline\":false,\"lastModifiedDateTime\":\"2022-09-29T17:39:06Z\",\"name\":\"database.db\",\"size\":%d," + + "\"contentBytes\":\"%s\"}]," + + "\"bccRecipients\":[],\"body\":{\"content\":\"\\r\\n
Lidia,

I hope this message finds you well. I am researching a database construct for next quarter's review. SkyNet will not be able to match our database process speeds if we utilize the formulae that are included. 

Please give me your thoughts on the implementation.

Best,

Dustin
\",\"contentType\":\"html\",\"@odata.type\":\"#microsoft.graph.itemBody\"}," + + "\"bodyPreview\":\"Lidia,\\r\\n\\r\\nI hope this message finds you well. I am researching a database construct for next quarter's review. SkyNet will not be able to match our database process speeds if we utilize the formulae that are included.\\r\\n\\r\\nPlease give me your thoughts on th\",\"ccRecipients\":[]," + + "\"conversationId\":\"AAQkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwAQANPFOcy_BapBghezTzIIldI=\",\"conversationIndex\":\"AQHY1Cpb08U5zL4FqkGCF7NPMgiV0g==\",\"flag\":{\"flagStatus\":\"notFlagged\",\"@odata.type\":\"#microsoft.graph.followupFlag\"}," + + "\"from\":{\"emailAddress\":{\"address\":\"dustina@8qzvrj.onmicrosoft.com\",\"name\":\"Dustin Abbot\",\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"},\"hasAttachments\":true,\"importance\":\"normal\",\"inferenceClassification\":\"focused\"," + + "\"internetMessageId\":\"\",\"isDeliveryReceiptRequested\":false,\"isDraft\":false,\"isRead\":false,\"isReadReceiptRequested\":false," + + "\"parentFolderId\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwAuAAAAAADCNgjhM9QmQYWNcI7hCpPrAQDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAAA=\",\"receivedDateTime\":\"2022-09-29T17:39:07Z\",\"replyTo\":[],\"sender\":{\"emailAddress\":{\"address\":\"dustina@8qzvrj.onmicrosoft.com\",\"name\":\"Dustin Abbot\"," + + "\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"},\"sentDateTime\":\"2022-09-29T17:39:02Z\"," + + "\"subject\":\"" + subject + "\",\"toRecipients\":[{\"emailAddress\":{\"address\":\"LidiaH@8qzvrj.onmicrosoft.com\",\"name\":\"Lidia Holloway\",\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"}]," + + "\"webLink\":\"https://outlook.office365.com/owa/?ItemID=AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA%3D&exvsurl=1&viewmodel=ReadMessageItem\"}" + + attachmentSize := 3 * 1024 * 1024 // 3 MB + attachmentBytes := make([]byte, attachmentSize) + + // Attachment content bytes are base64 encoded + return []byte(fmt.Sprintf(messageFmt, attachmentSize, base64.StdEncoding.EncodeToString([]byte(attachmentBytes)))) +} + // GetMessageWithOneDriveAttachment returns a message with an OneDrive attachment represented in bytes // Serialized with: kiota-serialization-json-go v0.7.1 func GetMessageWithOneDriveAttachment(subject string) []byte { diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index af72137c52..c9573b0635 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -1,9 +1,7 @@ package onedrive import ( - "bytes" "context" - "fmt" "io" "net/http" "time" @@ -11,11 +9,11 @@ import ( msup "github.com/microsoftgraph/msgraph-sdk-go/drives/item/items/item/createuploadsession" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/pkg/errors" - "gopkg.in/resty.v1" "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/connector/uploadsession" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/logger" ) @@ -103,61 +101,5 @@ func driveItemWriter( logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url) - return &itemWriter{id: itemID, contentLength: itemSize, url: url}, nil -} - -// itemWriter implements an io.Writer for the OneDrive URL -// it is initialized with -type itemWriter struct { - // Item ID - id string - // Upload URL for this item - url string - // Tracks how much data will be written - contentLength int64 - // Last item offset that was written to - lastWrittenOffset int64 -} - -const ( - contentRangeHeaderKey = "Content-Range" - // Format for Content-Range is "bytes -/" - contentRangeHeaderValueFmt = "bytes %d-%d/%d" - contentLengthHeaderKey = "Content-Length" -) - -// Write will upload the provided data to OneDrive. It sets the `Content-Length` and `Content-Range` headers based on -// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession -func (iw *itemWriter) Write(p []byte) (n int, err error) { - rangeLength := len(p) - logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d", - iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength) - - endOffset := iw.lastWrittenOffset + int64(rangeLength) - - client := resty.New() - - // PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of - // data in the current request - resp, err := client.R(). - SetHeaders(map[string]string{ - contentRangeHeaderKey: fmt.Sprintf(contentRangeHeaderValueFmt, - iw.lastWrittenOffset, - endOffset-1, - iw.contentLength), - contentLengthHeaderKey: fmt.Sprintf("%d", iw.contentLength), - }). - SetBody(bytes.NewReader(p)).Put(iw.url) - if err != nil { - return 0, errors.Wrapf(err, - "failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ", - iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength) - } - - // Update last offset - iw.lastWrittenOffset = endOffset - - logger.Ctx(context.Background()).Debugf("Response: %s", resp.String()) - - return rangeLength, nil + return uploadsession.NewWriter(itemID, url, itemSize), nil } diff --git a/src/internal/connector/uploadsession/uploadsession.go b/src/internal/connector/uploadsession/uploadsession.go new file mode 100644 index 0000000000..818159e0b5 --- /dev/null +++ b/src/internal/connector/uploadsession/uploadsession.go @@ -0,0 +1,71 @@ +package uploadsession + +import ( + "bytes" + "context" + "fmt" + + "github.com/pkg/errors" + "gopkg.in/resty.v1" + + "github.com/alcionai/corso/src/pkg/logger" +) + +const ( + contentRangeHeaderKey = "Content-Range" + // Format for Content-Range is "bytes -/" + contentRangeHeaderValueFmt = "bytes %d-%d/%d" + contentLengthHeaderKey = "Content-Length" +) + +// Writer implements an io.Writer for a M365 +// UploadSession URL +type writer struct { + // Identifier + id string + // Upload URL for this item + url string + // Tracks how much data will be written + contentLength int64 + // Last item offset that was written to + lastWrittenOffset int64 + client *resty.Client +} + +func NewWriter(id, url string, size int64) *writer { + return &writer{id: id, url: url, contentLength: size, client: resty.New()} +} + +// Write will upload the provided data to M365. It sets the `Content-Length` and `Content-Range` headers based on +// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession +func (iw *writer) Write(p []byte) (n int, err error) { + rangeLength := len(p) + logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d", + iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength) + + endOffset := iw.lastWrittenOffset + int64(rangeLength) + + // PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of + // data in the current request + resp, err := iw.client.R(). + SetHeaders(map[string]string{ + contentRangeHeaderKey: fmt.Sprintf(contentRangeHeaderValueFmt, + iw.lastWrittenOffset, + endOffset-1, + iw.contentLength), + contentLengthHeaderKey: fmt.Sprintf("%d", rangeLength), + }). + SetBody(bytes.NewReader(p)).Put(iw.url) + if err != nil { + return 0, errors.Wrapf(err, + "failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ", + iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength) + } + + // Update last offset + iw.lastWrittenOffset = endOffset + + logger.Ctx(context.Background()).Debugf("Response: %s", resp.String()) + + return rangeLength, nil +} diff --git a/src/internal/connector/uploadsession/uploadsession_test.go b/src/internal/connector/uploadsession/uploadsession_test.go new file mode 100644 index 0000000000..50a2782007 --- /dev/null +++ b/src/internal/connector/uploadsession/uploadsession_test.go @@ -0,0 +1,91 @@ +package uploadsession + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "regexp" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type UploadSessionSuite struct { + suite.Suite +} + +func TestUploadSessionSuite(t *testing.T) { + suite.Run(t, new(UploadSessionSuite)) +} + +func (suite *UploadSessionSuite) TestWriter() { + t := suite.T() + + // Initialize a 100KB mockDataProvider + td, writeSize := mockDataReader(int64(100 * 1024)) + + // Expected Content-Range value format + contentRangeRegex := regexp.MustCompile(`^bytes (?P\d+)-(?P\d+)/(?P\d+)$`) + + nextOffset := -1 + // Initialize a test http server that validates expeected headers + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, r.Method, http.MethodPut) + + // Validate the "Content-Range" header + assert.True(t, contentRangeRegex.MatchString(r.Header[contentRangeHeaderKey][0]), + "%s does not match expected value", r.Header[contentRangeHeaderKey][0]) + + // Extract the Content-Range components + matches := contentRangeRegex.FindStringSubmatch(r.Header[contentRangeHeaderKey][0]) + rangeStart, err := strconv.Atoi(matches[contentRangeRegex.SubexpIndex("rangestart")]) + assert.NoError(t, err) + rangeEnd, err := strconv.Atoi(matches[contentRangeRegex.SubexpIndex("rangeend")]) + assert.NoError(t, err) + length, err := strconv.Atoi(matches[contentRangeRegex.SubexpIndex("length")]) + assert.NoError(t, err) + + // Validate total size and range start/end + assert.Equal(t, int(writeSize), length) + assert.Equal(t, nextOffset+1, rangeStart) + assert.Greater(t, rangeEnd, nextOffset) + + // Validate the "Content-Length" header + assert.Equal(t, fmt.Sprintf("%d", (rangeEnd+1)-rangeStart), r.Header[contentLengthHeaderKey][0]) + + nextOffset = rangeEnd + })) + defer ts.Close() + + writer := NewWriter("item", ts.URL, writeSize) + + // Using a 32 KB buffer for the copy allows us to validate the + // multi-part upload. `io.CopyBuffer` will only write 32 KB at + // a time + copyBuffer := make([]byte, 32*1024) + + size, err := io.CopyBuffer(writer, td, copyBuffer) + require.NoError(suite.T(), err) + require.Equal(suite.T(), writeSize, size) +} + +func mockDataReader(size int64) (io.Reader, int64) { + data := bytes.Repeat([]byte("D"), int(size)) + return &mockReader{r: bytes.NewReader(data)}, size +} + +// mockReader allows us to wrap a `bytes.NewReader` but *disable* +// ReaderFrom functionality. This forces io.CopyBuffer to do a +// buffered read (useful to validate that chunked writes are working) +type mockReader struct { + r io.Reader +} + +func (mr *mockReader) Read(b []byte) (n int, err error) { + return mr.r.Read(b) +}