From abcda32bf56d2d9853dc81fb1289ef0eca861e61 Mon Sep 17 00:00:00 2001 From: Lucian Date: Fri, 25 Aug 2023 17:11:49 -0600 Subject: [PATCH] Revert "feat(api): scheduled stamp data dump (#365)" This reverts commit b68e68cd41822d2400db184ae7973b82d13e33fb. --- api/ceramic_cache/api.py | 10 +- .../management/commands/dump_stamp_data.py | 59 ++----- .../0014_ceramiccache_updated_at_and_more.py | 35 ---- api/ceramic_cache/models.py | 11 +- infra/lib/scorer/scheduledTasks.ts | 167 ------------------ infra/prod/index.ts | 57 +++--- infra/review/index.ts | 73 ++++---- infra/staging/index.ts | 55 ++---- 8 files changed, 94 insertions(+), 373 deletions(-) delete mode 100644 api/ceramic_cache/migrations/0014_ceramiccache_updated_at_and_more.py delete mode 100644 infra/lib/scorer/scheduledTasks.ts diff --git a/api/ceramic_cache/api.py b/api/ceramic_cache/api.py index ef8437a49..da64c8b75 100644 --- a/api/ceramic_cache/api.py +++ b/api/ceramic_cache/api.py @@ -150,20 +150,17 @@ def cache_stamps(request, payload: List[CacheStampPayload]): address = get_address_from_did(request.did) stamp_objects = [] - now = get_utc_time() for p in payload: stamp_object = CeramicCache( address=address, provider=p.provider, stamp=p.stamp, - updated_at=now, ) stamp_objects.append(stamp_object) - created = CeramicCache.objects.bulk_create( stamp_objects, update_conflicts=True, - update_fields=["stamp", "updated_at"], + update_fields=["stamp"], unique_fields=["address", "provider"], ) @@ -186,7 +183,6 @@ def patch_stamps(request, payload: List[CacheStampPayload]): stamp_objects = [] providers_to_delete = [] updated = [] - now = get_utc_time() for p in payload: if p.stamp: @@ -194,7 +190,6 @@ def patch_stamps(request, payload: List[CacheStampPayload]): address=address, provider=p.provider, stamp=p.stamp, - updated_at=now, ) stamp_objects.append(stamp_object) else: @@ -204,7 +199,7 @@ def patch_stamps(request, payload: List[CacheStampPayload]): updated = CeramicCache.objects.bulk_create( stamp_objects, update_conflicts=True, - update_fields=["stamp", "updated_at"], + update_fields=["stamp"], unique_fields=["address", "provider"], ) @@ -263,7 +258,6 @@ def cache_stamp(request, payload: CacheStampPayload): provider=payload.provider, defaults=dict( stamp=payload.stamp, - updated_at=get_utc_time(), ), ) diff --git a/api/ceramic_cache/management/commands/dump_stamp_data.py b/api/ceramic_cache/management/commands/dump_stamp_data.py index b647ed8c7..3ca0197d0 100644 --- a/api/ceramic_cache/management/commands/dump_stamp_data.py +++ b/api/ceramic_cache/management/commands/dump_stamp_data.py @@ -8,7 +8,6 @@ from django.core.management.base import BaseCommand from django.core.paginator import Paginator from django.utils import timezone -from tqdm import tqdm s3 = boto3.client( "s3", @@ -27,57 +26,25 @@ def handle(self, *args, **options): if not latest_export: print("No previous exports found. Exporting all data.") - latest_export = StampExports( - last_export_ts=datetime.date.fromisoformat("1970-01-01") + latest_export = StampExports.objects.create( + last_export_ts=timezone.now() - datetime.timedelta(days=7) ) - print(f"Getting Stamps updated since {latest_export.last_export_ts}") - - query = ( - CeramicCache.objects.values("stamp", "updated_at") - .order_by("updated_at") - .using("read_replica_0") + paginator = Paginator( + CeramicCache.objects.filter( + created_at__gt=latest_export.last_export_ts + ).values_list("stamp", flat=True), + 1000, ) # Generate the dump file name file_name = f'stamps_{latest_export.last_export_ts.strftime("%Y%m%d_%H%M%S")}_{timezone.now().strftime("%Y%m%d_%H%M%S")}.jsonl' - last_updated_at = latest_export.last_export_ts - chunk_size = 1000 - - try: - # Write serialized data to the file - with open(file_name, "w") as f: - with tqdm( - unit="items", unit_scale=None, desc="Exporting stamps" - ) as progress_bar: - has_more = True - while has_more: - objects = list( - query.filter(updated_at__gt=last_updated_at)[:chunk_size] - ) - if objects: - num_objects = len(objects) - progress_bar.update(num_objects) - - for cache_obj in objects: - f.write( - json.dumps({"stamp": cache_obj["stamp"]}) + "\n" - ) - - last_updated_at = cache_obj["updated_at"] - - # If we get less than the chunk size, we've reached the end - # No need to keep querying which could result in querying forever - if num_objects < chunk_size: - has_more = False - else: - has_more = False - - finally: - self.stdout.write( - self.style.SUCCESS(f'Last stamp updated at "{last_updated_at}"') - ) + # Write serialized data to the file + with open(file_name, "w") as f: + for page in paginator.page_range: + for stamp in paginator.page(page).object_list: + f.write(json.dumps({"stamp": stamp}) + "\n") # Upload to S3 bucket s3.upload_file(file_name, settings.S3_WEEKLY_BACKUP_BUCKET_NAME, file_name) @@ -86,7 +53,7 @@ def handle(self, *args, **options): os.remove(file_name) StampExports.objects.create( - last_export_ts=last_updated_at, stamp_total=progress_bar.n + last_export_ts=timezone.now(), stamp_total=paginator.count ) print(f"Data dump completed and uploaded to S3 as {file_name}") diff --git a/api/ceramic_cache/migrations/0014_ceramiccache_updated_at_and_more.py b/api/ceramic_cache/migrations/0014_ceramiccache_updated_at_and_more.py deleted file mode 100644 index fb41a738b..000000000 --- a/api/ceramic_cache/migrations/0014_ceramiccache_updated_at_and_more.py +++ /dev/null @@ -1,35 +0,0 @@ -# Generated by Django 4.2.3 on 2023-08-21 21:42 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("ceramic_cache", "0013_alter_ceramiccachelegacy_stamp"), - ] - - operations = [ - migrations.AddField( - model_name="ceramiccache", - name="updated_at", - field=models.DateTimeField( - auto_now=True, - default="1970-01-01T00:00:00Z", - help_text="This is the timestamp that this DB record was updated (it is not necessarily the stamp issuance timestamp)", - ), - preserve_default=False, - ), - migrations.AlterField( - model_name="ceramiccache", - name="created_at", - field=models.DateTimeField( - auto_now_add=True, - help_text="This is the timestamp that this DB record was created (it is not necessarily the stamp issuance timestamp)", - null=True, - ), - ), - migrations.RunSQL( - "UPDATE ceramic_cache_ceramiccache SET updated_at = created_at where created_at is not null", - reverse_sql="", - ), - ] diff --git a/api/ceramic_cache/models.py b/api/ceramic_cache/models.py index 8de265080..cde8519b8 100644 --- a/api/ceramic_cache/models.py +++ b/api/ceramic_cache/models.py @@ -14,16 +14,7 @@ class CeramicCache(models.Model): auto_now_add=True, blank=True, null=True, - help_text="This is the timestamp that this DB record was created (it is not necessarily the stamp issuance timestamp)", - ) - - # NOTE! auto_now is here to make tests easier, but it is not - # supported for bulk updates so it should be set explicitly - updated_at = models.DateTimeField( - blank=False, - null=False, - auto_now=True, - help_text="This is the timestamp that this DB record was updated (it is not necessarily the stamp issuance timestamp)", + help_text="This is the timestamp that this DB record was created (it is not necesarily the stamp issuance timestamp)", ) class Meta: diff --git a/infra/lib/scorer/scheduledTasks.ts b/infra/lib/scorer/scheduledTasks.ts deleted file mode 100644 index 5d9a46a93..000000000 --- a/infra/lib/scorer/scheduledTasks.ts +++ /dev/null @@ -1,167 +0,0 @@ -import * as awsx from "@pulumi/awsx"; -import { all } from "@pulumi/pulumi"; -import * as aws from "@pulumi/aws"; -import { - secrets, - ScorerService, - ScorerEnvironmentConfig, - getEnvironment, -} from "./service"; - -let SCORER_SERVER_SSM_ARN = `${process.env["SCORER_SERVER_SSM_ARN"]}`; - -export type ScheduledTaskConfig = Pick< - ScorerService, - "dockerImageScorer" | "executionRole" | "cluster" | "subnets" -> & { - securityGroup: aws.ec2.SecurityGroup; - command: string[]; - scheduleExpression: string; -}; - -export function createScheduledTask( - name: string, - config: ScheduledTaskConfig, - envConfig: ScorerEnvironmentConfig -) { - const { - executionRole, - subnets, - dockerImageScorer, - cluster, - securityGroup, - command, - scheduleExpression, - } = config; - - const task = new awsx.ecs.FargateTaskDefinition(name, { - executionRole, - containers: { - web: { - image: dockerImageScorer, - cpu: 256, - memory: 2048, - secrets, - environment: getEnvironment(envConfig), - command, - }, - }, - }); - - const scheduledEventRule = new aws.cloudwatch.EventRule( - "scheduledEventRule", - { scheduleExpression } - ); - - const eventsStsAssumeRole = new aws.iam.Role("eventsStsAssumeRole", { - assumeRolePolicy: JSON.stringify({ - Version: "2012-10-17", - Statement: [ - { - Action: "sts:AssumeRole", - Effect: "Allow", - Sid: "", - Principal: { - Service: "ecs-tasks.amazonaws.com", - }, - }, - { - Action: "sts:AssumeRole", - Effect: "Allow", - Sid: "", - Principal: { - Service: "events.amazonaws.com", - }, - }, - ], - }), - inlinePolicies: [ - { - name: "allow_exec", - policy: JSON.stringify({ - Version: "2012-10-17", - Statement: [ - { - Effect: "Allow", - Action: [ - "ssmmessages:CreateControlChannel", - "ssmmessages:CreateDataChannel", - "ssmmessages:OpenControlChannel", - "ssmmessages:OpenDataChannel", - ], - Resource: "*", - }, - ], - }), - }, - { - name: "allow_iam_secrets_access", - policy: JSON.stringify({ - Version: "2012-10-17", - Statement: [ - { - Action: ["secretsmanager:GetSecretValue"], - Effect: "Allow", - Resource: SCORER_SERVER_SSM_ARN, - }, - ], - }), - }, - { - name: "allow_run_task", - policy: task.taskDefinition.arn.apply((Resource) => - JSON.stringify({ - Version: "2012-10-17", - Statement: [ - { - Action: ["ecs:RunTask"], - Effect: "Allow", - Resource: Resource, - }, - ], - }) - ), - }, - { - name: "allow_pass_role", - policy: all([executionRole.arn, task.taskDefinition.taskRoleArn]).apply( - ([dpoppEcsRoleArn, weeklyDataDumpTaskRoleArn]) => - JSON.stringify({ - Version: "2012-10-17", - Statement: [ - { - Action: ["iam:PassRole"], - Effect: "Allow", - Resource: [dpoppEcsRoleArn, weeklyDataDumpTaskRoleArn], - }, - ], - }) - ), - }, - ], - managedPolicyArns: [ - "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy", - ], - tags: { - dpopp: "", - }, - }); - - new aws.cloudwatch.EventTarget("scheduledEventTarget", { - rule: scheduledEventRule.name, - arn: cluster.cluster.arn, - roleArn: eventsStsAssumeRole.arn, - ecsTarget: { - taskCount: 1, - taskDefinitionArn: task.taskDefinition.arn, - launchType: "FARGATE", - networkConfiguration: { - assignPublicIp: false, - securityGroups: [securityGroup.id], - subnets, - }, - }, - }); - - return task.taskDefinition.id; -} diff --git a/infra/prod/index.ts b/infra/prod/index.ts index 75eb778c3..f71c3b07e 100644 --- a/infra/prod/index.ts +++ b/infra/prod/index.ts @@ -10,7 +10,6 @@ import { getEnvironment, secrets, } from "../lib/scorer/service"; -import { createScheduledTask } from "../lib/scorer/scheduledTasks"; // The following vars are not allowed to be undefined, hence the `${...}` magic @@ -751,6 +750,7 @@ const flower = new awsx.ecs.FargateService("flower", { }, }); + const secgrp = new aws.ec2.SecurityGroup(`scorer-run-migrations-task`, { description: "gitcoin-ecs-task", vpcId: vpc.id, @@ -842,6 +842,7 @@ export const dockrRunCmd = pulumi.secret( pulumi.interpolate`docker run -it -e 'DATABASE_URL=${rdsConnectionUrl}' -e 'CELERY_BROKER_URL=${redisCacheOpsConnectionUrl}' '${dockerGtcPassportScorerImage}' bash` ); + /////////////////////// // Redash instance /////////////////////// @@ -872,25 +873,21 @@ let redashDbPassword = pulumi.secret(`${process.env["REDASH_DB_PASSWORD"]}`); let redashDbName = `${process.env["REDASH_DB_NAME"]}`; // Create an RDS instance -const redashDb = new aws.rds.Instance( - "redash-db", - { - allocatedStorage: 20, - maxAllocatedStorage: 20, - engine: "postgres", - engineVersion: "13.10", - instanceClass: "db.t3.micro", - dbName: redashDbName, - password: redashDbPassword, - username: redashDbUsername, - skipFinalSnapshot: true, - dbSubnetGroupName: dbSubnetGroup.id, - vpcSecurityGroupIds: [redashDbSecgrp.id], - backupRetentionPeriod: 5, - performanceInsightsEnabled: true, - }, - { protect: true } -); +const redashDb = new aws.rds.Instance("redash-db", { + allocatedStorage: 20, + maxAllocatedStorage: 20, + engine: "postgres", + engineVersion: "13.10", + instanceClass: "db.t3.micro", + dbName: redashDbName, + password: redashDbPassword, + username: redashDbUsername, + skipFinalSnapshot: true, + dbSubnetGroupName: dbSubnetGroup.id, + vpcSecurityGroupIds: [redashDbSecgrp.id], + backupRetentionPeriod: 5, + performanceInsightsEnabled: true, +}, { protect: true }); const dbUrl = redashDb.endpoint; export const redashDbUrl = pulumi.secret( @@ -938,8 +935,7 @@ const redashSecurityGroup = new aws.ec2.SecurityGroup( // const redashDbUrlString = redashDbUrl.apply((url) => url).toString(); -const redashInitScript = redashDbUrl.apply( - (url: any) => `#!/bin/bash +const redashInitScript = redashDbUrl.apply((url: any) => `#!/bin/bash echo "Setting environment variables..." export POSTGRES_PASSWORD="${redashDbPassword}" export REDASH_DATABASE_URL="${url}" @@ -952,11 +948,11 @@ sudo chmod +x ./setup.sh cd data sudo docker-compose run --rm server create_db sudo docker-compose up -d -` -); +`); + const redashinstance = new aws.ec2.Instance("redashinstance", { - ami: ubuntu.then((ubuntu: { id: any }) => ubuntu.id), + ami: ubuntu.then((ubuntu: { id: any; }) => ubuntu.id), associatePublicIpAddress: true, instanceType: "t3.medium", subnetId: vpcPublicSubnetId2.then(), @@ -1049,14 +1045,3 @@ new aws.lb.TargetGroupAttachment("redashTargetAttachment", { targetId: redashinstance.privateIp, targetGroupArn: redashTarget.targetGroup.arn, }); - -export const weeklyDataDumpTaskDefinition = createScheduledTask( - "weekly-data-dump", - { - ...baseScorerServiceConfig, - securityGroup: secgrp, - command: ["python", "manage.py", "dump_stamp_data"], - scheduleExpression: "cron(30 23 ? * FRI *)", // Run the task every friday at 23:30 UTC - }, - envConfig -); diff --git a/infra/review/index.ts b/infra/review/index.ts index 51def1645..ddca388c4 100644 --- a/infra/review/index.ts +++ b/infra/review/index.ts @@ -2,9 +2,6 @@ import * as pulumi from "@pulumi/pulumi"; import * as aws from "@pulumi/aws"; import * as awsx from "@pulumi/awsx"; -import { ScorerEnvironmentConfig } from "../lib/scorer/service"; -import { createScheduledTask } from "../lib/scorer/scheduledTasks"; - // The following vars are not allowed to be undefined, hence the `${...}` magic let route53Zone = `${process.env["ROUTE_53_ZONE"]}`; @@ -685,6 +682,48 @@ const secgrp = new aws.ec2.SecurityGroup(`scorer-run-migrations-task`, { export const securityGroupForTaskDefinition = secgrp.id; +////////////////////////////////////////////////////////////// +// ECS Scheduled Task +////////////////////////////////////////////////////////////// +// const weeklyDataDump = new awsx.ecs.FargateTaskDefinition("weekly-data-dump", { +// containers: { +// web: { +// image: dockerGtcPassportScorerImage, +// cpu: 256, +// memory: 2048, +// secrets, +// command: ["python", "manage.py", "dump_stamp_data"], +// }, +// }, +// }); + +// const scheduledEventRule = new aws.cloudwatch.EventRule("scheduledEventRule", { +// // scheduleExpression: "cron(0 12 * * ? *)", // Run the task every day at 12 UTC +// scheduleExpression: "cron(0/5 * ? * * *)", // Run the task every 5 min +// // scheduleExpression: "cron(0 12 ? * FRI *)", // Run the task every friday at 12 UTC +// }); + +// // const serviceLinkRoler = new aws.iam.ServiceLinkedRole("ecs_service_link_roler", { +// // customSuffix: "ecs_scheduled_event", +// // awsServiceName: "ecs.amazonaws.com", +// // }) + +// new aws.cloudwatch.EventTarget("scheduledEventTarget", { +// rule: scheduledEventRule.name, +// arn: cluster.cluster.arn, +// roleArn: dpoppEcsRole.arn, +// ecsTarget: { +// taskCount: 1, +// taskDefinitionArn: weeklyDataDump.taskDefinition.arn, +// launchType: "FARGATE", +// networkConfiguration: { +// assignPublicIp: true, +// subnets: vpcPublicSubnetIds, +// securityGroups: [secgrp.id], +// }, +// }, +// }); + ////////////////////////////////////////////////////////////// // Set up EC2 instance // - it is intended to be used for troubleshooting @@ -757,31 +796,3 @@ export const ec2PublicIp = web.publicIp; export const dockrRunCmd = pulumi.secret( pulumi.interpolate`docker run -it -e 'DATABASE_URL=${rdsConnectionUrl}' -e 'CELERY_BROKER_URL=${redisCacheOpsConnectionUrl}' '${dockerGtcPassportScorerImage}' bash` ); - -const envConfig: ScorerEnvironmentConfig = { - allowedHosts: JSON.stringify([domain, "*"]), - domain: domain, - csrfTrustedOrigins: JSON.stringify([`https://${domain}`]), - rdsConnectionUrl: rdsConnectionUrl, - redisCacheOpsConnectionUrl: redisCacheOpsConnectionUrl, - uiDomains: JSON.stringify([ - "scorer." + process.env["DOMAIN"], - "www.scorer." + process.env["DOMAIN"], - ]), - debug: "off", - passportPublicUrl: "https://staging.passport.gitcoin.co/", -}; - -export const weeklyDataDumpTaskDefinition = createScheduledTask( - "weekly-data-dump", - { - cluster, - executionRole: dpoppEcsRole, - subnets: vpcPrivateSubnetIds, - dockerImageScorer: dockerGtcPassportScorerImage, - securityGroup: secgrp, - command: ["python", "manage.py", "dump_stamp_data"], - scheduleExpression: "cron(30 23 ? * FRI *)", // Run the task every friday at 23:30 UTC - }, - envConfig -); diff --git a/infra/staging/index.ts b/infra/staging/index.ts index a0ccaac2a..1163166b8 100644 --- a/infra/staging/index.ts +++ b/infra/staging/index.ts @@ -2,16 +2,6 @@ import * as pulumi from "@pulumi/pulumi"; import * as aws from "@pulumi/aws"; import * as awsx from "@pulumi/awsx"; -import { - ScorerEnvironmentConfig, - ScorerService, - createScorerECSService, - createTargetGroup, - getEnvironment, - secrets, -} from "../lib/scorer/service"; -import { createScheduledTask } from "../lib/scorer/scheduledTasks"; - import { ScorerEnvironmentConfig, ScorerService, @@ -807,25 +797,21 @@ let redashDbPassword = pulumi.secret(`${process.env["REDASH_DB_PASSWORD"]}`); let redashDbName = `${process.env["REDASH_DB_NAME"]}`; // Create an RDS instance -const redashDb = new aws.rds.Instance( - "redash-db", - { - allocatedStorage: 20, - maxAllocatedStorage: 20, - engine: "postgres", - engineVersion: "13.10", - instanceClass: "db.t3.micro", - dbName: redashDbName, - password: redashDbPassword, - username: redashDbUsername, - skipFinalSnapshot: true, - dbSubnetGroupName: dbSubnetGroup.id, - vpcSecurityGroupIds: [redashDbSecgrp.id], - backupRetentionPeriod: 5, - performanceInsightsEnabled: true, - }, - { protect: true } -); +const redashDb = new aws.rds.Instance("redash-db", { + allocatedStorage: 20, + maxAllocatedStorage: 20, + engine: "postgres", + engineVersion: "13.10", + instanceClass: "db.t3.micro", + dbName: redashDbName, + password: redashDbPassword, + username: redashDbUsername, + skipFinalSnapshot: true, + dbSubnetGroupName: dbSubnetGroup.id, + vpcSecurityGroupIds: [redashDbSecgrp.id], + backupRetentionPeriod: 5, + performanceInsightsEnabled: true, +}, { protect: true }); const dbUrl = redashDb.endpoint; export const redashDbUrl = pulumi.secret( @@ -989,14 +975,3 @@ new aws.lb.TargetGroupAttachment("redashTargetAttachment", { targetId: redashinstance.privateIp, targetGroupArn: redashTarget.targetGroup.arn, }); - -export const weeklyDataDumpTaskDefinition = createScheduledTask( - "weekly-data-dump", - { - ...baseScorerServiceConfig, - securityGroup: secgrp, - command: ["python", "manage.py", "dump_stamp_data"], - scheduleExpression: "cron(30 23 ? * FRI *)", // Run the task every friday at 23:30 UTC - }, - envConfig -);