Skip to content

Commit

Permalink
each stream has capacity, add new getters to stream request manager
Browse files Browse the repository at this point in the history
  • Loading branch information
GheisMohammadi committed Oct 24, 2024
1 parent b810653 commit bcf05fc
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 124 deletions.
7 changes: 7 additions & 0 deletions p2p/stream/common/requestmanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ type Deliverer interface {
DeliverResponse(stID sttypes.StreamID, resp sttypes.Response)
}

type Streams interface {
Streams() []sttypes.Stream
NumStreams() int
AvailableCapacity() int
}

// RequestManager manages over the requests
type RequestManager interface {
p2ptypes.LifeCycle
Requester
Deliverer
Streams
}
25 changes: 16 additions & 9 deletions p2p/stream/common/requestmanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func (sm *testStreamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream,
return st, exist
}

func (sm *testStreamManager) NumStreams() int {
sm.lock.Lock()
defer sm.lock.Unlock()

return len(sm.streams)
}

type testStream struct {
id sttypes.StreamID
rm *requestManager
Expand Down Expand Up @@ -138,14 +145,14 @@ func makeDummyTestStreams(indexes []int) []sttypes.Stream {
return sts
}

func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *stream] {
m := sttypes.NewSafeMap[sttypes.StreamID, *stream]()
func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *WorkerStream] {
m := sttypes.NewSafeMap[sttypes.StreamID, *WorkerStream]()

for _, index := range indexes {
st := &testStream{
id: makeStreamID(index),
}
m.Set(st.ID(), &stream{Stream: st})
m.Set(st.ID(), &WorkerStream{Stream: st})
}
return m
}
Expand All @@ -166,11 +173,11 @@ func makeTestRequest(index uint64) *testRequest {
}
}

func (req *testRequest) ReqID() uint64 {
func (req *testRequest) ID() uint64 {
return req.reqID
}

func (req *testRequest) SetReqID(rid uint64) {
func (req *testRequest) SetID(rid uint64) {
req.reqID = rid
}

Expand All @@ -180,10 +187,10 @@ func (req *testRequest) String() string {

func (req *testRequest) Encode() ([]byte, error) {
return rlp.EncodeToBytes(struct {
ReqID uint64
ID uint64
Index uint64
}{
ReqID: req.reqID,
ID: req.reqID,
Index: req.index,
})
}
Expand All @@ -204,15 +211,15 @@ func (req *testRequest) checkResponse(rawResp sttypes.Response) error {

func decodeTestRequest(b []byte) (*testRequest, error) {
type SerRequest struct {
ReqID uint64
ID uint64
Index uint64
}
var sr SerRequest
if err := rlp.DecodeBytes(b, &sr); err != nil {
return nil, err
}
return &testRequest{
reqID: sr.ReqID,
reqID: sr.ID,
index: sr.Index,
}, nil
}
Expand Down
8 changes: 4 additions & 4 deletions p2p/stream/common/requestmanager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ import sttypes "github.com/harmony-one/harmony/p2p/stream/types"
// 1. WithHighPriority
// 2. WithBlacklist
// 3. WithWhitelist
type RequestOption func(*request)
type RequestOption func(*WorkerRequest)

// WithHighPriority is the request option to do request with higher priority.
// High priority requests are done first.
func WithHighPriority() RequestOption {
return func(req *request) {
return func(req *WorkerRequest) {
req.priority = reqPriorityHigh
}
}

// WithBlacklist is the request option not to assign the request to the blacklisted
// stream ID.
func WithBlacklist(blacklist []sttypes.StreamID) RequestOption {
return func(req *request) {
return func(req *WorkerRequest) {
for _, stid := range blacklist {
req.addBlacklistedStream(stid)
}
Expand All @@ -31,7 +31,7 @@ func WithBlacklist(blacklist []sttypes.StreamID) RequestOption {
// given stream IDs.
// If a request is not with this option, all streams will be allowed.
func WithWhitelist(whitelist []sttypes.StreamID) RequestOption {
return func(req *request) {
return func(req *WorkerRequest) {
for _, stid := range whitelist {
req.addWhiteListStream(stid)
}
Expand Down
Loading

0 comments on commit bcf05fc

Please sign in to comment.