Skip to content

Commit

Permalink
Fix for tail -f on 1st mount while 2nd mount being written to
Browse files Browse the repository at this point in the history
  • Loading branch information
edmc-ss committed Jan 24, 2022
1 parent e69ec09 commit 2b55063
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 4 deletions.
86 changes: 86 additions & 0 deletions imgr/imgrpkg/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (inodeLease *inodeLeaseStruct) handleOperation(leaseRequestOperation *lease
logFatalf("(*inodeLeaseStruct).handleOperation() unable to json.Marshal(rpcInterrupt: %#v): %v [case 1]", rpcInterrupt, err)
}
globals.retryrpcServer.SendCallback(inodeLease.demotingHolder.mount.retryRPCClientID, rpcInterruptBuf)
logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", inodeLease.demotingHolder.mount.retryRPCClientID, rpcInterrupt)
inodeLease.lastInterruptTime = time.Now()
inodeLease.interruptsSent = 1
inodeLease.interruptTimer = time.NewTimer(globals.config.LeaseInterruptInterval)
Expand Down Expand Up @@ -178,6 +179,7 @@ func (inodeLease *inodeLeaseStruct) handleOperation(leaseRequestOperation *lease
leaseRequest.listElement = inodeLease.releasingHoldersList.PushBack(leaseRequest)
leaseRequest.requestState = leaseRequestStateSharedReleasing
globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)
logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)
}
inodeLease.lastInterruptTime = time.Now()
inodeLease.interruptsSent = 1
Expand Down Expand Up @@ -248,6 +250,7 @@ func (inodeLease *inodeLeaseStruct) handleOperation(leaseRequestOperation *lease
logFatalf("(*inodeLeaseStruct).handleOperation() unable to json.Marshal(rpcInterrupt: %#v): %v [case 3]", rpcInterrupt, err)
}
globals.retryrpcServer.SendCallback(sharedHolderLeaseRequest.mount.retryRPCClientID, rpcInterruptBuf)
logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", sharedHolderLeaseRequest.mount.retryRPCClientID, rpcInterrupt)
}
inodeLease.lastInterruptTime = time.Now()
inodeLease.interruptsSent = 1
Expand All @@ -274,6 +277,7 @@ func (inodeLease *inodeLeaseStruct) handleOperation(leaseRequestOperation *lease
logFatalf("(*inodeLeaseStruct).handleOperation() unable to json.Marshal(rpcInterrupt: %#v): %v [case 4]", rpcInterrupt, err)
}
globals.retryrpcServer.SendCallback(inodeLease.exclusiveHolder.mount.retryRPCClientID, rpcInterruptBuf)
logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", inodeLease.exclusiveHolder.mount.retryRPCClientID, rpcInterrupt)
inodeLease.exclusiveHolder = nil
inodeLease.lastInterruptTime = time.Now()
inodeLease.interruptsSent = 1
Expand Down Expand Up @@ -713,6 +717,8 @@ func (inodeLease *inodeLeaseStruct) handleLongAgoTimerPop() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

leaseRequestElement = inodeLease.sharedHoldersList.Front()
}

Expand Down Expand Up @@ -746,6 +752,8 @@ func (inodeLease *inodeLeaseStruct) handleLongAgoTimerPop() {
}

globals.retryrpcServer.SendCallback(inodeLease.demotingHolder.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", inodeLease.demotingHolder.mount.retryRPCClientID, rpcInterrupt)
case leaseRequestStateExclusiveRequested:
inodeLease.leaseState = inodeLeaseStateExclusiveReleasing

Expand All @@ -764,6 +772,8 @@ func (inodeLease *inodeLeaseStruct) handleLongAgoTimerPop() {

globals.retryrpcServer.SendCallback(inodeLease.exclusiveHolder.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", inodeLease.exclusiveHolder.mount.retryRPCClientID, rpcInterrupt)

inodeLease.exclusiveHolder = nil
default:
logFatalf("(*inodeLeaseStruct).handleLongAgoTimerPop() found requestedList with unexpected leaseRequest.requestState: %v", leaseRequest.requestState)
Expand Down Expand Up @@ -978,6 +988,8 @@ func (inodeLease *inodeLeaseStruct) handleInterruptTimerPop() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

leaseRequestElement = leaseRequestElement.Next()
}
case inodeLeaseStateSharedReleasing:
Expand All @@ -1000,6 +1012,8 @@ func (inodeLease *inodeLeaseStruct) handleInterruptTimerPop() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

leaseRequestElement = leaseRequestElement.Next()
}
case inodeLeaseStateExclusiveReleasing:
Expand All @@ -1021,6 +1035,8 @@ func (inodeLease *inodeLeaseStruct) handleInterruptTimerPop() {
}

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)
case inodeLeaseStateExclusiveDemoting:
if nil == inodeLease.demotingHolder {
logFatalf("(*inodeLeaseStruct).handleInterruptTimerPop() found empty demotingHolder [case 2]")
Expand All @@ -1037,6 +1053,8 @@ func (inodeLease *inodeLeaseStruct) handleInterruptTimerPop() {
}

globals.retryrpcServer.SendCallback(inodeLease.demotingHolder.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", inodeLease.demotingHolder.mount.retryRPCClientID, rpcInterrupt)
default:
logFatalf("(*inodeLeaseStruct).handleInterruptTimerPop() found unexpected leaseState: %v [case 2]", inodeLease.leaseState)
}
Expand Down Expand Up @@ -1098,6 +1116,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

inodeLease.leaseState = inodeLeaseStateSharedReleasing
}

Expand Down Expand Up @@ -1150,6 +1170,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {
}

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)
}

inodeLease.leaseState = inodeLeaseStateSharedReleasing
Expand Down Expand Up @@ -1178,6 +1200,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

inodeLease.leaseState = inodeLeaseStateExclusiveReleasing

inodeLease.lastInterruptTime = time.Now()
Expand Down Expand Up @@ -1233,6 +1257,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

inodeLease.leaseState = inodeLeaseStateExclusiveReleasing

if !inodeLease.interruptTimer.Stop() {
Expand Down Expand Up @@ -1380,6 +1406,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

leaseRequestElement = leaseRequestElement.Next()
}

Expand Down Expand Up @@ -1419,6 +1447,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

inodeLease.lastInterruptTime = time.Now()
inodeLease.interruptsSent++

Expand Down Expand Up @@ -1456,6 +1486,8 @@ func (inodeLease *inodeLeaseStruct) handleStopChanClose() {

globals.retryrpcServer.SendCallback(leaseRequest.mount.retryRPCClientID, rpcInterruptBuf)

logTracef("<== [RPC] SendCallback(clientID: 0x%016X, rpcInterrupt: %+v)", leaseRequest.mount.retryRPCClientID, rpcInterrupt)

inodeLease.lastInterruptTime = time.Now()
inodeLease.interruptsSent++

Expand Down Expand Up @@ -1490,6 +1522,60 @@ RequestChanDrained:
runtime.Goexit()
}

func (leaseRequest *leaseRequestStruct) okToRead() (ok bool) {
switch leaseRequest.requestState {
case leaseRequestStateNone:
ok = false
case leaseRequestStateSharedRequested:
ok = false
case leaseRequestStateSharedGranted:
ok = true
case leaseRequestStateSharedPromoting:
ok = true
case leaseRequestStateSharedReleasing:
ok = true
case leaseRequestStateExclusiveRequested:
ok = false
case leaseRequestStateExclusiveGranted:
ok = true
case leaseRequestStateExclusiveDemoting:
ok = true
case leaseRequestStateExclusiveReleasing:
ok = true
default:
logFatalf("(*leaseRequestStruct).okToRead() called while for unknown leaseRequest.requestState: %v", leaseRequest.requestState)
}

return
}

func (leaseRequest *leaseRequestStruct) okToWrite() (ok bool) {
switch leaseRequest.requestState {
case leaseRequestStateNone:
ok = false
case leaseRequestStateSharedRequested:
ok = false
case leaseRequestStateSharedGranted:
ok = false
case leaseRequestStateSharedPromoting:
ok = false
case leaseRequestStateSharedReleasing:
ok = false
case leaseRequestStateExclusiveRequested:
ok = false
case leaseRequestStateExclusiveGranted:
ok = true
case leaseRequestStateExclusiveDemoting:
ok = true
case leaseRequestStateExclusiveReleasing:
ok = true
default:
logFatalf("(*leaseRequestStruct).okToWrite() called while for unknown leaseRequest.requestState: %v", leaseRequest.requestState)
}

return
}

// armReleaseOfAllLeasesWhileLocked is called to schedule releasing of all held leases
// for a specific mountStruct. It is called while inodeLease.volume is locked. The
// leaseReleaseStartWG is assumed to be a sync.WaitGroup with a count of 1 such that
Expand Down
8 changes: 4 additions & 4 deletions imgr/imgrpkg/retry-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func getInodeTableEntry(getInodeTableEntryRequest *GetInodeTableEntryRequestStru
}

leaseRequest, ok = mount.leaseRequestMap[getInodeTableEntryRequest.InodeNumber]
if !ok || ((leaseRequestStateSharedGranted != leaseRequest.requestState) && (leaseRequestStateExclusiveGranted != leaseRequest.requestState)) {
if !ok || !leaseRequest.okToRead() {
globals.Unlock()
err = fmt.Errorf("%s %016X", EMissingLease, getInodeTableEntryRequest.InodeNumber)
return
Expand Down Expand Up @@ -524,7 +524,7 @@ func putInodeTableEntries(putInodeTableEntriesRequest *PutInodeTableEntriesReque

for _, putInodeTableEntry = range putInodeTableEntriesRequest.UpdatedInodeTableEntryArray {
leaseRequest, ok = mount.leaseRequestMap[putInodeTableEntry.InodeNumber]
if !ok || (leaseRequestStateExclusiveGranted != leaseRequest.requestState) {
if !ok || !leaseRequest.okToWrite() {
globals.Unlock()
err = fmt.Errorf("%s %016X", EMissingLease, putInodeTableEntry.InodeNumber)
return
Expand Down Expand Up @@ -608,7 +608,7 @@ func deleteInodeTableEntry(deleteInodeTableEntryRequest *DeleteInodeTableEntryRe
}

leaseRequest, ok = mount.leaseRequestMap[deleteInodeTableEntryRequest.InodeNumber]
if !ok || (leaseRequestStateExclusiveGranted != leaseRequest.requestState) {
if !ok || !leaseRequest.okToWrite() {
globals.Unlock()
err = fmt.Errorf("%s %016X", EMissingLease, deleteInodeTableEntryRequest.InodeNumber)
return
Expand Down Expand Up @@ -671,7 +671,7 @@ func adjustInodeTableEntryOpenCount(adjustInodeTableEntryOpenCountRequest *Adjus
}

leaseRequest, ok = mount.leaseRequestMap[adjustInodeTableEntryOpenCountRequest.InodeNumber]
if !ok || ((leaseRequestStateSharedGranted != leaseRequest.requestState) && (leaseRequestStateExclusiveGranted != leaseRequest.requestState)) {
if !ok || !leaseRequest.okToRead() {
globals.Unlock()
err = fmt.Errorf("%s %016X", EMissingLease, adjustInodeTableEntryOpenCountRequest.InodeNumber)
return
Expand Down

0 comments on commit 2b55063

Please sign in to comment.