From be411d69425017622502db85f7288c173b87af47 Mon Sep 17 00:00:00 2001 From: Francesco Torchia Date: Sun, 2 Feb 2025 02:03:31 +0100 Subject: [PATCH] Add call to tm1637 from controller --- .github/workflows/publish-controller.yaml | 1 + controller/go.mod | 6 +- controller/go.sum | 2 + controller/models/models.go | 5 +- controller/rpc_client.go | 116 +++++----------------- 5 files changed, 33 insertions(+), 97 deletions(-) diff --git a/.github/workflows/publish-controller.yaml b/.github/workflows/publish-controller.yaml index 3f2400b..3d94837 100644 --- a/.github/workflows/publish-controller.yaml +++ b/.github/workflows/publish-controller.yaml @@ -8,6 +8,7 @@ on: - main paths: - 'controller/**' + - 'rpc_client/**' - 'scripts/**' - '.github/**' diff --git a/controller/go.mod b/controller/go.mod index f732892..dd382a9 100644 --- a/controller/go.mod +++ b/controller/go.mod @@ -1,17 +1,18 @@ module github.com/torchiaf/Sensors/controller -go 1.23.0 +go 1.23.1 toolchain go1.23.4 require ( github.com/fatih/structs v1.1.0 github.com/itchyny/gojq v0.12.17 - github.com/rabbitmq/amqp091-go v1.10.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/client-go v0.32.0 ) +require github.com/rabbitmq/amqp091-go v1.10.0 // indirect + require ( github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -22,6 +23,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/torchiaf/Sensors/rpc_client v0.0.0-20250202005215-c296b8b6cb2b github.com/x448/float16 v0.8.4 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/text v0.19.0 // indirect diff --git a/controller/go.sum b/controller/go.sum index f892023..8508310 100644 --- a/controller/go.sum +++ b/controller/go.sum @@ -47,6 +47,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/torchiaf/Sensors/rpc_client v0.0.0-20250202005215-c296b8b6cb2b h1:Gov23VN7l20GFoGg6CCmHbI/90mcUjiZKMizWjRbxTI= +github.com/torchiaf/Sensors/rpc_client v0.0.0-20250202005215-c296b8b6cb2b/go.mod h1:/wPjrnUN+OPaRpEyoe4YHUTSfXNBVsXGBTLowDX96AM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/controller/models/models.go b/controller/models/models.go index 36ab01d..4284542 100644 --- a/controller/models/models.go +++ b/controller/models/models.go @@ -33,6 +33,7 @@ type Config struct { } type Message struct { - Device string `json:"device"` - // Args map[string]interface{} `json:"args"` + Device string `json:"device"` + Action string `json:"action"` + Args []string `json:"args"` } diff --git a/controller/rpc_client.go b/controller/rpc_client.go index 215bc7f..056ccbc 100644 --- a/controller/rpc_client.go +++ b/controller/rpc_client.go @@ -2,15 +2,11 @@ package main import ( "context" - "fmt" "log" - "math/rand" "time" - amqp "github.com/rabbitmq/amqp091-go" "github.com/torchiaf/Sensors/controller/config" - "github.com/torchiaf/Sensors/controller/models" - "github.com/torchiaf/Sensors/controller/utils" + "github.com/torchiaf/Sensors/rpc_client" ) func failOnError(err error, msg string) { @@ -19,103 +15,37 @@ func failOnError(err error, msg string) { } } -func randomString(l int) string { - bytes := make([]byte, l) - for i := 0; i < l; i++ { - bytes[i] = byte(randInt(65, 90)) - } - return string(bytes) -} - -func randInt(min int, max int) int { - return min + rand.Intn(max-min) -} - -func exec(routingKey string, message models.Message) (res string, err error) { - - address := fmt.Sprintf("amqp://%s:%s@%s:%s/", config.Config.RabbitMQ.Username, config.Config.RabbitMQ.Password, config.Config.RabbitMQ.Host, config.Config.RabbitMQ.Port) - - conn, err := amqp.Dial(address) - failOnError(err, "Failed to connect to RabbitMQ") - defer conn.Close() - - ch, err := conn.Channel() - failOnError(err, "Failed to open a channel") - defer ch.Close() - - q, err := ch.QueueDeclare( - "", // name - false, // durable - false, // delete when unused - true, // exclusive - false, // noWait - nil, // arguments - ) - failOnError(err, "Failed to declare a queue") - - msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - failOnError(err, "Failed to register a consumer") - - corrId := randomString(32) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - log.Printf("Msg: %s:", utils.ToString(message)) - - err = ch.PublishWithContext(ctx, - "", // exchange - routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "text/plain", - CorrelationId: corrId, - ReplyTo: q.Name, - Body: []byte(utils.ToString(message)), - }) - failOnError(err, "Failed to publish a message") - - for d := range msgs { - if corrId == d.CorrelationId { - res = string(d.Body) - failOnError(err, "Error msgs") - break - } - } - - return -} - func main() { - rand.Seed(time.Now().UTC().UnixNano()) - log.Printf("Config %+v", config.Config) + client := rpc_client.New(context.Background()) + for { for _, module := range config.Config.Modules { log.Printf(" [x] Requesting on {%s, %s, %s}", module.Name, module.Type, module.RoutingKey) - res, err := exec( + res, err := client.Read( module.RoutingKey, - models.Message{ - Device: "dht11", - // Args: map[string]interface{}{ - // "foo": "bar", - // }, - }, + "dht11", + []string{}, ) - failOnError(err, "Failed to handle RPC request") - - log.Printf(" [%s] Got %+v", module.Name, res) + failOnError(err, "Failed to handle RPC request: dht11") + + log.Printf(" [%s] [%s] Got %+v", module.Name, "dht11", res) + + if module.Name == "raspberrypi-1" { + res1, err := client.Read( + module.RoutingKey, + "tm1637", + []string{ + "temperature", + "12", + }, + ) + failOnError(err, "Failed to handle RPC request: tm1637") + + log.Printf(" [%s] [%s] Got %+v", module.Name, "tm1637", res1) + } } time.Sleep(time.Second)