diff --git a/cli/grants-ingest/ffisImport/cmd.go b/cli/grants-ingest/ffisImport/cmd.go index 7b3af3cc..467982a8 100644 --- a/cli/grants-ingest/ffisImport/cmd.go +++ b/cli/grants-ingest/ffisImport/cmd.go @@ -44,7 +44,7 @@ func (cmd *Cmd) Run(app *kong.Kong, logger *log.Logger) error { if !cmd.DryRun { return err } - app.Errorf(err.Error()) + app.Errorf("%s", err.Error()) } log.Debug(*logger, "Mapping files in directory to S3 keys...", "directory", cmd.SourceDirectory) diff --git a/cmd/SplitGrantsGovXMLDB/handler.go b/cmd/SplitGrantsGovXMLDB/handler.go index 2a359251..686ea4ea 100644 --- a/cmd/SplitGrantsGovXMLDB/handler.go +++ b/cmd/SplitGrantsGovXMLDB/handler.go @@ -10,7 +10,6 @@ import ( "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/go-kit/log/level" "github.com/hashicorp/go-multierror" "github.com/usdigitalresponse/grants-ingest/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -125,7 +124,8 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error span, ctx := tracer.StartSpanFromContext(ctx, "read.xml") // Count records sent to ch - countSentRecords := 0 + countSentOpportunityRecords := 0 + countSentForecastRecords := 0 d := xml.NewDecoder(r) for { @@ -136,8 +136,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error return err } - // End early if we have reached any configured limit on the number of records sent to ch - if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords { + // End early if a configured limit on the number of records sent to ch is reached + // OR if both record types have configured limits and both have been reached + if (env.MaxSplitRecords > -1 && + countSentOpportunityRecords+countSentForecastRecords >= env.MaxSplitRecords) || + (env.MaxSplitForecastRecords > -1 && env.MaxSplitOpportunityRecords > -1 && + countSentForecastRecords >= env.MaxSplitForecastRecords && + countSentOpportunityRecords >= env.MaxSplitOpportunityRecords) { break } @@ -147,7 +152,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error // EOF means that we're done reading break } - level.Error(logger).Log("msg", "Error reading XML token", "error", err) + log.Error(logger, "Error reading XML token", err) span.Finish(tracer.WithError(err)) return err } @@ -158,14 +163,18 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME { var o opportunity if err = d.DecodeElement(&o, &se); err == nil { - countSentRecords++ - ch <- &o + if env.MaxSplitOpportunityRecords < 0 || countSentOpportunityRecords < env.MaxSplitOpportunityRecords { + ch <- &o + countSentOpportunityRecords++ + } } } else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled { var f forecast if err = d.DecodeElement(&f, &se); err == nil { - countSentRecords++ - ch <- &f + if env.MaxSplitForecastRecords < 0 || countSentForecastRecords < env.MaxSplitForecastRecords { + ch <- &f + countSentForecastRecords++ + } } } diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index 6007269b..dbc8e107 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -10,6 +10,8 @@ import ( "io" "net/http" "net/http/httptest" + "reflect" + "strings" "testing" "text/template" "time" @@ -516,7 +518,6 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceData.Bytes()), }) require.NoError(t, err) - // err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ err = handleS3Event(context.TODO(), s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ @@ -580,6 +581,7 @@ func (r *MockReader) Read(p []byte) (int, error) { } func TestReadRecords(t *testing.T) { + setupLambdaEnvForTesting(t) t.Run("Context cancelled between reads", func(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) err := readRecords(ctx, &MockReader{func(p []byte) (int, error) { @@ -588,6 +590,78 @@ func TestReadRecords(t *testing.T) { }}, make(chan<- grantRecord, 10)) assert.ErrorIs(t, err, context.Canceled) }) + + t.Run("max record limits", func(t *testing.T) { + for _, tt := range []struct { + name string + maxSplitRecords, maxSplitOpportunityRecords, maxSplitForecastRecords int + expOpportunityRecords, expForecastRecords int + }{ + { + "no limits processes all records", + -1, -1, -1, + 10, 10, + }, + { + "opportunity limit does not limit forecasts", + -1, 2, -1, + 2, 10, + }, + { + "forecast limit does not limit opportunities", + -1, -1, 2, + 10, 2, + }, + { + "hard limit takes precedent over no type limits", + 2, -1, -1, + 2, 0, + }, + { + "hard limit takes precedent over type limits", + 2, 3, 5, + 2, 0, + }, + { + "mix of limits", + 5, 3, -1, + 3, 2, + }, + } { + t.Run(tt.name, func(t *testing.T) { + env.MaxSplitRecords = tt.maxSplitRecords + env.MaxSplitOpportunityRecords = tt.maxSplitOpportunityRecords + env.MaxSplitForecastRecords = tt.maxSplitForecastRecords + env.IsForecastedGrantsEnabled = true + + xmlData := "\n" + + // Content of records doesn't matter since we're just looking at the tag + strings.Repeat("\n", 10) + + strings.Repeat("\n", 10) + + "" + ch := make(chan grantRecord, 20) + require.NoError(t, readRecords(context.TODO(), strings.NewReader(xmlData), ch)) + close(ch) + var countSentOpportunityRecords, countSentForecastRecords int + for rec := range ch { + switch reflect.Indirect(reflect.ValueOf(rec)).Type().Name() { + case "opportunity": + countSentOpportunityRecords++ + case "forecast": + countSentForecastRecords++ + default: + require.Fail(t, + "Unknown grantRecord type sent to channel during test setup", + "type %T unrecognized", rec) + } + } + assert.Equalf(t, tt.expOpportunityRecords, countSentOpportunityRecords, + "Unexpected number of opportunity records sent to channel") + assert.Equalf(t, tt.expForecastRecords, countSentForecastRecords, + "Unexpected number of forecast records sent to channel") + }) + } + }) } func TestProcessRecord(t *testing.T) { diff --git a/cmd/SplitGrantsGovXMLDB/main.go b/cmd/SplitGrantsGovXMLDB/main.go index 50979f6d..9822a314 100644 --- a/cmd/SplitGrantsGovXMLDB/main.go +++ b/cmd/SplitGrantsGovXMLDB/main.go @@ -29,15 +29,17 @@ import ( ) type Environment struct { - LogLevel string `env:"LOG_LEVEL,default=INFO"` - DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"` - DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` - DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"` - MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` - MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"` - UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` - IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"` - Extras goenv.EnvSet + LogLevel string `env:"LOG_LEVEL,default=INFO"` + DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"` + DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` + DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"` + MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` + UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` + IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"` + MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"` // Hard limit of records to process, regardless of type. -1 for no limit. + MaxSplitOpportunityRecords int `env:"MAX_SPLIT_OPPORTUNITY_RECORDS,default=-1"` // Limit opportunity-type records to process. -1 for no limit. + MaxSplitForecastRecords int `env:"MAX_SPLIT_FORECAST_RECORDS,default=-1"` // Limit forecast-type records to process. -1 for no limit. + Extras goenv.EnvSet } var ( diff --git a/terraform/local.tfvars b/terraform/local.tfvars index b43cbc3c..7b1eaf74 100644 --- a/terraform/local.tfvars +++ b/terraform/local.tfvars @@ -1,19 +1,21 @@ -namespace = "grants-ingest" -environment = "sandbox" -version_identifier = "dev" -permissions_boundary_policy_name = "" -datadog_enabled = false -datadog_dashboards_enabled = false -datadog_lambda_extension_version = "38" -lambda_binaries_autobuild = true -lambda_default_log_retention_in_days = 7 -lambda_default_log_level = "DEBUG" -eventbridge_scheduler_enabled = false -ssm_deployment_parameters_path_prefix = "/grants-ingest/local" -dynamodb_contributor_insights_enabled = false -ffis_ingest_email_address = "ffis-ingest@localhost.grants.usdr.dev" -max_split_grantsgov_records = 10 -ses_active_receipt_rule_set_enabled = false +namespace = "grants-ingest" +environment = "sandbox" +version_identifier = "dev" +permissions_boundary_policy_name = "" +datadog_enabled = false +datadog_dashboards_enabled = false +datadog_lambda_extension_version = "38" +lambda_binaries_autobuild = true +lambda_default_log_retention_in_days = 7 +lambda_default_log_level = "DEBUG" +eventbridge_scheduler_enabled = false +ssm_deployment_parameters_path_prefix = "/grants-ingest/local" +dynamodb_contributor_insights_enabled = false +ffis_ingest_email_address = "ffis-ingest@localhost.grants.usdr.dev" +is_forecasted_grants_enabled = true +max_split_grantsgov_opportunity_records = 10 +max_split_grantsgov_forecast_records = 10 +ses_active_receipt_rule_set_enabled = false additional_lambda_environment_variables = { S3_USE_PATH_STYLE = "true" diff --git a/terraform/main.tf b/terraform/main.tf index 0a41c9b3..3250df6e 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -520,7 +520,10 @@ module "SplitGrantsGovXMLDB" { grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name grants_prepared_dynamodb_table_arn = module.grants_prepared_dynamodb_table.table_arn + is_forecasted_grants_enabled = var.is_forecasted_grants_enabled max_split_records = var.max_split_grantsgov_records + max_split_opportunity_records = var.max_split_grantsgov_opportunity_records + max_split_forecast_records = var.max_split_grantsgov_forecast_records } module "ReceiveFFISEmail" { diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 0765c2b6..d752096c 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -107,6 +107,8 @@ module "lambda_function" { LOG_LEVEL = var.log_level MAX_CONCURRENT_UPLOADS = "10" MAX_SPLIT_RECORDS = tostring(var.max_split_records) + MAX_SPLIT_OPPORTUNITY_RECORDS = tostring(var.max_split_opportunity_records) + MAX_SPLIT_FORECAST_RECORDS = tostring(var.max_split_forecast_records) IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled }) diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index de28c068..29f238f8 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -104,7 +104,19 @@ variable "is_forecasted_grants_enabled" { } variable "max_split_records" { - description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation." + description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation. This setting is a hard cap on top of opportunity- and forecast-specific limits." + type = number + default = -1 +} + +variable "max_split_opportunity_records" { + description = "Optional limit (i.e. for testing) on the number of opportunity records that the handler will process during a single invocation." + type = number + default = -1 +} + +variable "max_split_forecast_records" { + description = "Optional limit (i.e. for testing) on the number of opportunity records that the handler will process during a single invocation." type = number default = -1 } diff --git a/terraform/variables.tf b/terraform/variables.tf index 25700fd6..e5e88e6c 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -226,8 +226,26 @@ variable "ses_active_receipt_rule_set_enabled" { default = true } +variable "is_forecasted_grants_enabled" { + description = "When true, enables processing of forecasted grant records from Grants.gov." + type = bool + default = false +} + variable "max_split_grantsgov_records" { - description = "Optional limit (i.e. for testing) on the number of records that SplitGrantsGovXMLDB handler will process during a single invocation." + description = "Optional hard limit (i.e. for testing) on the number of records (of any type) that SplitGrantsGovXMLDB handler will process during a single invocation." + type = number + default = -1 +} + +variable "max_split_grantsgov_opportunity_records" { + description = "Optional limit (i.e. for testing) on the number of opportunity records that SplitGrantsGovXMLDB handler will process during a single invocation." + type = number + default = -1 +} + +variable "max_split_grantsgov_forecast_records" { + description = "Optional limit (i.e. for testing) on the number of forecast records that SplitGrantsGovXMLDB handler will process during a single invocation." type = number default = -1 }