diff --git a/README.md b/README.md index 57a27f305c..d728344090 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,8 @@ Flags: -r, --resources strings firewall,networks or * for all services -s, --state string local or bucket (default "local") -v, --verbose verbose mode + -n, --retry-number number of retries to perform if refresh fails + -m, --retry-sleep-ms time in ms to sleep between retries Use " import [provider] [command] --help" for more information about a command. ``` diff --git a/cmd/import.go b/cmd/import.go index 8e8ed0034e..f3331a14e5 100644 --- a/cmd/import.go +++ b/cmd/import.go @@ -20,6 +20,7 @@ import ( "os" "sort" "strings" + "sync" "github.com/GoogleCloudPlatform/terraformer/terraformutils/terraformerstring" @@ -51,6 +52,8 @@ type ImportOptions struct { Filter []string Plan bool `json:"-"` Output string + RetryCount int + RetrySleepMs int } const DefaultPathPattern = "{output}/{provider}/{service}/" @@ -78,16 +81,37 @@ func newImportCmd() *cobra.Command { } func Import(provider terraformutils.ProviderGenerator, options ImportOptions, args []string) error { - err := provider.Init(args) + + providerWrapper, options, err := initOptionsAndWrapper(provider, options, args) if err != nil { return err } + defer providerWrapper.Kill() + providerMapping := terraformutils.NewProvidersMapping(provider) - plan := &ImportPlan{ - Provider: provider.GetName(), - Options: options, - Args: args, - ImportedResource: map[string][]terraformutils.Resource{}, + err = initAllServicesResources(providerMapping, options, args, providerWrapper) + if err != nil { + return err + } + + err = terraformutils.RefreshResourcesByProvider(providerMapping, providerWrapper) + if err != nil { + return err + } + + providerMapping.ConvertTFStates(providerWrapper) + // change structs with additional data for each resource + providerMapping.CleanupProviders() + + err = importFromPlan(providerMapping, options, args) + + return err +} + +func initOptionsAndWrapper(provider terraformutils.ProviderGenerator, options ImportOptions, args []string) (*providerwrapper.ProviderWrapper, ImportOptions, error) { + err := provider.Init(args) + if err != nil { + return nil, options, err } if terraformerstring.ContainsString(options.Resources, "*") { @@ -112,64 +136,81 @@ func Import(provider terraformutils.ProviderGenerator, options ImportOptions, ar options.Resources = localSlice } - providerWrapper, err := providerwrapper.NewProviderWrapper(provider.GetName(), provider.GetConfig(), options.Verbose) + providerWrapper, err := providerwrapper.NewProviderWrapper(provider.GetName(), provider.GetConfig(), options.Verbose, map[string]int{"retryCount": options.RetryCount, "retrySleepMs": options.RetrySleepMs}) if err != nil { - return err + return nil, options, err } - defer providerWrapper.Kill() + return providerWrapper, options, nil +} + +func initAllServicesResources(providersMapping *terraformutils.ProvidersMapping, options ImportOptions, args []string, providerWrapper *providerwrapper.ProviderWrapper) error { + numOfResources := len(options.Resources) + var wg sync.WaitGroup + wg.Add(numOfResources) + + failedServices := []string{} for _, service := range options.Resources { - resources, err := buildServiceResources(service, provider, options, providerWrapper) + serviceProvider := providersMapping.AddServiceToProvider(service) + err := serviceProvider.Init(args) if err != nil { - log.Println(err) - continue + return err } - plan.ImportedResource[service] = append(plan.ImportedResource[service], resources...) + err = initServiceResources(service, serviceProvider, options, providerWrapper) + if err != nil { + failedServices = append(failedServices, service) + } + } + + // remove providers that failed to init their service + providersMapping.RemoveServices(failedServices) + providersMapping.ProcessResources() + + return nil +} + +func importFromPlan(providerMapping *terraformutils.ProvidersMapping, options ImportOptions, args []string) error { + plan := &ImportPlan{ + Provider: providerMapping.GetBaseProvider().GetName(), + Options: options, + Args: args, + ImportedResource: map[string][]terraformutils.Resource{}, } + + resourcesByService := providerMapping.GetResourcesByService() + for service := range resourcesByService { + plan.ImportedResource[service] = append(plan.ImportedResource[service], resourcesByService[service]...) + } + if options.Plan { - path := Path(options.PathPattern, provider.GetName(), "terraformer", options.PathOutput) + path := Path(options.PathPattern, providerMapping.GetBaseProvider().GetName(), "terraformer", options.PathOutput) return ExportPlanFile(plan, path, "plan.json") } - return ImportFromPlan(provider, plan) + + return ImportFromPlan(providerMapping.GetBaseProvider(), plan) } -func buildServiceResources(service string, provider terraformutils.ProviderGenerator, - options ImportOptions, providerWrapper *providerwrapper.ProviderWrapper) ([]terraformutils.Resource, error) { +func initServiceResources(service string, provider terraformutils.ProviderGenerator, + options ImportOptions, providerWrapper *providerwrapper.ProviderWrapper) error { log.Println(provider.GetName() + " importing... " + service) err := provider.InitService(service, options.Verbose) if err != nil { - return nil, err + log.Printf("%s error importing %s, err: %s\n", provider.GetName(), service, err) + return err } provider.GetService().ParseFilters(options.Filter) err = provider.GetService().InitResources() if err != nil { - return nil, err + log.Printf("%s error initializing resources in service %s, err: %s\n", provider.GetName(), service, err) + return err } provider.GetService().PopulateIgnoreKeys(providerWrapper) provider.GetService().InitialCleanup() + log.Println(provider.GetName() + " done importing " + service) - refreshedResources, err := terraformutils.RefreshResources(provider.GetService().GetResources(), providerWrapper) - if err != nil { - return nil, err - } - provider.GetService().SetResources(refreshedResources) - - for i := range provider.GetService().GetResources() { - err = provider.GetService().GetResources()[i].ConvertTFstate(providerWrapper) - if err != nil { - return nil, err - } - } - provider.GetService().PostRefreshCleanup() - - // change structs with additional data for each resource - err = provider.GetService().PostConvertHook() - if err != nil { - return nil, err - } - return provider.GetService().GetResources(), nil + return nil } func ImportFromPlan(provider terraformutils.ProviderGenerator, plan *ImportPlan) error { @@ -358,4 +399,6 @@ func baseProviderFlags(flag *pflag.FlagSet, options *ImportOptions, sampleRes, s flag.StringSliceVarP(&options.Filter, "filter", "f", []string{}, sampleFilters) flag.BoolVarP(&options.Verbose, "verbose", "v", false, "") flag.StringVarP(&options.Output, "output", "O", "hcl", "output format hcl or json") + flag.IntVarP(&options.RetryCount, "retry-number", "n", 5, "number of retries to perform when refresh fails") + flag.IntVarP(&options.RetrySleepMs, "retry-sleep-ms", "m", 300, "time in ms to sleep between retries") } diff --git a/terraformutils/providers_mapping.go b/terraformutils/providers_mapping.go new file mode 100644 index 0000000000..ce9fd3dd3d --- /dev/null +++ b/terraformutils/providers_mapping.go @@ -0,0 +1,165 @@ +package terraformutils + +import ( + "log" + "math/rand" + "reflect" + "time" + + "github.com/GoogleCloudPlatform/terraformer/terraformutils/providerwrapper" +) + +type ProvidersMapping struct { + baseProvider ProviderGenerator + Resources map[*Resource]bool + Services map[string]bool + Providers map[ProviderGenerator]bool + providerToService map[ProviderGenerator]string + serviceToProvider map[string]ProviderGenerator + resourceToProvider map[*Resource]ProviderGenerator +} + +func NewProvidersMapping(baseProvider ProviderGenerator) *ProvidersMapping { + providersMapping := &ProvidersMapping{ + baseProvider: baseProvider, + Resources: map[*Resource]bool{}, + Services: map[string]bool{}, + Providers: map[ProviderGenerator]bool{}, + providerToService: map[ProviderGenerator]string{}, + serviceToProvider: map[string]ProviderGenerator{}, + resourceToProvider: map[*Resource]ProviderGenerator{}, + } + + return providersMapping +} + +func deepCopyProvider(provider ProviderGenerator) ProviderGenerator { + return reflect.New(reflect.ValueOf(provider).Elem().Type()).Interface().(ProviderGenerator) +} + +func (p *ProvidersMapping) GetBaseProvider() ProviderGenerator { + return p.baseProvider +} + +func (p *ProvidersMapping) AddServiceToProvider(service string) ProviderGenerator { + newProvider := deepCopyProvider(p.baseProvider) + p.Providers[newProvider] = true + p.Services[service] = true + p.providerToService[newProvider] = service + p.serviceToProvider[service] = newProvider + + return newProvider +} + +func (p *ProvidersMapping) GetServices() []string { + services := make([]string, len(p.Services)) + for service := range p.Services { + services = append(services, service) + } + + return services +} + +func (p *ProvidersMapping) RemoveServices(services []string) { + for _, service := range services { + delete(p.Services, service) + + matchingProvider := p.serviceToProvider[service] + delete(p.Providers, matchingProvider) + delete(p.providerToService, matchingProvider) + delete(p.serviceToProvider, service) + } +} + +func (p *ProvidersMapping) ShuffleResources() []*Resource { + resources := []*Resource{} + for resource := range p.Resources { + resources = append(resources, resource) + } + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(resources), func(i, j int) { resources[i], resources[j] = resources[j], resources[i] }) + + return resources +} + +func (p *ProvidersMapping) ProcessResources() { + for provider := range p.Providers { + resources := provider.GetService().GetResources() + log.Printf("num of resources for service %s: %d", p.providerToService[provider], len(provider.GetService().GetResources())) + for i := range resources { + resource := resources[i] + p.Resources[&resource] = true + p.resourceToProvider[&resource] = provider + } + } +} + +func (p *ProvidersMapping) MatchProvider(resource *Resource) ProviderGenerator { + return p.resourceToProvider[resource] +} + +func (p *ProvidersMapping) SetResources(resourceToKeep []*Resource) { + p.Resources = map[*Resource]bool{} + resourcesGroupsByProviders := map[ProviderGenerator][]Resource{} + for i := range resourceToKeep { + resource := resourceToKeep[i] + provider := p.resourceToProvider[resource] + if resourcesGroupsByProviders[provider] == nil { + resourcesGroupsByProviders[provider] = []Resource{} + } + resourcesGroupsByProviders[provider] = append(resourcesGroupsByProviders[provider], *resource) + p.Resources[resource] = true + } + + for provider := range p.Providers { + provider.GetService().SetResources(resourcesGroupsByProviders[provider]) + } +} + +func (p *ProvidersMapping) GetResourcesByService() map[string][]Resource { + mapping := map[string][]Resource{} + for service := range p.Services { + mapping[service] = []Resource{} + } + + for resource := range p.Resources { + provider := p.resourceToProvider[resource] + service := p.providerToService[provider] + mapping[service] = append(mapping[service], *resource) + } + + return mapping +} + +func (p *ProvidersMapping) ConvertTFStates(providerWrapper *providerwrapper.ProviderWrapper) { + for resource := range p.Resources { + err := resource.ConvertTFstate(providerWrapper) + if err != nil { + log.Printf("failed to convert resources %s because of error %s", resource.InstanceInfo.Id, err) + } + } + + resourcesGroupsByProviders := map[ProviderGenerator][]Resource{} + for resource := range p.Resources { + provider := p.resourceToProvider[resource] + if resourcesGroupsByProviders[provider] == nil { + resourcesGroupsByProviders[provider] = []Resource{} + } + resourcesGroupsByProviders[provider] = append(resourcesGroupsByProviders[provider], *resource) + } + + for provider := range p.Providers { + provider.GetService().SetResources(resourcesGroupsByProviders[provider]) + } + +} + +func (p *ProvidersMapping) CleanupProviders() { + for provider := range p.Providers { + provider.GetService().PostRefreshCleanup() + err := provider.GetService().PostConvertHook() + if err != nil { + log.Printf("failed run PostConvertHook because of error %s", err) + } + } +} diff --git a/terraformutils/providerwrapper/provider.go b/terraformutils/providerwrapper/provider.go index 428621f237..f85d5cedd9 100644 --- a/terraformutils/providerwrapper/provider.go +++ b/terraformutils/providerwrapper/provider.go @@ -56,12 +56,26 @@ type ProviderWrapper struct { providerName string config cty.Value schema *providers.GetSchemaResponse + retryCount int + retrySleepMs int } -func NewProviderWrapper(providerName string, providerConfig cty.Value, verbose bool) (*ProviderWrapper, error) { - p := &ProviderWrapper{} +func NewProviderWrapper(providerName string, providerConfig cty.Value, verbose bool, options ...map[string]int) (*ProviderWrapper, error) { + p := &ProviderWrapper{retryCount: 5, retrySleepMs: 300} p.providerName = providerName p.config = providerConfig + + if len(options) > 0 { + retryCount, hasOption := options[0]["retryCount"] + if hasOption { + p.retryCount = retryCount + } + retrySleepMs, hasOption := options[0]["retrySleepMs"] + if hasOption { + p.retrySleepMs = retrySleepMs + } + } + err := p.initProvider(verbose) return p, err @@ -153,15 +167,15 @@ func (p *ProviderWrapper) Refresh(info *terraform.InstanceInfo, state *terraform } successReadResource := false resp := providers.ReadResourceResponse{} - for i := 0; i < 5; i++ { + for i := 0; i < p.retryCount; i++ { resp = p.Provider.ReadResource(providers.ReadResourceRequest{ TypeName: info.Type, PriorState: priorState, Private: []byte{}, }) if resp.Diagnostics.HasErrors() { - log.Println("WARN: Fail read resource from provider, wait 300ms before retry") - time.Sleep(300 * time.Millisecond) + log.Printf("WARN: Fail read resource from provider, wait %dms before retry\n", p.retrySleepMs) + time.Sleep(time.Duration(p.retrySleepMs) * time.Millisecond) continue } else { successReadResource = true diff --git a/terraformutils/utils.go b/terraformutils/utils.go index 0ac39c72fa..684a9f7e09 100644 --- a/terraformutils/utils.go +++ b/terraformutils/utils.go @@ -65,23 +65,36 @@ func PrintTfState(resources []Resource) ([]byte, error) { return buf.Bytes(), err } -func RefreshResources(resources []Resource, provider *providerwrapper.ProviderWrapper) ([]Resource, error) { - refreshedResources := []Resource{} - input := make(chan *Resource, 100) +func RefreshResources(resources []*Resource, provider *providerwrapper.ProviderWrapper, slowProcessingResources [][]*Resource) ([]*Resource, error) { + refreshedResources := []*Resource{} + input := make(chan *Resource, len(resources)) var wg sync.WaitGroup poolSize := 15 - if slowProcessingRequired(resources) { - poolSize = 1 + for i := range resources { + wg.Add(1) + input <- resources[i] } + close(input) + for i := 0; i < poolSize; i++ { go RefreshResourceWorker(input, &wg, provider) } - for i := range resources { - wg.Add(1) - input <- &resources[i] + + spInputs := []chan *Resource{} + for i, resourceGroup := range slowProcessingResources { + spInputs = append(spInputs, make(chan *Resource, len(resourceGroup))) + for j := range resourceGroup { + spInputs[i] <- resourceGroup[j] + } + close(spInputs[i]) + } + + for i := 0; i < len(spInputs); i++ { + wg.Add(len(slowProcessingResources[i])) + go RefreshResourceWorker(spInputs[i], &wg, provider) } + wg.Wait() - close(input) for _, r := range resources { if r.InstanceState != nil && r.InstanceState.ID != "" { refreshedResources = append(refreshedResources, r) @@ -89,16 +102,49 @@ func RefreshResources(resources []Resource, provider *providerwrapper.ProviderWr log.Printf("ERROR: Unable to refresh resource %s", r.ResourceName) } } + + for _, resourceGroup := range slowProcessingResources { + for i := range resourceGroup { + r := resourceGroup[i] + if r.InstanceState != nil && r.InstanceState.ID != "" { + refreshedResources = append(refreshedResources, r) + } else { + log.Printf("ERROR: Unable to refresh resource %s", r.ResourceName) + } + } + } return refreshedResources, nil } -func slowProcessingRequired(resources []Resource) bool { - for _, r := range resources { - if r.SlowQueryRequired { - return true +func RefreshResourcesByProvider(providersMapping *ProvidersMapping, providerWrapper *providerwrapper.ProviderWrapper) error { + allResources := providersMapping.ShuffleResources() + slowProcessingResources := make(map[ProviderGenerator][]*Resource) + regularResources := []*Resource{} + for i := range allResources { + resource := allResources[i] + if resource.SlowQueryRequired { + provider := providersMapping.MatchProvider(resource) + if slowProcessingResources[provider] == nil { + slowProcessingResources[provider] = []*Resource{} + } + slowProcessingResources[provider] = append(slowProcessingResources[provider], resource) + } else { + regularResources = append(regularResources, resource) } } - return false + + var spResourcesList [][]*Resource + for p := range slowProcessingResources { + spResourcesList = append(spResourcesList, slowProcessingResources[p]) + } + + refreshedResources, err := RefreshResources(regularResources, providerWrapper, spResourcesList) + if err != nil { + return err + } + + providersMapping.SetResources(refreshedResources) + return nil } func RefreshResourceWorker(input chan *Resource, wg *sync.WaitGroup, provider *providerwrapper.ProviderWrapper) {