From 9a5f90b860baa01300b8d6b7ba43c5abc11ee563 Mon Sep 17 00:00:00 2001 From: cipres Date: Thu, 10 Sep 2020 12:46:14 +0200 Subject: [PATCH] Use raw task in scanNetworkGraph() --- galacteek/core/ctx.py | 31 +++++++++++++++++++++++-------- galacteek/ipfs/dag.py | 7 +++++-- galacteek/ui/files.py | 6 +++--- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/galacteek/core/ctx.py b/galacteek/core/ctx.py index 916fa9d1..8e495fe0 100644 --- a/galacteek/core/ctx.py +++ b/galacteek/core/ctx.py @@ -3,6 +3,7 @@ import collections import time import os.path +import functools from datetime import datetime from PyQt5.QtWidgets import QApplication @@ -162,7 +163,7 @@ async def watch(self, ipfsop): )) self.debug(f'Ping OK: {pingAvg}') - ensureLater(120, self.watch) + ensureLater(180, self.watch) await self.sStatusChanged.emit() else: @@ -248,7 +249,7 @@ def __init__(self, ctx): self.evStopWatcher = asyncio.Event() self._byPeerId = collections.OrderedDict() self._byHandle = collections.OrderedDict() - self._didGraphLTasks = {} + self._didGraphLStatus = [] self._didAuthInp = {} self._pgScanCount = 0 @@ -304,6 +305,15 @@ async def networkGraphChanged(self, dagCid): async def scanNetworkGraph(self, ipfsop): profile = ipfsop.ctx.currentProfile + def didLoadCallback(did, future): + try: + res = future.result() + self._didGraphLStatus.remove(did) + except Exception as err: + log.debug(f'didLoadCallback for {did}: error {err}') + else: + log.debug(f'loadDidFromGraph for {did} returned: {res}') + async with profile.dagNetwork.read() as ng: for peerId, peerHandles in ng.d['peers'].items(): for handle, hData in peerHandles.items(): @@ -322,16 +332,19 @@ async def scanNetworkGraph(self, ipfsop): log.debug(f'DID {did}: already in model') continue - if did in self._didGraphLTasks: + if did in self._didGraphLStatus: await ipfsop.sleep() continue - self._didGraphLTasks[did] = await self.app.scheduler.spawn( + task = ensure( self.loadDidFromGraph(ipfsop, peerId, did, sHandle)) + task.add_done_callback(functools.partial( + didLoadCallback, did)) + self._didGraphLStatus.append(did) await ipfsop.sleep(0.1) - await ipfsop.sleep() + await ipfsop.sleep(0.05) self._pgScanCount += 1 @@ -378,10 +391,11 @@ async def loadDidFromGraph(self, ipfsop, peerId: str, did: str, if piCtx.peerId not in self._byPeerId: self._byPeerId[piCtx.peerId] = piCtx - await self.peerAdded.emit(piCtx) - log.debug(f'Loaded IPID from graph: {did}') + await self.peerAdded.emit(piCtx) + await ipfsop.sleep(1) + return True @ipfsOp @@ -469,7 +483,8 @@ async def registerFromIdent(self, ipfsop, iMsg): ) ipid.sChanged.connectTo(partialEnsure( self.onPeerDidModified, piCtx)) - piCtx.sInactive.connectTo(self.onUnresponsivePeer) + piCtx.sStatusChanged.connectTo(partialEnsure( + self.peerModified.emit, piCtx)) ensure(self.didPerformAuth(piCtx)) diff --git a/galacteek/ipfs/dag.py b/galacteek/ipfs/dag.py index eb9d24c6..6facc242 100644 --- a/galacteek/ipfs/dag.py +++ b/galacteek/ipfs/dag.py @@ -499,8 +499,11 @@ async def rewind(self, ipfsop): self._dagRoot = pDag await self.saveNewCid(newCid) - await ipfsop.pinUpdate(prevCid, newCid, - unpin=self._unpinOnUpdate) + await ipfsop.waitFor( + ipfsop.pinUpdate(prevCid, newCid, + unpin=self._unpinOnUpdate), + 10 + ) else: raise DAGRewindException('No DAG history') diff --git a/galacteek/ui/files.py b/galacteek/ui/files.py index 13b7a76d..bf1d3caf 100644 --- a/galacteek/ui/files.py +++ b/galacteek/ui/files.py @@ -48,7 +48,7 @@ from galacteek.appsettings import * from galacteek.core import modelhelpers -from galacteek.core import utcDatetimeIso +from galacteek.core import datetimeIsoH from galacteek.core.models.mfs import MFSItem from galacteek.core.models.mfs import MFSNameItem from galacteek.core.models.mfs import MFSTimeFrameItem @@ -2164,7 +2164,7 @@ async def onClose(self): @ipfsOp async def runGc(self, ipfsop): - self.log.append("GC run, start date: {d}\n".format(d=utcDatetimeIso())) + self.log.append("GC run, start date: {d}\n".format(d=datetimeIsoH())) purgedCn = 0 async for entry in ipfsop.client.repo.gc(): @@ -2179,5 +2179,5 @@ async def runGc(self, ipfsop): await ipfsop.sleep(0.08) self.log.append("\n") - self.log.append("GC done, end date: {d}\n".format(d=utcDatetimeIso())) + self.log.append("GC done, end date: {d}\n".format(d=datetimeIsoH())) self.log.append(f'Purged {purgedCn} CIDs')