Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking: Replace Callbacks interface by Callbacks struct (server) #326

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/examples/server/opampsrv/opampsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func NewServer(agents *data.Agents) *Server {
func (srv *Server) Start() {
settings := server.StartSettings{
Settings: server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
Callbacks: types.Callbacks{
OnConnecting: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{
Accept: true,
ConnectionCallbacks: server.ConnectionCallbacksStruct{
OnMessageFunc: srv.onMessage,
OnConnectionCloseFunc: srv.onDisconnect,
ConnectionCallbacks: types.ConnectionCallbacks{
OnMessage: srv.onMessage,
OnConnectionClose: srv.onDisconnect,
},
}
},
Expand Down
63 changes: 0 additions & 63 deletions server/callbacks.go

This file was deleted.

71 changes: 30 additions & 41 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

func (s *server) Attach(settings Settings) (HTTPHandlerFunc, ConnContext, error) {
s.settings = settings
s.settings.Callbacks.SetDefaults()
s.wsUpgrader = websocket.Upgrader{
EnableCompression: settings.EnableCompression,
}
Expand Down Expand Up @@ -169,26 +170,25 @@

func (s *server) httpHandler(w http.ResponseWriter, req *http.Request) {
var connectionCallbacks serverTypes.ConnectionCallbacks
if s.settings.Callbacks != nil {
resp := s.settings.Callbacks.OnConnecting(req)
if !resp.Accept {
// HTTP connection is not accepted. Set the response headers.
for k, v := range resp.HTTPResponseHeader {
w.Header().Set(k, v)
}
// And write the response status code.
w.WriteHeader(resp.HTTPStatusCode)
return
resp := s.settings.Callbacks.OnConnecting(req)
if !resp.Accept {
// HTTP connection is not accepted. Set the response headers.
for k, v := range resp.HTTPResponseHeader {
w.Header().Set(k, v)
}
// use connection-specific handler provided by ConnectionResponse
connectionCallbacks = resp.ConnectionCallbacks
// And write the response status code.
w.WriteHeader(resp.HTTPStatusCode)
return
}
// use connection-specific handler provided by ConnectionResponse
connectionCallbacks = resp.ConnectionCallbacks
connectionCallbacks.SetDefaults()

// HTTP connection is accepted. Check if it is a plain HTTP request.

if req.Header.Get(headerContentType) == contentTypeProtobuf {
// Yes, a plain HTTP request.
s.handlePlainHTTPRequest(req, w, connectionCallbacks)
s.handlePlainHTTPRequest(req, w, &connectionCallbacks)
return
}

Expand All @@ -201,10 +201,10 @@

// Return from this func to reduce memory usage.
// Handle the connection on a separate goroutine.
go s.handleWSConnection(req.Context(), conn, connectionCallbacks)
go s.handleWSConnection(req.Context(), conn, &connectionCallbacks)
}

func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Conn, connectionCallbacks serverTypes.ConnectionCallbacks) {
func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Conn, connectionCallbacks *serverTypes.ConnectionCallbacks) {
agentConn := wsConnection{wsConn: wsConn, connMutex: &sync.Mutex{}}

defer func() {
Expand All @@ -216,14 +216,10 @@
}
}()

if connectionCallbacks != nil {
connectionCallbacks.OnConnectionClose(agentConn)
}
connectionCallbacks.OnConnectionClose(agentConn)
}()

if connectionCallbacks != nil {
connectionCallbacks.OnConnected(reqCtx, agentConn)
}
connectionCallbacks.OnConnected(reqCtx, agentConn)

sentCustomCapabilities := false

Expand Down Expand Up @@ -254,21 +250,19 @@
continue
}

if connectionCallbacks != nil {
response := connectionCallbacks.OnMessage(msgContext, agentConn, &request)
if len(response.InstanceUid) == 0 {
response.InstanceUid = request.InstanceUid
}
if !sentCustomCapabilities {
response.CustomCapabilities = &protobufs.CustomCapabilities{
Capabilities: s.settings.CustomCapabilities,
}
sentCustomCapabilities = true
}
err = agentConn.Send(msgContext, response)
if err != nil {
s.logger.Errorf(msgContext, "Cannot send message to WebSocket: %v", err)
response := connectionCallbacks.OnMessage(msgContext, agentConn, &request)
if len(response.InstanceUid) == 0 {
response.InstanceUid = request.InstanceUid
}

Check warning on line 256 in server/serverimpl.go

View check run for this annotation

Codecov / codecov/patch

server/serverimpl.go#L255-L256

Added lines #L255 - L256 were not covered by tests
if !sentCustomCapabilities {
response.CustomCapabilities = &protobufs.CustomCapabilities{
Capabilities: s.settings.CustomCapabilities,
}
sentCustomCapabilities = true
}
err = agentConn.Send(msgContext, response)
if err != nil {
s.logger.Errorf(msgContext, "Cannot send message to WebSocket: %v", err)

Check warning on line 265 in server/serverimpl.go

View check run for this annotation

Codecov / codecov/patch

server/serverimpl.go#L265

Added line #L265 was not covered by tests
}
}
}
Expand Down Expand Up @@ -310,7 +304,7 @@
return buf.Bytes(), nil
}

func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter, connectionCallbacks serverTypes.ConnectionCallbacks) {
func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter, connectionCallbacks *serverTypes.ConnectionCallbacks) {
bodyBytes, err := s.readReqBody(req)
if err != nil {
s.logger.Debugf(req.Context(), "Cannot read HTTP body: %v", err)
Expand All @@ -331,11 +325,6 @@
conn: connFromRequest(req),
}

if connectionCallbacks == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

connectionCallbacks.OnConnected(req.Context(), agentConn)

defer func() {
Expand Down
Loading
Loading