Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow generating legal hold bundles in filestore directly #70

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 148 additions & 1 deletion server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"path/filepath"
"time"

"github.com/gorilla/mux"
Expand All @@ -16,10 +17,33 @@ import (
"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-legal-hold/server/model"
"github.com/mattermost/mattermost-plugin-legal-hold/server/store/filebackend"
)

const requestBodyMaxSizeBytes = 1024 * 1024 // 1MB

// sendMessageToUser sends a message to a user in the provided channel and returns the created post.
func (p *Plugin) sendMessageToUser(channelID, message, replyToRootID string) (*mattermostModel.Post, *mattermostModel.AppError) {
post, appErr := p.API.CreatePost(&mattermostModel.Post{
UserId: p.botUserID,
ChannelId: channelID,
RootId: replyToRootID,
Message: message,
})
if appErr != nil {
p.Client.Log.Error(appErr.Error())
return nil, appErr
}

return post, nil
}

// sendErrorMessageToUser sends an error message to the user ignoring the error output since this method
// should already be used in the failure path of the code.
func (p *Plugin) sendErrorMessageToUser(message, replyToRootID string) {
_, _ = p.sendMessageToUser(p.botUserID, message, replyToRootID)
}

// ServeHTTP demonstrates a plugin that handles HTTP requests by greeting the world.
func (p *Plugin) ServeHTTP(_ *plugin.Context, w http.ResponseWriter, r *http.Request) {
// All HTTP endpoints of this plugin require a logged-in user.
Expand All @@ -44,6 +68,7 @@ func (p *Plugin) ServeHTTP(_ *plugin.Context, w http.ResponseWriter, r *http.Req
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/release", p.releaseLegalHold).Methods(http.MethodPost)
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/update", p.updateLegalHold).Methods(http.MethodPost)
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/download", p.downloadLegalHold).Methods(http.MethodGet)
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/bundle", p.bundleLegalHold).Methods(http.MethodPost)
router.HandleFunc("/api/v1/test_amazon_s3_connection", p.testAmazonS3Connection).Methods(http.MethodPost)

// Other routes
Expand Down Expand Up @@ -245,8 +270,8 @@ func (p *Plugin) downloadLegalHold(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", "legalholddata.zip"))
w.WriteHeader(http.StatusOK)

// Write the files to the download on-the-fly.
zipWriter := zip.NewWriter(w)
// Write the files to the download on-the-fly.
for _, entry := range files {
header := &zip.FileHeader{
Name: entry,
Expand Down Expand Up @@ -295,6 +320,128 @@ func (p *Plugin) downloadLegalHold(w http.ResponseWriter, r *http.Request) {
}
}

func (p *Plugin) bundleLegalHold(w http.ResponseWriter, r *http.Request) {
// Get the LegalHold.
legalholdID, err := RequireLegalHoldID(r)
if err != nil {
http.Error(w, "failed to parse LegalHold ID", http.StatusBadRequest)
p.Client.Log.Error(err.Error())
return
}

legalHold, err := p.KVStore.GetLegalHoldByID(legalholdID)
if err != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(err.Error())
return
}

// Get the list of files to include in the download.
files, err := p.FileBackend.ListDirectoryRecursively(legalHold.BasePath())
if err != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(err.Error())
return
}

mattermostUserID := r.Header.Get("Mattermost-User-Id")

channel, appErr := p.API.GetDirectChannel(p.botUserID, mattermostUserID)
if appErr != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(appErr.Error())
return
}

// Create an initial post to notify the user that the bundle is being generated.
initialPost, appErr := p.sendMessageToUser(channel.Id, "Generating legal hold bundle in the background as per the request. You will be notified once the download is ready.", "")
if appErr != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(appErr.Error())
return
}

p.Client.Log.Info("Generating legal hold bundle on S3")
go func() {
errGoro := p.KVStore.LockLegalHold(legalholdID, "bundle")
if errGoro != nil {
p.Client.Log.Error("failed to lock legal hold before download task", errGoro.Error())
p.sendErrorMessageToUser("There was an error generating the legal hold on the file store. Check the server logs for more details or contact an administrator.", initialPost.Id)
w.WriteHeader(http.StatusInternalServerError)
return
}

defer func() {
if errGoro := p.KVStore.UnlockLegalHold(legalholdID, "bundle"); errGoro != nil {
p.Client.Log.Error("failed to unlock legal hold after download task", errGoro.Error())
}
}()

// Use a custom writter to use the Write/Append functions while we generate the object on
// the fly and avoid storing it on the local disk.
// Also use a buffer to avoid writing to the S3 object in small chunks, since the minimal
// size for a source in the underneath `ComposeObject` call is 5MB, so using 5MB as buffer size.
filename := filepath.Join(model.FilestoreBundlePath, fmt.Sprintf("%s_%d.zip", legalholdID, time.Now().Unix()))
zipWriter := zip.NewWriter(
bufio.NewWriterSize(
filebackend.NewFileBackendWritter(p.FileBackend, filename),
1024*1024*5, // 5MB
))

bytesWritten := int64(0)
for _, entry := range files {
header := &zip.FileHeader{
Name: entry,
Method: zip.Deflate, // deflate also works, but at a cost
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment makes it seem like we're not using Deflate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this verbatim from the original code, it's also confusing to me tbh.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a comment that outlasted a code change that it shouldn't have. I'd remove it.

Modified: time.Now(),
}

entryWriter, errGoro := zipWriter.CreateHeader(header)
if errGoro != nil {
p.Client.Log.Error(errGoro.Error())
p.sendErrorMessageToUser("There was an error generating the legal hold on the file store. Check the server logs for more details or contact an administrator.", initialPost.Id)
return
}

backendReader, errGoro := p.FileBackend.Reader(entry)
if errGoro != nil {
p.Client.Log.Error(errGoro.Error())
p.sendErrorMessageToUser("There was an error generating the legal hold on the file store. Check the server logs for more details or contact an administrator.", initialPost.Id)
return
}

fileReader := bufio.NewReader(backendReader)

loopBytesWritten, errGoro := io.Copy(entryWriter, fileReader)
if errGoro != nil {
p.Client.Log.Error(errGoro.Error())
p.sendErrorMessageToUser("There was an error generating the legal hold on the file store. Check the server logs for more details or contact an administrator.", initialPost.Id)
return
}
bytesWritten += loopBytesWritten
}

if errGoro := zipWriter.Close(); errGoro != nil {
p.Client.Log.Error(errGoro.Error())
p.sendErrorMessageToUser("There was an error generating the legal hold on the file store. Check the server logs for more details or contact an administrator.", initialPost.Id)
return
}

_, _ = p.sendMessageToUser(channel.Id, fmt.Sprintf("Legal hold bundle is ready for download. You can find it under `%s` in your storage provider.", filename), initialPost.Id)
}()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
err = json.NewEncoder(w).Encode(struct {
Message string `json:"message"`
}{
Message: "Legal hold bundle is being generated. You will be notified once it is ready.",
})
if err != nil {
p.API.LogError("failed to write http response", err.Error())
}
}

func (p *Plugin) runJobFromAPI(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write([]byte("Processing all Legal Holds. Please check the MM server logs for more details."))
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions server/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"path/filepath"
"strconv"
"time"

"github.com/mattermost/mattermost-plugin-legal-hold/server/model"
)

// JobCleanupOldBundlesFromFilestore is a job that cleans old legal hold bundles from the filestore by
// checking the timestamp in the filename and ensuring that bundles older than 24h are deleted.
func (p *Plugin) jobCleanupOldBundlesFromFilestore() {
p.API.LogDebug("Starting legal hold cleanup job")

files, jobErr := p.FileBackend.ListDirectory(model.FilestoreBundlePath)
if jobErr != nil {
p.Client.Log.Error("failed to list directory", "err", jobErr)
return
}

for _, file := range files {
parts := model.FilestoreBundleRegex.FindStringSubmatch(filepath.Base(file))
if len(parts) != 3 {
p.Client.Log.Error("Skipping file", "file", file, "reason", "does not match regex", "parts", parts)
continue
}

// golang parse unix time
parsedTimestamp, errStrConv := strconv.ParseInt(parts[2], 10, 64)
if errStrConv != nil {
p.Client.Log.Error("Skipping file", "file", file, "reason", "failed to parse timestamp", "err", errStrConv)
continue
}
fileCreationTime := time.Unix(parsedTimestamp, 0)
if time.Since(fileCreationTime) > time.Hour*24 {
p.Client.Log.Debug("Deleting file", "file", file)
if err := p.FileBackend.RemoveFile(file); err != nil {
p.Client.Log.Error("Failed to delete file", "file", file, "err", err)
}
}

p.Client.Log.Debug("Checking file", "file", file, "parts", parts)
}

p.API.LogDebug("Finished legal hold cleanup job")
}
14 changes: 14 additions & 0 deletions server/model/legal_hold.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@ package model

import (
"fmt"
"path/filepath"
"regexp"

mattermostModel "github.com/mattermost/mattermost-server/v6/model"
"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-legal-hold/server/utils"
)

const (
baseFilestorePath = "legal_hold"
)

var FilestoreBundlePath = filepath.Join(baseFilestorePath, "download")

// FilestoreBundleRegex is a regular expression that matches the filename of a
// legal hold bundle: <bundle_id>_<unix timestamp>.zip
var FilestoreBundleRegex = regexp.MustCompile(`^([a-z0-9]+)_([0-9]+)\.zip$`)

// LegalHold represents one legal hold.
type LegalHold struct {
ID string `json:"id"`
Expand All @@ -23,6 +35,7 @@ type LegalHold struct {
LastExecutionEndedAt int64 `json:"last_execution_ended_at"`
ExecutionLength int64 `json:"execution_length"`
Secret string `json:"secret"`
Locks []string `json:"locks"`
}

// DeepCopy creates a deep copy of the LegalHold.
Expand All @@ -43,6 +56,7 @@ func (lh *LegalHold) DeepCopy() LegalHold {
LastExecutionEndedAt: lh.LastExecutionEndedAt,
ExecutionLength: lh.ExecutionLength,
Secret: lh.Secret,
Locks: lh.Locks,
}

if len(lh.UserIDs) > 0 {
Expand Down
31 changes: 27 additions & 4 deletions server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/gorilla/mux"
pluginapi "github.com/mattermost/mattermost-plugin-api"
"github.com/mattermost/mattermost-server/v6/model"
"github.com/mattermost/mattermost-plugin-api/cluster"
mattermostModel "github.com/mattermost/mattermost-server/v6/model"
"github.com/mattermost/mattermost-server/v6/plugin"
"github.com/mattermost/mattermost-server/v6/shared/filestore"
"github.com/pkg/errors"
Expand Down Expand Up @@ -53,6 +55,12 @@ type Plugin struct {

// router holds the HTTP router for the plugin's rest API
router *mux.Router

// Bot user ID
botUserID string

// cleanupJob is the job that cleans up old legal hold jobs bunldes from the filestore
cleanupJob *cluster.Job
}

func (p *Plugin) OnActivate() error {
Expand Down Expand Up @@ -84,6 +92,15 @@ func (p *Plugin) OnActivate() error {

p.KVStore = kvstore.NewKVStore(p.Client)

// Create bot user
p.botUserID, err = p.API.EnsureBotUser(&mattermostModel.Bot{
Username: "legal_hold_bot",
DisplayName: "Legal Hold Bot",
})
if err != nil {
return err
}

// Create job manager
p.jobManager = jobs.NewJobManager(&p.Client.Log)

Expand Down Expand Up @@ -195,16 +212,22 @@ func (p *Plugin) Reconfigure() error {
if err != nil {
return fmt.Errorf("cannot create legal hold job: %w", err)
}
if err := p.jobManager.AddJob(p.legalHoldJob); err != nil {
if err = p.jobManager.AddJob(p.legalHoldJob); err != nil {
return fmt.Errorf("cannot add legal hold job: %w", err)
}
_ = p.jobManager.OnConfigurationChange(p.getConfiguration())

// Setup cluster cleanup job
p.cleanupJob, err = cluster.Schedule(p.API, "legal_hold_bundle_cleanup", cluster.MakeWaitForRoundedInterval(time.Minute), p.jobCleanupOldBundlesFromFilestore)
if err != nil {
return fmt.Errorf("error setting up cluster cleanup job: %w", err)
}

return nil
}

func FixedFileSettingsToFileBackendSettings(fileSettings model.FileSettings, enableComplianceFeature bool, skipVerify bool) filestore.FileBackendSettings {
if *fileSettings.DriverName == model.ImageDriverLocal {
func FixedFileSettingsToFileBackendSettings(fileSettings mattermostModel.FileSettings, enableComplianceFeature bool, skipVerify bool) filestore.FileBackendSettings {
if *fileSettings.DriverName == mattermostModel.ImageDriverLocal {
return filestore.FileBackendSettings{
DriverName: *fileSettings.DriverName,
Directory: *fileSettings.Directory,
Expand Down
34 changes: 34 additions & 0 deletions server/store/filebackend/writter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package filebackend

import (
"bytes"
"io"

"github.com/mattermost/mattermost-server/v6/shared/filestore"
)

// fileBackendWritter is a simple io.Writer that writes to a file using a filestore.FileBackend
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is spelled Writter rather than Writer?

type fileBackendWritter struct {
filePath string
fileBackend filestore.FileBackend
// created is used to know if the file has been created or not, to use either WriteFile or AppendFile
created bool
}

func (s *fileBackendWritter) Write(p []byte) (n int, err error) {
var written int64
if !s.created {
s.created = true
written, err = s.fileBackend.WriteFile(bytes.NewReader(p), s.filePath)
} else {
written, err = s.fileBackend.AppendFile(bytes.NewReader(p), s.filePath)
}
return int(written), err
}

func NewFileBackendWritter(fileBackend filestore.FileBackend, filePath string) io.Writer {
return &fileBackendWritter{
filePath: filePath,
fileBackend: fileBackend,
}
}
3 changes: 3 additions & 0 deletions server/store/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ type KVStore interface {
GetLegalHoldByID(id string) (*model.LegalHold, error)
UpdateLegalHold(lh, oldValue model.LegalHold) (*model.LegalHold, error)
DeleteLegalHold(id string) error
LockLegalHold(id, lockType string) error
UnlockLegalHold(id, lockType string) error
IsLockedLegalHold(id, lockType string) (bool, error)
}
Loading
Loading