Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aether Scaling #64

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'main' of github.com:mbilal92/util into main
Signed-off-by: Bilal Saleem <m.bilal.saleem92@gmail.com>
  • Loading branch information
mbilal92 committed Sep 29, 2024
commit 8d5d8a4d9db5b5c46b0051a14eef0ed59fe5386e
2 changes: 1 addition & 1 deletion drsm/chunk.go
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
d.globalChunkTblMutex.RLock()
_, found := d.globalChunkTbl[cn]
d.globalChunkTblMutex.RUnlock()
if found == true {
if found {
continue
}
logger.AppLog.Debugln("Found chunk Id block ", cn)
23 changes: 11 additions & 12 deletions drsm/claim.go
Original file line number Diff line number Diff line change
@@ -11,18 +11,17 @@ import (
)

func (d *Drsm) podDownDetected() {
fmt.Println("Started Pod Down goroutine")
for {
select {
case p := <-d.podDown:
logger.AppLog.Infoln("Pod Down detected ", p)
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k, _ := range pd.podChunks {
d.globalChunkTblMutex.RLock()
c, found := d.globalChunkTbl[k]
d.globalChunkTblMutex.RUnlock()
logger.AppLog.Debugf("Found : %v chunk : %v ", found, c)
fmt.Println("started Pod Down goroutine")
for p := range d.podDown {
logger.AppLog.Infoln("pod Down detected", p)
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k := range pd.podChunks {
d.globalChunkTblMutex.RLock()
c, found := d.globalChunkTbl[k]
d.globalChunkTblMutex.RUnlock()
logger.AppLog.Debugf("found: %v chunk: %v", found, c)
if found {
go c.claimChunk(d)
}
}
2 changes: 0 additions & 2 deletions drsm/drsm.go
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@
prefix map[string]*ipam.Prefix
mongo *MongoDBLibrary.MongoClient
globalChunkTblMutex sync.RWMutex
punchLivenessTime int

Check failure on line 62 in drsm/drsm.go

GitHub Actions / staticcheck

field punchLivenessTime is unused (U1000)
}

func (d *Drsm) DeletePod(podInstance string) {
@@ -87,8 +87,6 @@
d.podDown = make(chan string, 10)
d.scanChunks = make(map[int32]*chunk)
d.globalChunkTblMutex = sync.RWMutex{}
t := time.Now().UnixNano()
rand.Seed(t)
d.initIpam(opt)

//connect to DB
8 changes: 6 additions & 2 deletions drsm/updates.go
Original file line number Diff line number Diff line change
@@ -185,7 +185,10 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan

func (d *Drsm) punchLiveness() {
// write to DB - signature every 5 second
ticker := time.NewTicker(time.Duration(d.punchLivenessTime) * time.Millisecond)
// or make it parameterized
// ticker := time.NewTicker(time.Duration(d.punchLivenessTime) * time.Millisecond)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

logger.AppLog.Debugf("document expiry enabled")
ret := d.mongo.RestfulAPICreateTTLIndex(d.sharedPoolName, 0, "expireAt")
@@ -199,7 +202,8 @@ func (d *Drsm) punchLiveness() {
//logger.AppLog.Debugf(" update keepalive time")
filter := bson.M{"_id": d.clientId.PodName}

timein := time.Now().Local().Add(time.Millisecond * time.Duration(d.punchLivenessTime+500))
// timein := time.Now().Local().Add(time.Millisecond * time.Duration(d.punchLivenessTime+500))
timein := time.Now().Local().Add(20 * time.Second)

update := bson.D{
{"_id", d.clientId.PodName},
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.