Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Add group mailbox export (#5153)
Browse files Browse the repository at this point in the history
<!-- PR description-->

* Add EML exports for group mailbox.
* Tested E2E manually along with unit tests added in this PR.
* Will follow it up with a sanity test PR.
---

#### Does this PR need a docs update or release note?

- [x] ✅ Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [ ] ⛔ No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [x] ⚡ Unit test
- [ ] 💚 E2E
  • Loading branch information
pandeyabs authored Jan 31, 2024
1 parent 1537db5 commit 7e2b9da
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update repo init configuration to reduce the total number of GET requests sent
to the object store when using corso. This affects repos that have many
backups created in them per day the most.
- Group mailbox emails can now be exported as `.eml` files.

### Fixed
- Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup.
Expand Down
5 changes: 0 additions & 5 deletions src/cli/export/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/alcionai/corso/src/cli/flags"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors"
)

// called by export.go to map subcommands to provider-specific handling.
Expand Down Expand Up @@ -93,10 +92,6 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error {
sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts)
utils.FilterGroupsRestoreInfoSelectors(sel, opts)

// TODO(pandeyabs): Exclude conversations from export since they are not
// supported yet. https://github.com/alcionai/corso/issues/4822
sel.Exclude(sel.Conversation(selectors.Any()))

acceptedGroupsFormatTypes := []string{
string(control.DefaultFormat),
string(control.JSONFormat),
Expand Down
167 changes: 165 additions & 2 deletions src/internal/m365/collection/groups/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@ import (
"context"
"encoding/json"
"io"
"strings"
"time"

"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"

"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/converters/eml"
"github.com/alcionai/corso/src/internal/data"
groupMeta "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/metrics"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
)

func NewExportCollection(
Expand All @@ -26,7 +31,19 @@ func NewExportCollection(
backupVersion int,
cec control.ExportConfig,
stats *metrics.ExportStats,
cat path.CategoryType,
) export.Collectioner {
var streamItems export.ItemStreamer

switch cat {
case path.ChannelMessagesCategory:
streamItems = streamChannelMessages
case path.ConversationPostsCategory:
streamItems = streamConversationPosts
default:
return nil
}

return export.BaseCollection{
BaseDir: baseDir,
BackingCollection: backingCollections,
Expand All @@ -37,8 +54,12 @@ func NewExportCollection(
}
}

// streamItems streams the items in the backingCollection into the export stream chan
func streamItems(
//-------------------------------------------------------------
// Channel Messages
//-------------------------------------------------------------

// streamChannelMessages streams the items in the backingCollection into the export stream chan
func streamChannelMessages(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
Expand Down Expand Up @@ -198,3 +219,145 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag
Subject: ptr.Val(item.GetSubject()),
}
}

//-------------------------------------------------------------
// Conversation Posts
//-------------------------------------------------------------

// streamConversationPosts adds the post items into the export stream channel.
func streamConversationPosts(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
cec control.ExportConfig,
ch chan<- export.Item,
stats *metrics.ExportStats,
) {
defer close(ch)

errs := fault.New(false)

for _, rc := range drc {
for item := range rc.Items(ctx, errs) {
ictx := clues.Add(
ctx,
"path_short_ref", rc.FullPath().ShortRef(),
"stream_item_id", item.ID())

// Trim .data suffix from itemID. Also, we don't expect .meta files
// here since details are not persisted for metadata files.
trimmedID := strings.TrimSuffix(item.ID(), metadata.DataFileSuffix)
exportName := trimmedID + ".eml"

postMetadata, err := fetchAndReadMetadata(ictx, trimmedID, rc)
if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}

continue
}

reader := item.ToReader()
content, err := io.ReadAll(reader)

reader.Close()

if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}

continue
}

// Convert JSON to eml.
email, err := eml.FromJSONPostToEML(ictx, content, postMetadata)
if err != nil {
err = clues.Wrap(err, "converting JSON to eml")

logger.CtxErr(ictx, err).Info("processing collection item")

ch <- export.Item{
ID: item.ID(),
Error: err,
}

continue
}

emlReader := io.NopCloser(bytes.NewReader([]byte(email)))

stats.UpdateResourceCount(path.ConversationPostsCategory)
body := metrics.ReaderWithStats(emlReader, path.ConversationPostsCategory, stats)

ch <- export.Item{
ID: item.ID(),
Name: exportName,
Body: body,
}
}

items, recovered := errs.ItemsAndRecovered()

// Return all the items that we failed to source from the persistence layer
for _, item := range items {
ch <- export.Item{
ID: item.ID,
Error: &item,
}
}

for _, err := range recovered {
ch <- export.Item{
Error: err,
}
}
}
}

func fetchAndReadMetadata(
ctx context.Context,
itemID string,
fin data.FetchItemByNamer,
) (groupMeta.ConversationPostMetadata, error) {
metaName := itemID + metadata.MetaFileSuffix

ctx = clues.Add(ctx, "meta_file_name", metaName)

meta, err := fin.FetchItemByName(ctx, metaName)
if err != nil {
return groupMeta.ConversationPostMetadata{},
clues.WrapWC(ctx, err, "fetching metadata")
}

metaReader := meta.ToReader()
defer metaReader.Close()

metaFormatted, err := readMetadata(metaReader)
if err != nil {
return groupMeta.ConversationPostMetadata{},
clues.WrapWC(ctx, err, "deserializing metadata")
}

return metaFormatted, nil
}

// getMetadata reads and parses the metadata info for an item
func readMetadata(metaRC io.ReadCloser) (groupMeta.ConversationPostMetadata, error) {
var meta groupMeta.ConversationPostMetadata

metaraw, err := io.ReadAll(metaRC)
if err != nil {
return groupMeta.ConversationPostMetadata{}, err
}

err = json.Unmarshal(metaraw, &meta)
if err != nil {
return groupMeta.ConversationPostMetadata{}, err
}

return meta, nil
}
Loading

0 comments on commit 7e2b9da

Please sign in to comment.