Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mcamou committed Feb 4, 2025
1 parent 46ebc60 commit 5c94dba
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 11 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ RUN useradd -m -s /bin/bash masa && mkdir -p /home/masa && chown -R masa:masa /h
USER masa

WORKDIR /home/masa
ENV DATA_DIR=/home/masa

# Expose necessary ports
EXPOSE 8080

# Set default command to start the Go application
CMD ego run /usr/bin/masa-tee-worker
CMD ego run /usr/bin/masa-tee-worker
33 changes: 33 additions & 0 deletions cmd/tee-worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/joho/godotenv"
"github.com/masa-finance/tee-worker/api/types"
"github.com/sirupsen/logrus"
)

func readConfig() types.JobConfiguration {
Expand Down Expand Up @@ -48,9 +50,40 @@ func readConfig() types.JobConfiguration {
jc["twitter_accounts"] = twitterAccounts
}

jc["stats_buf_size"] = statsBufSize()

logLevel := os.Getenv("LOG_LEVEL")
switch strings.ToLower(logLevel) {
case "debug":
logrus.SetLevel(logrus.DebugLevel)
case "info":
logrus.SetLevel(logrus.InfoLevel)
case "warn":
logrus.SetLevel(logrus.WarnLevel)
case "error":
logrus.SetLevel(logrus.ErrorLevel)
default:
logrus.SetLevel(logrus.InfoLevel)
}

return jc
}

// statsBufSize returns the size of the stats channel buffer
func statsBufSize() uint {
bufSizeStr := os.Getenv("STATS_BUF_SIZE")
if bufSizeStr == "" {
bufSizeStr = "128"
}

bufSize, err := strconv.Atoi(bufSizeStr)
if err != nil {
logrus.Errorf("Error parsing STATS_BUF_SIZE: %s. Setting to default.", err)
bufSize = 128
}
return uint(bufSize)
}

func listenAddress() string {
listenAddress := os.Getenv("LISTEN_ADDRESS")
if listenAddress == "" {
Expand Down
9 changes: 5 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
services:
masa-tee-worker:
# network_mode: "host"
image: masaengineering/tee-worker:main
network_mode: "host"
# image: masaengineering/tee-worker:main
# Uncomment to build from source
# build: .
build: .
ports:
- "8080:8080"
environment:
LISTEN_ADDRESS: ":8080"
# uncomment if not running with Intel SGX HW
# OE_SIMULATION: "1"
LOG_LEVEL: "info"
restart: always
# uncomment if running with Intel SGX
# devices:
# - /dev/sgx_enclave
# - /dev/sgx_provision
# - /dev/sgx_provision
14 changes: 10 additions & 4 deletions internal/jobs/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"encoding/json"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -51,6 +52,7 @@ type stats struct {
LastOperationUnix int64 `json:"last_operation_time"`
CurrentTimeUnix int64 `json:"current_time"`
Stats map[statType]uint `json:"stats"`
sync.Mutex
}

// StatsCollector is the object used to collect statistics
Expand All @@ -60,7 +62,7 @@ type StatsCollector struct {
}

// StartCollector starts a goroutine that listens to a channel for AddStat messages and updates the stats accordingly.
func StartCollector() *StatsCollector {
func StartCollector(bufSize uint) *StatsCollector {
logrus.Info("Starting stats collector")

s := stats{
Expand All @@ -71,17 +73,19 @@ func StartCollector() *StatsCollector {
s.Stats[t] = 0
}

ch := make(chan AddStat, 128)
ch := make(chan AddStat, bufSize)

go func(s *stats, ch chan AddStat) {
for {
stat := <-ch
s.Lock()
s.LastOperationUnix = time.Now().Unix()
if _, ok := s.Stats[stat.Type]; ok {
s.Stats[stat.Type] += stat.Num
} else {
s.Stats[stat.Type] = stat.Num
}
s.Unlock()
logrus.Debugf("Added %d to stat %s. Current stats: %#v", stat.Num, stat.Type, s)
}
}(&s, ch)
Expand All @@ -90,12 +94,14 @@ func StartCollector() *StatsCollector {
}

// Json returns the current statistics as a JSON byte array
func (s StatsCollector) Json() ([]byte, error) {
func (s *StatsCollector) Json() ([]byte, error) {
s.Stats.Lock()
defer s.Stats.Unlock()
s.Stats.CurrentTimeUnix = time.Now().Unix()
return json.Marshal(s.Stats)
}

// Add is a convenience method to add a number to a statistic
func (s StatsCollector) Add(typ statType, num uint) {
func (s *StatsCollector) Add(typ statType, num uint) {
s.Chan <- AddStat{Type: typ, Num: num}
}
4 changes: 3 additions & 1 deletion internal/jobs/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

const TelemetryJobType = "telemetry"

// A TelemetryJob connects to a StatsCollector, and receives requests to return the current stats
type TelemetryJob struct {
collector *stats.StatsCollector
}
Expand All @@ -20,6 +19,9 @@ func NewTelemetryJob(jc types.JobConfiguration, c *stats.StatsCollector) Telemet
func (t TelemetryJob) ExecuteJob(j types.Job) (types.JobResult, error) {
logrus.Debug("Executing telemetry job")

if t.collector == nil {
return types.JobResult{Error: "No StatsCollector configured", Job: j}, nil
}
data, err := t.collector.Json()
if err != nil {
return types.JobResult{
Expand Down
6 changes: 5 additions & 1 deletion internal/jobserver/jobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func NewJobServer(workers int, jc types.JobConfiguration) *JobServer {
workers = 1
}

s := stats.StartCollector()
bufSize, ok := jc["stats_buf_size"].(uint)
if !ok {
bufSize = 128
}
s := stats.StartCollector(bufSize)

jobworkers := map[string]*jobWorkerEntry{
jobs.WebScraperType: {
Expand Down

0 comments on commit 5c94dba

Please sign in to comment.