From aa97b4d894b4bbfc421a54161e9ef1ea87a27aad Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Mon, 25 Nov 2024 17:56:50 -0800 Subject: [PATCH 1/2] Adding support for downloading offloaded literal in copilot Signed-off-by: pmahindrakar-oss --- flytecopilot/data/download.go | 18 ++++ flytecopilot/data/download_test.go | 147 +++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 73d6e3be53..8176989a3c 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -366,6 +366,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa Collection: c2, }}, nil case *core.Literal_Map: + err := os.MkdirAll(filePath, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath) + } v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, writeToFile) if err != nil { return nil, nil, err @@ -387,6 +391,10 @@ func (d Downloader) handleCollection(ctx context.Context, c *core.LiteralCollect litCollection := &core.LiteralCollection{} for i, lit := range c.GetLiterals() { filePath := path.Join(dir, strconv.Itoa(i)) + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", dir) + } v, lit, err := d.handleLiteral(ctx, lit, filePath, writePrimitiveToFile) if err != nil { return nil, nil, err @@ -410,6 +418,16 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM } f := make(FutureMap, len(inputs.GetLiterals())) for variable, literal := range inputs.GetLiterals() { + if literal.GetOffloadedMetadata() != nil { + offloadedMetadataURI := literal.GetOffloadedMetadata().GetUri() + // literal will be overwritten with the contents of the offloaded data which contains the actual large literal. + if err := d.store.ReadProtobuf(ctx, storage.DataReference(offloadedMetadataURI), literal); err != nil { + errString := fmt.Sprintf("Failed to read the object at location [%s] with error [%s]", offloadedMetadataURI, err) + logger.Error(ctx, errString) + return nil, nil, fmt.Errorf(errString) + } + logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI) + } varPath := path.Join(dir, variable) lit := literal f[variable] = futures.NewAsyncFuture(childCtx, func(ctx2 context.Context) (interface{}, error) { diff --git a/flytecopilot/data/download_test.go b/flytecopilot/data/download_test.go index b4bee54fc5..03003f9a16 100644 --- a/flytecopilot/data/download_test.go +++ b/flytecopilot/data/download_test.go @@ -152,3 +152,150 @@ func TestHandleBlobHTTP(t *testing.T) { t.Errorf("expected file %s to exist", toPath) } } + +func TestRecursiveDownload(t *testing.T) { + t.Run("OffloadedMetadataContainsCollectionOfStrings", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + d := Downloader{store: s} + + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "s3://container/offloaded", + }, + }, + } + + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": offloadedLiteral, + }, + } + + // Mock reading the offloaded metadata + err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "string1", + }, + }, + }, + }, + }, + }, + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "string2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true) + assert.NoError(t, err) + assert.NotNil(t, varMap) + assert.NotNil(t, lMap) + assert.Equal(t, []interface{}{"string1", "string2"}, varMap["input1"]) + }) + + t.Run("OffloadedMetadataContainsMapOfStringString", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + d := Downloader{store: s} + + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "s3://container/offloaded", + }, + }, + } + + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": offloadedLiteral, + }, + } + + // Mock reading the offloaded metadata + err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "key1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "value1", + }, + }, + }, + }, + }, + }, + "key2": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "value2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true) + assert.NoError(t, err) + assert.NotNil(t, varMap) + assert.NotNil(t, lMap) + assert.Equal(t, "value1", varMap["input1"].(VarMap)["key1"]) + assert.Equal(t, "value2", varMap["input1"].(VarMap)["key2"]) + }) +} From 06c488170d0081901c3af3a88323336b011e7ca9 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Tue, 26 Nov 2024 10:34:30 -0800 Subject: [PATCH 2/2] feedback + move folder creation up + test coverage for file creation Signed-off-by: pmahindrakar-oss --- flytecopilot/data/download.go | 10 +++++----- flytecopilot/data/download_test.go | 12 ++++++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 8176989a3c..24450697e7 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -358,6 +358,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa Scalar: s, }}, nil case *core.Literal_Collection: + err := os.MkdirAll(filePath, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath) + } v, c2, err := d.handleCollection(ctx, lit.GetCollection(), filePath, writeToFile) if err != nil { return nil, nil, err @@ -391,10 +395,6 @@ func (d Downloader) handleCollection(ctx context.Context, c *core.LiteralCollect litCollection := &core.LiteralCollection{} for i, lit := range c.GetLiterals() { filePath := path.Join(dir, strconv.Itoa(i)) - err := os.MkdirAll(dir, os.ModePerm) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", dir) - } v, lit, err := d.handleLiteral(ctx, lit, filePath, writePrimitiveToFile) if err != nil { return nil, nil, err @@ -424,7 +424,7 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM if err := d.store.ReadProtobuf(ctx, storage.DataReference(offloadedMetadataURI), literal); err != nil { errString := fmt.Sprintf("Failed to read the object at location [%s] with error [%s]", offloadedMetadataURI, err) logger.Error(ctx, errString) - return nil, nil, fmt.Errorf(errString) + return nil, nil, fmt.Errorf("%s", errString) } logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI) } diff --git a/flytecopilot/data/download_test.go b/flytecopilot/data/download_test.go index 03003f9a16..dbc7cb33e7 100644 --- a/flytecopilot/data/download_test.go +++ b/flytecopilot/data/download_test.go @@ -224,6 +224,12 @@ func TestRecursiveDownload(t *testing.T) { assert.NotNil(t, varMap) assert.NotNil(t, lMap) assert.Equal(t, []interface{}{"string1", "string2"}, varMap["input1"]) + // Check if files were created and data written + for _, file := range []string{"0", "1"} { + if _, err := os.Stat(filepath.Join(toPath, "input1", file)); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", file) + } + } }) t.Run("OffloadedMetadataContainsMapOfStringString", func(t *testing.T) { @@ -297,5 +303,11 @@ func TestRecursiveDownload(t *testing.T) { assert.NotNil(t, lMap) assert.Equal(t, "value1", varMap["input1"].(VarMap)["key1"]) assert.Equal(t, "value2", varMap["input1"].(VarMap)["key2"]) + + for _, file := range []string{"key1", "key2"} { + if _, err := os.Stat(filepath.Join(toPath, "input1", file)); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", file) + } + } }) }