diff --git a/backend/genproto/p2p/v1alpha/syncing.pb.go b/backend/genproto/p2p/v1alpha/syncing.pb.go new file mode 100644 index 000000000..85f9f6560 --- /dev/null +++ b/backend/genproto/p2p/v1alpha/syncing.pb.go @@ -0,0 +1,474 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v4.24.4 +// source: p2p/v1alpha/syncing.proto + +package p2p + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SetReconciliationRange_Mode int32 + +const ( + SetReconciliationRange_SKIP SetReconciliationRange_Mode = 0 + SetReconciliationRange_FINGERPRINT SetReconciliationRange_Mode = 1 + SetReconciliationRange_LIST SetReconciliationRange_Mode = 2 +) + +// Enum value maps for SetReconciliationRange_Mode. +var ( + SetReconciliationRange_Mode_name = map[int32]string{ + 0: "SKIP", + 1: "FINGERPRINT", + 2: "LIST", + } + SetReconciliationRange_Mode_value = map[string]int32{ + "SKIP": 0, + "FINGERPRINT": 1, + "LIST": 2, + } +) + +func (x SetReconciliationRange_Mode) Enum() *SetReconciliationRange_Mode { + p := new(SetReconciliationRange_Mode) + *p = x + return p +} + +func (x SetReconciliationRange_Mode) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (SetReconciliationRange_Mode) Descriptor() protoreflect.EnumDescriptor { + return file_p2p_v1alpha_syncing_proto_enumTypes[0].Descriptor() +} + +func (SetReconciliationRange_Mode) Type() protoreflect.EnumType { + return &file_p2p_v1alpha_syncing_proto_enumTypes[0] +} + +func (x SetReconciliationRange_Mode) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use SetReconciliationRange_Mode.Descriptor instead. +func (SetReconciliationRange_Mode) EnumDescriptor() ([]byte, []int) { + return file_p2p_v1alpha_syncing_proto_rawDescGZIP(), []int{2, 0} +} + +type ReconcileBlobsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Optional. Filter to narrow down the blobs to reconcile. + // If not set, all public blobs are reconciled. + Filter *ReconcileBlobsRequest_Filter `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` + // Optional. The ranges for the sender's part of the set. + Ranges []*SetReconciliationRange `protobuf:"bytes,2,rep,name=ranges,proto3" json:"ranges,omitempty"` +} + +func (x *ReconcileBlobsRequest) Reset() { + *x = ReconcileBlobsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReconcileBlobsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReconcileBlobsRequest) ProtoMessage() {} + +func (x *ReconcileBlobsRequest) ProtoReflect() protoreflect.Message { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReconcileBlobsRequest.ProtoReflect.Descriptor instead. +func (*ReconcileBlobsRequest) Descriptor() ([]byte, []int) { + return file_p2p_v1alpha_syncing_proto_rawDescGZIP(), []int{0} +} + +func (x *ReconcileBlobsRequest) GetFilter() *ReconcileBlobsRequest_Filter { + if x != nil { + return x.Filter + } + return nil +} + +func (x *ReconcileBlobsRequest) GetRanges() []*SetReconciliationRange { + if x != nil { + return x.Ranges + } + return nil +} + +type ReconcileBlobsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ranges []*SetReconciliationRange `protobuf:"bytes,1,rep,name=ranges,proto3" json:"ranges,omitempty"` +} + +func (x *ReconcileBlobsResponse) Reset() { + *x = ReconcileBlobsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReconcileBlobsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReconcileBlobsResponse) ProtoMessage() {} + +func (x *ReconcileBlobsResponse) ProtoReflect() protoreflect.Message { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReconcileBlobsResponse.ProtoReflect.Descriptor instead. +func (*ReconcileBlobsResponse) Descriptor() ([]byte, []int) { + return file_p2p_v1alpha_syncing_proto_rawDescGZIP(), []int{1} +} + +func (x *ReconcileBlobsResponse) GetRanges() []*SetReconciliationRange { + if x != nil { + return x.Ranges + } + return nil +} + +type SetReconciliationRange struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Mode for the range. + Mode SetReconciliationRange_Mode `protobuf:"varint,1,opt,name=mode,proto3,enum=com.seed.p2p.v1alpha.SetReconciliationRange_Mode" json:"mode,omitempty"` + // Timestamp of the upper bound of the range. + BoundTimestamp int64 `protobuf:"varint,2,opt,name=bound_timestamp,json=boundTimestamp,proto3" json:"bound_timestamp,omitempty"` + // Value of the upper bound of the range. + BoundValue []byte `protobuf:"bytes,3,opt,name=bound_value,json=boundValue,proto3" json:"bound_value,omitempty"` + // Only for LIST mode. List of values in the range. + Values [][]byte `protobuf:"bytes,4,rep,name=values,proto3" json:"values,omitempty"` + // Only for the FINGERPRINT mode. Fingerprint of the range. + Fingerprint []byte `protobuf:"bytes,5,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` +} + +func (x *SetReconciliationRange) Reset() { + *x = SetReconciliationRange{} + if protoimpl.UnsafeEnabled { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SetReconciliationRange) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SetReconciliationRange) ProtoMessage() {} + +func (x *SetReconciliationRange) ProtoReflect() protoreflect.Message { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SetReconciliationRange.ProtoReflect.Descriptor instead. +func (*SetReconciliationRange) Descriptor() ([]byte, []int) { + return file_p2p_v1alpha_syncing_proto_rawDescGZIP(), []int{2} +} + +func (x *SetReconciliationRange) GetMode() SetReconciliationRange_Mode { + if x != nil { + return x.Mode + } + return SetReconciliationRange_SKIP +} + +func (x *SetReconciliationRange) GetBoundTimestamp() int64 { + if x != nil { + return x.BoundTimestamp + } + return 0 +} + +func (x *SetReconciliationRange) GetBoundValue() []byte { + if x != nil { + return x.BoundValue + } + return nil +} + +func (x *SetReconciliationRange) GetValues() [][]byte { + if x != nil { + return x.Values + } + return nil +} + +func (x *SetReconciliationRange) GetFingerprint() []byte { + if x != nil { + return x.Fingerprint + } + return nil +} + +// Filter describes which blobs to select for reconciliation. +type ReconcileBlobsRequest_Filter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Selects only blobs related to the given resource. + Resource string `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` +} + +func (x *ReconcileBlobsRequest_Filter) Reset() { + *x = ReconcileBlobsRequest_Filter{} + if protoimpl.UnsafeEnabled { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReconcileBlobsRequest_Filter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReconcileBlobsRequest_Filter) ProtoMessage() {} + +func (x *ReconcileBlobsRequest_Filter) ProtoReflect() protoreflect.Message { + mi := &file_p2p_v1alpha_syncing_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReconcileBlobsRequest_Filter.ProtoReflect.Descriptor instead. +func (*ReconcileBlobsRequest_Filter) Descriptor() ([]byte, []int) { + return file_p2p_v1alpha_syncing_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *ReconcileBlobsRequest_Filter) GetResource() string { + if x != nil { + return x.Resource + } + return "" +} + +var File_p2p_v1alpha_syncing_proto protoreflect.FileDescriptor + +var file_p2p_v1alpha_syncing_proto_rawDesc = []byte{ + 0x0a, 0x19, 0x70, 0x32, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2f, 0x73, 0x79, + 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x63, 0x6f, 0x6d, + 0x2e, 0x73, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x32, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, + 0x61, 0x22, 0xcf, 0x01, 0x0a, 0x15, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x42, + 0x6c, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x4a, 0x0a, 0x06, 0x66, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x63, 0x6f, + 0x6d, 0x2e, 0x73, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x32, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, + 0x68, 0x61, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x42, 0x6c, 0x6f, 0x62, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, + 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x44, 0x0a, 0x06, 0x72, 0x61, 0x6e, 0x67, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x65, + 0x65, 0x64, 0x2e, 0x70, 0x32, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, + 0x65, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x06, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, 0x24, 0x0a, + 0x06, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x22, 0x5e, 0x0a, 0x16, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, + 0x42, 0x6c, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, + 0x06, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, + 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x32, 0x70, 0x2e, 0x76, 0x31, 0x61, + 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, + 0x69, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x06, 0x72, 0x61, 0x6e, + 0x67, 0x65, 0x73, 0x22, 0x90, 0x02, 0x0a, 0x16, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x6e, + 0x63, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x45, + 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x63, + 0x6f, 0x6d, 0x2e, 0x73, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x32, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x69, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x4d, 0x6f, 0x64, 0x65, 0x52, + 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, + 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1f, + 0x0a, 0x0b, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0c, 0x52, + 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x66, 0x69, 0x6e, 0x67, 0x65, + 0x72, 0x70, 0x72, 0x69, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x66, 0x69, + 0x6e, 0x67, 0x65, 0x72, 0x70, 0x72, 0x69, 0x6e, 0x74, 0x22, 0x2b, 0x0a, 0x04, 0x4d, 0x6f, 0x64, + 0x65, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x4b, 0x49, 0x50, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x46, + 0x49, 0x4e, 0x47, 0x45, 0x52, 0x50, 0x52, 0x49, 0x4e, 0x54, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, + 0x4c, 0x49, 0x53, 0x54, 0x10, 0x02, 0x32, 0x76, 0x0a, 0x07, 0x53, 0x79, 0x6e, 0x63, 0x69, 0x6e, + 0x67, 0x12, 0x6b, 0x0a, 0x0e, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x73, 0x12, 0x2b, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x65, 0x65, 0x64, 0x2e, 0x70, + 0x32, 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x6e, + 0x63, 0x69, 0x6c, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x2c, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x32, 0x70, 0x2e, + 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, + 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x27, + 0x5a, 0x25, 0x73, 0x65, 0x65, 0x64, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x2f, 0x67, + 0x65, 0x6e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x32, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x3b, 0x70, 0x32, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_p2p_v1alpha_syncing_proto_rawDescOnce sync.Once + file_p2p_v1alpha_syncing_proto_rawDescData = file_p2p_v1alpha_syncing_proto_rawDesc +) + +func file_p2p_v1alpha_syncing_proto_rawDescGZIP() []byte { + file_p2p_v1alpha_syncing_proto_rawDescOnce.Do(func() { + file_p2p_v1alpha_syncing_proto_rawDescData = protoimpl.X.CompressGZIP(file_p2p_v1alpha_syncing_proto_rawDescData) + }) + return file_p2p_v1alpha_syncing_proto_rawDescData +} + +var file_p2p_v1alpha_syncing_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_p2p_v1alpha_syncing_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_p2p_v1alpha_syncing_proto_goTypes = []interface{}{ + (SetReconciliationRange_Mode)(0), // 0: com.seed.p2p.v1alpha.SetReconciliationRange.Mode + (*ReconcileBlobsRequest)(nil), // 1: com.seed.p2p.v1alpha.ReconcileBlobsRequest + (*ReconcileBlobsResponse)(nil), // 2: com.seed.p2p.v1alpha.ReconcileBlobsResponse + (*SetReconciliationRange)(nil), // 3: com.seed.p2p.v1alpha.SetReconciliationRange + (*ReconcileBlobsRequest_Filter)(nil), // 4: com.seed.p2p.v1alpha.ReconcileBlobsRequest.Filter +} +var file_p2p_v1alpha_syncing_proto_depIdxs = []int32{ + 4, // 0: com.seed.p2p.v1alpha.ReconcileBlobsRequest.filter:type_name -> com.seed.p2p.v1alpha.ReconcileBlobsRequest.Filter + 3, // 1: com.seed.p2p.v1alpha.ReconcileBlobsRequest.ranges:type_name -> com.seed.p2p.v1alpha.SetReconciliationRange + 3, // 2: com.seed.p2p.v1alpha.ReconcileBlobsResponse.ranges:type_name -> com.seed.p2p.v1alpha.SetReconciliationRange + 0, // 3: com.seed.p2p.v1alpha.SetReconciliationRange.mode:type_name -> com.seed.p2p.v1alpha.SetReconciliationRange.Mode + 1, // 4: com.seed.p2p.v1alpha.Syncing.ReconcileBlobs:input_type -> com.seed.p2p.v1alpha.ReconcileBlobsRequest + 2, // 5: com.seed.p2p.v1alpha.Syncing.ReconcileBlobs:output_type -> com.seed.p2p.v1alpha.ReconcileBlobsResponse + 5, // [5:6] is the sub-list for method output_type + 4, // [4:5] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_p2p_v1alpha_syncing_proto_init() } +func file_p2p_v1alpha_syncing_proto_init() { + if File_p2p_v1alpha_syncing_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_p2p_v1alpha_syncing_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReconcileBlobsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_p2p_v1alpha_syncing_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReconcileBlobsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_p2p_v1alpha_syncing_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SetReconciliationRange); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_p2p_v1alpha_syncing_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReconcileBlobsRequest_Filter); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_p2p_v1alpha_syncing_proto_rawDesc, + NumEnums: 1, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_p2p_v1alpha_syncing_proto_goTypes, + DependencyIndexes: file_p2p_v1alpha_syncing_proto_depIdxs, + EnumInfos: file_p2p_v1alpha_syncing_proto_enumTypes, + MessageInfos: file_p2p_v1alpha_syncing_proto_msgTypes, + }.Build() + File_p2p_v1alpha_syncing_proto = out.File + file_p2p_v1alpha_syncing_proto_rawDesc = nil + file_p2p_v1alpha_syncing_proto_goTypes = nil + file_p2p_v1alpha_syncing_proto_depIdxs = nil +} diff --git a/backend/genproto/p2p/v1alpha/syncing_grpc.pb.go b/backend/genproto/p2p/v1alpha/syncing_grpc.pb.go new file mode 100644 index 000000000..ad3def7b4 --- /dev/null +++ b/backend/genproto/p2p/v1alpha/syncing_grpc.pb.go @@ -0,0 +1,103 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v4.24.4 +// source: p2p/v1alpha/syncing.proto + +package p2p + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SyncingClient is the client API for Syncing service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SyncingClient interface { + ReconcileBlobs(ctx context.Context, in *ReconcileBlobsRequest, opts ...grpc.CallOption) (*ReconcileBlobsResponse, error) +} + +type syncingClient struct { + cc grpc.ClientConnInterface +} + +func NewSyncingClient(cc grpc.ClientConnInterface) SyncingClient { + return &syncingClient{cc} +} + +func (c *syncingClient) ReconcileBlobs(ctx context.Context, in *ReconcileBlobsRequest, opts ...grpc.CallOption) (*ReconcileBlobsResponse, error) { + out := new(ReconcileBlobsResponse) + err := c.cc.Invoke(ctx, "/com.seed.p2p.v1alpha.Syncing/ReconcileBlobs", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SyncingServer is the server API for Syncing service. +// All implementations should embed UnimplementedSyncingServer +// for forward compatibility +type SyncingServer interface { + ReconcileBlobs(context.Context, *ReconcileBlobsRequest) (*ReconcileBlobsResponse, error) +} + +// UnimplementedSyncingServer should be embedded to have forward compatible implementations. +type UnimplementedSyncingServer struct { +} + +func (UnimplementedSyncingServer) ReconcileBlobs(context.Context, *ReconcileBlobsRequest) (*ReconcileBlobsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReconcileBlobs not implemented") +} + +// UnsafeSyncingServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SyncingServer will +// result in compilation errors. +type UnsafeSyncingServer interface { + mustEmbedUnimplementedSyncingServer() +} + +func RegisterSyncingServer(s grpc.ServiceRegistrar, srv SyncingServer) { + s.RegisterService(&Syncing_ServiceDesc, srv) +} + +func _Syncing_ReconcileBlobs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReconcileBlobsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SyncingServer).ReconcileBlobs(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/com.seed.p2p.v1alpha.Syncing/ReconcileBlobs", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SyncingServer).ReconcileBlobs(ctx, req.(*ReconcileBlobsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Syncing_ServiceDesc is the grpc.ServiceDesc for Syncing service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Syncing_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "com.seed.p2p.v1alpha.Syncing", + HandlerType: (*SyncingServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReconcileBlobs", + Handler: _Syncing_ReconcileBlobs_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "p2p/v1alpha/syncing.proto", +} diff --git a/backend/syncing/rbsr/fingerprint.go b/backend/syncing/rbsr/fingerprint.go new file mode 100644 index 000000000..acc5350dd --- /dev/null +++ b/backend/syncing/rbsr/fingerprint.go @@ -0,0 +1,56 @@ +package rbsr + +import ( + "crypto/sha256" + "encoding/binary" + "unsafe" +) + +type Fingerprint [fingerprintSize]byte + +type accumulator struct { + len int + sum [32]byte +} + +func (acc *accumulator) Add(other [32]byte) { + var currCarry, nextCarry uint64 + + // Treating [32]byte as [4]uint64 when adding. + p := (*[4]uint64)(unsafe.Pointer(&acc.sum[0])) + po := (*[4]uint64)(unsafe.Pointer(&other[0])) + + for i := 0; i < 4; i++ { + orig := p[i] + otherV := po[i] + + next := orig + + next += currCarry + if next < orig { + nextCarry = 1 + } + + next += otherV + if next < otherV { + nextCarry = 1 + } + + p[i] = next + + currCarry = nextCarry + nextCarry = 0 + } +} + +func (acc *accumulator) Fingerprint() Fingerprint { + buf := make([]byte, 0, len(acc.sum)+8) // sum + len will be hashed. + buf = append(buf, acc.sum[:]...) + buf = binary.LittleEndian.AppendUint64(buf, uint64(acc.len)) + + hash := sha256.Sum256(buf) + + var fingerprint Fingerprint + copy(fingerprint[:], hash[:fingerprintSize]) + return fingerprint +} diff --git a/backend/syncing/rbsr/rbsr.go b/backend/syncing/rbsr/rbsr.go new file mode 100644 index 000000000..2faceac5f --- /dev/null +++ b/backend/syncing/rbsr/rbsr.go @@ -0,0 +1,398 @@ +// Package rbsr provides Range-Based Set Reconciliation protocol. +// It's largely based on the corresponding [paper by Aljoscha Meyer][paper], +// and some practical implementation ideas from [Negentropy][negentropy]. +// +// [paper]: https://github.com/AljoschaMeyer/set-reconciliation +// [negentropy]: https://github.com/hoytech/negentropy +package rbsr + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + "errors" + "math" + p2p "seed/backend/genproto/p2p/v1alpha" + "unsafe" +) + +func init() { + // Fingerprint function depends on the machine's byte order. + // No need/time to implement big-endian right now. + ensureLittleEndian() +} + +const ( + msgSizeMin = 4096 + fingerprintSize = 16 +) + +var maxItem = Item{Timestamp: math.MaxInt64} + +// Range is a subset of the data that needs to be reconciled. +type Range = p2p.SetReconciliationRange + +// Mode is the reconciliation mode for a given range. +type Mode = p2p.SetReconciliationRange_Mode + +type response struct { + Ranges []*p2p.SetReconciliationRange + Size int +} + +func (r *response) AddRange(rng *p2p.SetReconciliationRange) { + r.Ranges = append(r.Ranges, rng) + for _, c := range rng.Values { + r.Size += len(c) + } + + r.Size += len(rng.BoundValue) + r.Size += len(rng.Fingerprint) + r.Size += 10 // roughly the size of the protobuf wrapping. +} + +// Range modes for the reconciliation protocol. +const ( + SkipMode = p2p.SetReconciliationRange_SKIP + FingerprintMode = p2p.SetReconciliationRange_FINGERPRINT + ListMode = p2p.SetReconciliationRange_LIST +) + +type Item struct { + Timestamp int64 + Value []byte +} + +func NewItem(timestamp int64, id []byte) Item { + return Item{Timestamp: timestamp, Value: id} +} + +func (i Item) Cmp(other Item) int { + if i.Timestamp < other.Timestamp { + return -1 + } + + if i.Timestamp > other.Timestamp { + return +1 + } + + return bytes.Compare(i.Value, other.Value) +} + +// Session holds state for a single running reconciliation between two peers. +type Session struct { + store Store + msgSizeLimit uint64 + isInitiator bool +} + +// NewSession creates a new reconciliation session to track its state and progress. +func NewSession(store Store, msgSizeLimitBytes uint64) (*Session, error) { + if msgSizeLimitBytes != 0 && msgSizeLimitBytes < msgSizeMin { + return nil, errors.New("message size limit is too small") + } + return &Session{ + store: store, + msgSizeLimit: msgSizeLimitBytes, + }, nil +} + +func (n *Session) Initiate() ([]*Range, error) { + if n.isInitiator { + return nil, errors.New("already initiated") + } + n.isInitiator = true + + var out response + + if err := n.SplitRange(0, n.store.Size(), maxItem, &out); err != nil { + return nil, err + } + + return out.Ranges, nil +} + +func (n *Session) Reconcile(query []*Range) ([]*Range, error) { + if n.isInitiator { + return nil, errors.New("initiator not asking for have/need IDs") + } + var haveIds, needIds [][]byte + + output, err := n.reconcile(query, &haveIds, &needIds) + if err != nil { + return nil, err + } + + if len(output) == 1 && n.isInitiator { + return nil, nil + } + + return output, nil +} + +func (n *Session) ReconcileWithIDs(query []*Range, haveIds, needIds *[][]byte) ([]*Range, error) { + if !n.isInitiator { + return nil, errors.New("non-initiator asking for have/need IDs") + } + + output, err := n.reconcile(query, haveIds, needIds) + if err != nil { + return nil, err + } + if len(output) == 1 { + // Assuming an empty string is a special case indicating a condition similar to std::nullopt + return nil, nil + } + + return output, nil +} + +func (n *Session) reconcile(query []*Range, haveIds, needIds *[][]byte) ([]*Range, error) { + var fullOutput response + + var prevBound Item + prevIndex := 0 + skip := false + + for _, rng := range query { + doSkip := func() { + if skip { + skip = false + + rng := &Range{ + Mode: SkipMode, + BoundTimestamp: prevBound.Timestamp, + BoundValue: prevBound.Value, + } + + fullOutput.AddRange(rng) + } + } + + currBound := NewItem(rng.BoundTimestamp, rng.BoundValue) + mode := Mode(rng.Mode) + + lower := prevIndex + upper, err := n.store.FindLowerBound(prevIndex, currBound) + if err != nil { + return nil, err + } + + switch mode { + case SkipMode: + skip = true + + case FingerprintMode: + theirFingerprint := rng.Fingerprint // TODO(burdiyan): maybe validate their fingerprint before using? + ourFingerprint, err := n.Fingerprint(lower, upper) + if err != nil { + return nil, err + } + + if !bytes.Equal(theirFingerprint, ourFingerprint[:]) { + doSkip() + if err := n.SplitRange(lower, upper, currBound, &fullOutput); err != nil { + return nil, err + } + } else { + skip = true + } + + case ListMode: + theirElems := make(map[string][]byte) + for _, e := range rng.Values { + theirElems[string(e)] = e + } + + if err := n.store.ForEach(lower, upper, func(_ int, item Item) bool { + have := item.Value + if _, exists := theirElems[string(have)]; !exists { + if n.isInitiator { + *haveIds = append(*haveIds, have) + } + } else { + delete(theirElems, string(have)) + } + return true + }); err != nil { + return nil, err + } + + if n.isInitiator { + skip = true + + for _, v := range theirElems { + *needIds = append(*needIds, v) + } + } else { + doSkip() + + var ( + responseIDs [][]byte + endBound = currBound + ) + + if err := n.store.ForEach(lower, upper, func(i int, item Item) bool { + size := fullOutput.Size + for _, x := range responseIDs { + size += len(x) + } + + if n.checkMessageSize(size) { + endBound = item + upper = i + return false + } + + responseIDs = append(responseIDs, item.Value) + return true + }); err != nil { + return nil, err + } + + rng := &Range{ + Mode: ListMode, + BoundTimestamp: endBound.Timestamp, + BoundValue: endBound.Value, + Values: responseIDs, + } + fullOutput.AddRange(rng) + } + + default: + return nil, errors.New("unexpected mode") + } + + if n.checkMessageSize(fullOutput.Size) { + remainingFingerprint, err := n.Fingerprint(upper, n.store.Size()) + if err != nil { + panic(err) + } + + bound := maxItem + + rng := &Range{ + Mode: FingerprintMode, + BoundTimestamp: bound.Timestamp, + BoundValue: bound.Value, + Fingerprint: remainingFingerprint[:], + } + fullOutput.AddRange(rng) + + break + } + + prevIndex = upper + prevBound = currBound + } + + return fullOutput.Ranges, nil +} + +func (n *Session) SplitRange(lower, upper int, upperBound Item, output *response) error { + if output == nil { + panic("BUG: output must be initialized when splitting range") + } + + numElems := upper - lower + const Buckets = 16 + + if numElems < Buckets*2 { + rng := &Range{ + Mode: ListMode, + BoundTimestamp: upperBound.Timestamp, + BoundValue: upperBound.Value, + Values: make([][]byte, 0, numElems), + } + + if err := n.store.ForEach(lower, upper, func(i int, item Item) bool { + rng.Values = append(rng.Values, item.Value) + return true + }); err != nil { + return err + } + + output.AddRange(rng) + } else { + itemsPerBucket := numElems / Buckets + bucketsWithExtra := numElems % Buckets + curr := lower + + for i := 0; i < Buckets; i++ { + bucketSize := itemsPerBucket + if i < bucketsWithExtra { + bucketSize++ + } + ourFingerprint, err := n.Fingerprint(curr, curr+bucketSize) + if err != nil { + return err + } + + curr += bucketSize + + var nextBound Item + if curr == upper { + nextBound = upperBound + } else { + var currItem Item + if err := n.store.ForEach(curr, curr+1, func(_ int, item Item) bool { + currItem = item + return true + }); err != nil { + return err + } + + nextBound = currItem + } + + rng := &Range{ + Mode: FingerprintMode, + BoundTimestamp: nextBound.Timestamp, + BoundValue: nextBound.Value, + Fingerprint: ourFingerprint[:], + } + + output.AddRange(rng) + } + } + + return nil +} + +func (n *Session) checkMessageSize(size int) (ok bool) { + return n.msgSizeLimit != 0 && size > int(n.msgSizeLimit)-200 +} + +func (n *Session) Fingerprint(begin, end int) (Fingerprint, error) { + var out accumulator + + if err := n.store.ForEach(begin, end, func(_ int, item Item) bool { + h := sha256.Sum256(item.Value) // TODO(burdiyan): cache the value of the hash between rounds. + out.Add(h) + return true + }); err != nil { + return Fingerprint{}, err + } + + return out.Fingerprint(), nil +} + +func ensureLittleEndian() { + var nativeEndian binary.ByteOrder + + var buf [2]byte + *(*uint16)(unsafe.Pointer(&buf[0])) = uint16(0xABCD) + + switch buf { + case [2]byte{0xCD, 0xAB}: + nativeEndian = binary.LittleEndian + case [2]byte{0xAB, 0xCD}: + nativeEndian = binary.BigEndian + default: + panic("Could not determine native endianness.") + } + + if nativeEndian != binary.LittleEndian { + panic("This code only works on little-endian architectures.") + } +} diff --git a/backend/syncing/rbsr/rbsr_test.go b/backend/syncing/rbsr/rbsr_test.go new file mode 100644 index 000000000..2b9d6e5f3 --- /dev/null +++ b/backend/syncing/rbsr/rbsr_test.go @@ -0,0 +1,212 @@ +package rbsr + +import ( + "encoding/hex" + "fmt" + "os" + p2p "seed/backend/genproto/p2p/v1alpha" + "seed/backend/pkg/must" + "strconv" + "testing" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/stretchr/testify/require" +) + +func TestReplication(t *testing.T) { + dataset := make([]Item, 10000) + { + var ts int64 = 1 + const itemsPerTimestamp = 20 + for i := range dataset { + if i%itemsPerTimestamp == 0 { + ts++ + } + + dataset[i] = Item{ + Timestamp: ts, + Value: []byte("Hello " + strconv.Itoa(i)), + } + } + } + + testReplicate := func(t *testing.T, client, server *peer, wantRounds int) { + t.Helper() + + must.Do(client.store.Seal()) + must.Do(server.store.Seal()) + + msg, err := client.ne.Initiate() + require.NoError(t, err) + + var allWants [][]byte + + var rounds int + for msg != nil { + rounds++ + if rounds > 1000 { + panic("BUG: too many round spinning") + } + + msg, err = server.ne.Reconcile(msg) + require.NoError(t, err) + + var haves, wants [][]byte + msg, err = client.ne.ReconcileWithIDs(msg, &haves, &wants) + require.NoError(t, err) + allWants = append(allWants, wants...) + } + + require.Equal(t, wantRounds, rounds, "round-trip don't match") + + ds := make(map[string]struct{}, len(dataset)) + for _, item := range dataset { + ds[string(item.Value[:])] = struct{}{} + } + + clientData := make(map[string]struct{}, len(dataset)) + if err := client.store.ForEach(0, client.store.Size(), func(i int, item Item) bool { + clientData[string(item.Value)] = struct{}{} + return true + }); err != nil { + t.Fatal(err) + } + + for _, w := range allWants { + clientData[string(w)] = struct{}{} + } + + require.Equal(t, ds, clientData) + } + + t.Run("ClientHasAll", func(t *testing.T) { + client := newPeer() + server := newPeer() + for _, x := range dataset { + must.Do(client.store.Insert(x.Timestamp, x.Value)) + } + testReplicate(t, client, server, 1) + }) + + t.Run("ServerHasAll", func(t *testing.T) { + client := newPeer() + server := newPeer() + for _, x := range dataset { + must.Do(server.store.Insert(x.Timestamp, x.Value)) + } + testReplicate(t, client, server, 2) + }) + + t.Run("BothHaveAll", func(t *testing.T) { + client := newPeer() + server := newPeer() + for _, x := range dataset { + must.Do(client.store.Insert(x.Timestamp, x.Value)) + must.Do(server.store.Insert(x.Timestamp, x.Value)) + } + testReplicate(t, client, server, 1) + }) + + t.Run("BothHaveDisjoint", func(t *testing.T) { + client := newPeer() + server := newPeer() + + for _, x := range dataset[0 : len(dataset)/2] { + must.Do(client.store.Insert(x.Timestamp, x.Value[:])) + } + + for _, x := range dataset[len(dataset)/2:] { + must.Do(server.store.Insert(x.Timestamp, x.Value[:])) + } + + testReplicate(t, client, server, 3) + }) + + t.Run("BothHaveInterleaved", func(t *testing.T) { + client := newPeer() + server := newPeer() + for i, x := range dataset { + if i%2 == 0 { + must.Do(client.store.Insert(x.Timestamp, x.Value[:])) + } else { + must.Do(server.store.Insert(x.Timestamp, x.Value[:])) + } + } + testReplicate(t, client, server, 3) + }) +} + +func TestSmoke(t *testing.T) { + p1 := newPeer() + p2 := newPeer() + + var ts int64 = 1 + for i := 0; i < 10000; i++ { + if i%20 == 0 { + ts++ + } + + must.Do(p2.store.Insert(ts, []byte("Hello "+strconv.Itoa(i)))) + } + + must.Do(p2.store.Seal()) + + must.Do(p1.store.Seal()) + msg := must.Do2(p1.ne.Initiate()) + + var err error + var count int + for msg != nil { + count++ + + p2.printSummary2(msg) + + msg, err = p2.ne.Reconcile(msg) + if err != nil { + panic(err) + } + + fmt.Println("==== Server message =====") + p2.printSummary2(msg) + + var haves, wants [][]byte + msg, err = p1.ne.ReconcileWithIDs(msg, &haves, &wants) + if err != nil { + panic(err) + } + + fmt.Println("Round", count, "Haves:", len(haves), "Wants:", len(wants)) + } +} + +type peer struct { + store Store + ne *Session +} + +func newPeer() *peer { + s := NewSliceStore() + ne, err := NewSession(s, 50000) + if err != nil { + panic(err) + } + return &peer{store: s, ne: ne} +} + +func (p *peer) printSummary2(msg []*p2p.SetReconciliationRange) { + tw := table.NewWriter() + tw.SetOutputMirror(os.Stdout) + tw.SetStyle(table.StyleLight) + tw.AppendHeader(table.Row{"Mode", "Ts", "Prefix", "Fingerprint", "NumIDs"}) + + for _, rng := range msg { + tw.AppendRow(table.Row{ + strconv.Itoa(int(rng.Mode)), + strconv.FormatInt(rng.BoundTimestamp, 10), + hex.EncodeToString(rng.BoundValue), + hex.EncodeToString(rng.Fingerprint), + strconv.Itoa(len(rng.Values)), + }) + } + tw.Render() +} diff --git a/backend/syncing/rbsr/storage.go b/backend/syncing/rbsr/storage.go new file mode 100644 index 000000000..5aa79d4b8 --- /dev/null +++ b/backend/syncing/rbsr/storage.go @@ -0,0 +1,119 @@ +package rbsr + +import ( + "errors" + "sort" + + "golang.org/x/exp/slices" +) + +// Store is the interface to the dataset. +type Store interface { + Size() int + Insert(ts int64, data []byte) error + ForEach(start, end int, fn func(i int, item Item) bool) error + FindLowerBound(startHint int, value Item) (int, error) + Seal() error +} + +// sliceStore implements Storage backed by a slice. +type sliceStore struct { + items []Item + sealed bool +} + +// NewSliceStore creates a store instance that is backed by a slice. +func NewSliceStore() Store { + return &sliceStore{ + items: make([]Item, 0), + sealed: false, + } +} + +func (v *sliceStore) Insert(createdAt int64, id []byte) error { + if v.sealed { + return errors.New("already sealed") + } + + item := NewItem(createdAt, id) + + v.items = append(v.items, item) + return nil +} + +func (v *sliceStore) InsertItem(item Item) error { + return v.Insert(item.Timestamp, item.Value[:]) +} + +func (v *sliceStore) Seal() error { + if v.sealed { + return errors.New("already sealed") + } + v.sealed = true + + slices.SortFunc(v.items, func(a, b Item) int { + x := a.Cmp(b) + if x == 0 { + panic("BUG: duplicate items in store when sealing") + } + return x + }) + + return nil +} + +func (v *sliceStore) Unseal() { + v.sealed = false +} + +func (v *sliceStore) Size() int { + if err := v.checkSealed(); err != nil { + return 0 + } + return len(v.items) +} + +func (v *sliceStore) ForEach(start, end int, fn func(int, Item) bool) error { + if err := v.checkSealed(); err != nil { + return err + } + + if err := v.checkBounds(start, end); err != nil { + return err + } + + for i := start; i < end; i++ { + if !fn(i, v.items[i]) { + break + } + } + return nil +} + +func (v *sliceStore) FindLowerBound(startHint int, bound Item) (int, error) { + if err := v.checkSealed(); err != nil { + return 0, err + } + if err := v.checkBounds(startHint, len(v.items)); err != nil { + return 0, err + } + + i := sort.Search(len(v.items[startHint:]), func(i int) bool { + return v.items[startHint+i].Cmp(bound) >= 0 + }) + return startHint + i, nil +} + +func (v *sliceStore) checkSealed() error { + if !v.sealed { + return errors.New("not sealed") + } + return nil +} + +func (v *sliceStore) checkBounds(begin, end int) error { + if begin > end || end > len(v.items) { + return errors.New("bad range") + } + return nil +} diff --git a/go.mod b/go.mod index 787e2fd5c..e7f6c4e9a 100644 --- a/go.mod +++ b/go.mod @@ -65,6 +65,9 @@ require ( require ( github.com/alessio/shellescape v1.4.1 // indirect github.com/danieljoos/wincred v1.2.0 // indirect + github.com/jedib0t/go-pretty/v6 v6.5.9 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/rivo/uniseg v0.2.0 // indirect ) require ( @@ -220,7 +223,7 @@ require ( golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.21.0 // indirect - golang.org/x/term v0.16.0 // indirect + golang.org/x/term v0.17.0 // indirect golang.org/x/tools v0.16.1 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.14.0 // indirect diff --git a/go.sum b/go.sum index fe5cf95b0..342eb1665 100644 --- a/go.sum +++ b/go.sum @@ -534,6 +534,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jedib0t/go-pretty/v6 v6.5.9 h1:ACteMBRrrmm1gMsXe9PSTOClQ63IXDUt03H5U+UV8OU= +github.com/jedib0t/go-pretty/v6 v6.5.9/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= @@ -678,6 +680,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= @@ -888,6 +892,8 @@ github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1ab github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rocicorp/fracdex v0.0.0-20231009204907-ebc26eac9486 h1:Rfp3ytlsm2xkAlm58EvZNNfXQVbXbinKVyISeyn/z9s= github.com/rocicorp/fracdex v0.0.0-20231009204907-ebc26eac9486/go.mod h1:Lbb7V1DeB2FAkgeKjxff99mmSPrtrawik8cG1RbKVwg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1326,6 +1332,8 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= diff --git a/proto/p2p/v1alpha/go.gensum b/proto/p2p/v1alpha/go.gensum index 69bae3e43..848874cc7 100644 --- a/proto/p2p/v1alpha/go.gensum +++ b/proto/p2p/v1alpha/go.gensum @@ -1,2 +1,2 @@ -srcs: d36f261696c8d14449e5b554ac4d9e42 -outs: 6682aa2cb474183e491c3f0715e95ae2 +srcs: 281b8cb656bbcc20b74b6287379b5ff0 +outs: 80ce2979c408886489b147f1d57115d8 diff --git a/proto/p2p/v1alpha/syncing.proto b/proto/p2p/v1alpha/syncing.proto new file mode 100644 index 000000000..75c93a063 --- /dev/null +++ b/proto/p2p/v1alpha/syncing.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; + +package com.seed.p2p.v1alpha; + +option go_package = "seed/backend/genproto/p2p/v1alpha;p2p"; + +service Syncing { + rpc ReconcileBlobs(ReconcileBlobsRequest) returns (ReconcileBlobsResponse); +} + +message ReconcileBlobsRequest { + // Filter describes which blobs to select for reconciliation. + message Filter { + // Selects only blobs related to the given resource. + string resource = 1; + } + + // Optional. Filter to narrow down the blobs to reconcile. + // If not set, all public blobs are reconciled. + Filter filter = 1; + + // Optional. The ranges for the sender's part of the set. + repeated SetReconciliationRange ranges = 2; +} + +message ReconcileBlobsResponse { + repeated SetReconciliationRange ranges = 1; +} + +message SetReconciliationRange { + enum Mode { + SKIP = 0; + + FINGERPRINT = 1; + + LIST = 2; + } + + // Mode for the range. + Mode mode = 1; + + // Timestamp of the upper bound of the range. + int64 bound_timestamp = 2; + + // Value of the upper bound of the range. + bytes bound_value = 3; + + // Only for LIST mode. List of values in the range. + repeated bytes values = 4; + + // Only for the FINGERPRINT mode. Fingerprint of the range. + bytes fingerprint = 5; +}