Skip to content

Commit

Permalink
feat: kafka lab
Browse files Browse the repository at this point in the history
  • Loading branch information
albert037037037 authored and alice890308 committed Mar 5, 2024
1 parent 4698cac commit cdff00e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 26 deletions.
13 changes: 4 additions & 9 deletions modules/video/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,8 @@ func (s *service) UploadVideo(stream pb.Video_UploadVideoServer) error {
return err
}

if err := s.produceVideoCreatedEvent(&pb.HandleVideoCreatedRequest{
Id: id.Hex(),
Url: path.Join(s.storage.Endpoint(), s.storage.Bucket(), objectName),
}); err != nil {
return err
}
// [Kafka TODO]
// [Describe] Video now is uploaded successfully, try to take advantage of produceVideoCreatedEvent here to send messages.

if err := stream.SendAndClose(&pb.UploadVideoResponse{
Id: id.Hex(),
Expand Down Expand Up @@ -169,9 +165,8 @@ func (s *service) produceVideoCreatedEvent(req *pb.HandleVideoCreatedRequest) er
{Value: valueBytes},
}

if err := s.producer.SendMessages(msgs); err != nil {
return err
}
// [Kafka TODO]
// [Describe] Send message to kafka via sarama producer

return nil
}
23 changes: 6 additions & 17 deletions modules/video/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,14 @@ func (s *stream) HandleVideoCreated(ctx context.Context, req *pb.HandleVideoCrea
if req.GetScale() != 0 {
variant := strconv.Itoa(int(req.GetScale()))

if err := s.handleVideoWithVariant(ctx, id, variant, req.GetUrl()); err != nil {
return nil, &saramakit.HandlerError{Retry: true, Err: err}
}
// [Kafka TODO]
// [Describe] Transcode video if get message with scale != 0, you can handle error occurance like above primitive.ObjectIDFromHex(req.GetId()).

return &emptypb.Empty{}, nil
}

// fanout create events to each variant
variants := []int32{1080, 720, 480, 320}
for _, scale := range variants {
if err := s.produceVideoCreatedWithScaleEvent(&pb.HandleVideoCreatedRequest{
Id: req.GetId(),
Url: req.GetUrl(),
Scale: scale,
}); err != nil {
return nil, &saramakit.HandlerError{Retry: true, Err: err}
}
}
// [Kafka TODO]
// [Describe] Fanout create events to each variant [1080, 720, 480, 320], you can handle error occurance like above primitive.ObjectIDFromHex(req.GetId()).

return &emptypb.Empty{}, nil
}
Expand All @@ -80,9 +70,8 @@ func (s *stream) produceVideoCreatedWithScaleEvent(req *pb.HandleVideoCreatedReq
{Value: valueBytes},
}

if err := s.producer.SendMessages(msgs); err != nil {
return err
}
// [Kafka TODO]
// [Describe] Send message to kafka

return nil
}
49 changes: 49 additions & 0 deletions pkg/otelkit/pgkit/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pgkit

import (
"context"
"os"

"github.com/NTHU-LSALAB/NTHU-Distributed-System/pkg/logkit"
"github.com/go-pg/pg/v10"
"go.uber.org/zap"
)

type PGConfig struct {
URL string `long:"url" env:"URL" description:"the URL of PostgreSQL" required:"true"`
}

type PGClient struct {
*pg.DB
closeFunc func()
}

func (c *PGClient) Close() error {
if c.closeFunc != nil {
c.closeFunc()
}
return c.DB.Close()
}

func NewPGClient(ctx context.Context, conf *PGConfig) *PGClient {
if url := os.ExpandEnv(conf.URL); url != "" {
conf.URL = url
}

logger := logkit.FromContext(ctx).With(zap.String("url", conf.URL))
opts, err := pg.ParseURL(conf.URL)
if err != nil {
logger.Fatal("failed to parse PostgreSQL url", zap.Error(err))
}

db := pg.Connect(opts).WithContext(ctx)
if err := db.Ping(ctx); err != nil {
logger.Fatal("failed to ping PostgreSQL", zap.Error(err))
}

logger.Info("create PostgreSQL client successfully")

return &PGClient{
DB: db,
}
}
47 changes: 47 additions & 0 deletions pkg/otelkit/pgkit/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pgkit

import (
"context"
"os"

"github.com/NTHU-LSALAB/NTHU-Distributed-System/pkg/logkit"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("PGClient", func() {
Describe("NewPGClient", func() {
var (
ctx context.Context
pgConf *PGConfig
pgClient *PGClient
)

BeforeEach(func() {
ctx = logkit.NewLogger(&logkit.LoggerConfig{
Development: true,
}).WithContext(context.Background())

pgConf = &PGConfig{
URL: "postgres://postgres@postgres:5432/postgres?sslmode=disable",
}
if url := os.Getenv("POSTGRES_URL"); url != "" {
pgConf.URL = url
}
})

JustBeforeEach(func() {
pgClient = NewPGClient(ctx, pgConf)
})

AfterEach(func() {
Expect(pgClient.Close()).NotTo(HaveOccurred())
})

When("success", func() {
It("returns new PGClient without error", func() {
Expect(pgClient).NotTo(BeNil())
})
})
})
})
13 changes: 13 additions & 0 deletions pkg/otelkit/pgkit/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pgkit

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestPGKit(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Test PG Kit")
}

0 comments on commit cdff00e

Please sign in to comment.