Skip to content

Commit

Permalink
Implement backoff, state check between updates & regionId checking
Browse files Browse the repository at this point in the history
  • Loading branch information
ducvm29 committed Sep 5, 2024
1 parent 67ef978 commit 2123eb7
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 19 deletions.
191 changes: 173 additions & 18 deletions fptcloud/dfke/resource_dfke.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fptcloud_dfke
import (
"context"
"encoding/json"
"errors"
"fmt"
diag2 "github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/resource"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/hashicorp/terraform-plugin-log/tflog"
"strings"
"terraform-provider-fptcloud/commons"
"time"
)

var (
Expand All @@ -31,8 +33,9 @@ var (
)

type resourceDedicatedKubernetesEngine struct {
client *commons.Client
dfkeClient *dfkeApiClient
client *commons.Client
dfkeClient *dfkeApiClient
tenancyApiClient *tenancyApiClient
}

func (r *resourceDedicatedKubernetesEngine) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) {
Expand Down Expand Up @@ -74,7 +77,7 @@ func (r *resourceDedicatedKubernetesEngine) Create(ctx context.Context, request

tflog.Info(ctx, "Created cluster with id "+createResponse.Cluster.ID)

if err = r.internalRead(ctx, createResponse.Cluster.ID, &state); err != nil {
if _, err = r.internalRead(ctx, createResponse.Cluster.ID, &state); err != nil {
response.Diagnostics.Append(diag2.NewErrorDiagnostic("Error reading cluster state", err.Error()))
return
}
Expand All @@ -95,7 +98,7 @@ func (r *resourceDedicatedKubernetesEngine) Read(ctx context.Context, request re
return
}

err := r.internalRead(ctx, state.Id.ValueString(), &state)
_, err := r.internalRead(ctx, state.Id.ValueString(), &state)
if err != nil {
response.Diagnostics.Append(diag2.NewErrorDiagnostic("Error calling API", err.Error()))
return
Expand All @@ -117,21 +120,28 @@ func (r *resourceDedicatedKubernetesEngine) Update(ctx context.Context, request
return
}

var existing dedicatedKubernetesEngine
var plan dedicatedKubernetesEngine
request.Plan.Get(ctx, &plan)

err := r.internalRead(ctx, state.Id.ValueString(), &existing)
if err != nil {
response.Diagnostics.Append(diag2.NewErrorDiagnostic("Error getting existing state", err.Error()))
response.Diagnostics.Append(diags...)
if response.Diagnostics.HasError() {
return
}

errDiag := r.diff(ctx, &existing, &state)
//tflog.Info(ctx, "Reading existing state of cluster ID "+state.Id.ValueString()+", VPC "+state.vpcId())
//err := r.internalRead(ctx, state.Id.ValueString(), &existing)
//if err != nil {
// response.Diagnostics.Append(diag2.NewErrorDiagnostic("Error getting existing state", err.Error()))
// return
//}

errDiag := r.diff(ctx, &state, &plan)
if errDiag != nil {
response.Diagnostics.Append(errDiag)
return
}

err = r.internalRead(ctx, state.Id.ValueString(), &existing)
_, err := r.internalRead(ctx, state.Id.ValueString(), &state)
if err != nil {
response.Diagnostics.Append(diag2.NewErrorDiagnostic("Error refreshing state", err.Error()))
return
Expand Down Expand Up @@ -167,7 +177,7 @@ func (r *resourceDedicatedKubernetesEngine) ImportState(ctx context.Context, req
state.VpcId = types.StringValue("188af427-269b-418a-90bb-0cb27afc6c1e")

state.Id = types.StringValue(request.ID)
err := r.internalRead(ctx, request.ID, &state)
_, err := r.internalRead(ctx, request.ID, &state)
if err != nil {
response.Diagnostics.Append(diag2.NewErrorDiagnostic("Error calling API", err.Error()))
return
Expand Down Expand Up @@ -330,36 +340,39 @@ func (r *resourceDedicatedKubernetesEngine) Configure(ctx context.Context, reque
}

r.dfkeClient = a

t := newTenancyApiClient(client)
r.tenancyApiClient = t
}

func (r *resourceDedicatedKubernetesEngine) internalRead(ctx context.Context, clusterId string, state *dedicatedKubernetesEngine) error {
func (r *resourceDedicatedKubernetesEngine) internalRead(ctx context.Context, clusterId string, state *dedicatedKubernetesEngine) (*dedicatedKubernetesEngineReadResponse, error) {
vpcId := state.VpcId.ValueString()
tflog.Info(ctx, "Reading state of cluster ID "+clusterId+", VPC ID "+vpcId)

a, err := r.client.SendGetRequest(fmt.Sprintf("/v1/xplat/fke/vpc/%s/cluster/%s?page=1&page_size=25", vpcId, clusterId))

if err != nil {
return err
return nil, err
}

var d dedicatedKubernetesEngineReadResponse
err = json.Unmarshal(a, &d)
data := d.Cluster
if err != nil {
return err
return nil, err
}

var awx dedicatedKubernetesEngineParams
err = json.Unmarshal([]byte(d.Cluster.AwxParams), &awx)

if err != nil {
return err
return nil, err
}

// resolve edge ID
edge, err := r.dfkeClient.FindEdgeByEdgeGatewayId(ctx, vpcId, data.EdgeID)
if err != nil {
return err
return nil, err
}

state.ClusterId = types.StringValue(data.ClusterID)
Expand Down Expand Up @@ -388,8 +401,13 @@ func (r *resourceDedicatedKubernetesEngine) internalRead(ctx context.Context, cl
state.VpcId = types.StringValue(data.VpcID)
//state.CustomScript = awx.CustomScript
//state.EnableCustomScript = awx.EnableCustomScript
region, err := r.getRegionFromVpcId(ctx, vpcId)
if err != nil {
return nil, err
}
state.RegionId = types.StringValue(region)

return nil
return &d, nil
}

func (r *resourceDedicatedKubernetesEngine) checkForError(a []byte) *diag2.ErrorDiagnostic {
Expand All @@ -403,10 +421,17 @@ func (r *resourceDedicatedKubernetesEngine) checkForError(a []byte) *diag2.Error
if errorField, ok := re["error"]; ok {
e2, isBool := errorField.(bool)
if isBool && e2 != false {
res := diag2.NewErrorDiagnostic("Response contained an error field", "Response body was "+string(a))
res := diag2.NewErrorDiagnostic(
fmt.Sprintf("Response contained an error field and value was %t", e2),
"Response body was "+string(a),
)
return &res
}

if isBool {
return nil
}

if errorField != nil {
res := diag2.NewErrorDiagnostic("Response contained an error field", "Response body was "+string(a))
return &res
Expand Down Expand Up @@ -447,13 +472,16 @@ func (r *resourceDedicatedKubernetesEngine) remap(from *dedicatedKubernetesEngin
func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedicatedKubernetesEngine, to *dedicatedKubernetesEngine) *diag2.ErrorDiagnostic {
master := from.MasterDiskSize.ValueInt64()
master2 := to.MasterDiskSize.ValueInt64()
// status: EXTENDING
if master != master2 {
if master2 < master {
d := diag2.NewErrorDiagnostic("Wrong master disk size", "Disk cannot be shrinked")
return &d
}

tflog.Info(ctx, fmt.Sprintf("Resizing master from %d to %d", master, master2))

time.Sleep(5 * time.Second)
management := dedicatedKubernetesEngineManagement{
ClusterId: to.clusterUUID(),
MgmtAction: "",
Expand All @@ -467,10 +495,17 @@ func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedi
return err
}
tflog.Info(ctx, fmt.Sprintf("Resized master from %d to %d", master, master2))

err := r.waitForSucceeded(ctx, from, 5*time.Minute)
if err != nil {
d := diag2.NewErrorDiagnostic("Error waiting for cluster after resizing master disk to return to SUCCEEDED state", err.Error())
return &d
}
}

worker := from.WorkerDiskSize.ValueInt64()
worker2 := to.WorkerDiskSize.ValueInt64()
// status: EXTENDING
if worker != worker2 {
if worker2 < worker {
d := diag2.NewErrorDiagnostic("Wrong worker disk size", "Disk cannot be shrinked")
Expand All @@ -491,6 +526,12 @@ func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedi
return err
}
tflog.Info(ctx, fmt.Sprintf("Resized worker from %d to %d", worker, worker2))

err := r.waitForSucceeded(ctx, from, 5*time.Minute)
if err != nil {
d := diag2.NewErrorDiagnostic("Error waiting for cluster after resizing worker disk to return to SUCCEEDED state", err.Error())
return &d
}
}

masterType := from.MasterType.ValueString()
Expand All @@ -511,6 +552,12 @@ func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedi
return err
}
tflog.Info(ctx, fmt.Sprintf("Changed master from %s to %s", masterType, master2Type))

err := r.waitForSucceeded(ctx, from, 20*time.Minute)
if err != nil {
d := diag2.NewErrorDiagnostic("Error waiting for cluster after changing master type to return to SUCCEEDED state", err.Error())
return &d
}
}

workerType := from.WorkerType.ValueString()
Expand All @@ -532,6 +579,12 @@ func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedi
}

tflog.Info(ctx, fmt.Sprintf("Changed worker from %s to %s", workerType, worker2Type))

err := r.waitForSucceeded(ctx, from, 20*time.Minute)
if err != nil {
d := diag2.NewErrorDiagnostic("Error waiting for cluster after changing worker type to return to SUCCEEDED state", err.Error())
return &d
}
}

if (from.ScaleMin.ValueInt64() != to.ScaleMin.ValueInt64()) || (from.ScaleMax.ValueInt64() != to.ScaleMax.ValueInt64()) {
Expand All @@ -555,6 +608,12 @@ func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedi
"Changed autoscale to to (%d-%d)",
to.ScaleMin.ValueInt64(), to.ScaleMax.ValueInt64(),
))

err := r.waitForSucceeded(ctx, from, 5*time.Minute)
if err != nil {
d := diag2.NewErrorDiagnostic("Error waiting for cluster after updating autoscale to return to SUCCEEDED state", err.Error())
return &d
}
}

if from.Version.ValueString() != to.Version.ValueString() {
Expand All @@ -581,6 +640,12 @@ func (r *resourceDedicatedKubernetesEngine) diff(ctx context.Context, from *dedi
if diagErr2 := r.checkForError(a); diagErr2 != nil {
return diagErr2
}

err := r.waitForSucceeded(ctx, from, 1*time.Hour)
if err != nil {
d := diag2.NewErrorDiagnostic("Error waiting for cluster after upgrading to return to SUCCEEDED state", err.Error())
return &d
}
}

return nil
Expand All @@ -602,12 +667,102 @@ func (r *resourceDedicatedKubernetesEngine) manage(state *dedicatedKubernetesEng
return nil
}

func (r *resourceDedicatedKubernetesEngine) waitForSucceeded(ctx context.Context, state *dedicatedKubernetesEngine, timeout time.Duration) error {
clusterId := state.clusterUUID()
durationText := fmt.Sprintf("%v", timeout)
tflog.Info(ctx, "Waiting for cluster "+clusterId+" to succeed, duration "+durationText)

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
to := time.NewTimer(timeout)
defer to.Stop()

for {
select {
case <-to.C:
return errors.New("Timed out waiting for cluster " + clusterId + " to return to success state")
case <-ticker.C:
{
var e error
tflog.Info(ctx, "Checking status of cluster "+clusterId)

localTimeout := 200 * time.Millisecond

for i := 0; i < 5; i++ {
status, err := r.internalRead(ctx, clusterId, &dedicatedKubernetesEngine{
ClusterId: state.ClusterId,
VpcId: state.VpcId,
})
e = err

if err != nil {
time.Sleep(localTimeout)
localTimeout *= 2
localTimeout = min(localTimeout, 30*time.Second)

continue
}

state := status.Cluster.Status
tflog.Info(ctx, "Status of cluster "+clusterId+" is currently "+state)
if state == "SUCCEEDED" {
return nil
}

if state == "ERROR" {
return errors.New("cluster in error state")
}

if state == "STOPPED" {
return errors.New("cluster is stopped")
}
}
if e != nil {
return e
}
}
}
}
}

func (e *dedicatedKubernetesEngine) vpcId() string {
return e.VpcId.ValueString()
}
func (e *dedicatedKubernetesEngine) clusterUUID() string {
return e.Id.ValueString()
}
func (r *resourceDedicatedKubernetesEngine) getRegionFromVpcId(ctx context.Context, vpcId string) (string, error) {
client := r.tenancyApiClient

t, err := client.GetTenancy(ctx)
if err != nil {
return "", err
}

user := t.UserId

for _, tenant := range t.Tenants {
regions, e := client.GetRegions(ctx, tenant.Id)
if e != nil {
return "", e
}

for _, region := range regions {
vpcs, e2 := client.ListVpcs(ctx, tenant.Id, user, region.Id)
if e2 != nil {
return "", e2
}

for _, vpc := range vpcs {
if vpc.Id == vpcId {
return region.Abbr, nil
}
}
}
}

return "", errors.New("no VPC found under this account with vpcId " + vpcId)
}

type dedicatedKubernetesEngine struct {
ClusterName types.String `tfsdk:"cluster_name" json:"cluster_name"`
Expand Down
Loading

0 comments on commit 2123eb7

Please sign in to comment.