Skip to content

Commit

Permalink
resolve issues and enable local net test (grpc)
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 27, 2023
1 parent 3cec6d8 commit d48a061
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 55 deletions.
94 changes: 47 additions & 47 deletions api/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import (
"github.com/amirylm/p2pmq/proto"
)

func TestGrpc_Network(t *testing.T) {
// t.Skip()

func TestGrpc_LocalNetwork(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

n := 4
rounds := 5
n := 10
rounds := n

require.NoError(t, logging.SetLogLevelRegex("p2pmq", "debug"))

Expand Down Expand Up @@ -107,12 +105,12 @@ func TestGrpc_Network(t *testing.T) {
}
require.NoError(t, err)
valHitMap[msg.GetTopic()].Add(1)
if len(msg.Data) > 48 {
if len(msg.GetData()) > 48 {
require.NoError(t, valClient.Send(&proto.ValidatedMessage{
Result: proto.ValidationResult_REJECT,
Msg: msg,
}))
} else if len(msg.Data) > 32 {
} else if len(msg.GetData()) > 32 {
require.NoError(t, valClient.Send(&proto.ValidatedMessage{
Result: proto.ValidationResult_IGNORE,
Msg: msg,
Expand Down Expand Up @@ -142,7 +140,7 @@ func TestGrpc_Network(t *testing.T) {
}
require.NoError(t, err)
msgHitMap[msg.GetTopic()].Add(1)
require.LessOrEqualf(t, len(msg.Data), 32, "should see only valid messages: %s", msg.Data)
require.LessOrEqualf(t, len(msg.GetData()), 32, "should see only valid messages: %s", msg.Data)
}
})
}
Expand All @@ -164,7 +162,7 @@ func TestGrpc_Network(t *testing.T) {
wg.Wait()

<-time.After(time.Second * 5) // TODO: avoid timeout
t.Log("Publishing")
t.Log("Publishing valid messages")
for r := 0; r < rounds; r++ {
for i := range grpcServers {
control := proto.NewControlServiceClient(conns[i])
Expand All @@ -185,44 +183,45 @@ func TestGrpc_Network(t *testing.T) {
}
wg.Wait()

// // invalid messages
// for i := range grpcServers {
// control := proto.NewControlServiceClient(conns[i])
// data := []byte(fmt.Sprintf("%d-test-data-%d", rand.Int31n(1e3), i+1))
// for len(data)+1 < 48 {
// data = append(data, []byte(fmt.Sprintf("%d", 1e5+rand.Int31n(1e9)))...)
// }
// req := &proto.PublishRequest{
// Topic: fmt.Sprintf("test-%d", i+1),
// Data: data,
// }
// wg.Add(1)
// go func() {
// defer wg.Done()
// _, _ = control.Publish(ctx, req)
// }()
// }

// // ignored messages
// for i := range grpcServers {
// control := proto.NewControlServiceClient(conns[i])
// data := []byte(fmt.Sprintf("%d-test-data-%d", rand.Int31n(1e3), i+1))
// for len(data)+1 < 32 {
// data = append(data, []byte(fmt.Sprintf("%d", rand.Int31n(1e3)))...)
// }
// req := &proto.PublishRequest{
// Topic: fmt.Sprintf("test-%d", i+1),
// Data: data,
// }
// wg.Add(1)
// go func() {
// defer wg.Done()
// _, _ = control.Publish(ctx, req)
// }()
// }
// wg.Wait()

<-time.After(time.Second * 2) // TODO: avoid timeout
t.Log("Publishing invalid messages")
for i := range grpcServers {
control := proto.NewControlServiceClient(conns[i])
data := []byte(fmt.Sprintf("%d-test-data-%d", rand.Int31n(1e3), i+1))
for len(data)+1 <= 48 {
data = append(data, []byte(fmt.Sprintf("%d", 1e5+rand.Int31n(1e9)))...)
}
req := &proto.PublishRequest{
Topic: fmt.Sprintf("test-%d", i+1),
Data: data,
}
wg.Add(1)
go func() {
defer wg.Done()
_, _ = control.Publish(ctx, req)
}()
}
wg.Wait()

t.Log("Publishing ignored messages")
for i := range grpcServers {
control := proto.NewControlServiceClient(conns[i])
data := []byte(fmt.Sprintf("%d-test-data-%d", rand.Int31n(1e3), i+1))
for len(data)+1 <= 32 {
data = append(data, []byte(fmt.Sprintf("%d", rand.Int31n(1e4)))...)
}
req := &proto.PublishRequest{
Topic: fmt.Sprintf("test-%d", i+1),
Data: data,
}
wg.Add(1)
go func() {
defer wg.Done()
_, _ = control.Publish(ctx, req)
}()
}
wg.Wait()

<-time.After(time.Second * 5) // TODO: avoid timeout

for _, s := range grpcServers {
s.Stop()
Expand All @@ -232,6 +231,7 @@ func TestGrpc_Network(t *testing.T) {
for topic, counter := range msgHitMap {
count := int(counter.Load()) / n // per node
require.GreaterOrEqual(t, count, rounds, "should get at least %d messages on topic %s", rounds, topic)
require.LessOrEqual(t, count, rounds+1, "should get at most %d messages on topic %s", rounds, topic)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/ctrl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestController_Sanity(t *testing.T) {
wg.Add(1)
go func(c *Controller, r, i int) {
defer wg.Done()
require.NoError(t, c.Publish(ctx, fmt.Sprintf("test-%d", i+1), []byte(fmt.Sprintf("round-%d-test-data-%d", r+1, i+1))))
require.NoError(t, c.Publish(ctx, fmt.Sprintf("test-%d", i+1), []byte(fmt.Sprintf("r-%d-test-data-%d", r+1, i+1))))
}(d, r, i)
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *Controller) Publish(ctx context.Context, topicName string, data []byte)
return err
}
// d.lggr.Debugw("publishing on topic", "topic", topicName, "data", string(data))
return topic.Publish(ctx, data, pubsub.WithLocalPublication(false))
return topic.Publish(ctx, data)
}

func (c *Controller) Leave(topicName string) error {
Expand Down Expand Up @@ -145,6 +145,9 @@ func (c *Controller) listenSubscription(ctx context.Context, sub *pubsub.Subscri
if msg == nil {
continue
}
// if msg.ReceivedFrom == c.host.ID() {
// continue
// }
if err := c.msgRouter.Handle(ctx, msg.ReceivedFrom, msg); err != nil {
if ctx.Err() != nil {
return
Expand Down
7 changes: 1 addition & 6 deletions core/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ func SetupTestControllers(ctx context.Context, t *testing.T, n int, routingFn fu
},
},
Pubsub: &commons.PubsubConfig{
Topics: []commons.TopicConfig{
{
Name: "test-1",
MsgValidator: &commons.MsgValidationConfig{},
},
},
MsgValidator: &commons.MsgValidationConfig{},
},
}
msgRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[error]) {
Expand Down

0 comments on commit d48a061

Please sign in to comment.