Skip to content

Commit

Permalink
IPID: cache local DID in nscache
Browse files Browse the repository at this point in the history
Better aiojobs scheduler shutdown

ipfsops: nameResolveStream() uses NS cache as last option
if DHT returns nothing
  • Loading branch information
cipres authored and cipres committed Jul 5, 2020
1 parent d27bc8f commit c1dd756
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 46 deletions.
4 changes: 1 addition & 3 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@

.. image:: https://raw.githubusercontent.com/pinnaculum/galacteek/master/share/icons/galacteek.png
.. image:: https://raw.githubusercontent.com/pinnaculum/galacteek/master/share/icons/galacteek-128.png
:align: center
:width: 64
:height: 64

:info: A multi-platform browser for the distributed web

Expand Down
32 changes: 25 additions & 7 deletions galacteek/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ def __init__(self, debug=False, profile='main', sslverify=True,
def cmdArgs(self):
return self._cmdArgs

@property
def shuttingDown(self):
return self._shuttingDown

@property
def offline(self):
return self.cmdArgs.offline
Expand Down Expand Up @@ -735,9 +739,12 @@ async def stopIpfsServices(self):
def setupDb(self):
ensure(self.setupOrmDb(self._mainDbLocation))

def jobsExceptionHandler(self, scheduler, context):
pass

async def setupOrmDb(self, dbpath):
self.scheduler = await aiojobs.create_scheduler(
close_timeout=1.5,
close_timeout=1.0,
limit=150,
pending_limit=1000
)
Expand Down Expand Up @@ -1134,6 +1141,21 @@ async def restartApp(self):
def onExit(self):
ensure(self.exitApp())

async def shutdownScheduler(self):
# It ain't that bad. STFS with dignity

for stry in range(0, 12):
try:
async with async_timeout.timeout(2):
await self.scheduler.close()
except asyncio.TimeoutError:
log.warning(
'Timeout shutting down the scheduler (not fooled)')
continue
else:
log.debug(f'Scheduler went down (try: {stry})')
return

async def exitApp(self):
self._shuttingDown = True

Expand All @@ -1152,12 +1174,11 @@ async def exitApp(self):
except:
pass

await self.scheduler.close()

await self.stopIpfsServices()

await cancelAllTasks(timeout=5)
await self.loop.shutdown_asyncgens()
await self.shutdownScheduler()
await cancelAllTasks()

await self.ethereum.stop()

Expand All @@ -1167,9 +1188,6 @@ async def exitApp(self):
if self.ipfsd:
self.ipfsd.stop()

if self.ipfsCtx.inOrbit:
await self.ipfsCtx.orbitConnector.stop()

self.mainWindow.close()

if self.debug:
Expand Down
9 changes: 9 additions & 0 deletions galacteek/core/asynclib.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def connectTo(self, callback):
async def emit(self, *args, **kwargs):
from galacteek import log

app = QApplication.instance()

if app.shuttingDown:
# Prevent emitting signals during the app's shutdown
log.debug(
'{!r}: Application is shutting down, not emitting'.format(
self))
return

if len(args) != len(self._sig):
log.debug(
'{!r}: does not match signature: {} !'.format(
Expand Down
10 changes: 6 additions & 4 deletions galacteek/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ async def preload(self):
async def processTask(self):
await self.preload()

while True:
try:
try:
while True:
feeds = await self.allFeeds()

for feed in feeds:
Expand All @@ -444,5 +444,7 @@ async def processTask(self):
e=str(err)))

await asyncio.sleep(60)
except asyncio.CancelledError:
return
except asyncio.CancelledError:
return
except Exception:
return
17 changes: 14 additions & 3 deletions galacteek/did/ipid.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from galacteek import AsyncSignal
from galacteek import ensure
from galacteek import log
from galacteek.core import SingletonDecorator

from galacteek.ipfs.wrappers import ipfsOp
from galacteek.ipfs.cidhelpers import stripIpfs
Expand Down Expand Up @@ -525,14 +526,16 @@ async def updateDocument(self, ipfsop, document, publish=False):
@ipfsOp
async def resolve(self, ipfsop, resolveTimeout=30):
useCache = 'always' if self.local else 'never'
maxLifetime = 86400 * 7 if self.local else 60 * 10

self.message('DID resolve: {did} (using cache: {usecache})'.format(
did=self.ipnsKey, usecache=useCache))

return await ipfsop.nameResolveStreamFirst(
joinIpns(self.ipnsKey),
timeout=resolveTimeout,
useCache=useCache
useCache=useCache,
maxCacheLifetime=maxLifetime
)

async def refresh(self):
Expand Down Expand Up @@ -712,12 +715,13 @@ async def searchServices(self, query: str):

async def searchServiceById(self, _id: str):
for srvNode in await self.getServices():

if srvNode['id'] == _id:
_inst = self._serviceInst(srvNode)
if _inst:
return _inst

await asyncio.sleep(0)

async def removeServiceById(self, _id: str):
"""
Remove the service with the give DID identifier
Expand Down Expand Up @@ -753,6 +757,7 @@ def __str__(self):
return 'IP Identifier: {did}'.format(did=self.did)


@SingletonDecorator
class IPIDManager:
def __init__(self, resolveTimeout=60 * 5):
self._managedIdentifiers = {}
Expand All @@ -773,20 +778,26 @@ async def searchServices(self, term: str):
async for srv in ipid.searchServices(term):
yield srv

await asyncio.sleep(0)

async def getServiceById(self, _id: str):
with await self._lock:
for did, ipid in self._managedIdentifiers.items():
srv = await ipid.searchServiceById(_id)
if srv:
return srv

await asyncio.sleep(0)

async def trackingTask(self):
while True:
await asyncio.sleep(60 * 5)

with await self._lock:
for ipId in self._managedIdentifiers:
for did, ipId in self._managedIdentifiers.items():
await ipId.load()
log.debug('tracker: {0} => {1}'.format(
ipId, ipId.docCid))

@ipfsOp
async def load(self, ipfsop,
Expand Down
22 changes: 14 additions & 8 deletions galacteek/hashmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,22 @@ class HashmarksSynchronizer:
syncing = AsyncSignal(bool)

async def syncTask(self):
while True:
await asyncio.sleep(60)
try:
while True:
await asyncio.sleep(60)

sources = await database.hashmarkSourcesNeedSync(
minutes=60 * 3)
count = len(sources)
sources = await database.hashmarkSourcesNeedSync(
minutes=60 * 3)
count = len(sources)

if count > 0:
log.debug('Unsynced sources: {}, syncing now'.format(count))
await self.sync()
if count > 0:
log.debug(
'Unsynced sources: {}, syncing now'.format(count))
await self.sync()
except asyncio.CancelledError:
log.debug('Sync task cancelled')
except Exception as err:
log.debug(f'Sync task error: {err}')

async def sync(self):
_count, _scount = 0, 0
Expand Down
102 changes: 87 additions & 15 deletions galacteek/ipfs/ipfsops.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ def availCommands(self):
def debug(self, msg):
log.debug('IPFSOp({0}): {1}'.format(self.uid, msg))

def info(self, msg):
log.info('IPFSOp({0}): {1}'.format(self.uid, msg))

def setRsaAgent(self, agent):
self._rsaAgent = agent

Expand Down Expand Up @@ -684,13 +687,14 @@ def nsCacheLoad(self):

if not jsonSchemaValidate(cache, nsCacheSchema):
raise Exception('Invalid NS cache schema')
except Exception as e:
self.debug(str(e))
except Exception as err:
self.debug(f'Error loading NS cache: {err}')
else:
self.info(f'NS cache: loaded from {self._nsCachePath}')
self._nsCache = cache

async def nsCacheSave(self):
if not self._nsCachePath:
if not self._nsCachePath or not isinstance(self.nsCache, dict):
return

async with self._lock:
Expand Down Expand Up @@ -763,13 +767,13 @@ async def nameResolve(self, path, timeout=20, recursive=False,

return resolved

async def nameResolveStream(self, path, count=3,
timeout=20,
useCache='never',
cache='never',
cacheOrigin='unknown',
recursive=True,
maxCacheLifetime=60 * 10):
async def nameResolveStreamLegacy(self, path, count=3,
timeout=20,
useCache='never',
cache='never',
cacheOrigin='unknown',
recursive=True,
maxCacheLifetime=60 * 10):
usingCache = useCache == 'always' or \
(useCache == 'offline' and self.noPeers and 0)
cache = cache == 'always' or (cache == 'offline' and self.noPeers)
Expand Down Expand Up @@ -809,17 +813,84 @@ async def nameResolveStream(self, path, count=3,
except aioipfs.APIError as e:
self.debug('streamed resolve error: {}'.format(e.message))

async def nameResolveStream(self, path, count=3,
timeout=20,
useCache='never',
cache='never',
cacheOrigin='unknown',
recursive=True,
maxCacheLifetime=60 * 10,
debug=True):
"""
DHT is used first for resolution.
NS cache is used as last option (used by local IPID).
"""

usingCache = useCache == 'always' or \
(useCache == 'offline' and self.noPeers and 0)
rTimeout = '{t}s'.format(t=timeout) if isinstance(timeout, int) else \
timeout
_yieldedcn = 0

try:
async for nentry in self.client.name.resolve_stream(
name=path,
recursive=recursive,
stream=True,
dht_record_count=count,
dht_timeout=rTimeout):
if debug:
self.debug(
f'nameResolveStream (timeout: {timeout}) '
f'({path}): {nentry}')

yield nentry
_yieldedcn += 1
except asyncio.TimeoutError:
self.debug(
f'nameResolveStream (timeout: {timeout}) '
f'({path}): Timed out')
except aioipfs.APIError as e:
self.debug('nameResolveStream API error: {}'.format(e.message))
except Exception as gerr:
self.debug(f'nameResolveStream ({path}) unknown error: {gerr}')

if _yieldedcn == 0 and usingCache:
# The NS cache is used only for IPIDs when offline
rPath = self.nsCacheGet(
path, maxLifetime=maxCacheLifetime,
knownOrigin=True
)

if rPath and IPFSPath(rPath).valid:
self.debug(
'nameResolveStream: from cache: {0} for {1}'.format(
rPath, path))
yield {
'Path': rPath
}

async def nameResolveStreamFirst(self, path, count=1,
timeout=8,
timeout=10,
cache='never',
cacheOrigin='unknown',
useCache='never'):
useCache='never',
maxCacheLifetime=60 * 10,
debug=True):
"""
A wrapper around the nameResolveStream async gen,
returning the last result of the yielded values
:rtype: dict
"""
matches = []
async for entry in self.nameResolveStream(
path,
timeout=timeout,
cache=cache, cacheOrigin=cacheOrigin,
useCache=useCache):
maxCacheLifetime=maxCacheLifetime,
useCache=useCache,
debug=debug):
found = entry.get('Path')
if found:
matches.append(found)
Expand Down Expand Up @@ -1092,7 +1163,8 @@ async def addPath(self, path, recursive=True, wrap=False,
if callbackvalid:
await callback(entry)
except aioipfs.APIError as err:
self.debug('addPath: API error: {}'.format(err.message))
self.debug('addPath: {path}: API error: {e}'.format(
path=path, e=err.message))
return None
except asyncio.CancelledError:
self.debug('addPath: cancelled')
Expand Down Expand Up @@ -1257,7 +1329,7 @@ async def objectPathMapCacheResolve(self, path):
"""
Simple async TTL cache for results from nameResolveStreamFirst()
"""
return await self.nameResolveStreamFirst(path)
return await self.nameResolveStreamFirst(path, debug=False)

def sResolveCacheClear(self, time=None):
sResolveCache.expire(time)
Expand Down
5 changes: 4 additions & 1 deletion galacteek/ipfs/pubsub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,11 @@ async def sendIdent(self, op, profile):
)

if not ipid:
logger.debug('Failed to load local DID')
logger.info('Failed to load local DID')
return
else:
logger.debug('Local IPID ({did}) load: OK, dagCID is {cid}'.format(
did=profile.userInfo.personDid, cid=ipid.docCid))

msg = await PeerIdentMessageV3.make(
nodeId,
Expand Down
Loading

0 comments on commit c1dd756

Please sign in to comment.