Skip to content

Commit

Permalink
log broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Sep 7, 2021
1 parent 110ad09 commit 43c0a94
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 116 deletions.
8 changes: 7 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -84,7 +85,12 @@ func serve(c *cli.Context) error {
errChan <- nodeManager.Run(ctx)
}()

go api.Serve(config.API.Addr)
additionalHandler := map[string]map[string]func(http.ResponseWriter, *http.Request){
"GET": {
"/log/": workloadManager.LogHandler(),
},
}
go api.Serve(config.API.Addr, additionalHandler)

select {
case err := <-errChan:
Expand Down
40 changes: 7 additions & 33 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import (
// enable api
_ "net/http/pprof" // nolint

"github.com/projecteru2/agent/manager/workload"
"github.com/projecteru2/agent/types"
"github.com/projecteru2/agent/version"
coreutils "github.com/projecteru2/core/utils"

"github.com/bmizerany/pat"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -42,38 +39,10 @@ func (h *Handler) profile(w http.ResponseWriter, _ *http.Request) {
_ = json.NewEncoder(w).Encode(r)
}

// URL /log/
func (h *Handler) log(w http.ResponseWriter, req *http.Request) {
app := req.URL.Query().Get("app")
if app == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
if workload.LogMonitor == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// fuck httpie
w.WriteHeader(http.StatusOK)
if hijack, ok := w.(http.Hijacker); ok {
conn, buf, err := hijack.Hijack()
if err != nil {
log.Errorf("[apiLog] connect failed %v", err)
return
}
logConsumer := &types.LogConsumer{
ID: coreutils.RandomString(8),
App: app, Conn: conn, Buf: buf,
}
workload.LogMonitor.RegisterLogConsumer(logConsumer)
log.Infof("[apiLog] %s %s log attached", app, logConsumer.ID)
}
}

// Serve start a api service
// blocks by http.ListenAndServe
// run this in a separated goroutine
func Serve(addr string) {
func Serve(addr string, additionalHandlers map[string]map[string]func(http.ResponseWriter, *http.Request)) {
if addr == "" {
return
}
Expand All @@ -84,7 +53,6 @@ func Serve(addr string) {
"GET": {
"/profile/": h.profile,
"/version/": h.version,
"/log/": h.log,
},
}

Expand All @@ -94,6 +62,12 @@ func Serve(addr string) {
}
}

for method, routes := range additionalHandlers {
for route, handler := range routes {
restfulAPIServer.Add(method, route, http.HandlerFunc(handler))
}
}

http.Handle("/", restfulAPIServer)
http.Handle("/metrics", promhttp.Handler())
log.Infof("[apiServe] http api started %s", addr)
Expand Down
4 changes: 2 additions & 2 deletions manager/workload/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func (m *Manager) attach(ctx context.Context, ID string) {
Datetime: time.Now().Format(common.DateTimeFormat),
Extra: extra,
}
if LogMonitor != nil && LogMonitor.LogC != nil {
LogMonitor.LogC <- l
if m.logBroadcaster != nil && m.logBroadcaster.logC != nil {
m.logBroadcaster.logC <- l
}
if err := writer.Write(l); err != nil && !(entryPoint == "agent" && utils.IsDockerized()) {
log.Errorf("[attach] %s workload %s_%s write failed %v", workloadName, entryPoint, coreutils.ShortID(ID), err)
Expand Down
12 changes: 5 additions & 7 deletions manager/workload/attach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ import (
"testing"
"time"

"github.com/projecteru2/agent/types"

"github.com/stretchr/testify/assert"
)

func TestAttach(t *testing.T) {
LogMonitor = &Watcher{LogC: make(chan *types.Log)}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := newMockWorkloadManager(t)
go func() {
for {
log := <-LogMonitor.LogC

log := <-manager.logBroadcaster.logC
// see: runtime.FromTemplate
switch log.Type {
case "stdout":
Expand All @@ -26,7 +25,6 @@ func TestAttach(t *testing.T) {
}
}()

manager := newMockWorkloadManager(t)
manager.attach(context.Background(), "Rei")
manager.attach(ctx, "Rei")
time.Sleep(2 * time.Second)
}
104 changes: 104 additions & 0 deletions manager/workload/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package workload

import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/projecteru2/agent/types"
coreutils "github.com/projecteru2/core/utils"

"github.com/sirupsen/logrus"
)

type subscriber struct {
buf *bufio.ReadWriter
unsubscribe func()
}

// logBroadcaster receives log and broadcasts to subscribers
type logBroadcaster struct {
logC chan *types.Log
subscribers map[string]map[string]*subscriber
}

func newLogBroadcaster() *logBroadcaster {
return &logBroadcaster{
logC: make(chan *types.Log),
subscribers: map[string]map[string]*subscriber{},
}
}

// subscribe subscribes logs of the specific app.
func (l *logBroadcaster) subscribe(app string, buf *bufio.ReadWriter) {
if _, ok := l.subscribers[app]; !ok {
l.subscribers[app] = map[string]*subscriber{}
}

ID := coreutils.RandomString(8)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

l.subscribers[app][ID] = &subscriber{buf, cancel}
logrus.Infof("%s %s log subscribed", app, ID)
<-ctx.Done()

delete(l.subscribers[app], ID)
if len(l.subscribers[app]) == 0 {
delete(l.subscribers, app)
}
}

func (l *logBroadcaster) broadcast(log *types.Log) {
if _, ok := l.subscribers[log.Name]; !ok {
return
}
data, err := json.Marshal(log)
if err != nil {
logrus.Error(err)
return
}
line := fmt.Sprintf("%X\r\n%s\r\n\r\n", len(data)+2, string(data))
for ID, subscriber := range l.subscribers[log.Name] {
if _, err := subscriber.buf.WriteString(line); err != nil {
logrus.Error(err)
logrus.Infof("%s %s detached", log.Name, ID)
subscriber.unsubscribe()
}
subscriber.buf.Flush()
logrus.Debugf("sub %s get %s", ID, line)
}
}

func (l *logBroadcaster) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
logrus.Infof("[logBroadcaster] stops")
return
case log := <-l.logC:
l.broadcast(log)
}
}
}

func (l *logBroadcaster) handler(w http.ResponseWriter, req *http.Request) {
app := req.URL.Query().Get("app")
if app == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
// fuck httpie
w.WriteHeader(http.StatusOK)
if hijack, ok := w.(http.Hijacker); ok {
conn, buf, err := hijack.Hijack()
if err != nil {
logrus.Errorf("[apiLog] connect failed %v", err)
return
}
defer conn.Close()
l.subscribe(app, buf)
}
}
60 changes: 60 additions & 0 deletions manager/workload/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package workload

import (
"bufio"
"context"
"net/http"
"testing"
"time"

"github.com/projecteru2/agent/types"

"github.com/bmizerany/pat"
"github.com/stretchr/testify/assert"
)

func TestLogBroadcaster(t *testing.T) {
l := newLogBroadcaster()

go func() {
restfulAPIServer := pat.New()
restfulAPIServer.Add("GET", "/log/", http.HandlerFunc(l.handler))
http.Handle("/", restfulAPIServer)
http.ListenAndServe(":12310", nil)
}()

go func() {
time.Sleep(3 * time.Second)
l.logC <- &types.Log{
ID: "Rei",
Name: "nerv",
Type: "stdout",
EntryPoint: "eva0",
Ident: "",
Data: "data0",
}
l.logC <- &types.Log{
ID: "Rei",
Name: "nerv",
Type: "stdout",
EntryPoint: "eva0",
Ident: "",
Data: "data1",
}
}()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
go l.run(ctx)

time.Sleep(2 * time.Second)
resp, err := http.Get("http://127.0.0.1:12310/log/?app=nerv")
assert.Nil(t, err)

reader := bufio.NewReader(resp.Body)
for i := 0; i < 2; i++ {
line, err := reader.ReadBytes('\n')
assert.Nil(t, err)
t.Log(string(line))
}
}
15 changes: 12 additions & 3 deletions manager/workload/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workload

import (
"context"
"net/http"

"github.com/projecteru2/agent/common"
"github.com/projecteru2/agent/runtime"
Expand All @@ -25,6 +26,8 @@ type Manager struct {
nodeIP string
forwards *utils.HashBackends

logBroadcaster *logBroadcaster

// storeIdentifier indicates which eru this agent belongs to
// it can be used to identify the corresponding core
// and all containers that belong to this core
Expand Down Expand Up @@ -81,16 +84,17 @@ func NewManager(ctx context.Context, config *types.Config) (*Manager, error) {
return nil, err
}

manager.logBroadcaster = newLogBroadcaster()

return manager, nil
}

// Run will start agent
// blocks by ctx.Done()
// either call this in a separated goroutine, or used in main to block main goroutine
func (m *Manager) Run(ctx context.Context) error {
// start log watcher
InitMonitor()
go LogMonitor.Serve(ctx)
// start log broadcaster
go m.logBroadcaster.run(ctx)

// load container
if err := m.load(ctx); err != nil {
Expand All @@ -113,3 +117,8 @@ func (m *Manager) Run(ctx context.Context) error {
return err
}
}

// LogHandler returns the http handler for /log/
func (m *Manager) LogHandler() func(w http.ResponseWriter, req *http.Request) {
return m.logBroadcaster.handler
}
Loading

0 comments on commit 43c0a94

Please sign in to comment.