Skip to content

Commit

Permalink
Adding support for downloading offloaded literal in copilot (#6048)
Browse files Browse the repository at this point in the history
* Adding support for downloading offloaded literal in copilot

Signed-off-by: pmahindrakar-oss <[email protected]>

* feedback + move folder creation up + test coverage for file creation

Signed-off-by: pmahindrakar-oss <[email protected]>

---------

Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Nov 26, 2024
1 parent c150150 commit ab04192
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 0 deletions.
18 changes: 18 additions & 0 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa
Scalar: s,
}}, nil
case *core.Literal_Collection:
err := os.MkdirAll(filePath, os.ModePerm)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath)
}
v, c2, err := d.handleCollection(ctx, lit.GetCollection(), filePath, writeToFile)
if err != nil {
return nil, nil, err
Expand All @@ -366,6 +370,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa
Collection: c2,
}}, nil
case *core.Literal_Map:
err := os.MkdirAll(filePath, os.ModePerm)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath)
}
v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, writeToFile)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -410,6 +418,16 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM
}
f := make(FutureMap, len(inputs.GetLiterals()))
for variable, literal := range inputs.GetLiterals() {
if literal.GetOffloadedMetadata() != nil {
offloadedMetadataURI := literal.GetOffloadedMetadata().GetUri()
// literal will be overwritten with the contents of the offloaded data which contains the actual large literal.
if err := d.store.ReadProtobuf(ctx, storage.DataReference(offloadedMetadataURI), literal); err != nil {
errString := fmt.Sprintf("Failed to read the object at location [%s] with error [%s]", offloadedMetadataURI, err)
logger.Error(ctx, errString)
return nil, nil, fmt.Errorf("%s", errString)
}
logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI)
}
varPath := path.Join(dir, variable)
lit := literal
f[variable] = futures.NewAsyncFuture(childCtx, func(ctx2 context.Context) (interface{}, error) {
Expand Down
159 changes: 159 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,162 @@ func TestHandleBlobHTTP(t *testing.T) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestRecursiveDownload(t *testing.T) {
t.Run("OffloadedMetadataContainsCollectionOfStrings", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

offloadedLiteral := &core.Literal{
Value: &core.Literal_OffloadedMetadata{
OffloadedMetadata: &core.LiteralOffloadedMetadata{
Uri: "s3://container/offloaded",
},
},
}

inputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"input1": offloadedLiteral,
},
}

// Mock reading the offloaded metadata
err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{
Value: &core.Literal_Collection{
Collection: &core.LiteralCollection{
Literals: []*core.Literal{
{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "string1",
},
},
},
},
},
},
{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "string2",
},
},
},
},
},
},
},
},
},
})
assert.NoError(t, err)

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true)
assert.NoError(t, err)
assert.NotNil(t, varMap)
assert.NotNil(t, lMap)
assert.Equal(t, []interface{}{"string1", "string2"}, varMap["input1"])
// Check if files were created and data written
for _, file := range []string{"0", "1"} {
if _, err := os.Stat(filepath.Join(toPath, "input1", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})

t.Run("OffloadedMetadataContainsMapOfStringString", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

offloadedLiteral := &core.Literal{
Value: &core.Literal_OffloadedMetadata{
OffloadedMetadata: &core.LiteralOffloadedMetadata{
Uri: "s3://container/offloaded",
},
},
}

inputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"input1": offloadedLiteral,
},
}

// Mock reading the offloaded metadata
err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{
Value: &core.Literal_Map{
Map: &core.LiteralMap{
Literals: map[string]*core.Literal{
"key1": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "value1",
},
},
},
},
},
},
"key2": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "value2",
},
},
},
},
},
},
},
},
},
})
assert.NoError(t, err)

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true)
assert.NoError(t, err)
assert.NotNil(t, varMap)
assert.NotNil(t, lMap)
assert.Equal(t, "value1", varMap["input1"].(VarMap)["key1"])
assert.Equal(t, "value2", varMap["input1"].(VarMap)["key2"])

for _, file := range []string{"key1", "key2"} {
if _, err := os.Stat(filepath.Join(toPath, "input1", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})
}

0 comments on commit ab04192

Please sign in to comment.