Skip to content

Commit

Permalink
fix(infra): added updated_at to CeramicCache, modified dump script to…
Browse files Browse the repository at this point in the history
… use this
  • Loading branch information
lucianHymer committed Aug 21, 2023
1 parent 8c941a8 commit 0ffa0c3
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 24 deletions.
10 changes: 8 additions & 2 deletions api/ceramic_cache/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,20 @@ def cache_stamps(request, payload: List[CacheStampPayload]):

address = get_address_from_did(request.did)
stamp_objects = []
now = get_utc_time()
for p in payload:
stamp_object = CeramicCache(
address=address,
provider=p.provider,
stamp=p.stamp,
updated_at=now,
)
stamp_objects.append(stamp_object)

created = CeramicCache.objects.bulk_create(
stamp_objects,
update_conflicts=True,
update_fields=["stamp"],
update_fields=["stamp", "updated_at"],
unique_fields=["address", "provider"],
)

Expand All @@ -183,13 +186,15 @@ def patch_stamps(request, payload: List[CacheStampPayload]):
stamp_objects = []
providers_to_delete = []
updated = []
now = get_utc_time()

for p in payload:
if p.stamp:
stamp_object = CeramicCache(
address=address,
provider=p.provider,
stamp=p.stamp,
updated_at=now,
)
stamp_objects.append(stamp_object)
else:
Expand All @@ -199,7 +204,7 @@ def patch_stamps(request, payload: List[CacheStampPayload]):
updated = CeramicCache.objects.bulk_create(
stamp_objects,
update_conflicts=True,
update_fields=["stamp"],
update_fields=["stamp", "updated_at"],
unique_fields=["address", "provider"],
)

Expand Down Expand Up @@ -258,6 +263,7 @@ def cache_stamp(request, payload: CacheStampPayload):
provider=payload.provider,
defaults=dict(
stamp=payload.stamp,
updated_at=get_utc_time(),
),
)

Expand Down
69 changes: 48 additions & 21 deletions api/ceramic_cache/management/commands/dump_stamp_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.core.management.base import BaseCommand
from django.core.paginator import Paginator
from django.utils import timezone
from tqdm import tqdm

s3 = boto3.client(
"s3",
Expand All @@ -25,33 +26,59 @@ def handle(self, *args, **options):
latest_export = StampExports.objects.order_by("-last_export_ts").first()

if not latest_export:
queryset = CeramicCache.objects.all()
print("Getting all Stamps")
else:
queryset = CeramicCache.objects.filter(
created_at__gt=latest_export.last_export_ts
print("No previous exports found. Exporting all data.")
latest_export = StampExports(
last_export_ts=datetime.date.fromisoformat("1970-01-01")
)
print(f"Getting Stamps since {latest_export.last_export_ts}")

paginator = Paginator(
queryset.values_list("stamp", flat=True),
1000,
)
print(f"Getting Stamps updated since {latest_export.last_export_ts}")

start = (
latest_export.last_export_ts.strftime("%Y%m%d_%H%M%S")
if latest_export
else "beginng_of_stamp_creation"
query = (
CeramicCache.objects.filter(created_at__gt=latest_export.last_export_ts)
.values("stamp", "updated_at")
.order_by("updated_at")
.using("read_replica_0")
)

# Generate the dump file name
file_name = f'stamps_{start}_{timezone.now().strftime("%Y%m%d_%H%M%S")}.jsonl'
file_name = f'stamps_{latest_export.last_export_ts.strftime("%Y%m%d_%H%M%S")}_{timezone.now().strftime("%Y%m%d_%H%M%S")}.jsonl'

last_updated_at = latest_export.last_export_ts
chunk_size = 1000

try:
# Write serialized data to the file
with open(file_name, "w") as f:
with tqdm(
unit="items", unit_scale=None, desc="Exporting stamps"
) as progress_bar:
has_more = True
while has_more:
objects = list(
query.filter(updated_at__gt=last_updated_at)[:chunk_size]
)
if objects:
num_objects = len(objects)
progress_bar.update(num_objects)

for cache_obj in objects:
f.write(
json.dumps({"stamp": cache_obj["stamp"]}) + "\n"
)

# Write serialized data to the file
with open(file_name, "w") as f:
for page in paginator.page_range:
for stamp in paginator.page(page).object_list:
f.write(json.dumps({"stamp": stamp}) + "\n")
last_updated_at = cache_obj["updated_at"]

# If we get less than the chunk size, we've reached the end
# No need to keep querying which could result in querying forever
if num_objects < chunk_size:
has_more = False
else:
has_more = False

finally:
self.stdout.write(
self.style.SUCCESS(f'Last stamp updated at "{last_updated_at}"')
)

# Upload to S3 bucket
s3.upload_file(file_name, settings.S3_WEEKLY_BACKUP_BUCKET_NAME, file_name)
Expand All @@ -60,7 +87,7 @@ def handle(self, *args, **options):
os.remove(file_name)

StampExports.objects.create(
last_export_ts=timezone.now(), stamp_total=paginator.count
last_export_ts=last_updated_at, stamp_total=progress_bar.n
)

print(f"Data dump completed and uploaded to S3 as {file_name}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 4.2.3 on 2023-08-21 21:42

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("ceramic_cache", "0012_alter_ceramiccache_created_at_ceramiccachelegacy"),
]

operations = [
migrations.AddField(
model_name="ceramiccache",
name="updated_at",
field=models.DateTimeField(
default="1970-01-01T00:00:00Z",
help_text="This is the timestamp that this DB record was updated (it is not necessarily the stamp issuance timestamp)",
),
preserve_default=False,
),
migrations.AlterField(
model_name="ceramiccache",
name="created_at",
field=models.DateTimeField(
auto_now_add=True,
help_text="This is the timestamp that this DB record was created (it is not necessarily the stamp issuance timestamp)",
null=True,
),
),
migrations.RunSQL(
"UPDATE ceramic_cache_ceramiccache SET updated_at = created_at where created_at is not null"
),
]
9 changes: 8 additions & 1 deletion api/ceramic_cache/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ class CeramicCache(models.Model):
auto_now_add=True,
blank=True,
null=True,
help_text="This is the timestamp that this DB record was created (it is not necesarily the stamp issuance timestamp)",
help_text="This is the timestamp that this DB record was created (it is not necessarily the stamp issuance timestamp)",
)

# Not auto_now because it does not work correctly with bulk updates
updated_at = models.DateTimeField(
blank=False,
null=False,
help_text="This is the timestamp that this DB record was updated (it is not necessarily the stamp issuance timestamp)",
)

class Meta:
Expand Down

0 comments on commit 0ffa0c3

Please sign in to comment.