Skip to content

Commit

Permalink
Merge pull request #75 from 1nFrastr/golang
Browse files Browse the repository at this point in the history
Bug-fixes in Golang implementation
  • Loading branch information
josiahcarlson authored Jan 9, 2023
2 parents 51faee3 + 2abe1fe commit 81a3a52
Show file tree
Hide file tree
Showing 15 changed files with 300 additions and 63 deletions.
2 changes: 1 addition & 1 deletion golang/Chapter01/model/article.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (r *ArticleRepo) GetGroupArticles(group, order string, page int64) []map[st
if res <= 0 {
log.Println("ZInterStore return 0")
}
r.Conn.Expire(key, 60*time.Second)
}
r.Conn.Expire(key, 60*time.Second)
return r.GetArticles(page, key)
}

Expand Down
4 changes: 2 additions & 2 deletions golang/Chapter02/model/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *Client) UpdateToken(token, user, item string) {
r.Conn.HSet("login:", token, user)
r.Conn.ZAdd("recent:", &redis.Z{Score: float64(timestamp), Member: token})
if item != "" {
r.Conn.ZAdd("viewed:"+token, item, timestamp)
r.Conn.ZAdd("viewed:"+token, &redis.Z{Score: float64(timestamp), Member: item})
r.Conn.ZRemRangeByRank("viewed:"+token, 0, -26)
}
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func (r *Client) UpdateTokenModified(token, user string, item string) {
r.Conn.HSet("login:", token, user)
r.Conn.ZAdd("recent:", &redis.Z{Score: float64(timestamp), Member: token})
if item != "" {
r.Conn.ZAdd("viewed:"+token, item, timestamp)
r.Conn.ZAdd("viewed:"+token, &redis.Z{Score: float64(timestamp), Member: item})
r.Conn.ZRemRangeByRank("viewed:"+token, 0, -26)
r.Conn.ZIncrBy("viewed:", -1, item)
}
Expand Down
9 changes: 4 additions & 5 deletions golang/Chapter04/model/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package model

import (
"errors"
"fmt"
"github.com/go-redis/redis/v7"
"log"
Expand All @@ -24,8 +25,7 @@ func (c *Client) ListItem(itemid, sellerid string, price float64) bool {
err := c.Conn.Watch(func(tx *redis.Tx) error {
if _, err := tx.TxPipelined(func(pipeliner redis.Pipeliner) error {
if !tx.SIsMember(inventory, itemid).Val() {
tx.Unwatch(inventory)
return nil
return errors.New("item doesn't belong to user")
}
pipeliner.ZAdd("market:", &redis.Z{Member: item, Score: price})
pipeliner.SRem(inventory, itemid)
Expand Down Expand Up @@ -55,10 +55,10 @@ func (c *Client) PurchaseItem(buyerid, itemid, sellerid string, lprice int64) bo
for time.Now().Unix() < end {
err := c.Conn.Watch(func(tx *redis.Tx) error {
if _, err := tx.TxPipelined(func(pipeliner redis.Pipeliner) error {
price := int64(pipeliner.ZScore("market:", item).Val())
price := int64(tx.ZScore("market:", item).Val())
funds, _ := tx.HGet(buyer, "funds").Int64()
if price != lprice || price > funds {
tx.Unwatch()
return errors.New("can not afford this item")
}

pipeliner.HIncrBy(seller, "funds", price)
Expand Down Expand Up @@ -109,4 +109,3 @@ func (c *Client) UpdateTokenPipeline (token, user, item string) {
}

//TODO: lack the parts before 4.4

21 changes: 18 additions & 3 deletions golang/Chapter04/redisConn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"redisInAction/Chapter04/model"
"redisInAction/redisConn"
"redisInAction/utils"
"strconv"
"testing"
)

Expand All @@ -30,7 +31,10 @@ func Test(t *testing.T) {
})

t.Run("Test purchase item", func(t *testing.T) {
client.ListItem("itemX", "userX", 10)
seller := "userX"
item := "itemX"
client.Conn.SAdd("inventory:"+seller, item)
client.ListItem(item, seller, 10)
t.Log("We need to set up just enough state so a user can buy an item")
buyer := "userY"
client.Conn.HSet("users:userY", "funds", 125)
Expand All @@ -42,8 +46,19 @@ func Test(t *testing.T) {
t.Log("Purchasing an item succeeded?", p)
utils.AssertTrue(t, p)
r = client.Conn.HGetAll("users:userY").Val()
t.Log("Their money is now:", r)
utils.AssertTrue(t, len(r) > 0)
t.Log("UserY money is now:", r)
funds, ok := r["funds"]
utils.AssertTrue(t, ok)
money, _ := strconv.Atoi(funds)
utils.AssertTrue(t, money == 125-10)

r = client.Conn.HGetAll("users:userX").Val()
t.Log("UserX money is now:", r)
funds, ok = r["funds"]
utils.AssertTrue(t, ok)
money, _ = strconv.Atoi(funds)
utils.AssertTrue(t, money == 10)

i := client.Conn.SMembers("inventory:" + buyer).Val()
t.Log("Their inventory is now:", i)
utils.AssertTrue(t, len(i) > 0)
Expand Down
4 changes: 1 addition & 3 deletions golang/Chapter05/model/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ func (c *Client) CleanCounters() {
return err
}
index--
} else {
tx.Unwatch()
}
return nil
}, hkey)
Expand Down Expand Up @@ -350,7 +348,7 @@ func (c *Client) ImportCityToRedis(filename string) {

func (c *Client) FindCityByIp(ip string) string {
ipAddress := strconv.Itoa(int(c.IpToScore(ip)))
res := c.Conn.ZRevRangeByScore("ip2cityid:", &redis.ZRangeBy{Max: ipAddress, Min: "0", Offset: 0, Count: 2}).Val()
res := c.Conn.ZRevRangeByScore("ip2cityid:", &redis.ZRangeBy{Max: ipAddress, Min: "0", Offset: 0, Count: 1}).Val()
if len(res) == 0 {
return ""
}
Expand Down
87 changes: 56 additions & 31 deletions golang/Chapter06/model/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"github.com/go-redis/redis/v7"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -127,6 +128,35 @@ func (c *Client) AcquireLock(lockname string, acquireTimeout float64) string {
return ""
}

func (c *Client) ListItem(itemid, sellerid string, price float64) bool {
inventory := fmt.Sprintf("inventory:%s", sellerid)
item := fmt.Sprintf("%s.%s", itemid, sellerid)
end := time.Now().Unix() + 5

for time.Now().Unix() < end {
err := c.Conn.Watch(func(tx *redis.Tx) error {
if _, err := tx.TxPipelined(func(pipeliner redis.Pipeliner) error {
if !tx.SIsMember(inventory, itemid).Val() {
return errors.New("item doesn't belong to user")
}
pipeliner.ZAdd("market:", &redis.Z{Member: item, Score: price})
pipeliner.SRem(inventory, itemid)
return nil
}); err != nil {
return err
}
return nil
}, inventory)

if err != nil {
log.Println("watch err: ", err)
return false
}
return true
}
return false
}

func (c *Client) PurchaseItemWithLock(buyerId, itemId, sellerId string) bool {
buyer := fmt.Sprintf("users:%s", buyerId)
seller := fmt.Sprintf("users:%s", sellerId)
Expand All @@ -148,23 +178,19 @@ func (c *Client) PurchaseItemWithLock(buyerId, itemId, sellerId string) bool {
resHget := &redis.StringCmd{}

pipe := c.Conn.TxPipeline()
if err := c.Conn.Watch(func(tx *redis.Tx) error {
resZscore = pipe.ZScore("market:", item)
resHget = tx.HGet(buyer, "funds")
if _, err := pipe.Exec(); err != nil {
log.Println("pipeline err in watch func of PurchaseItemWithLock: ", err)
return err
}
price = resZscore.Val()
funds, _ = strconv.ParseFloat(resHget.Val(), 64)
return nil
}); err != nil {
log.Println("tx err in PurchaseItemWithLock: ", err)
resZscore = pipe.ZScore("market:", item)
resHget = pipe.HGet(buyer, "funds")

if _, err := pipe.Exec(); err != nil {
log.Println("pipeline err in watch func of PurchaseItemWithLock: ", err)
return false
}

price = resZscore.Val()
funds, _ = strconv.ParseFloat(resHget.Val(), 64)

if price == 0 || price > funds {
log.Println("can not afford this")
return false
}

Expand All @@ -181,27 +207,26 @@ func (c *Client) PurchaseItemWithLock(buyerId, itemId, sellerId string) bool {

func (c *Client) ReleaseLock(lockname, identifier string) bool {
lockname = "lock:" + lockname
var flag = true
for flag {
lostLock := false
for {
err := c.Conn.Watch(func(tx *redis.Tx) error {
pipe := tx.TxPipeline()
if tx.Get(lockname).Val() == identifier {
pipe := tx.TxPipeline()
pipe.Del(lockname)
if _, err := pipe.Exec(); err != nil {
return err
}
flag = true
return nil
_, err := pipe.Exec()
return err
}

tx.Unwatch()
flag = false
// lock was grabbed by others
lostLock = true
return nil
},lockname)

}, lockname)
if err != nil {
log.Println("watch failed in ReleaseLock, err is: ", err)
return false
continue
}

if lostLock {
return true
}
}
return true
Expand Down Expand Up @@ -453,7 +478,7 @@ type Pack struct {
}

type Messages struct {
ChatId string
ChatId string
ChatMessages []Pack
}

Expand Down Expand Up @@ -511,7 +536,7 @@ func (c *Client) FetchPendingMessage(recipient string) []Messages {

messages := []Pack{}
for _, v := range chatInfo[i+1] {
message :=Pack{}
message := Pack{}
if err := json.Unmarshal([]byte(v), &message); err != nil {
log.Println("unmarshal err in FetchPendingMessage: ", err)
}
Expand All @@ -526,7 +551,7 @@ func (c *Client) FetchPendingMessage(recipient string) []Messages {

pipeline.ZAdd("seen:" + recipient, &redis.Z{Member:chatId, Score:seenId})
if minId != nil {
pipeline.ZRemRangeByScore("msgs:" + chatId, string(0), strconv.Itoa(int(minId[0].Score)))
pipeline.ZRemRangeByScore("msgs:" + chatId, "0", strconv.Itoa(int(minId[0].Score)))
}
result[i] = Messages{ChatId:chatId, ChatMessages:messages}
}
Expand All @@ -535,7 +560,7 @@ func (c *Client) FetchPendingMessage(recipient string) []Messages {
}

func (c *Client) JoinChat(chatId, user string) {
messageId, _ := c.Conn.Get("ids" + chatId).Float64()
messageId, _ := c.Conn.Get("ids:" + chatId).Float64()

pipeline := c.Conn.TxPipeline()
pipeline.ZAdd("chat:" + chatId, &redis.Z{Member:user, Score:messageId})
Expand Down Expand Up @@ -638,7 +663,7 @@ func (c *Client) clean(channel string, waiting *[]os.FileInfo, count int) int64
}

w0 := (*waiting)[0].Name()
res, err := c.Conn.Get(channel + w0 + ":donw").Int()
res, err := c.Conn.Get(channel + w0 + ":done").Int()
if err != nil {
//log.Println("Conn.Get err in clean: ", err)
return 0
Expand Down
60 changes: 58 additions & 2 deletions golang/Chapter06/redisConn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,63 @@ func Test(t *testing.T) {
defer client.Conn.FlushDB()
})

t.Run("Test list item", func(t *testing.T) {
t.Log("We need to set up just enough state so that a user can list an item")
seller := "seller"
item := "itemX"
client.Conn.SAdd("inventory:"+seller, item)
i := client.Conn.SMembers("inventory:" + seller).Val()
t.Log("The user's inventory has:", i)
utils.AssertTrue(t, len(i) > 0)

t.Log("Listing the item...")
l := client.ListItem(item, seller, 10)
t.Log("Listing the item succeeded?", l)
utils.AssertTrue(t, l)
r := client.Conn.ZRangeWithScores("market:", 0, -1).Val()
t.Log("The market contains:", r)
t.Log("The inventory: ", client.Conn.Get("inventory:"+seller).Val())
defer client.Conn.FlushDB()
})

t.Run("Test purchase item with lock", func(t *testing.T) {
seller := "seller"
item := "itemX"
client.Conn.SAdd("inventory:"+seller, item)
client.ListItem(item, seller, 10)
t.Log("We need to set up just enough state so a user can buy an item")

buyer := "buyer"
client.Conn.HSet("users:buyer", "funds", 125)
r := client.Conn.HGetAll("users:buyer").Val()
t.Log("The user has some money:", r)
utils.AssertTrue(t, len(r) != 0)

p := client.PurchaseItemWithLock(buyer, item, seller)
t.Log("Purchasing an item succeeded?", p)
utils.AssertTrue(t, p)

r = client.Conn.HGetAll("users:buyer").Val()
t.Log("buyer's money is now:", r)
funds, ok := r["funds"]
utils.AssertTrue(t, ok)
money, _ := strconv.Atoi(funds)
utils.AssertTrue(t, money == 125-10)

r = client.Conn.HGetAll("users:seller").Val()
t.Log("seller's money is now:", r)
funds, ok = r["funds"]
utils.AssertTrue(t, ok)
money, _ = strconv.Atoi(funds)
utils.AssertTrue(t, money == 10)

i := client.Conn.SMembers("inventory:" + buyer).Val()
t.Log("Their inventory is now:", i)
utils.AssertTrue(t, len(i) > 0)
utils.AssertTrue(t, client.Conn.ZScore("market:", "itemX.seller").Val() == 0)
defer client.Conn.FlushDB()
})

t.Run("Test distributed locking", func(t *testing.T) {
t.Log("Getting an initial lock...")
utils.AssertTrue(t, client.AcquireLockWithTimeout("testlock", 1, 1) != "")
Expand Down Expand Up @@ -165,7 +222,7 @@ func Test(t *testing.T) {
t.Log("Those messages are:", r1)
defer client.Conn.FlushDB()
})

t.Run("Test file distribution", func(t *testing.T) {
tempDirPath, err := ioutil.TempDir("", "myTempDir")
if err != nil {
Expand Down Expand Up @@ -224,4 +281,3 @@ func Test(t *testing.T) {
}()
})
}

2 changes: 1 addition & 1 deletion golang/Chapter07/model/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func Tokenize(content string) []string {
}
words.Add(match)
}
return words.Intersection(&common.STOPWORDS)
return words.Diff(&common.STOPWORDS)
}

func (c *Client) IndexDocument(docid, content string) int64 {
Expand Down
Loading

0 comments on commit 81a3a52

Please sign in to comment.