Skip to content

Commit

Permalink
Send takes to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
PapePathe committed Oct 15, 2023
1 parent 2337dda commit ccc0fc1
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions cmd/training/training.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/segmentio/kafka-go"
"pathe.co/zinx/gametake"
"pathe.co/zinx/pkg/broker"
"pathe.co/zinx/pkg/cards"
"pathe.co/zinx/pkg/game"
"pathe.co/zinx/pkg/player"
Expand All @@ -17,27 +19,44 @@ func main() {
playerTakes(g)
fmt.Println(g.GetTake().Name())
fmt.Println("\n\n\n -------------")
time.Sleep(10 * time.Second)
}
}

func playerTakes(g *game.Game) (takes []constrainedTake) {
_ptk := []gametake.GameTake{}
_kakfa_messages := []kafka.Message{}
publisher := broker.NewPublisher([]string{"localhost:9092", "localhost:9093", "localhost:9093"}, true)

for _, playerObj := range g.GetPlayers() {
if g.GetTake() == gametake.TOUT {
fmt.Println("Going to start the game")
break
}

oldTake := g.GetTake()
g.AddTake(playerObj.GetID(), playerObj.GetBestTake())
ctk := constrainedTake{take: *playerObj.Take, takes: []gametake.GameTake{}, cards: playerObj.OrderedCardsForTake(g.GetTake())}
ctk := constrainedTake{Take: *playerObj.Take, Takes: []gametake.GameTake{oldTake}, Cards: playerObj.OrderedCardsForTake(g.GetTake())}
msg, err := ctk.AsKafkaMessage("Player.Auto.Take")
if err != nil {
fmt.Println(msg)
}

_kakfa_messages = append(_kakfa_messages, msg)

fmt.Println(ctk)
for _, t := range _ptk {
ctk.takes = append(ctk.takes, t)
ctk.Takes = append(ctk.Takes, t)
}
_ptk = append(_ptk, *playerObj.Take)
takes = append(takes, ctk)
}

err := publisher.Publish(_kakfa_messages)

if err != nil {
fmt.Println(err)
}

return takes
}

Expand All @@ -56,23 +75,33 @@ func newSampleGame() *game.Game {
}

type constrainedTake struct {
cards [5]cards.Card
take gametake.GameTake
takes []gametake.GameTake
Cards [5]cards.Card
Take gametake.GameTake
Takes []gametake.GameTake
}

func (c constrainedTake) AsKafkaMessage(topic string) (kafka.Message, error) {
b, err := json.Marshal(c)
if err != nil {
return kafka.Message{}, err
}
msg := kafka.Message{Key: []byte(c.Take.Name()), Topic: topic, Value: b}

return msg, nil
}

func (c constrainedTake) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Take: %s, ", c.take.Name()))
sb.WriteString(fmt.Sprintf("Take: %s, ", c.Take.Name()))

sb.WriteString("Constraints: ")
for _, tk := range c.takes {
for _, tk := range c.Takes {
if tk != nil {
sb.WriteString(fmt.Sprintf("%s, ", tk.Name()))
}
}
sb.WriteString("Cards: ")
for _, c := range c.cards {
for _, c := range c.Cards {
sb.WriteString(fmt.Sprintf("%s, ", c.String()))
}

Expand Down

0 comments on commit ccc0fc1

Please sign in to comment.