Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1beta1 mapper example #112

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions mappers/kubeedge-v1.15.0/virtualdevice/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM golang:1.17-alpine AS builder

WORKDIR /build

ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct

COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -o main cmd/main.go


FROM ubuntu:16.04

RUN mkdir -p kubeedge

COPY --from=builder /build/main kubeedge/
COPY ./config.yaml kubeedge/

WORKDIR kubeedge
34 changes: 34 additions & 0 deletions mappers/kubeedge-v1.15.0/virtualdevice/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
SHELL := /bin/bash

curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST)))))
rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS))
$(eval $(rest_args):;@:)

help:
#
# Usage:
# make generate : generate a mapper based on a template.
# make mapper {mapper-name} <action> <parameter>: execute mapper building process.
#
# Actions:
# - mod, m : download code dependencies.
# - lint, l : verify code via go fmt and `golangci-lint`.
# - build, b : compile code.
# - package, p : package docker image.
# - clean, c : clean output binary.
#
# Parameters:
# ARM : true or undefined
# ARM64 : true or undefined
#
# Example:
# - make mapper modbus ARM64=true : execute `build` "modbus" mapper for ARM64.
# - make mapper modbus test : execute `test` "modbus" mapper.
@echo

make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g')
$(make_rules):
@$(curr_dir)/hack/make-rules/[email protected] $(rest_args)

.DEFAULT_GOAL := help
.PHONY: $(make_rules) build test package
67 changes: 67 additions & 0 deletions mappers/kubeedge-v1.15.0/virtualdevice/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"errors"
"os"

"k8s.io/klog/v2"

"github.com/kubeedge/virtualdevice/device"
"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/config"
"github.com/kubeedge/virtualdevice/pkg/grpcclient"
"github.com/kubeedge/virtualdevice/pkg/grpcserver"
"github.com/kubeedge/virtualdevice/pkg/httpserver"
"github.com/kubeedge/virtualdevice/pkg/util/parse"
)

func main() {
var err error
var c config.Config

klog.InitFlags(nil)
defer klog.Flush()

if err = c.Parse(); err != nil {
klog.Fatal(err)
os.Exit(1)
}
klog.Infof("config: %+v", c)

grpcclient.Init(&c)

// start grpc server
grpcServer := grpcserver.NewServer(
grpcserver.Config{
SockPath: c.GrpcServer.SocketPath,
Protocol: common.ProtocolCustomized,
},
device.NewDevPanel(),
)

panel := device.NewDevPanel()
err = panel.DevInit(&c)
if err != nil && !errors.Is(err, parse.ErrEmptyData) {
klog.Fatal(err)
}
klog.Infoln("devInit finished")

// register to edgecore
// if dev init mode is register, mapper's dev will init when registry to edgecore
if c.DevInit.Mode != common.DevInitModeRegister {
klog.Infoln("======dev init mode is not register, will register to edgecore")
if _, _, err = grpcclient.RegisterMapper(false); err != nil {
klog.Fatal(err)
}
klog.Infoln("registerMapper finished")
}
go panel.DevStart()

httpServer := httpserver.NewRestServer(panel)
go httpServer.StartServer()

defer grpcServer.Stop()
if err = grpcServer.Start(); err != nil {
klog.Fatal(err)
}
}
12 changes: 12 additions & 0 deletions mappers/kubeedge-v1.15.0/virtualdevice/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
grpc_server:
socket_path: /etc/kubeedge/virtualdevice.sock
common:
name: Virtualdevice-mapper
version: v1.13.0
api_version: v1.0.0
protocol: virtualProtocol # replace by your protocol name
address: 127.0.0.1
edgecore_sock: /etc/kubeedge/dmi.sock
dev_init:
mode: register

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package influxdb2

import (
"context"
"encoding/json"
"os"
"time"

"k8s.io/klog/v2"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/kubeedge/virtualdevice/pkg/common"
)

type DataBaseConfig struct {
Influxdb2ClientConfig *Influxdb2ClientConfig `json:"influxdb2ClientConfig,omitempty"`
Influxdb2DataConfig *Influxdb2DataConfig `json:"influxdb2DataConfig,omitempty"`
}

type Influxdb2ClientConfig struct {
Url string `json:"url,omitempty"`
Org string `json:"org,omitempty"`
Bucket string `json:"bucket,omitempty"`
}

type Influxdb2DataConfig struct {
Measurement string `json:"measurement,omitempty"`
Tag map[string]string `json:"tag,omitempty"`
FieldKey string `json:"fieldKey,omitempty"`
}

func NewDataBaseClient(clientConfig json.RawMessage, dataConfig json.RawMessage) (*DataBaseConfig, error) {
// parse influx database config data
influxdb2ClientConfig := new(Influxdb2ClientConfig)
influxdb2DataConfig := new(Influxdb2DataConfig)
err := json.Unmarshal(clientConfig, influxdb2ClientConfig)
if err != nil {
return nil, err
}
err = json.Unmarshal(dataConfig, influxdb2DataConfig)
if err != nil {
return nil, err
}
return &DataBaseConfig{
Influxdb2ClientConfig: influxdb2ClientConfig,
Influxdb2DataConfig: influxdb2DataConfig,
}, nil
}

func (d *DataBaseConfig) InitDbClient() influxdb2.Client {
var usrtoken string
usrtoken = os.Getenv("TOKEN")
client := influxdb2.NewClient(d.Influxdb2ClientConfig.Url, usrtoken)

return client
}

func (d *DataBaseConfig) CloseSession(client influxdb2.Client) {
client.Close()
}

func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error {
// write device data to influx database
writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket)
p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement,
d.Influxdb2DataConfig.Tag,
map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value},
time.Now())
// write point immediately
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
klog.V(4).Info("Exit AddData")
return err
}
return nil
}
73 changes: 73 additions & 0 deletions mappers/kubeedge-v1.15.0/virtualdevice/data/publish/http/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package http

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

"k8s.io/klog/v2"

"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/global"
)

type PushMethod struct {
HTTP *HTTPConfig `json:"http"`
}

type HTTPConfig struct {
HostName string `json:"hostName,omitempty"`
Port int `json:"port,omitempty"`
RequestPath string `json:"requestPath,omitempty"`
Timeout int `json:"timeout,omitempty"`
}

func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
httpConfig := new(HTTPConfig)
err := json.Unmarshal(config, httpConfig)
if err != nil {
return nil, err
}
return &PushMethod{
HTTP: httpConfig,
}, nil
}

func (pm *PushMethod) InitPushMethod() error {
klog.V(1).Info("Init HTTP")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(2).Info("Publish device data by HTTP")

targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath
payload := data.PropertyName + "=" + data.Value
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
currentTime := "&time" + "=" + formatTimeStr
payload += currentTime

klog.V(3).Infof("Publish %v to %s", payload, targetUrl)

resp, err := http.Post(targetUrl,
"application/x-www-form-urlencoded",
strings.NewReader(payload))

if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
klog.Errorf("Publish device data by HTTP failed, err = %v", err)
return
}
klog.V(1).Info("############### Message published. ###############")
klog.V(3).Infof("HTTP reviced %s", string(body))

}
63 changes: 63 additions & 0 deletions mappers/kubeedge-v1.15.0/virtualdevice/data/publish/mqtt/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package mqtt

import (
"encoding/json"
"fmt"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"k8s.io/klog/v2"

"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/global"
)

type PushMethod struct {
MQTT *MQTTConfig `json:"http"`
}

type MQTTConfig struct {
Address string `json:"address,omitempty"`
Topic string `json:"topic,omitempty"`
QoS int `json:"qos,omitempty"`
Retained bool `json:"retained,omitempty"`
}

func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
mqttConfig := new(MQTTConfig)
err := json.Unmarshal(config, mqttConfig)
if err != nil {
return nil, err
}
return &PushMethod{
MQTT: mqttConfig,
}, nil
}

func (pm *PushMethod) InitPushMethod() error {
klog.V(1).Info("Init MQTT")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(1).Infof("Publish %v to %s on topic: %s, Qos: %d, Retained: %v",
data.Value, pm.MQTT.Address, pm.MQTT.Topic, pm.MQTT.QoS, pm.MQTT.Retained)

opts := mqtt.NewClientOptions().AddBroker(pm.MQTT.Address)
client := mqtt.NewClient(opts)

if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
str_time := "time is " + formatTimeStr + " "
str_publish := str_time + pm.MQTT.Topic + ": " + data.Value

token := client.Publish(pm.MQTT.Topic, byte(pm.MQTT.QoS), pm.MQTT.Retained, str_publish)
token.Wait()

client.Disconnect(250)
klog.V(2).Info("############### Message published. ###############")
}
Loading
Loading