Skip to content

Commit

Permalink
importer: filter out user join slack message (#5)
Browse files Browse the repository at this point in the history
* importer: filter out user join slack message

Signed-off-by: Artem Navoiev <tenmozes@gmail.com>

* refactor a little

* cleanup

* rename package

* cleanup

* refactor

* rename transporter to transport

* fix cli

---------

Signed-off-by: Artem Navoiev <tenmozes@gmail.com>
Co-authored-by: dmitryk-dk <kozlovdmitriyy@gmail.com>
tenmozes and dmitryk-dk authored Jun 12, 2024
1 parent 68821eb commit 948aa73
Showing 8 changed files with 134 additions and 65 deletions.
6 changes: 3 additions & 3 deletions cli/main.go
Original file line number Diff line number Diff line change
@@ -11,8 +11,8 @@ import (
"syscall"
"time"

"slack2logs/importer"
"slack2logs/slack"
"slack2logs/transporter"
"slack2logs/vmlogs"
)

@@ -40,7 +40,7 @@ func main() {
log.Fatalf("error initialize VictoriaLogs client: %s", err)
}

processor := importer.New(slackClient, logs)
trns := transporter.New(slackClient, logs)

c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
@@ -51,7 +51,7 @@ func main() {
cancel()
}()

processor.Run(ctx)
trns.Run(ctx)

log.Println("Process stopped successfully")
log.Printf("Elapsed time: %s", time.Since(startTime))
2 changes: 1 addition & 1 deletion httpserver/httpserver.go
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ func handleHealth() http.Handler {
}

func respondWithError(w http.ResponseWriter, r *http.Request, statusCode int, err error) bool {
fmt.Errorf(err.Error(), r.URL.Path)
log.Printf("error - %s, path - %s", err, r.URL.Path)
w.WriteHeader(statusCode)
_, _ = w.Write([]byte(err.Error()))
return true
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
@@ -13,8 +13,8 @@ import (
"slack2logs/envflag"
"slack2logs/flagutil"
"slack2logs/httpserver"
"slack2logs/importer"
"slack2logs/slack"
"slack2logs/transporter"
"slack2logs/vmlogs"
)

@@ -40,7 +40,7 @@ func main() {
log.Fatalf("error initialize VictoriaLogs client: %s", err)
}

processor := importer.New(slackClient, logs)
trp := transporter.New(slackClient, logs)

go httpserver.Serve()

@@ -53,7 +53,7 @@ func main() {
cancel()
}()

processor.Run(ctx)
trp.Run(ctx)

err = httpserver.Stop()
if err != nil {
42 changes: 16 additions & 26 deletions slack/client.go
Original file line number Diff line number Diff line change
@@ -20,11 +20,12 @@ import (
"github.com/slack-go/slack/socketmode"

"slack2logs/flagutil"
"slack2logs/transporter"
)

const (
historicalRequestLimit = 500
joinedChannelMessage = "has joined the channel"
joinedChannelMessage = "> has joined the channel"
idLength = 10
)

@@ -44,38 +45,22 @@ var (
// Client represents slack client
type Client struct {
socketClient *socketmode.Client
messageC chan Message
messageC chan transporter.Message
threadC chan ThreadRequest
listeningChannels map[string]struct{}

mx sync.Mutex
batch Messages
}

// Message represents a slack message
// which would be sent to the additional service
type Message struct {
ThreadID string `json:"thread_id"`
Type string `json:"type"`
User string `json:"user"`
Text string `json:"text"`
ThreadTimeStamp string `json:"thread_ts"`
TimeStamp string `json:"ts"`
ChannelID string `json:"channel_id"`
ChannelName string `json:"channel_name"`
UserID string `json:"user_id"`
DisplayName string `json:"display_name"`
DisplayNameNormalized string `json:"display_name_normalized"`
}

// ThreadRequest represents request for getting
// historical thread messages
type ThreadRequest struct {
ChannelID string
Timestamp string
}

type Messages map[string]Message
type Messages map[string]transporter.Message

func New() *Client {
if len(*listeningChannels) == 0 {
@@ -92,7 +77,7 @@ func New() *Client {

c := Client{
socketClient: socketClient,
messageC: make(chan Message, 1),
messageC: make(chan transporter.Message, 1),
threadC: make(chan ThreadRequest, 1),
listeningChannels: make(map[string]struct{}, len(*listeningChannels)),
batch: make(Messages),
@@ -122,7 +107,7 @@ func (c *Client) RunHistoricalBackfilling(ctx context.Context) error {
}

// Export sends slack message to the additional service via callback
func (c *Client) Export(ctx context.Context, cb func(m Message)) {
func (c *Client) Export(ctx context.Context, cb func(m transporter.Message)) {
ticker := time.NewTicker(*batchFlushInterval)
defer ticker.Stop()
for {
@@ -142,7 +127,7 @@ func (c *Client) Export(ctx context.Context, cb func(m Message)) {
}
}

func (c *Client) flush(cb func(m Message)) {
func (c *Client) flush(cb func(m transporter.Message)) {
if len(c.batch) == 0 {
return
}
@@ -209,7 +194,7 @@ func (c *Client) handleEventMessage(ctx context.Context, event slackevents.Event
return fmt.Errorf("got message from unsupported channel id: %s", ev.Channel)
}
// skip messages like join channel
if strings.Contains(ev.Text, joinedChannelMessage) {
if filterOutLogMessage(ev.Text) {
return nil
}
if ev.SubType == slack.MsgSubTypeMessageChanged {
@@ -242,7 +227,7 @@ func (c *Client) handleEventMessage(ctx context.Context, event slackevents.Event

id := generateMessageID(threadTS)

m := Message{
m := transporter.Message{
ThreadID: id,
Type: ev.Type,
User: ev.User,
@@ -334,7 +319,7 @@ func (c *Client) collectHistoricalMessages(ctx context.Context) {
if m.ThreadTimestamp == "" {
m.ThreadTimestamp = m.Timestamp
}
c.messageC <- Message{
c.messageC <- transporter.Message{
Type: m.Type,
User: m.User,
Text: m.Text,
@@ -396,7 +381,7 @@ func (c *Client) collectThreadMessages(ctx context.Context) {
log.Printf("fail to parse timestamp:%q: %s", rp.Timestamp, err)
continue
}
c.messageC <- Message{
c.messageC <- transporter.Message{
Type: rp.Type,
User: rp.User,
Text: rp.Text,
@@ -433,3 +418,8 @@ func generateMessageID(threadTs string) string {
id := encoded[:idLength]
return id
}

func filterOutLogMessage(msg string) bool {
// filter out "user joined Slack channel messages", msg example "<@U0787V2AW9W> has joined the channel"
return strings.HasSuffix(msg, joinedChannelMessage)
}
20 changes: 20 additions & 0 deletions slack/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package slack

import "testing"

// Test for filterOutLogMessage function
func TestFilterOutLogMessage(t *testing.T) {
tests := []struct {
message string
want bool
}{
{"<@U0787V2AW9W> has joined the channel", true},
{"User message content", false},
{"Another message", false},
}
for _, tt := range tests {
if got := filterOutLogMessage(tt.message); got != tt.want {
t.Errorf("filterOutLogMessage(%q) = %v, want %v", tt.message, got, tt.want)
}
}
}
57 changes: 27 additions & 30 deletions importer/importer.go → transporter/transporter.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
package importer
package transporter

import (
"context"
"log"

"slack2logs/slack"
)

// Importer defines importer interface
// which should be implemented for each importer
type Importer interface {
Import(ctx context.Context, message LogMessage) error
}

// Exporter defines exporter interface
// which should be implemented for each exporter
type Exporter interface {
Export(context.Context, func(slack.Message))
}

// Processor defines object with exporter and importer
type Processor struct {
exporter Exporter
importer Importer
}

// LogMessage represents data for storing in the logs
type LogMessage struct {
// Message represents data for storing in the logs
type Message struct {
ThreadID string `json:"thread_id"`
Type string `json:"type"`
User string `json:"user"`
@@ -40,17 +20,34 @@ type LogMessage struct {
DisplayNameNormalized string `json:"display_name_normalized"`
}

func New(exporter Exporter, importer Importer) *Processor {
p := Processor{exporter: exporter, importer: importer}
return &p
// Importer defines importer interface
// which should be implemented for each importer
type Importer interface {
Import(ctx context.Context, message Message) error
}

// Exporter defines exporter interface
// which should be implemented for each exporter
type Exporter interface {
Export(context.Context, func(Message))
}

// Transport defines object with exporter and importer
type Transport struct {
exporter Exporter
importer Importer
}

// Run starts export import process
func (p *Processor) Run(ctx context.Context) {
p.exporter.Export(ctx, func(m slack.Message) {
logMsg := LogMessage(m)
if err := p.importer.Import(ctx, logMsg); err != nil {
func (p *Transport) Run(ctx context.Context) {
p.exporter.Export(ctx, func(m Message) {
if err := p.importer.Import(ctx, m); err != nil {
log.Printf("error import message to the importer: %s", err)
}
})
}

func New(exporter Exporter, importer Importer) *Transport {
p := Transport{exporter: exporter, importer: importer}
return &p
}
62 changes: 62 additions & 0 deletions transporter/transporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package transporter

import (
"context"
"errors"
"testing"
)

// Mock implementations of Importer and Exporter for testing purposes
type mockImporter struct {
importFunc func(ctx context.Context, message Message) error
}

func (m *mockImporter) Import(ctx context.Context, message Message) error {
return m.importFunc(ctx, message)
}

type mockExporter struct {
exportFunc func(ctx context.Context, processMessage func(Message))
}

func (m *mockExporter) Export(ctx context.Context, processMessage func(Message)) {
m.exportFunc(ctx, processMessage)
}

// Test for Transport.Run method
func TestProcessorRun(t *testing.T) {
mockImp := &mockImporter{
importFunc: func(ctx context.Context, message Message) error {
if message.Text == "test message" {
return nil
}
return errors.New("import error")
},
}
mockExp := &mockExporter{
exportFunc: func(ctx context.Context, processMessage func(Message)) {
processMessage(Message{Text: "test message"})
processMessage(Message{Text: "<@U0787V2AW9W> has joined the channel"}) // This should be filtered out
},
}
processor := New(mockExp, mockImp)
ctx := context.Background()
processor.Run(ctx)
}

// Test for Importer and Exporter interaction
func TestProcessorImportError(t *testing.T) {
mockImp := &mockImporter{
importFunc: func(ctx context.Context, message Message) error {
return errors.New("import error")
},
}
mockExp := &mockExporter{
exportFunc: func(ctx context.Context, processMessage func(Message)) {
processMessage(Message{Text: "test message"})
},
}
processor := New(mockExp, mockImp)
ctx := context.Background()
processor.Run(ctx)
}
4 changes: 2 additions & 2 deletions vmlogs/vmlogs.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ import (
"github.com/VictoriaMetrics/metrics"

"slack2logs/auth"
"slack2logs/importer"
"slack2logs/transporter"
)

const jsonLinePath = "insert/jsonline"
@@ -80,7 +80,7 @@ func New() (*Client, error) {

// Import make request to the VictoriaLogs server with
// given message
func (c *Client) Import(ctx context.Context, message importer.LogMessage) error {
func (c *Client) Import(ctx context.Context, message transporter.Message) error {
messagesDeliveryCount.Inc()
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(message)

0 comments on commit 948aa73

Please sign in to comment.