Skip to content

Commit

Permalink
make fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
juniaoshaonian committed Mar 6, 2024
1 parent 065f859 commit 8e1f243
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 12 deletions.
7 changes: 4 additions & 3 deletions e2e/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package e2e

import (
"context"
"github.com/ecodeclub/mq-api/mqerr"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"log"
"sync"
"testing"
"time"

"github.com/ecodeclub/mq-api/mqerr"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

"github.com/ecodeclub/mq-api"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down
10 changes: 4 additions & 6 deletions memory/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package memory

import (
"fmt"
"github.com/ecodeclub/ekit/syncx"
"github.com/ecodeclub/mq-api"
"github.com/pkg/errors"
"log"
"sync"
"sync/atomic"
"time"

"github.com/ecodeclub/ekit/syncx"
"github.com/ecodeclub/mq-api"
"github.com/pkg/errors"
)

var ErrReportOffsetFail = errors.New("非平衡状态,无法上报偏移量")
Expand Down Expand Up @@ -119,7 +120,6 @@ func (c *ConsumerGroup) ExitGroup(name string, closeCh chan struct{}) {
return
}
}

}

// ReportOffset 上报偏移量
Expand All @@ -131,7 +131,6 @@ func (c *ConsumerGroup) ReportOffset(records []PartitionRecord) error {
c.partitionRecords.Store(record.Index, record)
}
return nil

}

func (c *ConsumerGroup) Close() {
Expand Down Expand Up @@ -202,7 +201,6 @@ func (c *ConsumerGroup) reBalance() {
}
}
log.Println("重平衡结束")

}

// JoinGroup 加入消费组
Expand Down
5 changes: 3 additions & 2 deletions memory/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package memory
import (
"context"
"fmt"
"github.com/ecodeclub/mq-api/internal/pkg/validator"
"log"
"sync"

"github.com/ecodeclub/mq-api/internal/pkg/validator"

"github.com/ecodeclub/ekit/syncx"
"github.com/ecodeclub/mq-api"
"github.com/ecodeclub/mq-api/mqerr"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) {
}
// 初始化分区消费进度
partitionRecords := syncx.Map[int, PartitionRecord]{}
for idx, _ := range t.partitions {
for idx := range t.partitions {
partitionRecords.Store(idx, PartitionRecord{
Index: idx,
Cursor: 0,
Expand Down
3 changes: 2 additions & 1 deletion memory/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package memory

import (
"context"
"github.com/ecodeclub/mq-api/mqerr"
"sync"

"github.com/ecodeclub/mq-api/mqerr"

"github.com/ecodeclub/mq-api"
)

Expand Down
1 change: 1 addition & 0 deletions memory/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func NewTopic(name string, partitions int) *Topic {
t.partitions = partitionList
return t
}

func (t *Topic) addProducer(producer mq.Producer) error {
t.locker.Lock()
defer t.locker.Unlock()
Expand Down

0 comments on commit 8e1f243

Please sign in to comment.