diff --git a/tests/internal/test_ipfs.py b/tests/internal/test_ipfs.py index 18fdb182..e6af920a 100644 --- a/tests/internal/test_ipfs.py +++ b/tests/internal/test_ipfs.py @@ -114,10 +114,16 @@ def test_fetch_cid_status_from_ipfs(): status = fetch_cid_status_from_ipfs("bafkreigvk6oenx6mp4mca4at4znujzgljywcfghuvrcxxkhye5b7ghutbm") assert status == 200 status = fetch_cid_status_from_ipfs("bafkreigdsodbw6dlajnk7xyudw52cutzioovt7r7mrdf3t3cx7xfzz3eou") - assert status == 404 + assert status == 404 or status == 504 # depends on service def test_upload_vote_ipfs_description(): + result = upload_vote_ipfs_description("test string", True) + + assert result["cid"] == "bafkreigvk6oenx6mp4mca4at4znujzgljywcfghuvrcxxkhye5b7ghutbm" + assert result["text"] == "test string" + assert len(result["messages"]) == 0 + result = upload_vote_ipfs_description("test string") assert result["cid"] == "bafkreigvk6oenx6mp4mca4at4znujzgljywcfghuvrcxxkhye5b7ghutbm" diff --git a/utils/config.py b/utils/config.py index 4a7c3593..8392727a 100644 --- a/utils/config.py +++ b/utils/config.py @@ -61,9 +61,9 @@ def get_deployer_account() -> Union[LocalAccount, Account]: return accounts.load(os.environ["DEPLOYER"]) if (is_live or "DEPLOYER" in os.environ) else accounts[4] -def get_web3_storage_token() -> str: +def get_web3_storage_token(silent=False) -> str: is_live = get_is_live() - if is_live and "WEB3_STORAGE_TOKEN" not in os.environ: + if is_live and not silent and "WEB3_STORAGE_TOKEN" not in os.environ: raise EnvironmentError( "Please set WEB3_STORAGE_TOKEN env variable to the web3.storage API token to be able to " "upload the vote description to IPFS by calling upload_vote_ipfs_description. Alternatively, " @@ -73,6 +73,38 @@ def get_web3_storage_token() -> str: return os.environ["WEB3_STORAGE_TOKEN"] if (is_live or "WEB3_STORAGE_TOKEN" in os.environ) else "" +def get_pinata_cloud_token(silent=False) -> str: + is_live = get_is_live() + if is_live and not silent and "PINATA_CLOUD_TOKEN" not in os.environ: + raise EnvironmentError( + "Please set PINATA_CLOUD_TOKEN env variable to the pinata.cloud API token to be able to " + "upload the vote description to IPFS by calling upload_vote_ipfs_description. Alternatively, " + "you can only calculate cid without uploading to IPFS by calling calculate_vote_ipfs_description" + ) + + return os.environ["PINATA_CLOUD_TOKEN"] if (is_live or "PINATA_CLOUD_TOKEN" in os.environ) else "" + + +def get_infura_io_keys(silent=False) -> Tuple[str, str]: + is_live = get_is_live() + if is_live and not silent and ( + "WEB3_INFURA_IPFS_PROJECT_ID" not in os.environ or "WEB3_INFURA_IPFS_PROJECT_SECRET" not in os.environ + ): + raise EnvironmentError( + "Please set WEB3_INFURA_IPFS_PROJECT_ID and WEB3_INFURA_IPFS_PROJECT_SECRET env variable " + "to the web3.storage api token" + ) + project_id = ( + os.environ["WEB3_INFURA_IPFS_PROJECT_ID"] if (is_live or "WEB3_INFURA_IPFS_PROJECT_ID" in os.environ) else "" + ) + project_secret = ( + os.environ["WEB3_INFURA_IPFS_PROJECT_SECRET"] + if (is_live or "WEB3_INFURA_IPFS_PROJECT_SECRET" in os.environ) + else "" + ) + return project_id, project_secret + + def prompt_bool() -> Optional[bool]: choice = input().lower() if choice in {"yes", "y"}: @@ -227,7 +259,7 @@ def dai_token(self) -> interface.ERC20: @property def usdt_token(self) -> interface.ERC20: return interface.ERC20(USDT_TOKEN) - + @property def usdc_token(self) -> interface.ERC20: return interface.ERC20(USDC_TOKEN) diff --git a/utils/ipfs.py b/utils/ipfs.py index a38c9692..5bbf4835 100644 --- a/utils/ipfs.py +++ b/utils/ipfs.py @@ -5,10 +5,11 @@ import requests from typing import Tuple, TypedDict from os import linesep +import json from ipfs_cid import cid_sha256_hash -from utils.config import get_web3_storage_token +from utils.config import get_pinata_cloud_token, get_infura_io_keys, get_web3_storage_token from utils.checksummed_address import checksum_verify # https://github.com/multiformats/multibase/blob/master/multibase.csv @@ -48,6 +49,46 @@ class IPFSUploadResult(TypedDict): messages: list[Tuple[str, str]] +# alternative for upload_str_to_web3_storage +def _upload_str_to_infura_io(text: str) -> str: + text_bytes = text.encode("utf-8") + text_file = io.BytesIO(text_bytes) + files = {"file": text_file} + (projectId, projectSecret) = get_infura_io_keys() + + endpoint = "https://ipfs.infura.io:5001" + + response = requests.post(endpoint + "/api/v0/add?cid-version=1", files=files, auth=(projectId, projectSecret)) + response.raise_for_status() + response_json = response.json() + + return response_json.get("Hash") + + +# alternative for upload_str_to_web3_storage +def _upload_str_to_pinata_cloud(text: str) -> str: + text_bytes = text.encode("utf-8") + text_file = io.BytesIO(text_bytes) + files = {"file": text_file} + pinata_cloud_token = get_pinata_cloud_token() + + endpoint = "https://api.pinata.cloud" + + pinata_options = {"cidVersion": 1, "wrapWithDirectory": False} + payload = {"pinataOptions": json.dumps(pinata_options, separators=(",", ":"))} + + headers = { + "accept": "application/json", + "authorization": f"Bearer {pinata_cloud_token}" + } + + response = requests.post(endpoint + "/pinning/pinFileToIPFS", data=payload, files=files, headers=headers) + response.raise_for_status() + response_json = response.json() + + return response_json.get("IpfsHash") + + # upload text to web3.storage ipfs def _upload_str_to_web3_storage(text: str) -> str: text_bytes = text.encode("utf-8") @@ -65,6 +106,15 @@ def _upload_str_to_web3_storage(text: str) -> str: def _upload_str_to_ipfs(text: str) -> str: + if get_pinata_cloud_token(silent=True): + print(f"Uploading to pinata.cloud IPFS") + return _upload_str_to_pinata_cloud(text) + + if get_infura_io_keys(silent=True): + print(f"Uploading to infura.io IPFS") + return _upload_str_to_infura_io(text) + + print(f"Uploading to web3.storage IPFS") return _upload_str_to_web3_storage(text) @@ -93,7 +143,6 @@ async def _fetch_cid_status_from_ipfs_async(cid: str) -> int: request_urls = [ get_url_by_cid(cid), # faster for uploaded files - f"https://api.web3.storage/status/{cid}", # much faster for not uploaded files ] async with aiohttp.ClientSession() as session: @@ -186,7 +235,7 @@ def calculate_vote_ipfs_description(text: str) -> IPFSUploadResult: return IPFSUploadResult(cid=calculated_cid, messages=messages, text=text) -def upload_vote_ipfs_description(text: str) -> IPFSUploadResult: +def upload_vote_ipfs_description(text: str, force_upload = False) -> IPFSUploadResult: messages = verify_ipfs_description(text) calculated_cid = "" if not text: @@ -198,7 +247,8 @@ def upload_vote_ipfs_description(text: str) -> IPFSUploadResult: raise Exception("Couldn't calculate the ipfs hash for description.") status = fetch_cid_status_from_ipfs(calculated_cid) - if status < 400: + + if status < 400 and not force_upload: # have found file so CID is good return IPFSUploadResult(cid=calculated_cid, messages=messages, text=text)