From 94c11ad5decef0afc57f4b62ac884594b7cfffd1 Mon Sep 17 00:00:00 2001 From: Cody Littley <56973212+cody-littley@users.noreply.github.com> Date: Fri, 15 Nov 2024 08:32:48 -0600 Subject: [PATCH] Implement core relay functionality (#869) Signed-off-by: Cody Littley --- .github/workflows/golangci-lint.yml | 2 +- api/grpc/common/common.pb.go | 75 +- api/grpc/common/v2/frame.pb.go | 328 ------ api/grpc/relay/relay.pb.go | 4 +- api/proto/common/common.proto | 9 - api/proto/common/v2/frame.proto | 32 - api/proto/relay/relay.proto | 4 +- common/aws/s3/client.go | 38 +- core/data_test.go | 11 - disperser/apiserver/server.go | 2 +- .../blobstore/dynamo_metadata_store_test.go | 2 +- disperser/encoder/server_v2_test.go | 2 +- encoding/data.go | 96 -- relay/blob_provider.go | 78 ++ relay/blob_provider_test.go | 76 ++ relay/{ => cache}/cached_accessor.go | 16 +- relay/{ => cache}/cached_accessor_test.go | 6 +- relay/chunk_provider.go | 178 ++++ relay/chunk_provider_test.go | 158 +++ relay/chunkstore/chunk_reader.go | 5 +- relay/chunkstore/chunk_store_test.go | 4 +- relay/config.go | 44 + relay/metadata_provider.go | 169 ++++ relay/metadata_provider_test.go | 418 ++++++++ relay/relay_test_utils.go | 227 +++++ relay/server.go | 212 ++++ relay/server_test.go | 939 ++++++++++++++++++ 27 files changed, 2550 insertions(+), 585 deletions(-) delete mode 100644 api/grpc/common/v2/frame.pb.go delete mode 100644 api/proto/common/v2/frame.proto create mode 100644 relay/blob_provider.go create mode 100644 relay/blob_provider_test.go rename relay/{ => cache}/cached_accessor.go (93%) rename relay/{ => cache}/cached_accessor_test.go (97%) create mode 100644 relay/chunk_provider.go create mode 100644 relay/chunk_provider_test.go create mode 100644 relay/config.go create mode 100644 relay/metadata_provider.go create mode 100644 relay/metadata_provider_test.go create mode 100644 relay/relay_test_utils.go create mode 100644 relay/server.go create mode 100644 relay/server_test.go diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 5a94403cb1..0cdc443481 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -24,4 +24,4 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.60 - args: --timeout 3m --verbose + args: --timeout 3m --verbose --out-format=colored-line-number diff --git a/api/grpc/common/common.pb.go b/api/grpc/common/common.pb.go index 6d44b9438e..cdf0f16abe 100644 --- a/api/grpc/common/common.pb.go +++ b/api/grpc/common/common.pb.go @@ -213,54 +213,6 @@ func (x *PaymentHeader) GetCumulativePayment() []byte { return nil } -// A chunk of a blob. -type ChunkData struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` -} - -func (x *ChunkData) Reset() { - *x = ChunkData{} - if protoimpl.UnsafeEnabled { - mi := &file_common_common_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ChunkData) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ChunkData) ProtoMessage() {} - -func (x *ChunkData) ProtoReflect() protoreflect.Message { - mi := &file_common_common_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ChunkData.ProtoReflect.Descriptor instead. -func (*ChunkData) Descriptor() ([]byte, []int) { - return file_common_common_proto_rawDescGZIP(), []int{3} -} - -func (x *ChunkData) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - var File_common_common_proto protoreflect.FileDescriptor var file_common_common_proto_rawDesc = []byte{ @@ -286,12 +238,10 @@ var file_common_common_proto_rawDesc = []byte{ 0x78, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x75, 0x6d, 0x75, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x63, 0x75, 0x6d, 0x75, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, - 0x22, 0x1f, 0x0a, 0x09, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, - 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, - 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, + 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -306,12 +256,11 @@ func file_common_common_proto_rawDescGZIP() []byte { return file_common_common_proto_rawDescData } -var file_common_common_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_common_common_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_common_common_proto_goTypes = []interface{}{ (*G1Commitment)(nil), // 0: common.G1Commitment (*BlobCommitment)(nil), // 1: common.BlobCommitment (*PaymentHeader)(nil), // 2: common.PaymentHeader - (*ChunkData)(nil), // 3: common.ChunkData } var file_common_common_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -363,18 +312,6 @@ func file_common_common_proto_init() { return nil } } - file_common_common_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ChunkData); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } } type x struct{} out := protoimpl.TypeBuilder{ @@ -382,7 +319,7 @@ func file_common_common_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_common_common_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/api/grpc/common/v2/frame.pb.go b/api/grpc/common/v2/frame.pb.go deleted file mode 100644 index 351cd3d250..0000000000 --- a/api/grpc/common/v2/frame.pb.go +++ /dev/null @@ -1,328 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc v4.23.4 -// source: common/v2/frame.proto - -package v2 - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// A single chunk of a blob. -type Frame struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Proof is the multilevel proof corresponding to the chunk - Proof *Proof `protobuf:"bytes,1,opt,name=proof,proto3" json:"proof,omitempty"` - // Coeffs contains the coefficient of the interpolating polynomial of the chunk - Coeffs []*Element `protobuf:"bytes,2,rep,name=coeffs,proto3" json:"coeffs,omitempty"` -} - -func (x *Frame) Reset() { - *x = Frame{} - if protoimpl.UnsafeEnabled { - mi := &file_common_v2_frame_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Frame) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Frame) ProtoMessage() {} - -func (x *Frame) ProtoReflect() protoreflect.Message { - mi := &file_common_v2_frame_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Frame.ProtoReflect.Descriptor instead. -func (*Frame) Descriptor() ([]byte, []int) { - return file_common_v2_frame_proto_rawDescGZIP(), []int{0} -} - -func (x *Frame) GetProof() *Proof { - if x != nil { - return x.Proof - } - return nil -} - -func (x *Frame) GetCoeffs() []*Element { - if x != nil { - return x.Coeffs - } - return nil -} - -// Corresponds to the encoding.Proof type -type Proof struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - X *Element `protobuf:"bytes,1,opt,name=x,proto3" json:"x,omitempty"` - Y *Element `protobuf:"bytes,2,opt,name=y,proto3" json:"y,omitempty"` -} - -func (x *Proof) Reset() { - *x = Proof{} - if protoimpl.UnsafeEnabled { - mi := &file_common_v2_frame_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Proof) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Proof) ProtoMessage() {} - -func (x *Proof) ProtoReflect() protoreflect.Message { - mi := &file_common_v2_frame_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Proof.ProtoReflect.Descriptor instead. -func (*Proof) Descriptor() ([]byte, []int) { - return file_common_v2_frame_proto_rawDescGZIP(), []int{1} -} - -func (x *Proof) GetX() *Element { - if x != nil { - return x.X - } - return nil -} - -func (x *Proof) GetY() *Element { - if x != nil { - return x.Y - } - return nil -} - -// Corresponds to the fp.Element or fr.Element type. An fr/fp Element is made up of 4 uint64s. -// Since protobufs don't support fixed-size arrays, we use a fixed number of uint64 fields. -type Element struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - C0 uint64 `protobuf:"varint,1,opt,name=c0,proto3" json:"c0,omitempty"` - C1 uint64 `protobuf:"varint,2,opt,name=c1,proto3" json:"c1,omitempty"` - C2 uint64 `protobuf:"varint,3,opt,name=c2,proto3" json:"c2,omitempty"` - C3 uint64 `protobuf:"varint,4,opt,name=c3,proto3" json:"c3,omitempty"` -} - -func (x *Element) Reset() { - *x = Element{} - if protoimpl.UnsafeEnabled { - mi := &file_common_v2_frame_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Element) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Element) ProtoMessage() {} - -func (x *Element) ProtoReflect() protoreflect.Message { - mi := &file_common_v2_frame_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Element.ProtoReflect.Descriptor instead. -func (*Element) Descriptor() ([]byte, []int) { - return file_common_v2_frame_proto_rawDescGZIP(), []int{2} -} - -func (x *Element) GetC0() uint64 { - if x != nil { - return x.C0 - } - return 0 -} - -func (x *Element) GetC1() uint64 { - if x != nil { - return x.C1 - } - return 0 -} - -func (x *Element) GetC2() uint64 { - if x != nil { - return x.C2 - } - return 0 -} - -func (x *Element) GetC3() uint64 { - if x != nil { - return x.C3 - } - return 0 -} - -var File_common_v2_frame_proto protoreflect.FileDescriptor - -var file_common_v2_frame_proto_rawDesc = []byte{ - 0x0a, 0x15, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x66, 0x72, 0x61, 0x6d, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, - 0x76, 0x32, 0x22, 0x5b, 0x0a, 0x05, 0x46, 0x72, 0x61, 0x6d, 0x65, 0x12, 0x26, 0x0a, 0x05, 0x70, - 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, 0x6f, 0x6d, - 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x05, 0x70, 0x72, - 0x6f, 0x6f, 0x66, 0x12, 0x2a, 0x0a, 0x06, 0x63, 0x6f, 0x65, 0x66, 0x66, 0x73, 0x18, 0x02, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, - 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x63, 0x6f, 0x65, 0x66, 0x66, 0x73, 0x22, - 0x4b, 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x20, 0x0a, 0x01, 0x78, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, - 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x01, 0x78, 0x12, 0x20, 0x0a, 0x01, 0x79, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, - 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x01, 0x79, 0x22, 0x49, 0x0a, 0x07, - 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x63, 0x30, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x02, 0x63, 0x30, 0x12, 0x0e, 0x0a, 0x02, 0x63, 0x31, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x02, 0x63, 0x31, 0x12, 0x0e, 0x0a, 0x02, 0x63, 0x32, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x02, 0x63, 0x32, 0x12, 0x0e, 0x0a, 0x02, 0x63, 0x33, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x02, 0x63, 0x33, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, - 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, - 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, -} - -var ( - file_common_v2_frame_proto_rawDescOnce sync.Once - file_common_v2_frame_proto_rawDescData = file_common_v2_frame_proto_rawDesc -) - -func file_common_v2_frame_proto_rawDescGZIP() []byte { - file_common_v2_frame_proto_rawDescOnce.Do(func() { - file_common_v2_frame_proto_rawDescData = protoimpl.X.CompressGZIP(file_common_v2_frame_proto_rawDescData) - }) - return file_common_v2_frame_proto_rawDescData -} - -var file_common_v2_frame_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_common_v2_frame_proto_goTypes = []interface{}{ - (*Frame)(nil), // 0: common.v2.Frame - (*Proof)(nil), // 1: common.v2.Proof - (*Element)(nil), // 2: common.v2.Element -} -var file_common_v2_frame_proto_depIdxs = []int32{ - 1, // 0: common.v2.Frame.proof:type_name -> common.v2.Proof - 2, // 1: common.v2.Frame.coeffs:type_name -> common.v2.Element - 2, // 2: common.v2.Proof.x:type_name -> common.v2.Element - 2, // 3: common.v2.Proof.y:type_name -> common.v2.Element - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_common_v2_frame_proto_init() } -func file_common_v2_frame_proto_init() { - if File_common_v2_frame_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_common_v2_frame_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Frame); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_common_v2_frame_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Proof); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_common_v2_frame_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Element); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_common_v2_frame_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_common_v2_frame_proto_goTypes, - DependencyIndexes: file_common_v2_frame_proto_depIdxs, - MessageInfos: file_common_v2_frame_proto_msgTypes, - }.Build() - File_common_v2_frame_proto = out.File - file_common_v2_frame_proto_rawDesc = nil - file_common_v2_frame_proto_goTypes = nil - file_common_v2_frame_proto_depIdxs = nil -} diff --git a/api/grpc/relay/relay.pb.go b/api/grpc/relay/relay.pb.go index 3ecd7dd8e3..d01287a918 100644 --- a/api/grpc/relay/relay.pb.go +++ b/api/grpc/relay/relay.pb.go @@ -312,7 +312,8 @@ func (x *ChunkRequestByRange) GetEndIndex() uint32 { return 0 } -// A request for chunks within a specific blob. +// A request for chunks within a specific blob. Requests are fulfilled in all-or-nothing fashion. If any of the +// requested chunks are not found or are unable to be fetched, the entire request will fail. type ChunkRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -397,7 +398,6 @@ func (*ChunkRequest_ByIndex) isChunkRequest_Request() {} func (*ChunkRequest_ByRange) isChunkRequest_Request() {} // The reply to a GetChunks request. -// Requests are fulfilled in all-or-nothing fashion. If any of the requested chunks are not found, the entire request will fail. type GetChunksReply struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/api/proto/common/common.proto b/api/proto/common/common.proto index 0f8f51a1a6..a457652ff8 100644 --- a/api/proto/common/common.proto +++ b/api/proto/common/common.proto @@ -23,12 +23,3 @@ message PaymentHeader { uint32 bin_index = 2; bytes cumulative_payment = 3; } - -///////////////////////////////////////////////////////////////////////////////////// -// Experimental: the following definitions are experimental and subject to change. // -///////////////////////////////////////////////////////////////////////////////////// - -// A chunk of a blob. -message ChunkData { - bytes data = 1; -} \ No newline at end of file diff --git a/api/proto/common/v2/frame.proto b/api/proto/common/v2/frame.proto deleted file mode 100644 index 72fba06998..0000000000 --- a/api/proto/common/v2/frame.proto +++ /dev/null @@ -1,32 +0,0 @@ -syntax = "proto3"; -package common.v2; -option go_package = "github.com/Layr-Labs/eigenda/api/grpc/common/v2"; - -///////////////////////////////////////////////////////////////////////////////////// -// Experimental: the following definitions are experimental and subject to change. // -///////////////////////////////////////////////////////////////////////////////////// - -// TODO: documentation was generated by copilot AI, make sure it's accurate. - -// A single chunk of a blob. -message Frame { - // Proof is the multilevel proof corresponding to the chunk - Proof proof = 1; - // Coeffs contains the coefficient of the interpolating polynomial of the chunk - repeated Element coeffs = 2; -} - -// Corresponds to the encoding.Proof type -message Proof { - Element x = 1; - Element y = 2; -} - -// Corresponds to the fp.Element or fr.Element type. An fr/fp Element is made up of 4 uint64s. -// Since protobufs don't support fixed-size arrays, we use a fixed number of uint64 fields. -message Element { - uint64 c0 = 1; - uint64 c1 = 2; - uint64 c2 = 3; - uint64 c3 = 4; -} diff --git a/api/proto/relay/relay.proto b/api/proto/relay/relay.proto index f5d0d3b72d..ed52c0a0a5 100644 --- a/api/proto/relay/relay.proto +++ b/api/proto/relay/relay.proto @@ -59,7 +59,8 @@ message ChunkRequestByRange { uint32 end_index = 3; } -// A request for chunks within a specific blob. +// A request for chunks within a specific blob. Requests are fulfilled in all-or-nothing fashion. If any of the +// requested chunks are not found or are unable to be fetched, the entire request will fail. message ChunkRequest { oneof request { // Request chunks by their individual indices. @@ -70,7 +71,6 @@ message ChunkRequest { } // The reply to a GetChunks request. -// Requests are fulfilled in all-or-nothing fashion. If any of the requested chunks are not found, the entire request will fail. message GetChunksReply { // The chunks requested. The order of these chunks will be the same as the order of the requested chunks. // data is the raw data of the bundle (i.e. serialized byte array of the frames) diff --git a/common/aws/s3/client.go b/common/aws/s3/client.go index ed0ce0dbaa..3b773140c8 100644 --- a/common/aws/s3/client.go +++ b/common/aws/s3/client.go @@ -32,8 +32,11 @@ type Object struct { type client struct { cfg *commonaws.ClientConfig s3Client *s3.Client - pool *errgroup.Group - logger logging.Logger + + // concurrencyLimiter is a channel that limits the number of concurrent operations. + concurrencyLimiter chan struct{} + + logger logging.Logger } var _ Client = (*client)(nil) @@ -89,14 +92,14 @@ func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.L workers = 1 } - pool, _ := errgroup.WithContext(ctx) + pool := &errgroup.Group{} pool.SetLimit(workers) ref = &client{ - cfg: &cfg, - s3Client: s3Client, - pool: pool, - logger: logger.With("component", "S3Client"), + cfg: &cfg, + s3Client: s3Client, + concurrencyLimiter: make(chan struct{}, workers), + logger: logger.With("component", "S3Client"), } }) return ref, err @@ -225,20 +228,22 @@ func (s *client) FragmentedUploadObject( for _, fragment := range fragments { fragmentCapture := fragment - s.pool.Go(func() error { + s.concurrencyLimiter <- struct{}{} + go func() { + defer func() { + <-s.concurrencyLimiter + }() s.fragmentedWriteTask(ctx, resultChannel, fragmentCapture, bucket) - return nil - }) + }() } for range fragments { - err := <-resultChannel + err = <-resultChannel if err != nil { return err } } return ctx.Err() - } // fragmentedWriteTask writes a single file to S3. @@ -284,10 +289,13 @@ func (s *client) FragmentedDownloadObject( for i, fragmentKey := range fragmentKeys { boundFragmentKey := fragmentKey boundI := i - s.pool.Go(func() error { + s.concurrencyLimiter <- struct{}{} + go func() { + defer func() { + <-s.concurrencyLimiter + }() s.readTask(ctx, resultChannel, bucket, boundFragmentKey, boundI) - return nil - }) + }() } fragments := make([]*Fragment, len(fragmentKeys)) diff --git a/core/data_test.go b/core/data_test.go index eec2e1bd41..84cb5097e9 100644 --- a/core/data_test.go +++ b/core/data_test.go @@ -2,7 +2,6 @@ package core_test import ( "bytes" - tu "github.com/Layr-Labs/eigenda/common/testutils" "math/rand" "testing" @@ -218,13 +217,3 @@ func TestChunksData(t *testing.T) { assert.EqualError(t, err, "unsupported chunk encoding format: 3") } } - -func TestRoundTripProtobufSerialization(t *testing.T) { - tu.InitializeRandom() - bundle := createBundle(t, 64, 64, rand.Int()) - - protobufBundle := encoding.FramesToProtobuf(bundle) - bundle2 := encoding.FramesFromProtobuf(protobufBundle) - - checkBundleEquivalence(t, bundle, bundle2) -} diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 58e4b907dc..87e82b5a10 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -16,7 +16,7 @@ import ( commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/common" - healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck" + "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/core/meterer" diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index cbe192d30b..67d77c6601 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -10,7 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/dynamodb" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - core "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" diff --git a/disperser/encoder/server_v2_test.go b/disperser/encoder/server_v2_test.go index 15cb6faea7..26c7110f7e 100644 --- a/disperser/encoder/server_v2_test.go +++ b/disperser/encoder/server_v2_test.go @@ -201,7 +201,7 @@ func createTestComponents(t *testing.T) *testComponents { dynamoDBClient := &mock.MockDynamoDBClient{} blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger) chunkStoreWriter := chunkstore.NewChunkWriter(logger, s3Client, s3BucketName, 512*1024) - chunkStoreReader := chunkstore.NewChunkReader(logger, s3Client, s3BucketName, []uint32{}) + chunkStoreReader := chunkstore.NewChunkReader(logger, s3Client, s3BucketName) encoderServer := encoder.NewEncoderServerV2(encoder.ServerConfig{ GrpcPort: "8080", MaxConcurrentRequests: 10, diff --git a/encoding/data.go b/encoding/data.go index 37365ce519..b2033d199a 100644 --- a/encoding/data.go +++ b/encoding/data.go @@ -2,9 +2,7 @@ package encoding import ( pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common" - framepb "github.com/Layr-Labs/eigenda/api/grpc/common/v2" "github.com/consensys/gnark-crypto/ecc/bn254" - "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/consensys/gnark-crypto/ecc/bn254/fr" ) @@ -97,97 +95,3 @@ type FragmentInfo struct { // FragmentSizeBytes is the maximum fragment size used to store the chunk coefficients. FragmentSizeBytes uint32 } - -// ToProtobuf converts the FragmentInfo to protobuf format -func (f *Frame) ToProtobuf() *framepb.Frame { - proof := &framepb.Proof{ - X: fpElementToProtobuf(&f.Proof.X), - Y: fpElementToProtobuf(&f.Proof.Y), - } - - coeffs := make([]*framepb.Element, len(f.Coeffs)) - for i, c := range f.Coeffs { - coeffs[i] = frElementToProtobuf(&c) - } - - return &framepb.Frame{ - Proof: proof, - Coeffs: coeffs, - } -} - -// fpElementToProtobuf converts an fp.Element to protobuf format -func fpElementToProtobuf(e *fp.Element) *framepb.Element { - return &framepb.Element{ - C0: e[0], - C1: e[1], - C2: e[2], - C3: e[3], - } -} - -// frElementToProtobuf converts an fr.Element to protobuf format -func frElementToProtobuf(e *fr.Element) *framepb.Element { - return &framepb.Element{ - C0: e[0], - C1: e[1], - C2: e[2], - C3: e[3], - } -} - -// fpElementFromProtobuf converts a protobuf element to an fp.Element -func fpElementFromProtobuf(e *framepb.Element) fp.Element { - return fp.Element{ - e.C0, - e.C1, - e.C2, - e.C3, - } -} - -// frElementFromProtobuf converts a protobuf element to an fr.Element -func frElementFromProtobuf(e *framepb.Element) fr.Element { - return fr.Element{ - e.C0, - e.C1, - e.C2, - e.C3, - } -} - -// FrameFromProtobuf converts a protobuf frame to a Frame. -func FrameFromProtobuf(f *framepb.Frame) *Frame { - proof := Proof{ - X: fpElementFromProtobuf(f.Proof.X), - Y: fpElementFromProtobuf(f.Proof.Y), - } - - coeffs := make([]Symbol, len(f.Coeffs)) - for i, c := range f.Coeffs { - coeffs[i] = frElementFromProtobuf(c) - } - - return &Frame{ - Proof: proof, - Coeffs: coeffs, - } -} - -// FramesToProtobuf converts a slice of Frames to protobuf format -func FramesToProtobuf(frames []*Frame) []*framepb.Frame { - protobufFrames := make([]*framepb.Frame, len(frames)) - for i, f := range frames { - protobufFrames[i] = f.ToProtobuf() - } - return protobufFrames -} - -// FramesFromProtobuf converts a slice of protobuf Frames to Frames. -func FramesFromProtobuf(frames []*framepb.Frame) []*Frame { - protobufFrames := make([]*Frame, len(frames)) - for i, f := range frames { - protobufFrames[i] = FrameFromProtobuf(f) - } - return protobufFrames -} diff --git a/relay/blob_provider.go b/relay/blob_provider.go new file mode 100644 index 0000000000..1d2da3a1ff --- /dev/null +++ b/relay/blob_provider.go @@ -0,0 +1,78 @@ +package relay + +import ( + "context" + "fmt" + "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/relay/cache" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +// blobProvider encapsulates logic for fetching blobs. Utilized by the relay Server. +// This struct adds caching and concurrency limitation on top of blobstore.BlobStore. +type blobProvider struct { + ctx context.Context + logger logging.Logger + + // blobStore is used to read blobs from S3. + blobStore *blobstore.BlobStore + + // blobCache is an LRU cache of blobs. + blobCache cache.CachedAccessor[v2.BlobKey, []byte] + + // concurrencyLimiter is a channel that limits the number of concurrent operations. + concurrencyLimiter chan struct{} +} + +// newBlobProvider creates a new blobProvider. +func newBlobProvider( + ctx context.Context, + logger logging.Logger, + blobStore *blobstore.BlobStore, + blobCacheSize int, + maxIOConcurrency int) (*blobProvider, error) { + + server := &blobProvider{ + ctx: ctx, + logger: logger, + blobStore: blobStore, + concurrencyLimiter: make(chan struct{}, maxIOConcurrency), + } + + c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, server.fetchBlob) + if err != nil { + return nil, fmt.Errorf("error creating blob cache: %w", err) + } + server.blobCache = c + + return server, nil +} + +// GetBlob retrieves a blob from the blob store. +func (s *blobProvider) GetBlob(blobKey v2.BlobKey) ([]byte, error) { + + s.concurrencyLimiter <- struct{}{} + data, err := s.blobCache.Get(blobKey) + <-s.concurrencyLimiter + + if err != nil { + // It should not be possible for external users to force an error here since we won't + // even call this method if the blob key is invalid (so it's ok to have a noisy log here). + s.logger.Errorf("Failed to fetch blob: %v", err) + return nil, err + } + + return data, nil +} + +// fetchBlob retrieves a single blob from the blob store. +func (s *blobProvider) fetchBlob(blobKey v2.BlobKey) ([]byte, error) { + data, err := s.blobStore.GetBlob(s.ctx, blobKey) + if err != nil { + s.logger.Errorf("Failed to fetch blob: %v", err) + return nil, err + } + + return data, nil +} diff --git a/relay/blob_provider_test.go b/relay/blob_provider_test.go new file mode 100644 index 0000000000..6e996977bb --- /dev/null +++ b/relay/blob_provider_test.go @@ -0,0 +1,76 @@ +package relay + +import ( + "context" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/stretchr/testify/require" + "testing" +) + +func TestReadWrite(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + blobStore := buildBlobStore(t, logger) + + expectedData := make(map[v2.BlobKey][]byte) + + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, data := randomBlob(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = data + + err = blobStore.StoreBlob(context.Background(), blobKey, data) + require.NoError(t, err) + } + + server, err := newBlobProvider(context.Background(), logger, blobStore, 10, 32) + require.NoError(t, err) + + // Read the blobs back. + for key, data := range expectedData { + blob, err := server.GetBlob(key) + + require.NoError(t, err) + require.Equal(t, data, blob) + } + + // Read the blobs back again to test caching. + for key, data := range expectedData { + blob, err := server.GetBlob(key) + + require.NoError(t, err) + require.Equal(t, data, blob) + } +} + +func TestNonExistentBlob(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + blobStore := buildBlobStore(t, logger) + + server, err := newBlobProvider(context.Background(), logger, blobStore, 10, 32) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + blob, err := server.GetBlob(v2.BlobKey(tu.RandomBytes(32))) + require.Error(t, err) + require.Nil(t, blob) + } +} diff --git a/relay/cached_accessor.go b/relay/cache/cached_accessor.go similarity index 93% rename from relay/cached_accessor.go rename to relay/cache/cached_accessor.go index c42b48af0d..31dfa6b968 100644 --- a/relay/cached_accessor.go +++ b/relay/cache/cached_accessor.go @@ -1,4 +1,4 @@ -package relay +package cache import ( lru "github.com/hashicorp/golang-lru/v2" @@ -9,7 +9,7 @@ import ( // are expensive, and prevents multiple concurrent cache misses for the same key. type CachedAccessor[K comparable, V any] interface { // Get returns the value for the given key. If the value is not in the cache, it will be fetched using the Accessor. - Get(key K) (*V, error) + Get(key K) (V, error) } // Accessor is function capable of fetching a value from a resource. Used by CachedAccessor when there is a cache miss. @@ -20,7 +20,7 @@ type accessResult[V any] struct { // wg.Wait() will block until the value is fetched. wg sync.WaitGroup // value is the value fetched by the Accessor, or nil if there was an error. - value *V + value V // err is the error returned by the Accessor, or nil if the fetch was successful. err error } @@ -42,19 +42,19 @@ type cachedAccessor[K comparable, V any] struct { lookupsInProgress map[K]*accessResult[V] // cache is the LRU cache used to store values fetched by the accessor. - cache *lru.Cache[K, *V] + cache *lru.Cache[K, V] // lock is used to protect the cache and lookupsInProgress map. cacheLock sync.Mutex // accessor is the function used to fetch values that are not in the cache. - accessor Accessor[K, *V] + accessor Accessor[K, V] } // NewCachedAccessor creates a new CachedAccessor. -func NewCachedAccessor[K comparable, V any](cacheSize int, accessor Accessor[K, *V]) (CachedAccessor[K, V], error) { +func NewCachedAccessor[K comparable, V any](cacheSize int, accessor Accessor[K, V]) (CachedAccessor[K, V], error) { - cache, err := lru.New[K, *V](cacheSize) + cache, err := lru.New[K, V](cacheSize) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func newAccessResult[V any]() *accessResult[V] { return result } -func (c *cachedAccessor[K, V]) Get(key K) (*V, error) { +func (c *cachedAccessor[K, V]) Get(key K) (V, error) { c.cacheLock.Lock() diff --git a/relay/cached_accessor_test.go b/relay/cache/cached_accessor_test.go similarity index 97% rename from relay/cached_accessor_test.go rename to relay/cache/cached_accessor_test.go index 791214705e..cf274ce33e 100644 --- a/relay/cached_accessor_test.go +++ b/relay/cache/cached_accessor_test.go @@ -1,4 +1,4 @@ -package relay +package cache import ( "errors" @@ -183,7 +183,7 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) { require.Equal(t, uint64(1), cacheMissCount.Load()) // The internal lookupsInProgress map should no longer contain the key. - require.Equal(t, 0, len(ca.(*cachedAccessor[int, string]).lookupsInProgress)) + require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress)) } func TestParallelAccess(t *testing.T) { @@ -252,5 +252,5 @@ func TestParallelAccessWithError(t *testing.T) { require.Equal(t, count+1, cacheMissCount.Load()) // The internal lookupsInProgress map should no longer contain the key. - require.Equal(t, 0, len(ca.(*cachedAccessor[int, string]).lookupsInProgress)) + require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress)) } diff --git a/relay/chunk_provider.go b/relay/chunk_provider.go new file mode 100644 index 0000000000..18f325f887 --- /dev/null +++ b/relay/chunk_provider.go @@ -0,0 +1,178 @@ +package relay + +import ( + "bytes" + "context" + "fmt" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/rs" + "github.com/Layr-Labs/eigenda/relay/cache" + "github.com/Layr-Labs/eigenda/relay/chunkstore" + "github.com/Layr-Labs/eigensdk-go/logging" + "sync" +) + +type chunkProvider struct { + ctx context.Context + logger logging.Logger + + // metadataCache is an LRU cache of blob metadata. Each relay is authorized to serve data assigned to one or more + // relay IDs. Blobs that do not belong to one of the relay IDs assigned to this server will not be in the cache. + frameCache cache.CachedAccessor[blobKeyWithMetadata, []*encoding.Frame] + + // chunkReader is used to read chunks from the chunk store. + chunkReader chunkstore.ChunkReader + + // concurrencyLimiter is a channel that limits the number of concurrent operations. + concurrencyLimiter chan struct{} +} + +// blobKeyWithMetadata attaches some additional metadata to a blobKey. +type blobKeyWithMetadata struct { + blobKey v2.BlobKey + metadata blobMetadata +} + +func (m *blobKeyWithMetadata) Compare(other *blobKeyWithMetadata) int { + return bytes.Compare(m.blobKey[:], other.blobKey[:]) +} + +// newChunkProvider creates a new chunkProvider. +func newChunkProvider( + ctx context.Context, + logger logging.Logger, + chunkReader chunkstore.ChunkReader, + cacheSize int, + maxIOConcurrency int) (*chunkProvider, error) { + + server := &chunkProvider{ + ctx: ctx, + logger: logger, + chunkReader: chunkReader, + concurrencyLimiter: make(chan struct{}, maxIOConcurrency), + } + + c, err := cache.NewCachedAccessor[blobKeyWithMetadata, []*encoding.Frame](cacheSize, server.fetchFrames) + if err != nil { + return nil, err + } + server.frameCache = c + + return server, nil +} + +// frameMap is a map of blob keys to frames. +type frameMap map[v2.BlobKey][]*encoding.Frame + +// GetFrames retrieves the frames for a blob. +func (s *chunkProvider) GetFrames(ctx context.Context, mMap metadataMap) (frameMap, error) { + + if len(mMap) == 0 { + return nil, fmt.Errorf("no metadata provided") + } + + keys := make([]*blobKeyWithMetadata, 0, len(mMap)) + for k, v := range mMap { + keys = append(keys, &blobKeyWithMetadata{blobKey: k, metadata: *v}) + } + + type framesResult struct { + key v2.BlobKey + data []*encoding.Frame + err error + } + + // Channel for results. + completionChannel := make(chan *framesResult, len(keys)) + + for _, key := range keys { + + boundKey := key + go func() { + frames, err := s.frameCache.Get(*boundKey) + if err != nil { + s.logger.Errorf("Failed to get frames for blob %v: %v", boundKey.blobKey, err) + completionChannel <- &framesResult{ + key: boundKey.blobKey, + err: err, + } + } else { + completionChannel <- &framesResult{ + key: boundKey.blobKey, + data: frames, + } + } + + }() + } + + fMap := make(frameMap, len(keys)) + for len(fMap) < len(keys) { + result := <-completionChannel + if result.err != nil { + return nil, fmt.Errorf("error fetching frames for blob %v: %w", result.key, result.err) + } + fMap[result.key] = result.data + } + + return fMap, nil +} + +// fetchFrames retrieves the frames for a single blob. +func (s *chunkProvider) fetchFrames(key blobKeyWithMetadata) ([]*encoding.Frame, error) { + + wg := sync.WaitGroup{} + wg.Add(1) + + var proofs []*encoding.Proof + var proofsErr error + + s.concurrencyLimiter <- struct{}{} + go func() { + defer func() { + wg.Done() + <-s.concurrencyLimiter + }() + proofs, proofsErr = s.chunkReader.GetChunkProofs(s.ctx, key.blobKey) + }() + + fragmentInfo := &encoding.FragmentInfo{ + TotalChunkSizeBytes: key.metadata.totalChunkSizeBytes, + FragmentSizeBytes: key.metadata.fragmentSizeBytes, + } + + coefficients, err := s.chunkReader.GetChunkCoefficients(s.ctx, key.blobKey, fragmentInfo) + if err != nil { + return nil, err + } + + wg.Wait() + if proofsErr != nil { + return nil, proofsErr + } + + frames, err := assembleFrames(coefficients, proofs) + if err != nil { + return nil, err + } + + return frames, nil +} + +// assembleFrames assembles a slice of frames from its composite proofs and coefficients. +func assembleFrames(frames []*rs.Frame, proofs []*encoding.Proof) ([]*encoding.Frame, error) { + if len(frames) != len(proofs) { + return nil, fmt.Errorf("number of frames and proofs must be equal (%d != %d)", len(frames), len(proofs)) + } + + assembledFrames := make([]*encoding.Frame, len(frames)) + for i := range frames { + assembledFrames[i] = &encoding.Frame{ + Proof: *proofs[i], + Coeffs: frames[i].Coeffs, + } + } + + return assembledFrames, nil +} diff --git a/relay/chunk_provider_test.go b/relay/chunk_provider_test.go new file mode 100644 index 0000000000..b768210d77 --- /dev/null +++ b/relay/chunk_provider_test.go @@ -0,0 +1,158 @@ +package relay + +import ( + "context" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/stretchr/testify/require" + "testing" +) + +func TestFetchingIndividualBlobs(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + chunkReader, chunkWriter := buildChunkStore(t, logger) + + expectedFrames := make(map[v2.BlobKey][]*encoding.Frame) + fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + // Write some data. + blobCount := 10 + for i := 0; i < blobCount; i++ { + + header, _, frames := randomBlobChunks(t) + blobKey, err := header.BlobKey() + require.NoError(t, err) + + rsFrames, proofs := disassembleFrames(frames) + + err = chunkWriter.PutChunkProofs(context.Background(), blobKey, proofs) + require.NoError(t, err) + + fragmentInfo, err := chunkWriter.PutChunkCoefficients(context.Background(), blobKey, rsFrames) + require.NoError(t, err) + + expectedFrames[blobKey] = frames + fragmentInfoMap[blobKey] = fragmentInfo + } + + server, err := newChunkProvider(context.Background(), logger, chunkReader, 10, 32) + require.NoError(t, err) + + // Read it back. + for key, frames := range expectedFrames { + + mMap := make(metadataMap) + fragmentInfo := fragmentInfoMap[key] + mMap[key] = &blobMetadata{ + totalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + fragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + } + + fMap, err := server.GetFrames(context.Background(), mMap) + require.NoError(t, err) + + require.Equal(t, 1, len(fMap)) + readFrames := (fMap)[key] + require.NotNil(t, readFrames) + + // TODO: when I inspect this data using a debugger, the proofs are all made up of 0s... something + // is wrong with the way the data is generated in the test. + require.Equal(t, frames, readFrames) + } + + // Read it back again to test caching. + for key, frames := range expectedFrames { + + mMap := make(metadataMap) + fragmentInfo := fragmentInfoMap[key] + mMap[key] = &blobMetadata{ + totalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + fragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + } + + fMap, err := server.GetFrames(context.Background(), mMap) + require.NoError(t, err) + + require.Equal(t, 1, len(fMap)) + readFrames := (fMap)[key] + require.NotNil(t, readFrames) + + require.Equal(t, frames, readFrames) + } +} + +func TestFetchingBatchedBlobs(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + chunkReader, chunkWriter := buildChunkStore(t, logger) + + expectedFrames := make(map[v2.BlobKey][]*encoding.Frame) + fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + // Write some data. + blobCount := 10 + for i := 0; i < blobCount; i++ { + + header, _, frames := randomBlobChunks(t) + blobKey, err := header.BlobKey() + require.NoError(t, err) + + rsFrames, proofs := disassembleFrames(frames) + + err = chunkWriter.PutChunkProofs(context.Background(), blobKey, proofs) + require.NoError(t, err) + + fragmentInfo, err := chunkWriter.PutChunkCoefficients(context.Background(), blobKey, rsFrames) + require.NoError(t, err) + + expectedFrames[blobKey] = frames + fragmentInfoMap[blobKey] = fragmentInfo + } + + server, err := newChunkProvider(context.Background(), logger, chunkReader, 10, 32) + require.NoError(t, err) + + // Read it back. + batchSize := 3 + for i := 0; i < 10; i++ { + + mMap := make(metadataMap) + for key := range expectedFrames { + mMap[key] = &blobMetadata{ + totalChunkSizeBytes: fragmentInfoMap[key].TotalChunkSizeBytes, + fragmentSizeBytes: fragmentInfoMap[key].FragmentSizeBytes, + } + if len(mMap) == batchSize { + break + } + } + + fMap, err := server.GetFrames(context.Background(), mMap) + require.NoError(t, err) + + require.Equal(t, batchSize, len(fMap)) + for key := range mMap { + + readFrames := (fMap)[key] + require.NotNil(t, readFrames) + + expectedFramesForBlob := expectedFrames[key] + require.Equal(t, expectedFramesForBlob, readFrames) + } + } +} diff --git a/relay/chunkstore/chunk_reader.go b/relay/chunkstore/chunk_reader.go index 8945db6c08..d6206de32a 100644 --- a/relay/chunkstore/chunk_reader.go +++ b/relay/chunkstore/chunk_reader.go @@ -30,7 +30,6 @@ type chunkReader struct { logger logging.Logger client s3.Client bucket string - shards []uint32 } // NewChunkReader creates a new ChunkReader. @@ -40,14 +39,12 @@ type chunkReader struct { func NewChunkReader( logger logging.Logger, s3Client s3.Client, - bucketName string, - shards []uint32) ChunkReader { + bucketName string) ChunkReader { return &chunkReader{ logger: logger, client: s3Client, bucket: bucketName, - shards: shards, } } diff --git a/relay/chunkstore/chunk_store_test.go b/relay/chunkstore/chunk_store_test.go index 1f985971f3..f1f8300a64 100644 --- a/relay/chunkstore/chunk_store_test.go +++ b/relay/chunkstore/chunk_store_test.go @@ -152,7 +152,7 @@ func RandomProofsTest(t *testing.T, client s3.Client) { fragmentSize := rand.Intn(1024) + 100 // ignored since we aren't writing coefficients writer := NewChunkWriter(logger, client, bucket, fragmentSize) - reader := NewChunkReader(logger, client, bucket, make([]uint32, 0)) + reader := NewChunkReader(logger, client, bucket) expectedValues := make(map[corev2.BlobKey][]*encoding.Proof) @@ -227,7 +227,7 @@ func RandomCoefficientsTest(t *testing.T, client s3.Client) { require.NotNil(t, encoder) writer := NewChunkWriter(logger, client, bucket, fragmentSize) - reader := NewChunkReader(logger, client, bucket, make([]uint32, 0)) + reader := NewChunkReader(logger, client, bucket) expectedValues := make(map[corev2.BlobKey][]*rs.Frame) metadataMap := make(map[corev2.BlobKey]*encoding.FragmentInfo) diff --git a/relay/config.go b/relay/config.go new file mode 100644 index 0000000000..c44349190e --- /dev/null +++ b/relay/config.go @@ -0,0 +1,44 @@ +package relay + +import core "github.com/Layr-Labs/eigenda/core/v2" + +// Config is the configuration for the relay Server. +type Config struct { + + // RelayIDs contains the IDs of the relays that this server is willing to serve data for. If empty, the server will + // serve data for any shard it can. + RelayIDs []core.RelayKey + + // MetadataCacheSize is the maximum number of items in the metadata cache. Default is 1024 * 1024. + MetadataCacheSize int + + // MetadataMaxConcurrency puts a limit on the maximum number of concurrent metadata fetches actively running on + // goroutines. Default is 32. + MetadataMaxConcurrency int + + // BlobCacheSize is the maximum number of items in the blob cache. Default is 32. + BlobCacheSize int + + // BlobMaxConcurrency puts a limit on the maximum number of concurrent blob fetches actively running on goroutines. + // Default is 32. + BlobMaxConcurrency int + + // ChunkCacheSize is the maximum number of items in the chunk cache. Default is 32. + ChunkCacheSize int + + // ChunkMaxConcurrency is the size of the work pool for fetching chunks. Default is 32. Note that this does not + // impact concurrency utilized by the s3 client to upload/download fragmented files. + ChunkMaxConcurrency int +} + +// DefaultConfig returns the default configuration for the relay Server. +func DefaultConfig() *Config { + return &Config{ + MetadataCacheSize: 1024 * 1024, + MetadataMaxConcurrency: 32, + BlobCacheSize: 32, + BlobMaxConcurrency: 32, + ChunkCacheSize: 32, + ChunkMaxConcurrency: 32, + } +} diff --git a/relay/metadata_provider.go b/relay/metadata_provider.go new file mode 100644 index 0000000000..6a12964e1d --- /dev/null +++ b/relay/metadata_provider.go @@ -0,0 +1,169 @@ +package relay + +import ( + "context" + "fmt" + "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/relay/cache" + "github.com/Layr-Labs/eigensdk-go/logging" + "sync/atomic" +) + +// Metadata about a blob. The relay only needs a small subset of a blob's metadata. +// This struct adds caching and threading on top of blobstore.BlobMetadataStore. +type blobMetadata struct { + // the size of the blob in bytes + blobSizeBytes uint32 + // the size of the file containing the encoded chunks + totalChunkSizeBytes uint32 + // the fragment size used for uploading the encoded chunks + fragmentSizeBytes uint32 +} + +// metadataProvider encapsulates logic for fetching metadata for blobs. Utilized by the relay Server. +type metadataProvider struct { + ctx context.Context + logger logging.Logger + + // metadataStore can be used to read blob metadata from dynamoDB. + metadataStore *blobstore.BlobMetadataStore + + // metadataCache is an LRU cache of blob metadata. Blobs that do not belong to one of the relay shards + // assigned to this server will not be in the cache. + metadataCache cache.CachedAccessor[v2.BlobKey, *blobMetadata] + + // relayIDSet is the set of relay IDs assigned to this relay. This relay will refuse to serve metadata for blobs + // that are not assigned to one of these IDs. + relayIDSet map[v2.RelayKey]struct{} + + // concurrencyLimiter is a channel that limits the number of concurrent operations. + concurrencyLimiter chan struct{} +} + +// newMetadataProvider creates a new metadataProvider. +func newMetadataProvider( + ctx context.Context, + logger logging.Logger, + metadataStore *blobstore.BlobMetadataStore, + metadataCacheSize int, + maxIOConcurrency int, + relayIDs []v2.RelayKey) (*metadataProvider, error) { + + relayIDSet := make(map[v2.RelayKey]struct{}, len(relayIDs)) + for _, id := range relayIDs { + relayIDSet[id] = struct{}{} + } + + server := &metadataProvider{ + ctx: ctx, + logger: logger, + metadataStore: metadataStore, + relayIDSet: relayIDSet, + concurrencyLimiter: make(chan struct{}, maxIOConcurrency), + } + + metadataCache, err := cache.NewCachedAccessor[v2.BlobKey, *blobMetadata](metadataCacheSize, server.fetchMetadata) + if err != nil { + return nil, fmt.Errorf("error creating metadata cache: %w", err) + } + + server.metadataCache = metadataCache + + return server, nil +} + +// metadataMap is a map of blob keys to metadata. +type metadataMap map[v2.BlobKey]*blobMetadata + +// GetMetadataForBlobs retrieves metadata about multiple blobs in parallel. +func (m *metadataProvider) GetMetadataForBlobs(keys []v2.BlobKey) (metadataMap, error) { + + // blobMetadataResult is the result of a metadata fetch operation. + type blobMetadataResult struct { + key v2.BlobKey + metadata *blobMetadata + err error + } + + // Completed operations will send a result to this channel. + completionChannel := make(chan *blobMetadataResult, len(keys)) + + // Set when the first error is encountered. Useful for preventing new operations from starting. + hadError := atomic.Bool{} + + for _, key := range keys { + if hadError.Load() { + // Don't bother starting new operations if we've already encountered an error. + break + } + + boundKey := key + m.concurrencyLimiter <- struct{}{} + go func() { + defer func() { + <-m.concurrencyLimiter + }() + + metadata, err := m.metadataCache.Get(boundKey) + if err != nil { + // Intentionally log at debug level. External users can force this condition to trigger + // by requesting metadata for a blob that does not exist, and so it's important to avoid + // allowing hooligans to spam the logs in production environments. + m.logger.Debugf("error retrieving metadata for blob %s: %v", boundKey.Hex(), err) + hadError.Store(true) + completionChannel <- &blobMetadataResult{ + key: boundKey, + err: err, + } + } + + completionChannel <- &blobMetadataResult{ + key: boundKey, + metadata: metadata, + } + }() + } + + mMap := make(metadataMap) + for len(mMap) < len(keys) { + result := <-completionChannel + if result.err != nil { + return nil, fmt.Errorf("error fetching metadata for blob %s: %w", result.key.Hex(), result.err) + } + mMap[result.key] = result.metadata + } + + return mMap, nil +} + +// fetchMetadata retrieves metadata about a blob. Fetches from the cache if available, otherwise from the store. +func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) { + // Retrieve the metadata from the store. + cert, fragmentInfo, err := m.metadataStore.GetBlobCertificate(m.ctx, key) + if err != nil { + return nil, fmt.Errorf("error retrieving metadata for blob %s: %w", key.Hex(), err) + } + + if len(m.relayIDSet) > 0 { + validShard := false + for _, shard := range cert.RelayKeys { + if _, ok := m.relayIDSet[shard]; ok { + validShard = true + break + } + } + + if !validShard { + return nil, fmt.Errorf("blob %s is not assigned to this relay", key.Hex()) + } + } + + metadata := &blobMetadata{ + blobSizeBytes: 0, /* Future work: populate this once it is added to the metadata store */ + totalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + fragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + } + + return metadata, nil +} diff --git a/relay/metadata_provider_test.go b/relay/metadata_provider_test.go new file mode 100644 index 0000000000..e5586d901b --- /dev/null +++ b/relay/metadata_provider_test.go @@ -0,0 +1,418 @@ +package relay + +import ( + "context" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "math/rand" + "testing" +) + +func TestGetNonExistentBlob(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + metadataStore := buildMetadataStore(t) + + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil) + require.NoError(t, err) + + // Try to fetch a non-existent blobs + for i := 0; i < 10; i++ { + _, err := server.GetMetadataForBlobs([]v2.BlobKey{v2.BlobKey(tu.RandomBytes(32))}) + require.Error(t, err) + } +} + +func TestFetchingIndividualMetadata(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + metadataStore := buildMetadataStore(t) + + totalChunkSizeMap := make(map[v2.BlobKey]uint32) + fragmentSizeMap := make(map[v2.BlobKey]uint32) + + // Write some metadata + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, _ := randomBlob(t) + blobKey, err := header.BlobKey() + require.NoError(t, err) + + totalChunkSizeBytes := uint32(rand.Intn(1024 * 1024 * 1024)) + fragmentSizeBytes := uint32(rand.Intn(1024 * 1024)) + + totalChunkSizeMap[blobKey] = totalChunkSizeBytes + fragmentSizeMap[blobKey] = fragmentSizeBytes + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: totalChunkSizeBytes, + FragmentSizeBytes: fragmentSizeBytes, + }) + require.NoError(t, err) + } + + // Sanity check, make sure the metadata is in the low level store + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + cert, fragmentInfo, err := metadataStore.GetBlobCertificate(context.Background(), blobKey) + require.NoError(t, err) + require.NotNil(t, cert) + require.NotNil(t, fragmentInfo) + require.Equal(t, totalChunkSizeBytes, fragmentInfo.TotalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) + } + + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil) + require.NoError(t, err) + + // Fetch the metadata from the server. + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + mMap, err := server.GetMetadataForBlobs([]v2.BlobKey{blobKey}) + require.NoError(t, err) + require.Equal(t, 1, len(mMap)) + metadata := mMap[blobKey] + require.NotNil(t, metadata) + require.Equal(t, totalChunkSizeBytes, metadata.totalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], metadata.fragmentSizeBytes) + } + + // Read it back again. This uses a different code pathway due to the cache. + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + mMap, err := server.GetMetadataForBlobs([]v2.BlobKey{blobKey}) + require.NoError(t, err) + require.Equal(t, 1, len(mMap)) + metadata := mMap[blobKey] + require.NotNil(t, metadata) + require.Equal(t, totalChunkSizeBytes, metadata.totalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], metadata.fragmentSizeBytes) + } +} + +func TestBatchedFetch(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + metadataStore := buildMetadataStore(t) + + totalChunkSizeMap := make(map[v2.BlobKey]uint32) + fragmentSizeMap := make(map[v2.BlobKey]uint32) + + // Write some metadata + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, _ := randomBlob(t) + blobKey, err := header.BlobKey() + require.NoError(t, err) + + totalChunkSizeBytes := uint32(rand.Intn(1024 * 1024 * 1024)) + fragmentSizeBytes := uint32(rand.Intn(1024 * 1024)) + + totalChunkSizeMap[blobKey] = totalChunkSizeBytes + fragmentSizeMap[blobKey] = fragmentSizeBytes + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: totalChunkSizeBytes, + FragmentSizeBytes: fragmentSizeBytes, + }) + require.NoError(t, err) + } + + // Sanity check, make sure the metadata is in the low level store + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + cert, fragmentInfo, err := metadataStore.GetBlobCertificate(context.Background(), blobKey) + require.NoError(t, err) + require.NotNil(t, cert) + require.NotNil(t, fragmentInfo) + require.Equal(t, totalChunkSizeBytes, fragmentInfo.TotalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) + } + + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, nil) + require.NoError(t, err) + + // Each iteration, choose a random subset of the keys to fetch + for i := 0; i < 10; i++ { + keyCount := rand.Intn(blobCount) + 1 + keys := make([]v2.BlobKey, 0, keyCount) + for key := range totalChunkSizeMap { + keys = append(keys, key) + if len(keys) == keyCount { + break + } + } + + mMap, err := server.GetMetadataForBlobs(keys) + require.NoError(t, err) + + assert.Equal(t, keyCount, len(mMap)) + for _, key := range keys { + metadata := mMap[key] + require.NotNil(t, metadata) + require.Equal(t, totalChunkSizeMap[key], metadata.totalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[key], metadata.fragmentSizeBytes) + } + } +} + +func TestIndividualFetchWithSharding(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + metadataStore := buildMetadataStore(t) + + totalChunkSizeMap := make(map[v2.BlobKey]uint32) + fragmentSizeMap := make(map[v2.BlobKey]uint32) + shardMap := make(map[v2.BlobKey][]v2.RelayKey) + + shardCount := rand.Intn(10) + 10 + shardList := make([]v2.RelayKey, 0) + shardSet := make(map[v2.RelayKey]struct{}) + for i := 0; i < shardCount; i++ { + if i%2 == 0 { + shardList = append(shardList, v2.RelayKey(i)) + shardSet[v2.RelayKey(i)] = struct{}{} + } + } + + // Write some metadata + blobCount := 100 + for i := 0; i < blobCount; i++ { + header, _ := randomBlob(t) + blobKey, err := header.BlobKey() + require.NoError(t, err) + + totalChunkSizeBytes := uint32(rand.Intn(1024 * 1024 * 1024)) + fragmentSizeBytes := uint32(rand.Intn(1024 * 1024)) + + totalChunkSizeMap[blobKey] = totalChunkSizeBytes + fragmentSizeMap[blobKey] = fragmentSizeBytes + + // Assign two shards to each blob. + shard1 := v2.RelayKey(rand.Intn(shardCount)) + shard2 := v2.RelayKey(rand.Intn(shardCount)) + shards := []v2.RelayKey{shard1, shard2} + shardMap[blobKey] = shards + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + RelayKeys: shards, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: totalChunkSizeBytes, + FragmentSizeBytes: fragmentSizeBytes, + }) + require.NoError(t, err) + } + + // Sanity check, make sure the metadata is in the low level store + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + cert, fragmentInfo, err := metadataStore.GetBlobCertificate(context.Background(), blobKey) + require.NoError(t, err) + require.NotNil(t, cert) + require.NotNil(t, fragmentInfo) + require.Equal(t, totalChunkSizeBytes, fragmentInfo.TotalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) + } + + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList) + require.NoError(t, err) + + // Fetch the metadata from the server. + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + isBlobInCorrectShard := false + blobShards := shardMap[blobKey] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + mMap, err := server.GetMetadataForBlobs([]v2.BlobKey{blobKey}) + + if isBlobInCorrectShard { + // The blob is in the relay's shard, should be returned like normal + require.NoError(t, err) + require.Equal(t, 1, len(mMap)) + metadata := mMap[blobKey] + require.NotNil(t, metadata) + require.Equal(t, totalChunkSizeBytes, metadata.totalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], metadata.fragmentSizeBytes) + } else { + // the blob is not in the relay's shard, should return an error + require.Error(t, err) + } + } + + // Read it back again. This uses a different code pathway due to the cache. + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + isBlobInCorrectShard := false + blobShards := shardMap[blobKey] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + mMap, err := server.GetMetadataForBlobs([]v2.BlobKey{blobKey}) + + if isBlobInCorrectShard { + // The blob is in the relay's shard, should be returned like normal + require.NoError(t, err) + require.Equal(t, 1, len(mMap)) + metadata := mMap[blobKey] + require.NotNil(t, metadata) + require.Equal(t, totalChunkSizeBytes, metadata.totalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], metadata.fragmentSizeBytes) + } else { + // the blob is not in the relay's shard, should return an error + require.Error(t, err) + } + } +} + +func TestBatchedFetchWithSharding(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + metadataStore := buildMetadataStore(t) + + totalChunkSizeMap := make(map[v2.BlobKey]uint32) + fragmentSizeMap := make(map[v2.BlobKey]uint32) + shardMap := make(map[v2.BlobKey][]v2.RelayKey) + + shardCount := rand.Intn(10) + 10 + shardList := make([]v2.RelayKey, 0) + shardSet := make(map[v2.RelayKey]struct{}) + for i := 0; i < shardCount; i++ { + if i%2 == 0 { + shardList = append(shardList, v2.RelayKey(i)) + shardSet[v2.RelayKey(i)] = struct{}{} + } + } + + // Write some metadata + blobCount := 100 + for i := 0; i < blobCount; i++ { + header, _ := randomBlob(t) + blobKey, err := header.BlobKey() + require.NoError(t, err) + + totalChunkSizeBytes := uint32(rand.Intn(1024 * 1024 * 1024)) + fragmentSizeBytes := uint32(rand.Intn(1024 * 1024)) + + totalChunkSizeMap[blobKey] = totalChunkSizeBytes + fragmentSizeMap[blobKey] = fragmentSizeBytes + + // Assign two shards to each blob. + shard1 := v2.RelayKey(rand.Intn(shardCount)) + shard2 := v2.RelayKey(rand.Intn(shardCount)) + shards := []v2.RelayKey{shard1, shard2} + shardMap[blobKey] = shards + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + RelayKeys: shards, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: totalChunkSizeBytes, + FragmentSizeBytes: fragmentSizeBytes, + }) + require.NoError(t, err) + } + + // Sanity check, make sure the metadata is in the low level store + for blobKey, totalChunkSizeBytes := range totalChunkSizeMap { + cert, fragmentInfo, err := metadataStore.GetBlobCertificate(context.Background(), blobKey) + require.NoError(t, err) + require.NotNil(t, cert) + require.NotNil(t, fragmentInfo) + require.Equal(t, totalChunkSizeBytes, fragmentInfo.TotalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[blobKey], fragmentInfo.FragmentSizeBytes) + } + + server, err := newMetadataProvider(context.Background(), logger, metadataStore, 1024*1024, 32, shardList) + require.NoError(t, err) + + // Each iteration, choose two random keys to fetch. There will be a 25% chance that both blobs map to valid shards. + for i := 0; i < 100; i++ { + + keyCount := 2 + keys := make([]v2.BlobKey, 0, keyCount) + areKeysInCorrectShard := true + for key := range totalChunkSizeMap { + keys = append(keys, key) + + keyShards := shardMap[key] + keyIsInShard := false + for _, shard := range keyShards { + if _, ok := shardSet[shard]; ok { + keyIsInShard = true + break + } + } + if !keyIsInShard { + // If both keys are not in the shard, we expect an error. + areKeysInCorrectShard = false + } + + if len(keys) == keyCount { + break + } + } + + mMap, err := server.GetMetadataForBlobs(keys) + if areKeysInCorrectShard { + require.NoError(t, err) + assert.Equal(t, keyCount, len(mMap)) + for _, key := range keys { + metadata := mMap[key] + require.NotNil(t, metadata) + require.Equal(t, totalChunkSizeMap[key], metadata.totalChunkSizeBytes) + require.Equal(t, fragmentSizeMap[key], metadata.fragmentSizeBytes) + } + } else { + require.Error(t, err) + } + } +} diff --git a/relay/relay_test_utils.go b/relay/relay_test_utils.go new file mode 100644 index 0000000000..3c6d297b9e --- /dev/null +++ b/relay/relay_test_utils.go @@ -0,0 +1,227 @@ +package relay + +import ( + "context" + "fmt" + pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common" + pbcommonv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/common/aws/s3" + tu "github.com/Layr-Labs/eigenda/common/testutils" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/kzg" + p "github.com/Layr-Labs/eigenda/encoding/kzg/prover" + "github.com/Layr-Labs/eigenda/encoding/rs" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/Layr-Labs/eigenda/relay/chunkstore" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/google/uuid" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + "log" + "math/big" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" +) + +var ( + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + prover *p.Prover + bucketName = fmt.Sprintf("test-bucket-%v", UUID) +) + +const ( + localstackPort = "4570" + localstackHost = "http://0.0.0.0:4570" +) + +func setup(t *testing.T) { + deployLocalStack := !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + + _, b, _, _ := runtime.Caller(0) + rootPath := filepath.Join(filepath.Dir(b), "..") + changeDirectory(filepath.Join(rootPath, "inabox")) + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localstackPort) + require.NoError(t, err) + } + + // Only set up the prover once, it's expensive + if prover == nil { + config := &kzg.KzgConfig{ + G1Path: "./resources/kzg/g1.point.300000", + G2Path: "./resources/kzg/g2.point.300000", + CacheDir: "./resources/kzg/SRSTables", + SRSOrder: 8192, + SRSNumberToLoad: 8192, + NumWorker: uint64(runtime.GOMAXPROCS(0)), + } + var err error + prover, err = p.NewProver(config, true) + require.NoError(t, err) + } +} + +func changeDirectory(path string) { + err := os.Chdir(path) + if err != nil { + log.Panicf("Failed to change directories. Error: %s", err) + } + + newDir, err := os.Getwd() + if err != nil { + log.Panicf("Failed to get working directory. Error: %s", err) + } + log.Printf("Current Working Directory: %s\n", newDir) +} + +func teardown() { + deployLocalStack := !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} + +func buildMetadataStore(t *testing.T) *blobstore.BlobMetadataStore { + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + err = os.Setenv("AWS_ACCESS_KEY_ID", "localstack") + require.NoError(t, err) + err = os.Setenv("AWS_SECRET_ACCESS_KEY", "localstack") + require.NoError(t, err) + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: localstackHost, + } + + _, err = test_utils.CreateTable( + context.Background(), + cfg, + metadataTableName, + blobstore.GenerateTableSchema(metadataTableName, 10, 10)) + if err != nil { + if !strings.Contains(err.Error(), "ResourceInUseException: Table already exists") { + require.NoError(t, err) + } + } + + dynamoClient, err := dynamodb.NewClient(cfg, logger) + require.NoError(t, err) + + return blobstore.NewBlobMetadataStore( + dynamoClient, + logger, + metadataTableName) +} + +func buildBlobStore(t *testing.T, logger logging.Logger) *blobstore.BlobStore { + cfg := aws.DefaultClientConfig() + cfg.Region = "us-east-1" + cfg.AccessKey = "localstack" + cfg.SecretAccessKey = "localstack" + cfg.EndpointURL = localstackHost + + client, err := s3.NewClient(context.Background(), *cfg, logger) + require.NoError(t, err) + + err = client.CreateBucket(context.Background(), bucketName) + require.NoError(t, err) + + return blobstore.NewBlobStore(bucketName, client, logger) +} + +func buildChunkStore(t *testing.T, logger logging.Logger) (chunkstore.ChunkReader, chunkstore.ChunkWriter) { + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: localstackHost, + FragmentWriteTimeout: time.Duration(10) * time.Second, + FragmentReadTimeout: time.Duration(10) * time.Second, + } + + client, err := s3.NewClient(context.Background(), cfg, logger) + require.NoError(t, err) + + err = client.CreateBucket(context.Background(), bucketName) + require.NoError(t, err) + + // intentionally use very small fragment size + chunkWriter := chunkstore.NewChunkWriter(logger, client, bucketName, 32) + chunkReader := chunkstore.NewChunkReader(logger, client, bucketName) + + return chunkReader, chunkWriter +} + +func randomBlob(t *testing.T) (*v2.BlobHeader, []byte) { + + data := tu.RandomBytes(128) + + data = codec.ConvertByPaddingEmptyByte(data) + commitments, err := prover.GetCommitments(data) + require.NoError(t, err) + require.NoError(t, err) + commitmentProto, err := commitments.ToProtobuf() + require.NoError(t, err) + + blobHeaderProto := &pbcommonv2.BlobHeader{ + Version: 0, + QuorumNumbers: []uint32{0, 1}, + Commitment: commitmentProto, + PaymentHeader: &pbcommon.PaymentHeader{ + AccountId: tu.RandomString(10), + BinIndex: 5, + CumulativePayment: big.NewInt(100).Bytes(), + }, + } + blobHeader, err := v2.NewBlobHeader(blobHeaderProto) + require.NoError(t, err) + + return blobHeader, data +} + +func randomBlobChunks(t *testing.T) (*v2.BlobHeader, []byte, []*encoding.Frame) { + header, data := randomBlob(t) + + params := encoding.ParamsFromMins(16, 16) + _, frames, err := prover.EncodeAndProve(data, params) + require.NoError(t, err) + + return header, data, frames +} + +func disassembleFrames(frames []*encoding.Frame) ([]*rs.Frame, []*encoding.Proof) { + rsFrames := make([]*rs.Frame, len(frames)) + proofs := make([]*encoding.Proof, len(frames)) + + for i, frame := range frames { + rsFrames[i] = &rs.Frame{ + Coeffs: frame.Coeffs, + } + proofs[i] = &frame.Proof + } + + return rsFrames, proofs +} diff --git a/relay/server.go b/relay/server.go new file mode 100644 index 0000000000..dc006e43a3 --- /dev/null +++ b/relay/server.go @@ -0,0 +1,212 @@ +package relay + +import ( + "context" + "fmt" + pb "github.com/Layr-Labs/eigenda/api/grpc/relay" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/relay/chunkstore" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +var _ pb.RelayServer = &Server{} + +// Server implements the Relay service defined in api/proto/relay/relay.proto +type Server struct { + pb.UnimplementedRelayServer + + // metadataProvider encapsulates logic for fetching metadata for blobs. + metadataProvider *metadataProvider + + // blobProvider encapsulates logic for fetching blobs. + blobProvider *blobProvider + + // chunkProvider encapsulates logic for fetching chunks. + chunkProvider *chunkProvider +} + +// NewServer creates a new relay Server. +func NewServer( + ctx context.Context, + logger logging.Logger, + config *Config, + metadataStore *blobstore.BlobMetadataStore, + blobStore *blobstore.BlobStore, + chunkReader chunkstore.ChunkReader) (*Server, error) { + + ms, err := newMetadataProvider( + ctx, + logger, + metadataStore, + config.MetadataCacheSize, + config.MetadataMaxConcurrency, + config.RelayIDs) + if err != nil { + return nil, fmt.Errorf("error creating metadata server: %w", err) + } + + bs, err := newBlobProvider( + ctx, + logger, + blobStore, + config.BlobCacheSize, + config.BlobMaxConcurrency) + if err != nil { + return nil, fmt.Errorf("error creating blob server: %w", err) + } + + cs, err := newChunkProvider( + ctx, + logger, + chunkReader, + config.ChunkCacheSize, + config.ChunkMaxConcurrency) + if err != nil { + return nil, fmt.Errorf("error creating chunk server: %w", err) + } + + return &Server{ + metadataProvider: ms, + blobProvider: bs, + chunkProvider: cs, + }, nil +} + +// GetBlob retrieves a blob stored by the relay. +func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.GetBlobReply, error) { + + // Future work : + // - global throttle + // - per-connection throttle + // - timeouts + + key, err := v2.BytesToBlobKey(request.BlobKey) + if err != nil { + return nil, fmt.Errorf("invalid blob key: %w", err) + } + + keys := []v2.BlobKey{key} + mMap, err := s.metadataProvider.GetMetadataForBlobs(keys) + if err != nil { + return nil, fmt.Errorf( + "error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err) + } + metadata := mMap[v2.BlobKey(request.BlobKey)] + if metadata == nil { + return nil, fmt.Errorf("blob not found") + } + + data, err := s.blobProvider.GetBlob(key) + if err != nil { + return nil, fmt.Errorf("error fetching blob %s: %w", key.Hex(), err) + } + + reply := &pb.GetBlobReply{ + Blob: data, + } + + return reply, nil +} + +// GetChunks retrieves chunks from blobs stored by the relay. +func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*pb.GetChunksReply, error) { + + // Future work: + // - authentication + // - global throttle + // - per-connection throttle + // - timeouts + + if len(request.ChunkRequests) <= 0 { + return nil, fmt.Errorf("no chunk requests provided") + } + + keys := make([]v2.BlobKey, 0, len(request.ChunkRequests)) + + for _, chunkRequest := range request.ChunkRequests { + var key v2.BlobKey + if chunkRequest.GetByIndex() != nil { + var err error + key, err = v2.BytesToBlobKey(chunkRequest.GetByIndex().GetBlobKey()) + if err != nil { + return nil, fmt.Errorf("invalid blob key: %w", err) + } + } else { + var err error + key, err = v2.BytesToBlobKey(chunkRequest.GetByRange().GetBlobKey()) + if err != nil { + return nil, fmt.Errorf("invalid blob key: %w", err) + } + } + keys = append(keys, key) + } + + mMap, err := s.metadataProvider.GetMetadataForBlobs(keys) + if err != nil { + return nil, fmt.Errorf( + "error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err) + } + + frames, err := s.chunkProvider.GetFrames(ctx, mMap) + if err != nil { + return nil, fmt.Errorf("error fetching frames: %w", err) + } + + bytesToSend := make([][]byte, 0, len(keys)) + + // return data in the order that it was requested + for _, chunkRequest := range request.ChunkRequests { + + framesToSend := make([]*encoding.Frame, 0) + + if chunkRequest.GetByIndex() != nil { + key := v2.BlobKey(chunkRequest.GetByIndex().GetBlobKey()) + blobFrames := (frames)[key] + + for index := range chunkRequest.GetByIndex().ChunkIndices { + + if index >= len(blobFrames) { + return nil, fmt.Errorf( + "chunk index %d out of range for key %s, chunk count %d", + index, key.Hex(), len(blobFrames)) + } + + framesToSend = append(framesToSend, blobFrames[index]) + } + + } else { + key := v2.BlobKey(chunkRequest.GetByRange().GetBlobKey()) + startIndex := chunkRequest.GetByRange().StartIndex + endIndex := chunkRequest.GetByRange().EndIndex + + blobFrames := (frames)[key] + + if startIndex > endIndex { + return nil, fmt.Errorf( + "chunk range %d-%d is invalid for key %s, start index must be less than or equal to end index", + startIndex, endIndex, key.Hex()) + } + if endIndex > uint32(len((frames)[key])) { + return nil, fmt.Errorf( + "chunk range %d-%d is invald for key %s, chunk count %d", + chunkRequest.GetByRange().StartIndex, chunkRequest.GetByRange().EndIndex, key, len(blobFrames)) + } + + framesToSend = append(framesToSend, blobFrames[startIndex:endIndex]...) + } + + bundle := core.Bundle(framesToSend) + bundleBytes, err := bundle.Serialize() + if err != nil { + return nil, fmt.Errorf("error serializing bundle: %w", err) + } + bytesToSend = append(bytesToSend, bundleBytes) + } + + return &pb.GetChunksReply{ + Data: bytesToSend, + }, nil +} diff --git a/relay/server_test.go b/relay/server_test.go new file mode 100644 index 0000000000..de83c15eff --- /dev/null +++ b/relay/server_test.go @@ -0,0 +1,939 @@ +package relay + +import ( + "context" + pb "github.com/Layr-Labs/eigenda/api/grpc/relay" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/stretchr/testify/require" + "math/rand" + "testing" +) + +func TestReadWriteBlobs(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + blobStore := buildBlobStore(t, logger) + + // This is the server used to read it back + config := DefaultConfig() + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + blobStore, + nil /* not used in this test*/) + require.NoError(t, err) + + expectedData := make(map[v2.BlobKey][]byte) + + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, data := randomBlob(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = data + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + }, + &encoding.FragmentInfo{}) + require.NoError(t, err) + + err = blobStore.StoreBlob(context.Background(), blobKey, data) + require.NoError(t, err) + } + + // Read the blobs back. + for key, data := range expectedData { + request := &pb.GetBlobRequest{ + BlobKey: key[:], + } + + response, err := server.GetBlob(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, data, response.Blob) + } + + // Read the blobs back again to test caching. + for key, data := range expectedData { + request := &pb.GetBlobRequest{ + BlobKey: key[:], + } + + response, err := server.GetBlob(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, data, response.Blob) + } +} + +func TestReadNonExistentBlob(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + blobStore := buildBlobStore(t, logger) + + // This is the server used to read it back + config := DefaultConfig() + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + blobStore, + nil /* not used in this test */) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + request := &pb.GetBlobRequest{ + BlobKey: tu.RandomBytes(32), + } + + response, err := server.GetBlob(context.Background(), request) + require.Error(t, err) + require.Nil(t, response) + } +} + +func TestReadWriteBlobsWithSharding(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + blobStore := buildBlobStore(t, logger) + + shardCount := rand.Intn(10) + 10 + shardList := make([]v2.RelayKey, 0) + shardSet := make(map[v2.RelayKey]struct{}) + for i := 0; i < shardCount; i++ { + if i%2 == 0 { + shardList = append(shardList, v2.RelayKey(i)) + shardSet[v2.RelayKey(i)] = struct{}{} + } + } + + // This is the server used to read it back + config := DefaultConfig() + config.RelayIDs = shardList + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + blobStore, + nil /* not used in this test*/) + require.NoError(t, err) + + expectedData := make(map[v2.BlobKey][]byte) + shardMap := make(map[v2.BlobKey][]v2.RelayKey) + + blobCount := 100 + for i := 0; i < blobCount; i++ { + header, data := randomBlob(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = data + + // Assign two shards to each blob. + shard1 := v2.RelayKey(rand.Intn(shardCount)) + shard2 := v2.RelayKey(rand.Intn(shardCount)) + shards := []v2.RelayKey{shard1, shard2} + shardMap[blobKey] = shards + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + RelayKeys: shards, + }, + &encoding.FragmentInfo{}) + require.NoError(t, err) + + err = blobStore.StoreBlob(context.Background(), blobKey, data) + require.NoError(t, err) + } + + // Read the blobs back. On average, we expect 25% of the blobs to be assigned to shards we don't have. + for key, data := range expectedData { + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + request := &pb.GetBlobRequest{ + BlobKey: key[:], + } + + response, err := server.GetBlob(context.Background(), request) + + if isBlobInCorrectShard { + require.NoError(t, err) + require.Equal(t, data, response.Blob) + } else { + require.Error(t, err) + require.Nil(t, response) + } + } + + // Read the blobs back again to test caching. + for key, data := range expectedData { + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + request := &pb.GetBlobRequest{ + BlobKey: key[:], + } + + response, err := server.GetBlob(context.Background(), request) + + if isBlobInCorrectShard { + require.NoError(t, err) + require.Equal(t, data, response.Blob) + } else { + require.Error(t, err) + require.Nil(t, response) + } + } +} + +func TestReadWriteChunks(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + chunkReader, chunkWriter := buildChunkStore(t, logger) + + // This is the server used to read it back + config := DefaultConfig() + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + nil, /* not used in this test*/ + chunkReader) + require.NoError(t, err) + + expectedData := make(map[v2.BlobKey][]*encoding.Frame) + fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, _, chunks := randomBlobChunks(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = chunks + + coeffs, chunkProofs := disassembleFrames(chunks) + err = chunkWriter.PutChunkProofs(context.Background(), blobKey, chunkProofs) + require.NoError(t, err) + fragmentInfo, err := chunkWriter.PutChunkCoefficients(context.Background(), blobKey, coeffs) + require.NoError(t, err) + fragmentInfoMap[blobKey] = fragmentInfo + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + FragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + }) + require.NoError(t, err) + } + + // Request the entire blob by range + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: key[:], + StartIndex: 0, + EndIndex: uint32(len(data)), + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i, frame := range bundle { + require.Equal(t, data[i], frame) + } + } + + // Request the entire blob by index + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + + indices := make([]uint32, len(data)) + for i := range data { + indices[i] = uint32(i) + } + + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByIndex{ + ByIndex: &pb.ChunkRequestByIndex{ + BlobKey: key[:], + ChunkIndices: indices, + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i, frame := range bundle { + require.Equal(t, data[i], frame) + } + } + + // Request part of the blob back by range + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + + startIndex := rand.Intn(len(data) - 1) + endIndex := startIndex + rand.Intn(len(data)-startIndex-1) + 1 + + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: key[:], + StartIndex: uint32(startIndex), + EndIndex: uint32(endIndex), + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i := startIndex; i < endIndex; i++ { + require.Equal(t, data[i], bundle[i-startIndex]) + } + } + + // Request part of the blob back by index + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + + indices := make([]uint32, 0) + for i := range data { + if i%2 == 0 { + indices = append(indices, uint32(i)) + } + } + + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByIndex{ + ByIndex: &pb.ChunkRequestByIndex{ + BlobKey: key[:], + ChunkIndices: indices, + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i := 0; i < len(indices); i++ { + if i%2 == 0 { + require.Equal(t, data[indices[i]], bundle[i/2]) + } + } + } +} + +func TestBatchedReadWriteChunks(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + chunkReader, chunkWriter := buildChunkStore(t, logger) + + // This is the server used to read it back + config := DefaultConfig() + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + nil, /* not used in this test */ + chunkReader) + require.NoError(t, err) + + expectedData := make(map[v2.BlobKey][]*encoding.Frame) + fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, _, chunks := randomBlobChunks(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = chunks + + coeffs, chunkProofs := disassembleFrames(chunks) + err = chunkWriter.PutChunkProofs(context.Background(), blobKey, chunkProofs) + require.NoError(t, err) + fragmentInfo, err := chunkWriter.PutChunkCoefficients(context.Background(), blobKey, coeffs) + require.NoError(t, err) + fragmentInfoMap[blobKey] = fragmentInfo + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + FragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + }) + require.NoError(t, err) + } + + keyCount := 3 + + for i := 0; i < 10; i++ { + keys := make([]v2.BlobKey, 0, keyCount) + for key := range expectedData { + keys = append(keys, key) + if len(keys) == keyCount { + break + } + } + + requestedChunks := make([]*pb.ChunkRequest, 0) + for _, key := range keys { + + boundKey := key + request := &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: boundKey[:], + StartIndex: 0, + EndIndex: uint32(len(expectedData[key])), + }, + }, + } + + requestedChunks = append(requestedChunks, request) + } + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, keyCount, len(response.Data)) + + for keyIndex, key := range keys { + data := expectedData[key] + + bundle, err := core.Bundle{}.Deserialize(response.Data[keyIndex]) + require.NoError(t, err) + + for frameIndex, frame := range bundle { + require.Equal(t, data[frameIndex], frame) + } + } + } +} + +func TestReadWriteChunksWithSharding(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + chunkReader, chunkWriter := buildChunkStore(t, logger) + + shardCount := rand.Intn(10) + 10 + shardList := make([]v2.RelayKey, 0) + shardSet := make(map[v2.RelayKey]struct{}) + for i := 0; i < shardCount; i++ { + if i%2 == 0 { + shardList = append(shardList, v2.RelayKey(i)) + shardSet[v2.RelayKey(i)] = struct{}{} + } + } + shardMap := make(map[v2.BlobKey][]v2.RelayKey) + + // This is the server used to read it back + config := DefaultConfig() + config.RelayIDs = shardList + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + nil, /* not used in this test*/ + chunkReader) + require.NoError(t, err) + + expectedData := make(map[v2.BlobKey][]*encoding.Frame) + fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, _, chunks := randomBlobChunks(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = chunks + + // Assign two shards to each blob. + shard1 := v2.RelayKey(rand.Intn(shardCount)) + shard2 := v2.RelayKey(rand.Intn(shardCount)) + shards := []v2.RelayKey{shard1, shard2} + shardMap[blobKey] = shards + + coeffs, chunkProofs := disassembleFrames(chunks) + err = chunkWriter.PutChunkProofs(context.Background(), blobKey, chunkProofs) + require.NoError(t, err) + fragmentInfo, err := chunkWriter.PutChunkCoefficients(context.Background(), blobKey, coeffs) + require.NoError(t, err) + fragmentInfoMap[blobKey] = fragmentInfo + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + RelayKeys: shards, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + FragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + }) + require.NoError(t, err) + } + + // Request the entire blob by range. 25% of the blobs will be assigned to shards we don't have. + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: key[:], + StartIndex: 0, + EndIndex: uint32(len(data)), + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + response, err := server.GetChunks(context.Background(), request) + + if isBlobInCorrectShard { + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i, frame := range bundle { + require.Equal(t, data[i], frame) + } + } else { + require.Error(t, err) + require.Nil(t, response) + } + } + + // Request the entire blob by index + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + + indices := make([]uint32, len(data)) + for i := range data { + indices[i] = uint32(i) + } + + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByIndex{ + ByIndex: &pb.ChunkRequestByIndex{ + BlobKey: key[:], + ChunkIndices: indices, + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + if isBlobInCorrectShard { + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i, frame := range bundle { + require.Equal(t, data[i], frame) + } + } else { + response, err := server.GetChunks(context.Background(), request) + require.Error(t, err) + require.Nil(t, response) + } + } + + // Request part of the blob back by range + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + + startIndex := rand.Intn(len(data) - 1) + endIndex := startIndex + rand.Intn(len(data)-startIndex-1) + 1 + + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: key[:], + StartIndex: uint32(startIndex), + EndIndex: uint32(endIndex), + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + if isBlobInCorrectShard { + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i := startIndex; i < endIndex; i++ { + require.Equal(t, data[i], bundle[i-startIndex]) + } + } + } + + // Request part of the blob back by index + for key, data := range expectedData { + requestedChunks := make([]*pb.ChunkRequest, 0) + + indices := make([]uint32, 0) + for i := range data { + if i%2 == 0 { + indices = append(indices, uint32(i)) + } + } + + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByIndex{ + ByIndex: &pb.ChunkRequestByIndex{ + BlobKey: key[:], + ChunkIndices: indices, + }, + }, + }) + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + + if isBlobInCorrectShard { + response, err := server.GetChunks(context.Background(), request) + require.NoError(t, err) + + require.Equal(t, 1, len(response.Data)) + + bundle, err := core.Bundle{}.Deserialize(response.Data[0]) + require.NoError(t, err) + + for i := 0; i < len(indices); i++ { + if i%2 == 0 { + require.Equal(t, data[indices[i]], bundle[i/2]) + } + } + } else { + response, err := server.GetChunks(context.Background(), request) + require.Error(t, err) + require.Nil(t, response) + } + } +} + +func TestBatchedReadWriteChunksWithSharding(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + setup(t) + defer teardown() + + // These are used to write data to S3/dynamoDB + metadataStore := buildMetadataStore(t) + chunkReader, chunkWriter := buildChunkStore(t, logger) + + shardCount := rand.Intn(10) + 10 + shardList := make([]v2.RelayKey, 0) + shardSet := make(map[v2.RelayKey]struct{}) + for i := 0; i < shardCount; i++ { + if i%2 == 0 { + shardList = append(shardList, v2.RelayKey(i)) + shardSet[v2.RelayKey(i)] = struct{}{} + } + } + shardMap := make(map[v2.BlobKey][]v2.RelayKey) + + // This is the server used to read it back + config := DefaultConfig() + config.RelayIDs = shardList + server, err := NewServer( + context.Background(), + logger, + config, + metadataStore, + nil, /* not used in this test */ + chunkReader) + require.NoError(t, err) + + expectedData := make(map[v2.BlobKey][]*encoding.Frame) + fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + blobCount := 10 + for i := 0; i < blobCount; i++ { + header, _, chunks := randomBlobChunks(t) + + blobKey, err := header.BlobKey() + require.NoError(t, err) + expectedData[blobKey] = chunks + + coeffs, chunkProofs := disassembleFrames(chunks) + err = chunkWriter.PutChunkProofs(context.Background(), blobKey, chunkProofs) + require.NoError(t, err) + fragmentInfo, err := chunkWriter.PutChunkCoefficients(context.Background(), blobKey, coeffs) + require.NoError(t, err) + fragmentInfoMap[blobKey] = fragmentInfo + + // Assign two shards to each blob. + shard1 := v2.RelayKey(rand.Intn(shardCount)) + shard2 := v2.RelayKey(rand.Intn(shardCount)) + shards := []v2.RelayKey{shard1, shard2} + shardMap[blobKey] = shards + + err = metadataStore.PutBlobCertificate( + context.Background(), + &v2.BlobCertificate{ + BlobHeader: header, + RelayKeys: shards, + }, + &encoding.FragmentInfo{ + TotalChunkSizeBytes: fragmentInfo.TotalChunkSizeBytes, + FragmentSizeBytes: fragmentInfo.FragmentSizeBytes, + }) + require.NoError(t, err) + } + + keyCount := 2 + + // Read the blobs back. On average, we expect 25% of the blobs to be assigned to shards we don't have. + for i := 0; i < 10; i++ { + keys := make([]v2.BlobKey, 0, keyCount) + for key := range expectedData { + keys = append(keys, key) + if len(keys) == keyCount { + break + } + } + + requestedChunks := make([]*pb.ChunkRequest, 0) + for _, key := range keys { + + boundKey := key + request := &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: boundKey[:], + StartIndex: 0, + EndIndex: uint32(len(expectedData[key])), + }, + }, + } + + requestedChunks = append(requestedChunks, request) + } + request := &pb.GetChunksRequest{ + ChunkRequests: requestedChunks, + } + + allInCorrectShard := true + for _, key := range keys { + isBlobInCorrectShard := false + blobShards := shardMap[key] + for _, shard := range blobShards { + if _, ok := shardSet[shard]; ok { + isBlobInCorrectShard = true + break + } + } + if !isBlobInCorrectShard { + allInCorrectShard = false + break + } + } + + response, err := server.GetChunks(context.Background(), request) + + if allInCorrectShard { + require.NoError(t, err) + + require.Equal(t, keyCount, len(response.Data)) + + for keyIndex, key := range keys { + data := expectedData[key] + + bundle, err := core.Bundle{}.Deserialize(response.Data[keyIndex]) + require.NoError(t, err) + + for frameIndex, frame := range bundle { + require.Equal(t, data[frameIndex], frame) + } + } + } else { + require.Error(t, err) + require.Nil(t, response) + } + } +}