diff --git a/docs/resources/concurrency_limit.md b/docs/resources/concurrency_limit.md new file mode 100644 index 00000000..40d21fff --- /dev/null +++ b/docs/resources/concurrency_limit.md @@ -0,0 +1,32 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "prefect_concurrency_limit Resource - prefect" +subcategory: "" +description: |- + The resource concurrency_limit represents a concurrency limit. Concurrency limits allow you to manage execution efficiently, controlling how many tasks, flows, or other operations can run simultaneously. They are ideal for optimizing resource usage, preventing bottlenecks, and customizing task execution. +--- + +# prefect_concurrency_limit (Resource) + +The resource `concurrency_limit` represents a concurrency limit. Concurrency limits allow you to manage execution efficiently, controlling how many tasks, flows, or other operations can run simultaneously. They are ideal for optimizing resource usage, preventing bottlenecks, and customizing task execution. + + + + +## Schema + +### Required + +- `concurrency_limit` (Number) The concurrency limit. +- `tag` (String) A tag the concurrency limit is applied to. + +### Optional + +- `account_id` (String) Account ID (UUID) +- `workspace_id` (String) Workspace ID (UUID) + +### Read-Only + +- `created` (String) Timestamp of when the resource was created (RFC3339) +- `id` (String) Concurrency limit ID (UUID) +- `updated` (String) Timestamp of when the resource was updated (RFC3339) diff --git a/internal/api/client.go b/internal/api/client.go index c8785bb1..dd18d531 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -18,6 +18,7 @@ type PrefectClient interface { BlockSchemas(accountID uuid.UUID, workspaceID uuid.UUID) (BlockSchemaClient, error) BlockTypes(accountID uuid.UUID, workspaceID uuid.UUID) (BlockTypeClient, error) Collections(accountID uuid.UUID, workspaceID uuid.UUID) (CollectionsClient, error) + ConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (ConcurrencyLimitsClient, error) Deployments(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentsClient, error) DeploymentAccess(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentAccessClient, error) DeploymentSchedule(accountID uuid.UUID, workspaceID uuid.UUID) (DeploymentScheduleClient, error) diff --git a/internal/api/concurrency_limits.go b/internal/api/concurrency_limits.go new file mode 100644 index 00000000..389f5e8b --- /dev/null +++ b/internal/api/concurrency_limits.go @@ -0,0 +1,25 @@ +package api + +import ( + "context" +) + +// ConcurrencyLimitsClient is a client for working with concurrency limits. +type ConcurrencyLimitsClient interface { + Create(ctx context.Context, concurrencyLimit ConcurrencyLimitCreate) (*ConcurrencyLimit, error) + Read(ctx context.Context, concurrencyLimitID string) (*ConcurrencyLimit, error) + Delete(ctx context.Context, concurrencyLimitID string) error +} + +// ConcurrencyLimit is a representation of a concurrency limit. +type ConcurrencyLimit struct { + BaseModel + Tag string `json:"tag"` + ConcurrencyLimit int64 `json:"concurrency_limit"` +} + +// ConcurrencyLimitCreate is a subset of ConcurrencyLimit used when creating concurrency limits. +type ConcurrencyLimitCreate struct { + Tag string `json:"tag"` + ConcurrencyLimit int64 `json:"concurrency_limit"` +} diff --git a/internal/client/concurrency_limits.go b/internal/client/concurrency_limits.go new file mode 100644 index 00000000..12bb1fd7 --- /dev/null +++ b/internal/client/concurrency_limits.go @@ -0,0 +1,95 @@ +package client + +import ( + "context" + "fmt" + "net/http" + + "github.com/google/uuid" + "github.com/prefecthq/terraform-provider-prefect/internal/api" +) + +var _ = api.ConcurrencyLimitsClient(&ConcurrencyLimitsClient{}) + +// ConcurrencyLimitsClient is a client for working with concurrency limits. +type ConcurrencyLimitsClient struct { + hc *http.Client + routePrefix string + apiKey string +} + +// ConcurrencyLimits returns a ConcurrencyLimitsClient. +// +//nolint:ireturn // required to support PrefectClient mocking +func (c *Client) ConcurrencyLimits(accountID uuid.UUID, workspaceID uuid.UUID) (api.ConcurrencyLimitsClient, error) { + if accountID == uuid.Nil { + accountID = c.defaultAccountID + } + + if workspaceID == uuid.Nil { + workspaceID = c.defaultWorkspaceID + } + + if err := validateCloudEndpoint(c.endpoint, accountID, workspaceID); err != nil { + return nil, err + } + + return &ConcurrencyLimitsClient{ + hc: c.hc, + routePrefix: getWorkspaceScopedURL(c.endpoint, accountID, workspaceID, "concurrency_limits"), + apiKey: c.apiKey, + }, nil +} + +// Create creates a new concurrency limit. +func (c *ConcurrencyLimitsClient) Create(ctx context.Context, data api.ConcurrencyLimitCreate) (*api.ConcurrencyLimit, error) { + cfg := requestConfig{ + method: http.MethodPost, + url: c.routePrefix + "/", + body: &data, + apiKey: c.apiKey, + successCodes: successCodesStatusOK, + } + + var concurrencyLimit api.ConcurrencyLimit + if err := requestWithDecodeResponse(ctx, c.hc, cfg, &concurrencyLimit); err != nil { + return nil, fmt.Errorf("failed to create concurrency limit: %w", err) + } + + return &concurrencyLimit, nil +} + +// Read returns a concurrency limit. +func (c *ConcurrencyLimitsClient) Read(ctx context.Context, concurrencyLimitID string) (*api.ConcurrencyLimit, error) { + cfg := requestConfig{ + method: http.MethodGet, + url: fmt.Sprintf("%s/%s", c.routePrefix, concurrencyLimitID), + apiKey: c.apiKey, + successCodes: successCodesStatusOK, + } + + var concurrencyLimit api.ConcurrencyLimit + if err := requestWithDecodeResponse(ctx, c.hc, cfg, &concurrencyLimit); err != nil { + return nil, fmt.Errorf("failed to get concurrency limit: %w", err) + } + + return &concurrencyLimit, nil +} + +// Delete deletes a concurrency limit. +func (c *ConcurrencyLimitsClient) Delete(ctx context.Context, concurrencyLimitID string) error { + cfg := requestConfig{ + method: http.MethodDelete, + url: fmt.Sprintf("%s/%s", c.routePrefix, concurrencyLimitID), + apiKey: c.apiKey, + successCodes: successCodesStatusOK, + } + + resp, err := request(ctx, c.hc, cfg) + if err != nil { + return fmt.Errorf("failed to delete concurrency limit: %w", err) + } + defer resp.Body.Close() + + return nil +} diff --git a/internal/provider/datasources/account_role_test.go b/internal/provider/datasources/account_role_test.go index 903e05de..f1001ee8 100644 --- a/internal/provider/datasources/account_role_test.go +++ b/internal/provider/datasources/account_role_test.go @@ -26,7 +26,7 @@ func TestAccDatasource_account_role_defaults(t *testing.T) { } // Default account role names - these exist in every account - defaultAccountRoles := []defaultAccountRole{{"Admin", "39"}, {"Member", "11"}, {"Owner", "41"}} + defaultAccountRoles := []defaultAccountRole{{"Admin", "42"}, {"Member", "12"}, {"Owner", "44"}} testSteps := []resource.TestStep{} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 16b0bc6e..8577cc12 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -260,6 +260,7 @@ func (p *PrefectProvider) Resources(_ context.Context) []func() resource.Resourc resources.NewAutomationResource, resources.NewBlockAccessResource, resources.NewBlockResource, + resources.NewConcurrencyLimitResource, resources.NewDeploymentAccessResource, resources.NewDeploymentResource, resources.NewDeploymentScheduleResource, diff --git a/internal/provider/resources/concurrency_limit.go b/internal/provider/resources/concurrency_limit.go new file mode 100644 index 00000000..39d57ef4 --- /dev/null +++ b/internal/provider/resources/concurrency_limit.go @@ -0,0 +1,272 @@ +package resources + +import ( + "context" + "fmt" + "strings" + + "github.com/google/uuid" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/int64planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/prefecthq/terraform-provider-prefect/internal/api" + "github.com/prefecthq/terraform-provider-prefect/internal/provider/customtypes" + "github.com/prefecthq/terraform-provider-prefect/internal/provider/helpers" +) + +var ( + _ = resource.ResourceWithConfigure(&ConcurrencyLimitResource{}) + _ = resource.ResourceWithImportState(&ConcurrencyLimitResource{}) +) + +// ConcurrencyLimitResource contains state for the resource. +type ConcurrencyLimitResource struct { + client api.PrefectClient +} + +// ConcurrencyLimitResourceModel defines the Terraform resource model. +type ConcurrencyLimitResourceModel struct { + BaseModel + + AccountID customtypes.UUIDValue `tfsdk:"account_id"` + WorkspaceID customtypes.UUIDValue `tfsdk:"workspace_id"` + + Tag types.String `tfsdk:"tag"` + ConcurrencyLimit types.Int64 `tfsdk:"concurrency_limit"` +} + +// NewConcurrencyLimitResource returns a new ConcurrencyLimitResource. +// +//nolint:ireturn // required by Terraform API +func NewConcurrencyLimitResource() resource.Resource { + return &ConcurrencyLimitResource{} +} + +// Metadata returns the resource type name. +func (r *ConcurrencyLimitResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { + resp.TypeName = req.ProviderTypeName + "_concurrency_limit" +} + +// Configure initializes runtime state for the resource. +func (r *ConcurrencyLimitResource) Configure(_ context.Context, req resource.ConfigureRequest, resp *resource.ConfigureResponse) { + if req.ProviderData == nil { + return + } + + client, ok := req.ProviderData.(api.PrefectClient) + if !ok { + resp.Diagnostics.Append(helpers.ConfigureTypeErrorDiagnostic("resource", req.ProviderData)) + + return + } + + r.client = client +} + +// Schema defines the schema for the resource. +func (r *ConcurrencyLimitResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = schema.Schema{ + Description: "The resource `concurrency_limit` represents a concurrency limit. Concurrency limits allow you to manage execution efficiently, controlling how many tasks, flows, or other operations can run simultaneously. They are ideal for optimizing resource usage, preventing bottlenecks, and customizing task execution.", + Version: 0, + Attributes: map[string]schema.Attribute{ + "id": schema.StringAttribute{ + Computed: true, + Description: "Concurrency limit ID (UUID)", + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "created": schema.StringAttribute{ + Computed: true, + CustomType: customtypes.TimestampType{}, + Description: "Timestamp of when the resource was created (RFC3339)", + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + }, + "updated": schema.StringAttribute{ + Computed: true, + CustomType: customtypes.TimestampType{}, + Description: "Timestamp of when the resource was updated (RFC3339)", + }, + "account_id": schema.StringAttribute{ + Optional: true, + Description: "Account ID (UUID)", + CustomType: customtypes.UUIDType{}, + }, + "workspace_id": schema.StringAttribute{ + Optional: true, + Description: "Workspace ID (UUID)", + CustomType: customtypes.UUIDType{}, + }, + "tag": schema.StringAttribute{ + Required: true, + Description: "A tag the concurrency limit is applied to.", + PlanModifiers: []planmodifier.String{ + // Concurrency limit updates are not supported so any changes to the tag will + // require a replacement of the resource. + stringplanmodifier.RequiresReplace(), + }, + }, + "concurrency_limit": schema.Int64Attribute{ + Required: true, + Description: "The concurrency limit.", + PlanModifiers: []planmodifier.Int64{ + // Concurrency limit updates are not supported so any changes to the concurrency limit will + // require a replacement of the resource. + int64planmodifier.RequiresReplace(), + }, + }, + }, + } +} + +// Create creates the resource and sets the initial Terraform state. +func (r *ConcurrencyLimitResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + var plan ConcurrencyLimitResourceModel + + // Populate the model from resource configuration and emit diagnostics on error + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + if resp.Diagnostics.HasError() { + return + } + + client, err := r.client.ConcurrencyLimits(plan.AccountID.ValueUUID(), plan.WorkspaceID.ValueUUID()) + if err != nil { + resp.Diagnostics.Append(helpers.CreateClientErrorDiagnostic("Concurrency Limit", err)) + + return + } + + concurrencyLimit, err := client.Create(ctx, api.ConcurrencyLimitCreate{ + Tag: plan.Tag.ValueString(), + ConcurrencyLimit: plan.ConcurrencyLimit.ValueInt64(), + }) + if err != nil { + resp.Diagnostics.Append(helpers.ResourceClientErrorDiagnostic("Concurrency Limit", "create", err)) + + return + } + + copyConcurrencyLimitToModel(concurrencyLimit, &plan) + + resp.Diagnostics.Append(resp.State.Set(ctx, &plan)...) + if resp.Diagnostics.HasError() { + return + } +} + +func copyConcurrencyLimitToModel(concurrencyLimit *api.ConcurrencyLimit, model *ConcurrencyLimitResourceModel) diag.Diagnostics { + model.ID = types.StringValue(concurrencyLimit.ID.String()) + model.Created = customtypes.NewTimestampValue(*concurrencyLimit.Created) + model.Updated = customtypes.NewTimestampValue(*concurrencyLimit.Updated) + model.Tag = types.StringValue(concurrencyLimit.Tag) + model.ConcurrencyLimit = types.Int64Value(concurrencyLimit.ConcurrencyLimit) + + return nil +} + +// Delete deletes the resource. +func (r *ConcurrencyLimitResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + var state ConcurrencyLimitResourceModel + + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + client, err := r.client.ConcurrencyLimits(state.AccountID.ValueUUID(), state.WorkspaceID.ValueUUID()) + if err != nil { + resp.Diagnostics.Append(helpers.CreateClientErrorDiagnostic("Concurrency Limit", err)) + + return + } + + err = client.Delete(ctx, state.ID.ValueString()) + if err != nil { + resp.Diagnostics.Append(helpers.ResourceClientErrorDiagnostic("Concurrency Limit", "delete", err)) + + return + } +} + +// Read reads the resource state from the API. +func (r *ConcurrencyLimitResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + var state ConcurrencyLimitResourceModel + + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + client, err := r.client.ConcurrencyLimits(state.AccountID.ValueUUID(), state.WorkspaceID.ValueUUID()) + if err != nil { + resp.Diagnostics.Append(helpers.CreateClientErrorDiagnostic("Concurrency Limit", err)) + + return + } + + concurrencyLimit, err := client.Read(ctx, state.ID.ValueString()) + if err != nil { + resp.Diagnostics.Append(helpers.ResourceClientErrorDiagnostic("Concurrency Limit", "get", err)) + + return + } + + copyConcurrencyLimitToModel(concurrencyLimit, &state) + + resp.Diagnostics.Append(resp.State.Set(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } +} + +// Update updates the resource state. +// This resource does not support updates. +func (r *ConcurrencyLimitResource) Update(_ context.Context, _ resource.UpdateRequest, _ *resource.UpdateResponse) { +} + +// ImportState imports the resource into Terraform state. +func (r *ConcurrencyLimitResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { + // we'll allow input values in the form of: + // - "id,workspace_id" + // - "id" + maxInputCount := 2 + inputParts := strings.Split(req.ID, ",") + + // eg. ",foo" or "foo," + if len(inputParts) == maxInputCount && (inputParts[0] == "" || inputParts[1] == "") { + resp.Diagnostics.AddError( + "Unexpected Import Identifier", + fmt.Sprintf("Expected non-empty import identifiers, in the form of `id,workspace_id`. Got %q", req.ID), + ) + + return + } + if len(inputParts) > maxInputCount { + resp.Diagnostics.AddError( + "Unexpected Import Identifier", + fmt.Sprintf("Expected a maximum of 2 import identifiers, in the form of `id,workspace_id`. Got %q", req.ID), + ) + + return + } + + identifier := inputParts[0] + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("id"), identifier)...) + + if len(inputParts) == 2 && inputParts[1] != "" { + workspaceID, err := uuid.Parse(inputParts[1]) + if err != nil { + resp.Diagnostics.Append(helpers.ParseUUIDErrorDiagnostic("Concurrency Limit", err)) + + return + } + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("workspace_id"), workspaceID.String())...) + } +} diff --git a/internal/provider/resources/concurrency_limit_test.go b/internal/provider/resources/concurrency_limit_test.go new file mode 100644 index 00000000..1244f995 --- /dev/null +++ b/internal/provider/resources/concurrency_limit_test.go @@ -0,0 +1,57 @@ +package resources_test + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/prefecthq/terraform-provider-prefect/internal/testutils" +) + +func fixtureAccConcurrencyLimitCreate(workspace, tag string, concurrencyLimit int64) string { + return fmt.Sprintf(` +%s + +resource "prefect_concurrency_limit" "concurrency_limit" { + workspace_id = prefect_workspace.test.id + tag = "%s" + concurrency_limit = %d +} +`, workspace, tag, concurrencyLimit) +} + +//nolint:paralleltest // we use the resource.ParallelTest helper instead +func TestAccResource_concurrency_limit(t *testing.T) { + resourceName := "prefect_concurrency_limit.concurrency_limit" + workspace := testutils.NewEphemeralWorkspace() + + resource.ParallelTest(t, resource.TestCase{ + ProtoV6ProviderFactories: testutils.TestAccProtoV6ProviderFactories, + PreCheck: func() { testutils.AccTestPreCheck(t) }, + Steps: []resource.TestStep{ + { + // Check creation + existence of the resource + Config: fixtureAccConcurrencyLimitCreate(workspace.Resource, "test1", 10), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "tag", "test1"), + resource.TestCheckResourceAttr(resourceName, "concurrency_limit", "10"), + ), + }, + { + // Check updating the resource + Config: fixtureAccConcurrencyLimitCreate(workspace.Resource, "test2", 20), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "tag", "test2"), + resource.TestCheckResourceAttr(resourceName, "concurrency_limit", "20"), + ), + }, + // Import State checks - import by ID (default) + { + ImportState: true, + ImportStateIdFunc: testutils.GetResourceWorkspaceImportStateID(resourceName), + ResourceName: resourceName, + ImportStateVerify: true, + }, + }, + }) +}