Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getting rid of some old mess #30

Merged
merged 8 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 0 additions & 34 deletions .github/workflows/default_gateways.yml

This file was deleted.

7 changes: 1 addition & 6 deletions .github/workflows/local_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ jobs:
max-parallel: 4
matrix:
python-version: ["3.8", "3.9", "3.10"]
ipfs-version: ["0.12.0"]
include:
- python-version: "3.10"
ipfs-version: "0.9.1"
env:
IPFSSPEC_GATEWAYS: "http://127.0.0.1:8080" # use only localhost as gateway
ipfs-version: ["0.27.0"] # this is the latest IPFS version supporting /api/v0, see issue #28
steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ with fsspec.open("ipfs://QmZ4tDuvesekSs4qM5ZBKpXiZGun7S2CYtEZRB3DYXkjGx", "r") a
print(f.read())
```

The current implementation uses a HTTP gateway to access the data. It tries to use a local one (which is expected to be found at `http://127.0.0.1:8080`) and falls back to `ipfs.io` if the local gateway is not available.
The current implementation uses a HTTP gateway to access the data. It uses [IPIP-280](https://github.com/ipfs/specs/pull/280) to determine which gateway to use. If you have a current installation of an IPFS node (e.g. kubo, IPFS Desktop etc...), you should be fine. In case you want to use a different gateway, you can use any of the methods specified in IPIP-280, e.g.:

You can modify the list of gateways using the space separated environment variable `IPFSSPEC_GATEWAYS`.
* create the file `~/.ipfs/gateway` with the gateway address as first line
* define the environment variable `IPFS_GATEWAY` to the gateway address
* create the file `/etc/ipfs/gateway` with the gateway address as first line

No matter which option you use, the gateway has to be specified as an HTTP(S) url, e.g.: `http://127.0.0.1:8080`.
3 changes: 1 addition & 2 deletions ipfsspec/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from .core import IPFSFileSystem
from .async_ipfs import AsyncIPFSFileSystem

from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

__all__ = ["__version__", "IPFSFileSystem", "AsyncIPFSFileSystem"]
__all__ = ["__version__", "AsyncIPFSFileSystem"]
206 changes: 88 additions & 118 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import io
import time
import os
import platform
import weakref
from functools import lru_cache
from pathlib import Path
import warnings

import asyncio
import aiohttp

from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError

from .utils import get_default_gateways

import logging

logger = logging.getLogger("ipfsspec")
Expand Down Expand Up @@ -138,129 +139,98 @@ def __str__(self):
return f"GW({self.url})"


class GatewayState:
def __init__(self):
self.reachable = True
self.next_request_time = 0
self.backoff_time = 0
self.start_backoff = 1e-5
self.max_backoff = 5

def schedule_next(self):
self.next_request_time = time.monotonic() + self.backoff_time

def backoff(self):
if self.backoff_time < self.start_backoff:
self.backoff_time = self.start_backoff
else:
self.backoff_time *= 2
self.reachable = True
self.schedule_next()

def speedup(self, not_below=0):
did_speed_up = False
if self.backoff_time > not_below:
self.backoff_time *= 0.9
did_speed_up = True
self.reachable = True
self.schedule_next()
return did_speed_up

def broken(self):
self.backoff_time = self.max_backoff
self.reachable = False
self.schedule_next()

def trying_to_reach(self):
self.next_request_time = time.monotonic() + 1


class MultiGateway(AsyncIPFSGatewayBase):
def __init__(self, gws, max_backoff_rounds=50):
self.gws = [(GatewayState(), gw) for gw in gws]
self.max_backoff_rounds = max_backoff_rounds

@property
def _gws_in_priority_order(self):
now = time.monotonic()
return sorted(self.gws, key=lambda x: max(now, x[0].next_request_time))

async def _gw_op(self, op):
for _ in range(self.max_backoff_rounds):
for state, gw in self._gws_in_priority_order:
not_before = state.next_request_time
if not state.reachable:
state.trying_to_reach()
else:
state.schedule_next()
now = time.monotonic()
if not_before > now:
await asyncio.sleep(not_before - now)
logger.debug("tring %s", gw)
try:
res = await op(gw)
if state.speedup(time.monotonic() - now):
logger.debug("%s speedup", gw)
return res
except FileNotFoundError: # early exit if object doesn't exist
raise
except (RequestsTooQuick, aiohttp.ClientResponseError, asyncio.TimeoutError) as e:
state.backoff()
logger.debug("%s backoff %s", gw, e)
break
except IOError as e:
exception = e
state.broken()
logger.debug("%s broken", gw)
continue
else:
raise exception
raise RequestsTooQuick()

async def api_get(self, endpoint, session, **kwargs):
return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs))

async def api_post(self, endpoint, session, **kwargs):
return await self._gw_op(lambda gw: gw.api_post(endpoint, session, **kwargs))

async def cid_head(self, path, session, headers=None, **kwargs):
return await self._gw_op(lambda gw: gw.cid_head(path, session, headers=headers, **kwargs))

async def cid_get(self, path, session, headers=None, **kwargs):
return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs))

async def cat(self, path, session):
return await self._gw_op(lambda gw: gw.cat(path, session))

async def ls(self, path, session):
return await self._gw_op(lambda gw: gw.ls(path, session))

def state_report(self):
return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws)

def __str__(self):
return "Multi-GW(" + ", ".join(str(gw) for _, gw in self.gws) + ")"


async def get_client(**kwargs):
timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5)
kwargs = {"timeout": timeout, **kwargs}
return aiohttp.ClientSession(**kwargs)


DEFAULT_GATEWAY = None
def gateway_from_file(gateway_path):
if gateway_path.exists():
with open(gateway_path) as gw_file:
ipfs_gateway = gw_file.readline().strip()
logger.debug("using IPFS gateway from %s: %s", gateway_path, ipfs_gateway)
return AsyncIPFSGateway(ipfs_gateway)
return None


@lru_cache
def get_gateway():
global DEFAULT_GATEWAY
if DEFAULT_GATEWAY is None:
use_gateway(*get_default_gateways())
return DEFAULT_GATEWAY


def use_gateway(*urls):
global DEFAULT_GATEWAY
DEFAULT_GATEWAY = MultiGateway([AsyncIPFSGateway(url) for url in urls])
"""
Get IPFS gateway according to IPIP-280

see: https://github.com/ipfs/specs/pull/280
"""

# IPFS_GATEWAY environment variable should override everything
ipfs_gateway = os.environ.get("IPFS_GATEWAY", "")
if ipfs_gateway:
logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway)
return AsyncIPFSGateway(ipfs_gateway)

# internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility
if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""):
ipfs_gateway = ipfsspec_gateways.split()[0]
logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway)
warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning)
return AsyncIPFSGateway(ipfs_gateway)

# check various well-known files for possible gateway configurations
if ipfs_path := os.environ.get("IPFS_PATH", ""):
if ipfs_gateway := gateway_from_file(Path(ipfs_path) / "gateway"):
return ipfs_gateway

if home := os.environ.get("HOME", ""):
if ipfs_gateway := gateway_from_file(Path(home) / ".ipfs" / "gateway"):
return ipfs_gateway

if config_home := os.environ.get("XDG_CONFIG_HOME", ""):
if ipfs_gateway := gateway_from_file(Path(config_home) / "ipfs" / "gateway"):
return ipfs_gateway

if ipfs_gateway := gateway_from_file(Path("/etc") / "ipfs" / "gateway"):
return ipfs_gateway

system = platform.system()

if system == "Windows":
candidates = [
Path(os.environ.get("LOCALAPPDATA")) / "ipfs" / "gateway",
Path(os.environ.get("APPDATA")) / "ipfs" / "gateway",
Path(os.environ.get("PROGRAMDATA")) / "ipfs" / "gateway",
]
elif system == "Darwin":
candidates = [
Path(os.environ.get("HOME")) / "Library" / "Application Support" / "ipfs" / "gateway",
Path("/Library") / "Application Support" / "ipfs" / "gateway",
]
elif system == "Linux":
candidates = [
Path(os.environ.get("HOME")) / ".config" / "ipfs" / "gateway",
Path("/etc") / "ipfs" / "gateway",
]
else:
candidates = []

for candidate in candidates:
if ipfs_gateway := gateway_from_file(candidate):
return ipfs_gateway

# if we reach this point, no gateway is configured
raise RuntimeError("IPFS Gateway could not be found!\n"
"In order to access IPFS, you must configure an "
"IPFS Gateway using a IPIP-280 configuration method. "
"Possible options are: \n"
" * set the environment variable IPFS_GATEWAY\n"
" * write a gateway in the first line of the file ~/.ipfs/gateway\n"
"\n"
"It's always best to run your own IPFS gateway, e.g. by using "
"IPFS Desktop (https://docs.ipfs.tech/install/ipfs-desktop/) or "
"the command line version Kubo (https://docs.ipfs.tech/install/command-line/). "
"If you can't run your own gateway, you may also try using the "
"public IPFS gateway at https://ipfs.io or https://dweb.link . "
"However, this is not recommended for productive use and you may experience "
"severe performance issues.")


class AsyncIPFSFileSystem(AsyncFileSystem):
Expand Down
Loading
Loading