Skip to content

Commit

Permalink
Update boost_create_deals.py (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor authored Jan 29, 2025
1 parent fdc24d0 commit 0460177
Showing 1 changed file with 81 additions and 82 deletions.
163 changes: 81 additions & 82 deletions dataprep-tools/filecoin/boost_create_deals.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def get_providers(
"startEpoch": start_epoch,
"duration": 1468800,
"storagePricePerEpoch": 0,
"providerCollateral": provider_collateral,
#"providerCollateral": provider_collateral,
"verifiedDeal": True,
"transferSize": size,
"transferType": "http",
Expand Down Expand Up @@ -206,7 +206,6 @@ def create_deals(metadata_obj):
commp_piece_cid = line[1]
payload_cid = line[2]
padded_size = line[3]
file_size = line[4]

check_obj = client.check_exists(epoch + "/" + file_name)
if not check_obj[0]:
Expand Down Expand Up @@ -235,7 +234,7 @@ def create_deals(metadata_obj):
"file_name": file_name,
"url": public_url,
"commp_piece_cid": commp_piece_cid,
"file_size": file_size,
"file_size": check_obj[1],
"padded_size": padded_size,
"payload_cid": payload_cid,
}
Expand Down Expand Up @@ -269,6 +268,7 @@ def create_deals(metadata_obj):
# Create a lock file for the epoch to ensure that no one else is working on it
if not client.check_exists(lockfile)[0]:
client.upload_obj(StringIO(socket.gethostname() + "_" + filetime), lockfile)
logging.info("wrote lock file")
else:
lock_data = client.read_object(lockfile)
logging.error("lock file exists, exiting: " + lock_data)
Expand Down Expand Up @@ -300,91 +300,89 @@ def create_deals(metadata_obj):
logging.info("creating deal for ")
logging.info(file_item)

if file_item["commp_piece_cid"] not in replications:
replications[file_item["commp_piece_cid"]] = []

while len(replications[file_item["commp_piece_cid"]]) < replication_factor:

providers = get_providers(
piece_cid=file_item["commp_piece_cid"],
size=file_item["file_size"],
padded_size=file_item["padded_size"],
)
logging.info(f"found providers: {providers}")

providers = get_providers(
piece_cid=file_item["commp_piece_cid"],
size=file_item["file_size"],
padded_size=file_item["padded_size"],
)
logging.info(f"found providers: {providers}")

for provider in providers:
if file_item["commp_piece_cid"] in replications:
if provider in replications[file_item["commp_piece_cid"]]:
logging.info(
"skipping %s, already have deal with %s"
% (file_item["commp_piece_cid"], provider)
)
continue
for provider in providers:

if (len(replications[file_item["commp_piece_cid"]])
>= replication_factor
):
if file_item["commp_piece_cid"] in replications:
if provider in replications[file_item["commp_piece_cid"]]:
logging.info(
"skipping %s, already replicated %s times"
% (
file_item["commp_piece_cid"],
replications[file_item["commp_piece_cid"]],
)
"skipping %s, already have deal with %s"
% (file_item["commp_piece_cid"], provider)
)
continue

if file_item["commp_piece_cid"] not in replications:
replications[file_item["commp_piece_cid"]] = []
elif (
len(replications[file_item["commp_piece_cid"]])
>= replication_factor
):
logging.info(
"skipping %s, already replicated %s times"
% (
file_item["commp_piece_cid"],
replications[file_item["commp_piece_cid"]],
)
)
continue

params = {
"provider": provider,
"commp": file_item["commp_piece_cid"],
"piece-size": file_item["padded_size"],
#"car-size": file_item["file_size"],
"payload-cid": file_item["payload_cid"],
"storage-price": "0",
"start-epoch-head-offset": start_epoch_head_offset,
"verified": "true",
"duration": 1468800,
"wallet": environ.get("WALLET"),
}
deal_arg = "offline-deal"
if online:
params["http-url"] = file_item["url"]
else:
deal_arg = "offline-deal"

logging.info(params)
cmd = ["boost", "--vv", "--json=1", deal_arg] + [
f"--{k}={v}" for k, v in params.items()
]

logging.info(cmd)

if dry_run:
out = '{ "dealUuid": "dry-run-uuid", "dealState": "dry-run-state"}'
else:
try:
out = check_output(cmd, text=True).strip()
except CalledProcessError as e:
logging.warning(f"WARNING: boost deal failed for {provider}:")
logging.warning(e)
continue

params = {
"provider": provider,
"commp": file_item["commp_piece_cid"],
"piece-size": file_item["padded_size"],
"car-size": file_item["file_size"],
"payload-cid": file_item["payload_cid"],
"storage-price": "0",
"start-epoch-head-offset": start_epoch_head_offset,
"verified": "true",
"duration": 1468800,
"wallet": environ.get("WALLET"),
}
deal_arg = "deal"
if online:
params["http-url"] = file_item["url"]
else:
deal_arg = "offline-deal"

logging.info(params)
cmd = ["boost", "--vv", "--json=1", deal_arg] + [
f"--{k}={v}" for k, v in params.items()
]

logging.info(cmd)

if dry_run:
out = '{ "dealUuid": "dry-run-uuid", "dealState": "dry-run-state"}'
else:
try:
out = check_output(cmd, text=True).strip()
except CalledProcessError as e:
logging.warning(f"WARNING: boost deal failed for {provider}:")
logging.warning(e)
continue

logging.info(out)
res = json.loads(out)

deal_output = {
"provider": provider,
"deal_uuid": res.get("dealUuid"),
}

replications[file_item["commp_piece_cid"]].append(provider)

deal_output.update(file_item)
csv_writer.writerow(deal_output)
if provider not in deals_providers:
deals_providers[provider] = []
deals_providers[provider].append(deal_output)
log_file.close()
logging.info(out)
res = json.loads(out)

deal_output = {
"provider": provider,
"deal_uuid": res.get("dealUuid"),
}

replications[file_item["commp_piece_cid"]].append(provider)

deal_output.update(file_item)
csv_writer.writerow(deal_output)
if provider not in deals_providers:
deals_providers[provider] = []
deals_providers[provider].append(deal_output)
log_file.close()

if dry_run:
logging.info("completed processing dry run mode")
Expand Down Expand Up @@ -428,6 +426,7 @@ def create_deals(metadata_obj):
deal_type = sys.argv[2]

client.connect()
logging.info(f"Connected to client: {client}")

# Load the payload CI
epoch_cid = client.read_object("%s/epoch-%s.cid" % (epoch, epoch)).strip()
Expand Down

0 comments on commit 0460177

Please sign in to comment.