Skip to content

Commit

Permalink
Updates (#8)
Browse files Browse the repository at this point in the history
* removed deprecated import

* make protoc

* rename build target to 'pmq'

* pubsub trace

* gossip simulation test

* update gitignore

* rm unused cmd

* lint

* race

* small fixes to bls example

* Squashed docs branch

* update docs

* update dockerfile
  • Loading branch information
amirylm authored Jul 5, 2024
1 parent 13b84f5 commit 72aefdc
Show file tree
Hide file tree
Showing 27 changed files with 1,471 additions and 153 deletions.
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ go.work.sum
bin

cover.out
cover.html
cover.html

.output

.DS_Store
*.log
18 changes: 10 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ RUN apt-get update && \
&& rm -rf /var/lib/apt/lists/*

ARG APP_VERSION=nightly
ARG APP_NAME=p2pmq
ARG BUILD_TARGET=p2pmq
ARG APP_NAME=pmq
ARG BUILD_TARGET=pmq

WORKDIR /p2pmq
WORKDIR /pmq

COPY go.mod go.sum ./
RUN go mod download
Expand All @@ -23,12 +23,14 @@ RUN GOOS=linux CGO_ENABLED=0 go build -tags netgo -a -v -o ./bin/${BUILD_TARGET}

FROM alpine:latest as runner

ARG BUILD_TARGET=p2pmq
ARG BUILD_TARGET=pmq

RUN apk --no-cache --upgrade add ca-certificates bash

WORKDIR /p2pmq
WORKDIR /pmq

COPY --from=builder /p2pmq/.env* ./
COPY --from=builder /p2pmq/resources/config/*.p2pmq.yaml ./
COPY --from=builder /p2pmq/bin/${BUILD_TARGET} ./app
COPY --from=builder /pmq/.env* ./
COPY --from=builder /pmq/resources/config/*.pmq.yaml ./
COPY --from=builder /pmq/bin/${BUILD_TARGET} ./app

CMD ["./app"]
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
APP_NAME?=p2pmq
APP_NAME?=pmq
BUILD_TARGET?=${APP_NAME}
BUILD_IMG?=${APP_NAME}
APP_VERSION?=$(git describe --tags $(git rev-list --tags --max-count=1) 2> /dev/null || echo "nightly")
CFG_PATH?=./resources/config/default.p2pmq.yaml
TEST_PKG?=./core/...
TEST_TIMEOUT?=2m
GOSSIP_OUT_DIR=../.output

protoc:
./scripts/proto-gen.sh

lint:
@docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.54 golangci-lint run -v --timeout=5m ./...
Expand Down Expand Up @@ -46,3 +50,9 @@ docker-run-default:

docker-run-boot:
@docker run -d --restart unless-stopped --name "${APP_NAME}" -p "${TCP_PORT}":"${TCP_PORT}" -p "${GRPC_PORT}":"${GRPC_PORT}" -e "GRPC_PORT=${GRPC_PORT}" -it "${BUILD_IMG}" /p2pmq/app -config=./bootstrapper.p2pmq.yaml

gossip-sim:
@mkdir -p "${GOSSIP_OUT_DIR}" \
&& export GOSSIP_SIMULATION=full \
&& export GOSSIP_OUT_DIR="${GOSSIP_OUT_DIR}" \
&& go test -v -timeout 10m ./core -run TestGossipSimulation
28 changes: 18 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
# Decentralized Message Engine
# PMQ

Decentralized messaging engine that facilitates the secure exchange of verifiable messages across networks, enabling the formation of a global, collaborative network.

<br />

**NOTE: This is an experimental work in progress. DO NOT USE**
**WARNING: This is an experimental work in progress, DO NOT USE in production**

<br />

[![API Reference](
https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/68747470733a2f2f676f646f632e6f72672f6769746875622e636f6d2f676f6c616e672f6764646f3f7374617475732e737667
)](https://pkg.go.dev/github.com/amirylm/p2pmq?tab=doc)
![Go version](https://img.shields.io/badge/go-1.21-blue.svg)
![Github Actions](https://github.com/amirylm/p2pmq/actions/workflows/lint.yml/badge.svg?branch=main)
![Github Actions](https://github.com/amirylm/p2pmq/actions/workflows/test.yml/badge.svg?branch=main)

## Documentation

## Overview
You can find documentation in [./resources/docs](./resources/docs).

**DME** is a distributed, permissionless messaging engine for cross oracle communication.
## Usage

A network of agents is capable of the following:
- Broadcast messages over topics with optimal latency
- Pluggable and decoupled message validation using gRPC
- Scoring for protection from bad actors
- Syncing peers with the latest messages to recover from
restarts, network partition, etc.
Usage examples are available in the [examples](./examples) folder.
16 changes: 1 addition & 15 deletions cmd/p2pmq/main.go → cmd/pmq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func main() {
app := &cli.App{
Name: "p2pmq",
Name: "pmq",
Flags: []cli.Flag{
cli.IntFlag{
Name: "grpc-port",
Expand Down Expand Up @@ -92,20 +92,6 @@ func main() {
ctrl.Start(ctx)
defer ctrl.Close()

// <-time.After(time.Second * 10)

// if cfg.Pubsub != nil {
// if err := ctrl.Subscribe(ctx, "test-1"); err != nil {
// lggr.Errorw("could not subscribe to topic", "topic", "test-1", "err", err)
// }
// for i := 0; i < 10; i++ {
// <-time.After(time.Second * 5)
// if err := ctrl.Publish(ctx, "test-1", []byte(fmt.Sprintf("test-data-%d-%s", i, ctrl.ID()))); err != nil {
// lggr.Errorw("could not subscribe to topic", "topic", "test-1", "err", err)
// }
// }
// }

return grpcapi.ListenGrpc(srv, c.Int("grpc-port"))

},
Expand Down
5 changes: 0 additions & 5 deletions cmd/pqclient/main.go

This file was deleted.

8 changes: 7 additions & 1 deletion commons/config_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type PubsubConfig struct {
Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"`
MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"`
MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"`
Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"`
Trace *PubsubTraceConfig `json:"trace,omitempty" yaml:"trace,omitempty"`
}

func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) {
Expand All @@ -30,6 +30,12 @@ func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) {
return TopicConfig{}, false
}

type PubsubTraceConfig struct {
Skiplist []string `json:"skiplist,omitempty" yaml:"skiplist,omitempty"`
JsonFile string `json:"jsonFile,omitempty" yaml:"jsonFile,omitempty"`
Debug bool `json:"debug,omitempty" yaml:"debug,omitempty"`
}

type MsgIDFnConfig struct {
Type string `json:"type,omitempty" yaml:"type,omitempty"`
Size int `json:"size,omitempty" yaml:"size,omitempty"`
Expand Down
54 changes: 40 additions & 14 deletions core/ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"sync/atomic"

"github.com/amirylm/p2pmq/commons"
"github.com/amirylm/p2pmq/commons/utils"
Expand All @@ -12,6 +13,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
Expand All @@ -33,9 +36,11 @@ type Controller struct {
mdnsSvc mdns.Service
pubsub *pubsub.PubSub

topicManager *topicManager
denylist pubsub.Blacklist
subFilter pubsub.SubscriptionFilter
topicManager *topicManager
denylist pubsub.Blacklist
subFilter pubsub.SubscriptionFilter
psTracer *psTracer
pubsubRpcCounter *atomic.Uint64

valRouter MsgRouter[pubsub.ValidationResult]
msgRouter MsgRouter[error]
Expand All @@ -46,15 +51,16 @@ func NewController(
cfg commons.Config,
msgRouter MsgRouter[error],
valRouter MsgRouter[pubsub.ValidationResult],
lggrNS string,
name string,
) (*Controller, error) {
d := &Controller{
threadControl: utils.NewThreadControl(),
lggr: lggr.Named(lggrNS).Named("controller"),
cfg: cfg,
valRouter: valRouter,
msgRouter: msgRouter,
topicManager: newTopicManager(),
threadControl: utils.NewThreadControl(),
lggr: lggr.Named(name).Named("ctrl"),
cfg: cfg,
valRouter: valRouter,
msgRouter: msgRouter,
topicManager: newTopicManager(),
pubsubRpcCounter: new(atomic.Uint64),
}
err := d.setup(ctx, cfg)

Expand All @@ -65,6 +71,21 @@ func (c *Controller) ID() string {
return c.host.ID().String()
}

func (c *Controller) Connect(ctx context.Context, dest *Controller) error {
ai := peer.AddrInfo{
ID: dest.host.ID(),
Addrs: dest.host.Addrs(),
}
switch c.host.Network().Connectedness(ai.ID) {
case libp2pnetwork.Connected:
return nil
case libp2pnetwork.CannotConnect:
return fmt.Errorf("cannot connect to %s", ai.ID)
default:
}
return c.host.Connect(ctx, ai)
}

func (c *Controller) RefreshRouters(msgHandler func(*MsgWrapper[error]), valHandler func(*MsgWrapper[pubsub.ValidationResult])) {
if c.valRouter != nil {
c.valRouter.RefreshHandler(valHandler)
Expand All @@ -78,7 +99,7 @@ func (c *Controller) RefreshRouters(msgHandler func(*MsgWrapper[error]), valHand

func (c *Controller) Start(ctx context.Context) {
c.StartOnce(func() {
// d.lggr.Debugf("starting controller with host %s", d.host.ID())
c.lggr.Debugf("starting ctrl")

if c.msgRouter != nil {
c.threadControl.Go(c.msgRouter.Start)
Expand All @@ -98,7 +119,7 @@ func (c *Controller) Start(ctx context.Context) {
c.connect(b)
}
if err := c.dht.Bootstrap(ctx); err != nil {
c.lggr.Panicf("failed to start discovery: %w", err)
c.lggr.Panicf("failed to start dht: %w", err)
}
}
if c.mdnsSvc != nil {
Expand All @@ -111,7 +132,8 @@ func (c *Controller) Start(ctx context.Context) {

func (c *Controller) Close() {
c.StopOnce(func() {
c.lggr.Debugf("closing controller with host %s", c.host.ID())
h := c.host.ID()
c.lggr.Debugf("closing controller with host %s", h)
c.threadControl.Close()
if c.dht != nil {
if err := c.dht.Close(); err != nil {
Expand All @@ -126,6 +148,7 @@ func (c *Controller) Close() {
if err := c.host.Close(); err != nil {
c.lggr.Errorf("failed to close host: %w", err)
}
c.lggr.Debugf("closed controller with host %s", h)
})
}

Expand Down Expand Up @@ -194,7 +217,8 @@ func (c *Controller) setup(ctx context.Context, cfg commons.Config) (err error)
return err
}
c.host = h
c.lggr.Infow("created libp2p host", "peerID", h.ID(), "addrs", h.Addrs())
c.lggr = c.lggr.With("peerID", h.ID())
c.lggr.Debugw("created libp2p host", "addrs", h.Addrs())

if len(cfg.MdnsTag) > 0 {
c.setupMdnsDiscovery(ctx, h, cfg.MdnsTag)
Expand All @@ -207,5 +231,7 @@ func (c *Controller) setup(ctx context.Context, cfg commons.Config) (err error)
}
}

c.lggr.Infow("ctrl setup done", "addrs", h.Addrs())

return nil
}
3 changes: 1 addition & 2 deletions core/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (

"github.com/amirylm/p2pmq/commons"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

func (c *Controller) dhtRoutingFactory(ctx context.Context, opts ...dhtopts.Option) func(host.Host) (routing.PeerRouting, error) {
func (c *Controller) dhtRoutingFactory(ctx context.Context, opts ...dht.Option) func(host.Host) (routing.PeerRouting, error) {
return func(h host.Host) (routing.PeerRouting, error) {
dhtInst, err := dht.New(ctx, h, opts...)
if err != nil {
Expand Down
Loading

0 comments on commit 72aefdc

Please sign in to comment.