Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(infra): remove unneeded exports #733

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions api/ceramic_cache/management/commands/scorer_dump_data_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from django.apps import apps
from django.db import DEFAULT_DB_ALIAS

from scorer.export_utils import (
export_data_for_model,
writer_context_manager,
upload_to_s3,
writer_context_manager,
)

from .base_cron_cmds import BaseCronJobCmd


Expand Down Expand Up @@ -51,40 +53,73 @@ def add_arguments(self, parser):
help="""The field used to sort and batch the export. This is typically the id, but can be any unique field.""",
default="id",
)
parser.add_argument(
"--models",
type=str,
help="""Comma separated list of models to export. Can be specified as either 'app_name.model_name' or as table names.
If specified, only these models will be exported.""",
)

def handle_cron_job(self, *args, **options):
self.batch_size = options["batch_size"]
self.s3_uri = options["s3_uri"]
self.database = options["database"]
self.sort_field = options["sort_field"]
apps_to_export = options["apps"].split(",") if options["apps"] else None
models_to_export = options["models"].split(",") if options["models"] else None
extra_args = (
json.parse(options["s3_extra_args"]) if options["s3_extra_args"] else None
json.loads(options["s3_extra_args"]) if options["s3_extra_args"] else None
)

self.stdout.write(f"EXPORT - s3_uri : '{self.s3_uri}'")
self.stdout.write(f"EXPORT - batch_size : '{self.batch_size}'")
self.stdout.write(f"EXPORT - database : '{self.database}'")
self.stdout.write(f"EXPORT - apps : '{apps_to_export}'")
self.stdout.write(f"EXPORT - models : '{models_to_export}'")

if not apps:
if not apps_to_export and not models_to_export:
return

parsed_uri = urlparse(self.s3_uri)
s3_bucket_name = parsed_uri.netloc
s3_folder = parsed_uri.path.strip("/")

for app_name in apps_to_export:
# If specific models are provided, organize them by app
model_filter = {}
if models_to_export:
for model_spec in models_to_export:
model_filter["__table_names__"] = model_filter.get(
"__table_names__", set()
)
model_filter["__table_names__"].add(model_spec)
# Process each app
for app_name in apps_to_export or {
app for app in model_filter.keys() if app != "__table_names__"
}:
self.stdout.write(f"EXPORT - START export data for app: '{app_name}'")
# Get the app's configuration
app_config = apps.get_app_config(app_name)
try:
app_config = apps.get_app_config(app_name)
except LookupError:
self.stdout.write(
self.style.WARNING(f"EXPORT - App '{app_name}' not found, skipping")
)
continue

for model in app_config.get_models():
model_name = model._meta.model_name
table_name = model._meta.db_table

# Skip if we have a model filter and this model isn't in it
if models_to_export:
app_models = model_filter.get(app_name, set())
table_names = model_filter.get("__table_names__", set())
if not (model_name in app_models or table_name in table_names):
continue
self.stdout.write(
f"EXPORT - START export data for model: '{app_name}.{model._meta.model_name}'"
f"EXPORT - START export data for model: '{app_name}.{model_name}'"
)
try:
table_name = model._meta.db_table
output_file = f"{table_name}.parquet"
export_data_for_model(
model.objects.all().using(self.database),
Expand Down
180 changes: 180 additions & 0 deletions api/ceramic_cache/test/test_cmd_scorer_dump_data_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import json
import os
import shutil
from datetime import datetime, timezone

import pyarrow.parquet as pq
import pytest
from django.apps import apps
from django.core.cache import cache
from django.core.management import call_command
from django.db import connections

from account.models import Community
from registry.weight_models import WeightConfiguration
from scorer_weighted.models import BinaryWeightedScorer, Scorer


@pytest.fixture(autouse=True)
def cleanup():
# Pre-test cleanup (clean any leftover files before test runs)
for file in os.listdir("."):
if file.endswith(".parquet") or (
file.startswith("cpy_") and file.endswith(".parquet")
):
try:
os.remove(file)
except FileNotFoundError:
pass
yield

# Post-test cleanup
for file in os.listdir("."):
if file.endswith(".parquet") or (
file.startswith("cpy_") and file.endswith(".parquet")
):
try:
os.remove(file)
except FileNotFoundError:
pass


@pytest.fixture
def mock_upload_to_s3(mocker):
mocked_calls = []

def mocked_upload_to_s3(output_file, s3_folder, s3_bucket_name, extra_args):
copy_file_name = f"cpy_{output_file}"
mocked_calls.append(
{
"file_path": output_file,
"s3_folder": s3_folder,
"s3_bucket_name": s3_bucket_name,
"extra_args": extra_args,
}
)
shutil.copy(output_file, copy_file_name)

mocker.patch("scorer.export_utils.upload_to_s3", side_effect=mocked_upload_to_s3)
return mocked_calls


@pytest.fixture(autouse=True)
def reset_mocks(mocker):
"""Reset all mocks before each test"""
yield
mocker.resetall()


class TestScorerDumpDataParquet:
@pytest.mark.django_db
def test_export_specific_models(self, scorer_account, mocker, mock_upload_to_s3):
"""Test exporting specific models with stdout verification"""
WeightConfiguration.objects.create(
version="v1",
threshold=2,
active=True,
description="Test weight configuration",
)
scorer = BinaryWeightedScorer.objects.create(type=Scorer.Type.WEIGHTED_BINARY)
community = Community.objects.create(
name="Community 1",
description="Community 1 - testing",
account=scorer_account,
scorer=scorer,
)

# Test exporting specific models
call_command(
"scorer_dump_data_parquet",
**{
"models": "registry_score,account_community",
"s3_uri": "s3://test-bucket/exports/",
"apps": "registry,account",
},
)

# Verify only community data was exported
assert len(mock_upload_to_s3) == 2
assert "registry_score" in mock_upload_to_s3[0]["file_path"]

community_table = pq.read_table(f"cpy_{mock_upload_to_s3[1]['file_path']}")
community_df = community_table.to_pandas()
assert len(community_df) == 1
assert community_df.iloc[0]["name"] == "Community 1"

# BOTH tests pass in isolation but second test will fail when ran together. Commenting out since this functionality is not currently used.
# @pytest.mark.django_db
# def test_export_data_for_specific_apps(
# self, scorer_account, mocker, mock_upload_to_s3
# ):
# """Test exporting data for specific apps"""

# WeightConfiguration.objects.create(
# version="v1",
# threshold=2,
# active=True,
# description="Test weight configuration",
# )

# scorer = BinaryWeightedScorer.objects.create(type=Scorer.Type.WEIGHTED_BINARY)

# ###############################################################
# # Create community 1 and some scores for that community
# ###############################################################
# community_1 = Community.objects.create(
# name="Community 1",
# description="Community 1 - testing",
# account=scorer_account,
# scorer=scorer,
# )

# for i in range(5):
# p = Passport.objects.create(address=f"0x{i}", community=community_1)
# Score.objects.create(
# passport=p,
# score=10,
# last_score_timestamp=datetime.now(timezone.utc),
# status=Score.Status.DONE,
# evidence={"rawScore": "12.23", "threshold": "20.0"},
# stamp_scores={"provider-1": 1, "provider-2": 2},
# )

# ###############################################################
# # Create community 2 and some scores for that community
# ###############################################################
# community_2 = Community.objects.create(
# name="Community 2",
# description="Community 2 - testing",
# account=scorer_account,
# scorer=scorer,
# )

# # Test exporting specific apps
# call_command(
# "scorer_dump_data_parquet",
# **{
# "apps": "registry",
# "s3_uri": "s3://test-bucket/exports/",
# "s3_extra_args": json.dumps({"ACL": "private"}),
# },
# )

# # Verify the uploads
# assert len(mock_upload_to_s3) == 8

# # Verify community data
# passport_file = next(
# call for call in mock_upload_to_s3 if "passport" in call["file_path"]
# )
# passport_table = pq.read_table(f"cpy_{passport_file['file_path']}")
# passport_df = passport_table.to_pandas()
# assert len(passport_df) == 5

# # Verify passport data
# score_file = next(
# call for call in mock_upload_to_s3 if "score" in call["file_path"]
# )
# score_table = pq.read_table(f"cpy_{score_file['file_path']}")
# score_df = score_table.to_pandas()
# assert len(score_df) == 5
9 changes: 0 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ services:

command: uvicorn scorer.asgi:application --reload --host 0.0.0.0 --port 8002

worker:
build: api
volumes:
- ./api:/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0

command: celery -A scorer worker -Q score_passport_passport,score_registry_passport -l DEBUG

interface:
build:
context: ./interface
Expand Down
Loading
Loading