Skip to content

Commit

Permalink
- Add rpc_client
Browse files Browse the repository at this point in the history
- Adapt rpc_server to rpc_client interface
- Add call to tm1637 from controller
  • Loading branch information
torchiaf committed Feb 2, 2025
1 parent 57c39d4 commit 95e248b
Show file tree
Hide file tree
Showing 22 changed files with 549 additions and 106 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
- main
paths:
- 'controller/**'
- 'rpc_client/**'
- 'scripts/**'
- '.github/**'

Expand Down
1 change: 0 additions & 1 deletion controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ func isDevEnv() bool {
}

func initConfig() models.Config {

modules := utils.ParseYamlFile[[]models.Module]("/sensors/modules.yaml")

c := models.Config{
Expand Down
6 changes: 4 additions & 2 deletions controller/go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
module github.com/torchiaf/Sensors/controller

go 1.23.0
go 1.23.1

toolchain go1.23.4

require (
github.com/fatih/structs v1.1.0
github.com/itchyny/gojq v0.12.17
github.com/rabbitmq/amqp091-go v1.10.0
github.com/torchiaf/Sensors/rpc_client v0.0.0-20250202022913-2a42e8d8bd20
gopkg.in/yaml.v2 v2.4.0
k8s.io/client-go v0.32.0
)

require github.com/rabbitmq/amqp091-go v1.10.0 // indirect

require (
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions controller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/torchiaf/Sensors/rpc_client v0.0.0-20250202022913-2a42e8d8bd20 h1:Ym/PM/TlNRyxyCyJp4u0AIB6zk+oPhrRtgm026niDIc=
github.com/torchiaf/Sensors/rpc_client v0.0.0-20250202022913-2a42e8d8bd20/go.mod h1:/wPjrnUN+OPaRpEyoe4YHUTSfXNBVsXGBTLowDX96AM=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
5 changes: 3 additions & 2 deletions controller/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Config struct {
}

type Message struct {
Device string `json:"device"`
// Args map[string]interface{} `json:"args"`
Device string `json:"device"`
Action string `json:"action"`
Args []string `json:"args"`
}
137 changes: 44 additions & 93 deletions controller/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,71 @@ package main

import (
"context"
"fmt"
"encoding/json"
"log"
"math/rand"
"math"
"strconv"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/torchiaf/Sensors/controller/config"
"github.com/torchiaf/Sensors/controller/models"
"github.com/torchiaf/Sensors/controller/utils"
"github.com/torchiaf/Sensors/rpc_client"
)

type Dht11 struct {
T float64 `json:"t"`
H float64 `json:"h"`
}

func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}

func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}

func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}

func exec(routingKey string, message models.Message) (res string, err error) {

address := fmt.Sprintf("amqp://%s:%s@%s:%s/", config.Config.RabbitMQ.Username, config.Config.RabbitMQ.Password, config.Config.RabbitMQ.Host, config.Config.RabbitMQ.Port)

conn, err := amqp.Dial(address)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

corrId := randomString(32)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

log.Printf("Msg: %s:", utils.ToString(message))

err = ch.PublishWithContext(ctx,
"", // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(utils.ToString(message)),
})
failOnError(err, "Failed to publish a message")

for d := range msgs {
if corrId == d.CorrelationId {
res = string(d.Body)
failOnError(err, "Error msgs")
break
}
}

return
}

func main() {
rand.Seed(time.Now().UTC().UnixNano())

log.Printf("Config %+v", config.Config)

client := rpc_client.New(context.Background())

for {

temperature := ""

for _, module := range config.Config.Modules {
log.Printf(" [x] Requesting on {%s, %s, %s}", module.Name, module.Type, module.RoutingKey)

res, err := exec(
res, err := client.Read(
module.RoutingKey,
models.Message{
Device: "dht11",
// Args: map[string]interface{}{
// "foo": "bar",
// },
},
"dht11",
[]string{},
)
failOnError(err, "Failed to handle RPC request")

log.Printf(" [%s] Got %+v", module.Name, res)
failOnError(err, "Failed to handle RPC request: dht11")

log.Printf(" [%s] [%s] Got %+v", module.Name, "dht11", res)

if module.Name == "raspberrypi-0" {
var obj Dht11
err = json.Unmarshal([]byte(res), &obj)
if err != nil {
log.Fatalf("error: %v", err)
}

temperature = strconv.Itoa(int(math.Round(obj.T)))
}

if module.Name == "raspberrypi-1" {
res1, err := client.Write(
module.RoutingKey,
"tm1637",
[]string{
"temperature",
temperature,
},
)
failOnError(err, "Failed to handle RPC request: tm1637")

log.Printf(" [%s] [%s] Got %+v", module.Name, "tm1637", res1)
}
}

time.Sleep(time.Second)
Expand Down
3 changes: 2 additions & 1 deletion modules/raspberrypi3b/devices/dht11/app/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import json
import Adafruit_DHT
from config import device

Expand All @@ -14,7 +15,7 @@
d['t'] = temperature
d['h'] = humidity

print(d)
print(json.dumps(d))

except:
print("read() error")
4 changes: 3 additions & 1 deletion modules/raspberrypi5/devices/dht11/app/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import json
import board
import adafruit_dht
from config import device, pin
Expand All @@ -15,7 +17,7 @@
d['t'] = dhtDevice.temperature
d['h'] = dhtDevice.humidity

print(d)
print(json.dumps(d))

except Exception as error:
# dhtDevice.exit()
Expand Down
10 changes: 6 additions & 4 deletions modules/raspberrypi5/devices/tm1637/app/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import sys
import tm1637
from time import sleep
from config import print, clk, dio

tm = tm1637.TM1637(clk=clk, dio=dio)

try:
arg = sys.argv[1]
print('Received: {}'.format(arg))
print('Received: {}'.format(sys.argv))

action = sys.argv[1]
fn = getattr(tm, sys.argv[2])
arg = sys.argv[3]

t = int(arg)
tm.temperature(t) # show temperature 't*C'
fn(t) # show temperature 't*C'
except Exception as error:
print(error)
pass
Expand Down
35 changes: 35 additions & 0 deletions rpc_client/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"os"

"github.com/torchiaf/Sensors/rpc_client/models"
"github.com/torchiaf/Sensors/rpc_client/utils"
)

func isDevEnv() bool {
env := os.Getenv("DEV_ENV")

return len(env) > 0
}

func initConfig() models.Config {

modules := utils.ParseYamlFile[[]models.Module]("/sensors/modules.yaml")
modulesMap := utils.Map(modules, func(m models.Module) string { return m.Name })

c := models.Config{
IsDev: isDevEnv(),
RabbitMQ: models.RabbitMQ{
Host: os.Getenv("RABBITMQ_CLUSTER_SERVICE_HOST"),
Port: os.Getenv("RABBITMQ_CLUSTER_SERVICE_PORT_AMQP"),
Username: os.Getenv("RABBITMQ_USERNAME"),
Password: os.Getenv("RABBITMQ_PASSWORD"),
},
Modules: modulesMap,
}

return c
}

var Config = initConfig()
12 changes: 12 additions & 0 deletions rpc_client/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/torchiaf/Sensors/rpc_client

go 1.23.1

require (
github.com/fatih/structs v1.1.0
github.com/itchyny/gojq v0.12.17
github.com/rabbitmq/amqp091-go v1.10.0
gopkg.in/yaml.v2 v2.4.0
)

require github.com/itchyny/timefmt-go v0.1.6 // indirect
14 changes: 14 additions & 0 deletions rpc_client/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg=
github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
39 changes: 39 additions & 0 deletions rpc_client/models/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package models

type RabbitMQ struct {
Host string
Port string
Username string
Password string
}

type Config struct {
IsDev bool
RabbitMQ RabbitMQ
Modules map[string]Module
}

type Module struct {
Name string `yaml:"name"`
NodeName string `yaml:"nodeName"`
Type string `yaml:"type"`
RoutingKey string `yaml:"routingKey"`
Devices []Device
}

type Device struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Config []DeviceConfig
}

type DeviceConfig struct {
Name string `yaml:"name"`
Value string `yaml:"value"`
}

type Message struct {
Device string `json:"device"`
Action string `json:"action"`
Args []string `json:"args"`
}
Loading

0 comments on commit 95e248b

Please sign in to comment.