Skip to content

Commit

Permalink
Use keys that do not collide when data is stored in a single bucket. (#…
Browse files Browse the repository at this point in the history
…865)

Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Nov 5, 2024
1 parent 5fd9a08 commit 14c53d8
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 113 deletions.
5 changes: 0 additions & 5 deletions common/aws/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ type ClientConfig struct {
// EndpointURL of the S3 endpoint to use. If this is not set then the default AWS S3 endpoint will be used.
EndpointURL string

// FragmentPrefixChars is the number of characters of the key to use as the prefix for fragmented files.
// A value of "3" for the key "ABCDEFG" will result in the prefix "ABC". Default is 3.
FragmentPrefixChars int
// FragmentParallelismFactor helps determine the size of the pool of workers to help upload/download files.
// A non-zero value for this parameter adds a number of workers equal to the number of cores times this value.
// Default is 8. In general, the number of workers here can be a lot larger than the number of cores because the
Expand Down Expand Up @@ -120,7 +117,6 @@ func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
AccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, AccessKeyIdFlagName)),
SecretAccessKey: ctx.GlobalString(common.PrefixFlag(flagPrefix, SecretAccessKeyFlagName)),
EndpointURL: ctx.GlobalString(common.PrefixFlag(flagPrefix, EndpointURLFlagName)),
FragmentPrefixChars: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentPrefixCharsFlagName)),
FragmentParallelismFactor: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismFactorFlagName)),
FragmentParallelismConstant: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismConstantFlagName)),
FragmentReadTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentReadTimeoutFlagName)),
Expand All @@ -132,7 +128,6 @@ func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
func DefaultClientConfig() *ClientConfig {
return &ClientConfig{
Region: "us-east-2",
FragmentPrefixChars: 3,
FragmentParallelismFactor: 8,
FragmentParallelismConstant: 0,
FragmentReadTimeout: 30 * time.Second,
Expand Down
6 changes: 3 additions & 3 deletions common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type client struct {

var _ Client = (*client)(nil)

func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.Logger) (*client, error) {
func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.Logger) (Client, error) {
var err error
once.Do(func() {
customResolver := aws.EndpointResolverWithOptionsFunc(
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *client) FragmentedUploadObject(
data []byte,
fragmentSize int) error {

fragments, err := breakIntoFragments(key, data, s.cfg.FragmentPrefixChars, fragmentSize)
fragments, err := breakIntoFragments(key, data, fragmentSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *client) FragmentedDownloadObject(
return nil, errors.New("fragmentSize must be greater than 0")
}

fragmentKeys, err := getFragmentKeys(key, s.cfg.FragmentPrefixChars, getFragmentCount(fileSize, fragmentSize))
fragmentKeys, err := getFragmentKeys(key, getFragmentCount(fileSize, fragmentSize))
if err != nil {
return nil, err
}
Expand Down
27 changes: 10 additions & 17 deletions common/aws/s3/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@ func getFragmentCount(fileSize int, fragmentSize int) int {

// getFragmentKey returns the key for the fragment at the given index.
//
// Fragment keys take the form of "prefix/body-index[f]". The prefix is the first prefixLength characters
// of the file key. The body is the file key. The index is the index of the fragment. The character "f" is appended
// to the key of the last fragment in the series.
// Fragment keys take the form of "body-index[f]". The index is the index of the fragment. The character "f" is
// appended to the key of the last fragment in the series.
//
// Example: fileKey="abc123", prefixLength=2, fragmentCount=3
// The keys will be "ab/abc123-0", "ab/abc123-1", "ab/abc123-2f"
func getFragmentKey(fileKey string, prefixLength int, fragmentCount int, index int) (string, error) {
var prefix string
if prefixLength > len(fileKey) {
prefix = fileKey
} else {
prefix = fileKey[:prefixLength]
}
// Example: fileKey="abc123", fragmentCount=3
// The keys will be "abc123-0", "abc123-1", "abc123-2f"
func getFragmentKey(fileKey string, fragmentCount int, index int) (string, error) {

postfix := ""
if fragmentCount-1 == index {
Expand All @@ -42,7 +35,7 @@ func getFragmentKey(fileKey string, prefixLength int, fragmentCount int, index i
return "", fmt.Errorf("index %d is too high for fragment count %d", index, fragmentCount)
}

return fmt.Sprintf("%s/%s-%d%s", prefix, fileKey, index, postfix), nil
return fmt.Sprintf("%s-%d%s", fileKey, index, postfix), nil
}

// Fragment is a subset of a file.
Expand All @@ -53,7 +46,7 @@ type Fragment struct {
}

// breakIntoFragments breaks a file into fragments of the given size.
func breakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentSize int) ([]*Fragment, error) {
func breakIntoFragments(fileKey string, data []byte, fragmentSize int) ([]*Fragment, error) {
fragmentCount := getFragmentCount(len(data), fragmentSize)
fragments := make([]*Fragment, fragmentCount)
for i := 0; i < fragmentCount; i++ {
Expand All @@ -63,7 +56,7 @@ func breakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentS
end = len(data)
}

fragmentKey, err := getFragmentKey(fileKey, prefixLength, fragmentCount, i)
fragmentKey, err := getFragmentKey(fileKey, fragmentCount, i)
if err != nil {
return nil, err
}
Expand All @@ -77,10 +70,10 @@ func breakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentS
}

// getFragmentKeys returns the keys for all fragments of a file.
func getFragmentKeys(fileKey string, prefixLength int, fragmentCount int) ([]string, error) {
func getFragmentKeys(fileKey string, fragmentCount int) ([]string, error) {
keys := make([]string, fragmentCount)
for i := 0; i < fragmentCount; i++ {
fragmentKey, err := getFragmentKey(fileKey, prefixLength, fragmentCount, i)
fragmentKey, err := getFragmentKey(fileKey, fragmentCount, i)
if err != nil {
return nil, err
}
Expand Down
90 changes: 24 additions & 66 deletions common/aws/s3/fragment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,6 @@ func TestGetFragmentCount(t *testing.T) {
assert.Equal(t, expectedFragmentCount, fragmentCount)
}

// Fragment keys take the form of "prefix/body-index[f]". Verify the prefix part of the key.
func TestPrefix(t *testing.T) {
tu.InitializeRandom()

keyLength := rand.Intn(10) + 10
key := tu.RandomString(keyLength)

for i := 0; i < keyLength*2; i++ {
fragmentCount := rand.Intn(10) + 10
fragmentIndex := rand.Intn(fragmentCount)
fragmentKey, err := getFragmentKey(key, i, fragmentCount, fragmentIndex)
assert.NoError(t, err)

parts := strings.Split(fragmentKey, "/")
assert.Equal(t, 2, len(parts))
prefix := parts[0]

if i >= keyLength {
assert.Equal(t, key, prefix)
} else {
assert.Equal(t, key[:i], prefix)
}
}
}

// Fragment keys take the form of "prefix/body-index[f]". Verify the body part of the key.
func TestKeyBody(t *testing.T) {
tu.InitializeRandom()
Expand All @@ -86,12 +61,10 @@ func TestKeyBody(t *testing.T) {
key := tu.RandomString(keyLength)
fragmentCount := rand.Intn(10) + 10
fragmentIndex := rand.Intn(fragmentCount)
fragmentKey, err := getFragmentKey(key, rand.Intn(10), fragmentCount, fragmentIndex)
fragmentKey, err := getFragmentKey(key, fragmentCount, fragmentIndex)
assert.NoError(t, err)

parts := strings.Split(fragmentKey, "/")
assert.Equal(t, 2, len(parts))
parts = strings.Split(parts[1], "-")
parts := strings.Split(fragmentKey, "-")
assert.Equal(t, 2, len(parts))
body := parts[0]

Expand All @@ -106,12 +79,10 @@ func TestKeyIndex(t *testing.T) {
for i := 0; i < 10; i++ {
fragmentCount := rand.Intn(10) + 10
index := rand.Intn(fragmentCount)
fragmentKey, err := getFragmentKey(tu.RandomString(10), rand.Intn(10), fragmentCount, index)
fragmentKey, err := getFragmentKey(tu.RandomString(10), fragmentCount, index)
assert.NoError(t, err)

parts := strings.Split(fragmentKey, "/")
assert.Equal(t, 2, len(parts))
parts = strings.Split(parts[1], "-")
parts := strings.Split(fragmentKey, "-")
assert.Equal(t, 2, len(parts))
indexStr := parts[1]
assert.True(t, strings.HasPrefix(indexStr, fmt.Sprintf("%d", index)))
Expand All @@ -126,7 +97,7 @@ func TestKeyPostfix(t *testing.T) {
segmentCount := rand.Intn(10) + 10

for i := 0; i < segmentCount; i++ {
fragmentKey, err := getFragmentKey(tu.RandomString(10), rand.Intn(10), segmentCount, i)
fragmentKey, err := getFragmentKey(tu.RandomString(10), segmentCount, i)
assert.NoError(t, err)

if i == segmentCount-1 {
Expand All @@ -139,41 +110,35 @@ func TestKeyPostfix(t *testing.T) {

// TestExampleInGodoc tests the example provided in the documentation for getFragmentKey().
//
// Example: fileKey="abc123", prefixLength=2, fragmentCount=3
// The keys will be "ab/abc123-0", "ab/abc123-1", "ab/abc123-2f"
// Example: fileKey="abc123", fragmentCount=3
// The keys will be "abc123-0", "abc123-1", "abc123-2f"
func TestExampleInGodoc(t *testing.T) {
fileKey := "abc123"
prefixLength := 2
fragmentCount := 3
fragmentKeys, err := getFragmentKeys(fileKey, prefixLength, fragmentCount)
fragmentKeys, err := getFragmentKeys(fileKey, fragmentCount)
assert.NoError(t, err)
assert.Equal(t, 3, len(fragmentKeys))
assert.Equal(t, "ab/abc123-0", fragmentKeys[0])
assert.Equal(t, "ab/abc123-1", fragmentKeys[1])
assert.Equal(t, "ab/abc123-2f", fragmentKeys[2])
assert.Equal(t, "abc123-0", fragmentKeys[0])
assert.Equal(t, "abc123-1", fragmentKeys[1])
assert.Equal(t, "abc123-2f", fragmentKeys[2])
}

func TestGetFragmentKeys(t *testing.T) {
tu.InitializeRandom()

fileKey := tu.RandomString(10)
prefixLength := rand.Intn(3) + 1
fragmentCount := rand.Intn(10) + 10

fragmentKeys, err := getFragmentKeys(fileKey, prefixLength, fragmentCount)
fragmentKeys, err := getFragmentKeys(fileKey, fragmentCount)
assert.NoError(t, err)
assert.Equal(t, fragmentCount, len(fragmentKeys))

for i := 0; i < fragmentCount; i++ {
expectedKey, err := getFragmentKey(fileKey, prefixLength, fragmentCount, i)
expectedKey, err := getFragmentKey(fileKey, fragmentCount, i)
assert.NoError(t, err)
assert.Equal(t, expectedKey, fragmentKeys[i])

parts := strings.Split(fragmentKeys[i], "/")
assert.Equal(t, 2, len(parts))
parsedPrefix := parts[0]
assert.Equal(t, fileKey[:prefixLength], parsedPrefix)
parts = strings.Split(parts[1], "-")
parts := strings.Split(fragmentKeys[i], "-")
assert.Equal(t, 2, len(parts))
parsedKey := parts[0]
assert.Equal(t, fileKey, parsedKey)
Expand All @@ -192,17 +157,16 @@ func TestGetFragments(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, getFragmentCount(len(data), fragmentSize), len(fragments))

totalSize := 0

for i, fragment := range fragments {
fragmentKey, err := getFragmentKey(fileKey, prefixLength, len(fragments), i)
fragmentKey, err := getFragmentKey(fileKey, len(fragments), i)
assert.NoError(t, err)
assert.Equal(t, fragmentKey, fragment.FragmentKey)

Expand All @@ -224,14 +188,13 @@ func TestGetFragmentsSmallFile(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(10)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, 1, len(fragments))

fragmentKey, err := getFragmentKey(fileKey, prefixLength, 1, 0)
fragmentKey, err := getFragmentKey(fileKey, 1, 0)
assert.NoError(t, err)
assert.Equal(t, fragmentKey, fragments[0].FragmentKey)
assert.Equal(t, data, fragments[0].Data)
Expand All @@ -244,13 +207,12 @@ func TestGetFragmentsExactlyOnePerfectlySizedFile(t *testing.T) {
fileKey := tu.RandomString(10)
fragmentSize := rand.Intn(100) + 100
data := tu.RandomBytes(fragmentSize)
prefixLength := rand.Intn(3) + 1

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, 1, len(fragments))

fragmentKey, err := getFragmentKey(fileKey, prefixLength, 1, 0)
fragmentKey, err := getFragmentKey(fileKey, 1, 0)
assert.NoError(t, err)
assert.Equal(t, fragmentKey, fragments[0].FragmentKey)
assert.Equal(t, data, fragments[0].Data)
Expand All @@ -262,10 +224,9 @@ func TestRecombineFragments(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
recombinedData, err := recombineFragments(fragments)
assert.NoError(t, err)
Expand All @@ -287,10 +248,9 @@ func TestRecombineFragmentsSmallFile(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(10)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
assert.Equal(t, 1, len(fragments))
recombinedData, err := recombineFragments(fragments)
Expand All @@ -303,10 +263,9 @@ func TestMissingFragment(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)

fragmentIndexToSkip := rand.Intn(len(fragments))
Expand All @@ -321,10 +280,9 @@ func TestMissingFinalFragment(t *testing.T) {

fileKey := tu.RandomString(10)
data := tu.RandomBytes(1000)
prefixLength := rand.Intn(3) + 1
fragmentSize := rand.Intn(100) + 100

fragments, err := breakIntoFragments(fileKey, data, prefixLength, fragmentSize)
fragments, err := breakIntoFragments(fileKey, data, fragmentSize)
assert.NoError(t, err)
fragments = fragments[:len(fragments)-1]

Expand Down
55 changes: 55 additions & 0 deletions common/aws/s3/scoped_keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package s3

import (
"fmt"
v2 "github.com/Layr-Labs/eigenda/core/v2"
)

const (
// prefixLength is the number of characters to use from the base key to form the prefix.
// Assuming keys take the form of a random hash in hex, 3 will yield 16^3 = 4096 possible prefixes.
// This is currently hard coded because it is not expected to change, and it would require migration
// to change it that we have not yet implemented.
prefixLength = 3

// blobNamespace is the namespace for a blob key.
blobNamespace = "blob"

// chunkNamespace is the namespace for a chunk key.
chunkNamespace = "chunk"

// proofNamespace is the namespace for a proof key.
proofNamespace = "proof"
)

// ScopedKey returns a key that is scoped to a "namespace". Keys take the form of "prefix/namespace/baseKey".
// Although there is no runtime enforcement, neither the base key nor the namespace should contain any
// non-alphanumeric characters.
func ScopedKey(namespace string, baseKey string, prefixLength int) string {
var prefix string
if prefixLength > len(baseKey) {
prefix = baseKey
} else {
prefix = baseKey[:prefixLength]
}

return fmt.Sprintf("%s/%s/%s", prefix, namespace, baseKey)
}

// ScopedBlobKey returns a key scoped to the blob namespace. Used to name files containing blobs in S3.
// A key scoped for blobs will never collide with a key scoped for chunks or proofs.
func ScopedBlobKey(blobKey v2.BlobKey) string {
return ScopedKey(blobNamespace, blobKey.Hex(), prefixLength)
}

// ScopedChunkKey returns a key scoped to the chunk namespace. Used to name files containing chunks in S3.
// A key scoped for chunks will never collide with a key scoped for blobs or proofs.
func ScopedChunkKey(blobKey v2.BlobKey) string {
return ScopedKey(chunkNamespace, blobKey.Hex(), prefixLength)
}

// ScopedProofKey returns a key scoped to the proof namespace. Used to name files containing proofs in S3.
// A key scoped for proofs will never collide with a key scoped for blobs or chunks.
func ScopedProofKey(blobKey v2.BlobKey) string {
return ScopedKey(proofNamespace, blobKey.Hex(), prefixLength)
}
Loading

0 comments on commit 14c53d8

Please sign in to comment.