Skip to content

Commit

Permalink
Update lint config (#1136)
Browse files Browse the repository at this point in the history
* add new linters and fix schema issues

* fix new lint issues
  • Loading branch information
weeco authored Feb 27, 2024
1 parent 06df03f commit c6ac52d
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 69 deletions.
13 changes: 7 additions & 6 deletions backend/.golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ linters:
- staticcheck
- unused
# Not enabled by default: we want a good chunk
- typecheck
- asasalint
- asciicheck
- bidichk
- bodyclose
- containedctx
- cyclop
- durationcheck
- errname
- errorlint
- exhaustive
- exportloopref
- gci
- gocheckcompilerdirectives
- gocognit
- goconst
- gocritic
Expand All @@ -64,13 +66,16 @@ linters:
- nilerr
- noctx
- nolintlint
- reassign
- revive
- rowserrcheck
- sqlclosecheck
- stylecheck
- tenv
- typecheck
- unconvert
- unparam
- usestdlibvars
- wastedassign
- whitespace
linters-settings:
Expand All @@ -80,7 +85,6 @@ linters-settings:
default-signifies-exhaustive: true
# If we want to opt out of a lint, we require an explanation.
nolintlint:
allow-leading-space: true
allow-unused: false
require-explanation: true
require-specific: true
Expand All @@ -92,7 +96,7 @@ linters-settings:
#
# https://github.com/mvdan/gofumpt/issues/137
gofumpt:
lang-version: "1.21"
lang-version: "1.22"
gosec:
excludes:
- G104 # unhandled errors, we exclude for the same reason we do not use errcheck
Expand All @@ -113,7 +117,6 @@ linters-settings:
gocognit:
min-complexity: 30
gci:
no-prefix-comments: true # no leading comment; we allow inline for nolint
sections:
- standard # stdlib
- default # everything not std, not within project
Expand Down Expand Up @@ -149,8 +152,6 @@ linters-settings:
enable-all-rules: true
severity: warning
confidence: 0.7
error-code: 1
warning-code: 1
rules:
# removed because replacing the version of a proto is easier if we use it
# as alias
Expand Down
61 changes: 39 additions & 22 deletions backend/pkg/api/connect/service/console/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type Service struct {
}

// NewService creates a new Console service handler.
func NewService(logger *zap.Logger,
func NewService(
logger *zap.Logger,
consoleSvc console.Servicer,
authHooks hooks.AuthorizationHooks,
) *Service {
Expand All @@ -52,7 +53,11 @@ func NewService(logger *zap.Logger,
}

// ListMessages consumes a Kafka topic and streams the Kafka records back.
func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alpha.ListMessagesRequest], stream *connect.ServerStream[v1alpha.ListMessagesResponse]) error {
func (api *Service) ListMessages(
ctx context.Context,
req *connect.Request[v1alpha.ListMessagesRequest],
stream *connect.ServerStream[v1alpha.ListMessagesResponse],
) error {
lmq := httptypes.ListMessagesRequest{
TopicName: req.Msg.GetTopic(),
StartOffset: req.Msg.GetStartOffset(),
Expand Down Expand Up @@ -143,22 +148,24 @@ func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alp
defer cancel()

progress := &streamProgressReporter{
ctx: ctx,
logger: api.logger,
request: &listReq,
stream: stream,
messagesConsumed: atomic.Int64{},
bytesConsumed: atomic.Int64{},
}
progress.Start()
progress.Start(ctx)

return api.consoleSvc.ListMessages(ctx, listReq, progress)
}

// PublishMessage serialized and produces the records.
//
//nolint:gocognit,cyclop // complicated response logic
func (api *Service) PublishMessage(ctx context.Context, req *connect.Request[v1alpha.PublishMessageRequest]) (*connect.Response[v1alpha.PublishMessageResponse], error) {
func (api *Service) PublishMessage(
ctx context.Context,
req *connect.Request[v1alpha.PublishMessageRequest],
) (*connect.Response[v1alpha.PublishMessageResponse], error) {
msg := req.Msg

canPublish, restErr := api.authHooks.CanPublishTopicRecords(ctx, msg.GetTopic())
Expand All @@ -178,18 +185,22 @@ func (api *Service) PublishMessage(ctx context.Context, req *connect.Request[v1a

recordHeaders := make([]kgo.RecordHeader, 0, len(req.Msg.GetHeaders()))
for _, h := range req.Msg.GetHeaders() {
recordHeaders = append(recordHeaders, kgo.RecordHeader{
Key: h.GetKey(),
Value: h.GetValue(),
})
recordHeaders = append(
recordHeaders, kgo.RecordHeader{
Key: h.GetKey(),
Value: h.GetValue(),
},
)
}

keyInput := rpcPublishMessagePayloadOptionsToSerializeInput(msg.GetKey())
valueInput := rpcPublishMessagePayloadOptionsToSerializeInput(msg.GetValue())
compression := rpcCompressionTypeToKgoCodec(msg.GetCompression())

prRes, prErr := api.consoleSvc.PublishRecord(ctx, msg.GetTopic(), msg.GetPartitionId(), recordHeaders,
keyInput, valueInput, req.Msg.GetUseTransactions(), compression)
prRes, prErr := api.consoleSvc.PublishRecord(
ctx, msg.GetTopic(), msg.GetPartitionId(), recordHeaders,
keyInput, valueInput, req.Msg.GetUseTransactions(), compression,
)

if prErr == nil && prRes != nil && prRes.Error != "" {
prErr = errors.New(prRes.Error)
Expand All @@ -205,9 +216,11 @@ func (api *Service) PublishMessage(ctx context.Context, req *connect.Request[v1a
code = connect.CodeInvalidArgument

for _, ktr := range prRes.KeyTroubleshooting {
errInfo := apierrors.NewErrorInfo(dataplane.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: ktr.SerdeName, Value: ktr.Message,
})
errInfo := apierrors.NewErrorInfo(
dataplane.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: ktr.SerdeName, Value: ktr.Message,
},
)

if detail, detailErr := connect.NewErrorDetail(errInfo); detailErr == nil {
details = append(details, detail)
Expand All @@ -219,9 +232,11 @@ func (api *Service) PublishMessage(ctx context.Context, req *connect.Request[v1a
code = connect.CodeInvalidArgument

for _, vtr := range prRes.ValueTroubleshooting {
errInfo := apierrors.NewErrorInfo(dataplane.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: vtr.SerdeName, Value: vtr.Message,
})
errInfo := apierrors.NewErrorInfo(
dataplane.Reason_REASON_CONSOLE_ERROR.String(), apierrors.KeyVal{
Key: vtr.SerdeName, Value: vtr.Message,
},
)

if detail, detailErr := connect.NewErrorDetail(errInfo); detailErr == nil {
details = append(details, detail)
Expand All @@ -243,9 +258,11 @@ func (api *Service) PublishMessage(ctx context.Context, req *connect.Request[v1a
return nil, err
}

return connect.NewResponse(&v1alpha.PublishMessageResponse{
Topic: prRes.TopicName,
PartitionId: prRes.PartitionID,
Offset: prRes.Offset,
}), nil
return connect.NewResponse(
&v1alpha.PublishMessageResponse{
Topic: prRes.TopicName,
PartitionId: prRes.PartitionID,
Offset: prRes.Offset,
},
), nil
}
85 changes: 50 additions & 35 deletions backend/pkg/api/connect/service/console/stream_progress_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

// streamProgressReporter is in charge of sending status updates and messages regularly to the frontend.
type streamProgressReporter struct {
ctx context.Context
logger *zap.Logger
request *console.ListMessageRequest
stream *connect.ServerStream[v1alpha.ListMessagesResponse]
Expand All @@ -36,7 +35,7 @@ type streamProgressReporter struct {
writeMutex sync.Mutex
}

func (p *streamProgressReporter) Start() {
func (p *streamProgressReporter) Start(ctx context.Context) {
// We should report progress in two scenarios.
// If filter is enabled it could take a while to find the matching record(s).
// If search is for newest records it could take a while to get new records.
Expand All @@ -62,7 +61,7 @@ func (p *streamProgressReporter) Start() {

for {
select {
case <-p.ctx.Done():
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
Expand All @@ -81,11 +80,13 @@ func (p *streamProgressReporter) reportProgress() {
BytesConsumed: p.bytesConsumed.Load(),
}

if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Progress{
Progress: msg,
if err := p.stream.Send(
&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Progress{
Progress: msg,
},
},
}); err != nil {
); err != nil {
p.logger.Error("send error in stream reportProgress", zap.Error(err))
}
}
Expand All @@ -98,11 +99,13 @@ func (p *streamProgressReporter) OnPhase(name string) {
Phase: name,
}

if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Phase{
Phase: msg,
if err := p.stream.Send(
&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Phase{
Phase: msg,
},
},
}); err != nil {
); err != nil {
p.logger.Error("send error in stream OnPhase", zap.Error(err))
}
}
Expand All @@ -124,10 +127,12 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {

for _, mh := range message.Headers {
mh := mh
headers = append(headers, &v1alpha.KafkaRecordHeader{
Key: mh.Key,
Value: mh.Value,
})
headers = append(
headers, &v1alpha.KafkaRecordHeader{
Key: mh.Key,
Value: mh.Value,
},
)
}

compression := v1alpha.CompressionType_COMPRESSION_TYPE_UNSPECIFIED
Expand Down Expand Up @@ -182,26 +187,32 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
data.Key.TroubleshootReport = make([]*v1alpha.TroubleshootReport, 0, len(message.Key.Troubleshooting))
for _, ts := range message.Key.Troubleshooting {
ts := ts
data.Key.TroubleshootReport = append(data.Key.TroubleshootReport, &v1alpha.TroubleshootReport{
SerdeName: ts.SerdeName,
Message: ts.Message,
})
data.Key.TroubleshootReport = append(
data.Key.TroubleshootReport, &v1alpha.TroubleshootReport{
SerdeName: ts.SerdeName,
Message: ts.Message,
},
)
}

data.Value.TroubleshootReport = make([]*v1alpha.TroubleshootReport, 0, len(message.Value.Troubleshooting))
for _, ts := range message.Value.Troubleshooting {
ts := ts
data.Value.TroubleshootReport = append(data.Value.TroubleshootReport, &v1alpha.TroubleshootReport{
SerdeName: ts.SerdeName,
Message: ts.Message,
})
data.Value.TroubleshootReport = append(
data.Value.TroubleshootReport, &v1alpha.TroubleshootReport{
SerdeName: ts.SerdeName,
Message: ts.Message,
},
)
}

if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Data{
Data: data,
if err := p.stream.Send(
&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Data{
Data: data,
},
},
}); err != nil {
); err != nil {
p.logger.Error("send error in stream OnMessage", zap.Error(err))
}
}
Expand All @@ -217,11 +228,13 @@ func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
BytesConsumed: p.bytesConsumed.Load(),
}

if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Done{
Done: msg,
if err := p.stream.Send(
&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Done{
Done: msg,
},
},
}); err != nil {
); err != nil {
p.logger.Error("send error in stream OnComplete", zap.Error(err))
}
}
Expand All @@ -234,11 +247,13 @@ func (p *streamProgressReporter) OnError(message string) {
Message: message,
}

if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Error{
Error: msg,
if err := p.stream.Send(
&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Error{
Error: msg,
},
},
}); err != nil {
); err != nil {
p.logger.Error("send error in stream OnError", zap.Error(err))
}
}
16 changes: 11 additions & 5 deletions backend/pkg/api/handle_schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ func Test_getSubjectFromRequestPath(t *testing.T) {
// demonstration of the issue
// see comment in code

r := httptest.NewRequest("GET", "http://example.com/api/schema-registry/subjects/%252F/versions/last", http.NoBody)
r := httptest.NewRequest(
http.MethodGet,
"http://example.com/api/schema-registry/subjects/%252F/versions/last",
http.NoBody,
)
assert.Equal(t, "/api/schema-registry/subjects/%2F/versions/last", r.URL.Path)
assert.Equal(t, "", r.URL.RawPath)

Expand Down Expand Up @@ -98,9 +102,11 @@ func Test_getSubjectFromRequestPath(t *testing.T) {

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
r := httptest.NewRequest("GET", tt.target, http.NoBody)
assert.Equal(t, tt.expected, getSubjectFromRequestPath(r))
})
t.Run(
tt.name, func(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, tt.target, http.NoBody)
assert.Equal(t, tt.expected, getSubjectFromRequestPath(r))
},
)
}
}
Loading

0 comments on commit c6ac52d

Please sign in to comment.