diff --git a/CHANGELOG.md b/CHANGELOG.md index 67446bda55..c4cc1a08af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Update repo init configuration to reduce the total number of GET requests sent to the object store when using corso. This affects repos that have many backups created in them per day the most. +- Group mailbox emails can now be exported as `.eml` files. ### Fixed - Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup. diff --git a/src/cli/export/groups.go b/src/cli/export/groups.go index 8d5a9d51c8..13c820a20c 100644 --- a/src/cli/export/groups.go +++ b/src/cli/export/groups.go @@ -7,7 +7,6 @@ import ( "github.com/alcionai/corso/src/cli/flags" "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/pkg/control" - "github.com/alcionai/corso/src/pkg/selectors" ) // called by export.go to map subcommands to provider-specific handling. @@ -93,10 +92,6 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error { sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts) utils.FilterGroupsRestoreInfoSelectors(sel, opts) - // TODO(pandeyabs): Exclude conversations from export since they are not - // supported yet. https://github.com/alcionai/corso/issues/4822 - sel.Exclude(sel.Conversation(selectors.Any())) - acceptedGroupsFormatTypes := []string{ string(control.DefaultFormat), string(control.JSONFormat), diff --git a/src/internal/m365/collection/groups/export.go b/src/internal/m365/collection/groups/export.go index c7577296b5..767a2090a5 100644 --- a/src/internal/m365/collection/groups/export.go +++ b/src/internal/m365/collection/groups/export.go @@ -5,19 +5,24 @@ import ( "context" "encoding/json" "io" + "strings" "time" "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/converters/eml" "github.com/alcionai/corso/src/internal/data" + groupMeta "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" + "github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata" ) func NewExportCollection( @@ -26,7 +31,19 @@ func NewExportCollection( backupVersion int, cec control.ExportConfig, stats *metrics.ExportStats, + cat path.CategoryType, ) export.Collectioner { + var streamItems export.ItemStreamer + + switch cat { + case path.ChannelMessagesCategory: + streamItems = streamChannelMessages + case path.ConversationPostsCategory: + streamItems = streamConversationPosts + default: + return nil + } + return export.BaseCollection{ BaseDir: baseDir, BackingCollection: backingCollections, @@ -37,8 +54,12 @@ func NewExportCollection( } } -// streamItems streams the items in the backingCollection into the export stream chan -func streamItems( +//------------------------------------------------------------- +// Channel Messages +//------------------------------------------------------------- + +// streamChannelMessages streams the items in the backingCollection into the export stream chan +func streamChannelMessages( ctx context.Context, drc []data.RestoreCollection, backupVersion int, @@ -198,3 +219,145 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag Subject: ptr.Val(item.GetSubject()), } } + +//------------------------------------------------------------- +// Conversation Posts +//------------------------------------------------------------- + +// streamConversationPosts adds the post items into the export stream channel. +func streamConversationPosts( + ctx context.Context, + drc []data.RestoreCollection, + backupVersion int, + cec control.ExportConfig, + ch chan<- export.Item, + stats *metrics.ExportStats, +) { + defer close(ch) + + errs := fault.New(false) + + for _, rc := range drc { + for item := range rc.Items(ctx, errs) { + ictx := clues.Add( + ctx, + "path_short_ref", rc.FullPath().ShortRef(), + "stream_item_id", item.ID()) + + // Trim .data suffix from itemID. Also, we don't expect .meta files + // here since details are not persisted for metadata files. + trimmedID := strings.TrimSuffix(item.ID(), metadata.DataFileSuffix) + exportName := trimmedID + ".eml" + + postMetadata, err := fetchAndReadMetadata(ictx, trimmedID, rc) + if err != nil { + ch <- export.Item{ + ID: item.ID(), + Error: err, + } + + continue + } + + reader := item.ToReader() + content, err := io.ReadAll(reader) + + reader.Close() + + if err != nil { + ch <- export.Item{ + ID: item.ID(), + Error: err, + } + + continue + } + + // Convert JSON to eml. + email, err := eml.FromJSONPostToEML(ictx, content, postMetadata) + if err != nil { + err = clues.Wrap(err, "converting JSON to eml") + + logger.CtxErr(ictx, err).Info("processing collection item") + + ch <- export.Item{ + ID: item.ID(), + Error: err, + } + + continue + } + + emlReader := io.NopCloser(bytes.NewReader([]byte(email))) + + stats.UpdateResourceCount(path.ConversationPostsCategory) + body := metrics.ReaderWithStats(emlReader, path.ConversationPostsCategory, stats) + + ch <- export.Item{ + ID: item.ID(), + Name: exportName, + Body: body, + } + } + + items, recovered := errs.ItemsAndRecovered() + + // Return all the items that we failed to source from the persistence layer + for _, item := range items { + ch <- export.Item{ + ID: item.ID, + Error: &item, + } + } + + for _, err := range recovered { + ch <- export.Item{ + Error: err, + } + } + } +} + +func fetchAndReadMetadata( + ctx context.Context, + itemID string, + fin data.FetchItemByNamer, +) (groupMeta.ConversationPostMetadata, error) { + metaName := itemID + metadata.MetaFileSuffix + + ctx = clues.Add(ctx, "meta_file_name", metaName) + + meta, err := fin.FetchItemByName(ctx, metaName) + if err != nil { + return groupMeta.ConversationPostMetadata{}, + clues.WrapWC(ctx, err, "fetching metadata") + } + + metaReader := meta.ToReader() + defer metaReader.Close() + + metaFormatted, err := readMetadata(metaReader) + if err != nil { + return groupMeta.ConversationPostMetadata{}, + clues.WrapWC(ctx, err, "deserializing metadata") + } + + return metaFormatted, nil +} + +// getMetadata reads and parses the metadata info for an item +func readMetadata(metaRC io.ReadCloser) (groupMeta.ConversationPostMetadata, error) { + var meta groupMeta.ConversationPostMetadata + + metaraw, err := io.ReadAll(metaRC) + if err != nil { + return groupMeta.ConversationPostMetadata{}, err + } + + err = json.Unmarshal(metaraw, &meta) + if err != nil { + return groupMeta.ConversationPostMetadata{}, err + } + + return meta, nil +} diff --git a/src/internal/m365/collection/groups/export_test.go b/src/internal/m365/collection/groups/export_test.go index bc6247ba73..c75e4c1fa8 100644 --- a/src/internal/m365/collection/groups/export_test.go +++ b/src/internal/m365/collection/groups/export_test.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/clues" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/data" @@ -16,6 +17,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/metrics" + "github.com/alcionai/corso/src/pkg/path" ) type ExportUnitSuite struct { @@ -26,7 +28,7 @@ func TestExportUnitSuite(t *testing.T) { suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)}) } -func (suite *ExportUnitSuite) TestStreamItems() { +func (suite *ExportUnitSuite) TestStreamChannelMessages() { makeBody := func() io.ReadCloser { return io.NopCloser(bytes.NewReader([]byte("{}"))) } @@ -86,7 +88,7 @@ func (suite *ExportUnitSuite) TestStreamItems() { ch := make(chan export.Item) - go streamItems( + go streamChannelMessages( ctx, []data.RestoreCollection{test.backingColl}, version.NoBackup, @@ -113,3 +115,162 @@ func (suite *ExportUnitSuite) TestStreamItems() { }) } } + +func (suite *ExportUnitSuite) TestStreamConversationPosts() { + testPath, err := path.Build( + "t", + "g", + path.GroupsService, + path.ConversationPostsCategory, + true, + "convID", + "threadID") + require.NoError(suite.T(), err, clues.ToCore(err)) + + makeBody := func() io.ReadCloser { + rc := io.NopCloser(bytes.NewReader([]byte("{}"))) + + return metrics.ReaderWithStats( + rc, + path.ConversationPostsCategory, + &metrics.ExportStats{}) + } + + makeMeta := func() io.ReadCloser { + return io.NopCloser( + bytes.NewReader([]byte(`{"topic":"t", "recipients":["em@il"]}`))) + } + + table := []struct { + name string + backingColl dataMock.Collection + expectItem export.Item + expectErr assert.ErrorAssertionFunc + }{ + { + name: "no errors", + backingColl: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "zim.data", + Reader: makeBody(), + }, + }, + Path: testPath, + AuxItems: map[string]data.Item{ + "zim.meta": &dataMock.Item{ + ItemID: "zim.meta", + Reader: makeMeta(), + }, + }, + }, + expectItem: export.Item{ + ID: "zim.data", + Name: "zim.eml", + Body: makeBody(), + }, + expectErr: assert.NoError, + }, + { + name: "only recoverable errors", + backingColl: dataMock.Collection{ + ItemsRecoverableErrs: []error{ + clues.New("The knowledge... it fills me! It is neat!"), + }, + Path: testPath, + }, + expectErr: assert.Error, + }, + { + name: "items and recoverable errors", + backingColl: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "gir.data", + Reader: makeBody(), + }, + }, + ItemsRecoverableErrs: []error{ + clues.New("I miss my cupcake."), + }, + Path: testPath, + AuxItems: map[string]data.Item{ + "gir.meta": &dataMock.Item{ + ItemID: "gir.meta", + Reader: makeMeta(), + }, + }, + }, + expectItem: export.Item{ + ID: "gir.data", + Name: "gir.eml", + Body: makeBody(), + }, + expectErr: assert.Error, + }, + { + name: "missing metadata", + backingColl: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "mir.data", + Reader: makeBody(), + }, + }, + Path: testPath, + }, + expectItem: export.Item{ + ID: "mir.data", + Error: assert.AnError, + }, + expectErr: assert.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + ch := make(chan export.Item) + + go streamConversationPosts( + ctx, + []data.RestoreCollection{test.backingColl}, + version.NoBackup, + control.DefaultExportConfig(), + ch, + &metrics.ExportStats{}) + + var ( + itm export.Item + err error + ) + + for i := range ch { + if i.Error == nil { + itm = i + } else { + err = i.Error + } + } + + test.expectErr(t, err, clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, test.expectItem.ID, itm.ID, "item ID") + assert.Equal(t, test.expectItem.Name, itm.Name, "item name") + assert.NotNil(t, itm.Body, "body") + + _, err = io.ReadAll(itm.Body) + require.NoError(t, err, clues.ToCore(err)) + + itm.Body.Close() + }) + } +} diff --git a/src/internal/m365/service/groups/export.go b/src/internal/m365/service/groups/export.go index 7f667b8fcc..15457baab1 100644 --- a/src/internal/m365/service/groups/export.go +++ b/src/internal/m365/service/groups/export.go @@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections( ) switch cat { - case path.ChannelMessagesCategory: + case path.ChannelMessagesCategory, path.ConversationPostsCategory: folders = append(folders, fp.Folders()...) coll = groups.NewExportCollection( @@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections( []data.RestoreCollection{restoreColl}, backupVersion, exportCfg, - stats) + stats, + cat) case path.LibrariesCategory: drivePath, err := path.ToDrivePath(restoreColl.FullPath()) diff --git a/src/internal/m365/service/groups/export_test.go b/src/internal/m365/service/groups/export_test.go index cb885f0d97..7851189305 100644 --- a/src/internal/m365/service/groups/export_test.go +++ b/src/internal/m365/service/groups/export_test.go @@ -238,3 +238,86 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() { expectedStats.UpdateResourceCount(path.FilesCategory) assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") } + +func (suite *ExportUnitSuite) TestExportRestoreCollections_ConversationPosts() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + var ( + itemID = "itemID" + containerName = "convID" + content = groupMock.PostWithAttachments + body = io.NopCloser(bytes.NewBufferString(content)) + exportCfg = control.ExportConfig{} + expectedPath = path.ConversationPostsCategory.HumanString() + "/" + containerName + expectedItems = []export.Item{ + { + ID: itemID + ".data", + Name: itemID + ".eml", + // Body: body, not checked + }, + } + ) + + p, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, containerName) + assert.NoError(t, err, "build path") + + dcs := []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: itemID + ".data", + Reader: body, + }, + }, + }, + FetchItemByNamer: finD{ + id: itemID + ".meta", + key: "topic", name: itemID + ".meta", + }, + }, + } + + stats := metrics.NewExportStats() + + ecs, err := NewGroupsHandler(api.Client{}, nil). + ProduceExportCollections( + ctx, + int(version.Backup), + exportCfg, + dcs, + stats, + fault.New(true)) + assert.NoError(t, err, "export collections error") + assert.Len(t, ecs, 1, "num of collections") + + assert.Equal(t, expectedPath, ecs[0].BasePath(), "base dir") + + fitems := []export.Item{} + + size := 0 + + for item := range ecs[0].Items(ctx) { + b, err := io.ReadAll(item.Body) + assert.NoError(t, err, clues.ToCore(err)) + + // count up size for tests + size += len(b) + + // have to nil out body, otherwise assert fails due to + // pointer memory location differences + item.Body = nil + fitems = append(fitems, item) + } + + assert.Equal(t, expectedItems, fitems, "items") + + expectedStats := metrics.NewExportStats() + expectedStats.UpdateBytes(path.ConversationPostsCategory, int64(size)) + expectedStats.UpdateResourceCount(path.ConversationPostsCategory) + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") +} diff --git a/src/internal/operations/pathtransformer/restore_path_transformer.go b/src/internal/operations/pathtransformer/restore_path_transformer.go index 5cc7944793..86e3a61375 100644 --- a/src/internal/operations/pathtransformer/restore_path_transformer.go +++ b/src/internal/operations/pathtransformer/restore_path_transformer.go @@ -143,6 +143,7 @@ func makeRestorePathsForEntry( switch true { case ent.Exchange != nil || (ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) || + (ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost) || (ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList): // TODO(ashmrtn): Eventually make Events have it's own function to handle // setting the restore destination properly. diff --git a/src/pkg/export/export.go b/src/pkg/export/export.go index 9c26cb14b8..998f7875f9 100644 --- a/src/pkg/export/export.go +++ b/src/pkg/export/export.go @@ -24,7 +24,7 @@ type Collectioner interface { Items(context.Context) <-chan Item } -type itemStreamer func( +type ItemStreamer func( ctx context.Context, backingColls []data.RestoreCollection, backupVersion int, @@ -46,7 +46,7 @@ type BaseCollection struct { Cfg control.ExportConfig - Stream itemStreamer + Stream ItemStreamer Stats *metrics.ExportStats }