Skip to content

Commit

Permalink
Use raw task in scanNetworkGraph()
Browse files Browse the repository at this point in the history
  • Loading branch information
cipres authored and cipres committed Sep 10, 2020
1 parent d226fb8 commit 9a5f90b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
31 changes: 23 additions & 8 deletions galacteek/core/ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
import time
import os.path
import functools
from datetime import datetime

from PyQt5.QtWidgets import QApplication
Expand Down Expand Up @@ -162,7 +163,7 @@ async def watch(self, ipfsop):
))

self.debug(f'Ping OK: {pingAvg}')
ensureLater(120, self.watch)
ensureLater(180, self.watch)

await self.sStatusChanged.emit()
else:
Expand Down Expand Up @@ -248,7 +249,7 @@ def __init__(self, ctx):
self.evStopWatcher = asyncio.Event()
self._byPeerId = collections.OrderedDict()
self._byHandle = collections.OrderedDict()
self._didGraphLTasks = {}
self._didGraphLStatus = []
self._didAuthInp = {}
self._pgScanCount = 0

Expand Down Expand Up @@ -304,6 +305,15 @@ async def networkGraphChanged(self, dagCid):
async def scanNetworkGraph(self, ipfsop):
profile = ipfsop.ctx.currentProfile

def didLoadCallback(did, future):
try:
res = future.result()
self._didGraphLStatus.remove(did)
except Exception as err:
log.debug(f'didLoadCallback for {did}: error {err}')
else:
log.debug(f'loadDidFromGraph for {did} returned: {res}')

async with profile.dagNetwork.read() as ng:
for peerId, peerHandles in ng.d['peers'].items():
for handle, hData in peerHandles.items():
Expand All @@ -322,16 +332,19 @@ async def scanNetworkGraph(self, ipfsop):
log.debug(f'DID {did}: already in model')
continue

if did in self._didGraphLTasks:
if did in self._didGraphLStatus:
await ipfsop.sleep()
continue

self._didGraphLTasks[did] = await self.app.scheduler.spawn(
task = ensure(
self.loadDidFromGraph(ipfsop, peerId, did, sHandle))
task.add_done_callback(functools.partial(
didLoadCallback, did))
self._didGraphLStatus.append(did)

await ipfsop.sleep(0.1)

await ipfsop.sleep()
await ipfsop.sleep(0.05)

self._pgScanCount += 1

Expand Down Expand Up @@ -378,10 +391,11 @@ async def loadDidFromGraph(self, ipfsop, peerId: str, did: str,
if piCtx.peerId not in self._byPeerId:
self._byPeerId[piCtx.peerId] = piCtx

await self.peerAdded.emit(piCtx)

log.debug(f'Loaded IPID from graph: {did}')

await self.peerAdded.emit(piCtx)
await ipfsop.sleep(1)

return True

@ipfsOp
Expand Down Expand Up @@ -469,7 +483,8 @@ async def registerFromIdent(self, ipfsop, iMsg):
)
ipid.sChanged.connectTo(partialEnsure(
self.onPeerDidModified, piCtx))
piCtx.sInactive.connectTo(self.onUnresponsivePeer)
piCtx.sStatusChanged.connectTo(partialEnsure(
self.peerModified.emit, piCtx))

ensure(self.didPerformAuth(piCtx))

Expand Down
7 changes: 5 additions & 2 deletions galacteek/ipfs/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,11 @@ async def rewind(self, ipfsop):
self._dagRoot = pDag
await self.saveNewCid(newCid)

await ipfsop.pinUpdate(prevCid, newCid,
unpin=self._unpinOnUpdate)
await ipfsop.waitFor(
ipfsop.pinUpdate(prevCid, newCid,
unpin=self._unpinOnUpdate),
10
)
else:
raise DAGRewindException('No DAG history')

Expand Down
6 changes: 3 additions & 3 deletions galacteek/ui/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from galacteek.appsettings import *

from galacteek.core import modelhelpers
from galacteek.core import utcDatetimeIso
from galacteek.core import datetimeIsoH
from galacteek.core.models.mfs import MFSItem
from galacteek.core.models.mfs import MFSNameItem
from galacteek.core.models.mfs import MFSTimeFrameItem
Expand Down Expand Up @@ -2164,7 +2164,7 @@ async def onClose(self):

@ipfsOp
async def runGc(self, ipfsop):
self.log.append("GC run, start date: {d}\n".format(d=utcDatetimeIso()))
self.log.append("GC run, start date: {d}\n".format(d=datetimeIsoH()))
purgedCn = 0

async for entry in ipfsop.client.repo.gc():
Expand All @@ -2179,5 +2179,5 @@ async def runGc(self, ipfsop):
await ipfsop.sleep(0.08)

self.log.append("\n")
self.log.append("GC done, end date: {d}\n".format(d=utcDatetimeIso()))
self.log.append("GC done, end date: {d}\n".format(d=datetimeIsoH()))
self.log.append(f'Purged {purgedCn} CIDs')

0 comments on commit 9a5f90b

Please sign in to comment.