From 885771ad457901d2382325f14714aa582af1f6c2 Mon Sep 17 00:00:00 2001 From: CaiWanting Date: Sat, 30 Nov 2024 17:52:49 +0800 Subject: [PATCH] feat(dms): add group details for kafka data source (#5959) --- .../data-sources/dms_kafka_consumer_groups.md | 48 ++++- ...weicloud_dms_kafka_consumer_groups_test.go | 32 ++++ ...e_huaweicloud_dms_kafka_consumer_groups.go | 172 +++++++++++++++++- 3 files changed, 246 insertions(+), 6 deletions(-) diff --git a/docs/data-sources/dms_kafka_consumer_groups.md b/docs/data-sources/dms_kafka_consumer_groups.md index 4cb95da54f..3c75e5a178 100644 --- a/docs/data-sources/dms_kafka_consumer_groups.md +++ b/docs/data-sources/dms_kafka_consumer_groups.md @@ -13,10 +13,10 @@ Use this data source to get the list of Kafka consumer groups. ## Example Usage ```hcl -variable "instance_id" {} - -data "huaweicloud_dms_kafka_consumer_groups" "test" { - instance_id = var.instance_id +variable "instance_id" {} + +data "huaweicloud_dms_kafka_consumer_groups" "test" { + instance_id = var.instance_id } ``` @@ -69,3 +69,43 @@ The `groups` block supports: * `state` - Indicates the consumer group status. * `created_at` - Indicates the create time. + +* `group_message_offsets` - Indicates the group message offsets. + The [group_message_offsets](#attrblock--groups--group_message_offsets) structure is documented below. + +* `members` - Indicates the consumer group members. + The [members](#attrblock--groups--members) structure is documented below. + +* `assignment_strategy` - Indicates the partition assignment strategy. + + +The `group_message_offsets` block supports: + +* `lag` - Indicates the number of accumulated messages. + +* `message_current_offset` - Indicates the message current offset. + +* `message_log_end_offset` - Indicates the message log end offset. + +* `partition` - Indicates the partition. + +* `topic` - Indicates the topic name. + + +The `members` block supports: + +* `assignment` - Indicates the details about the partition assigned to the consumer. + The [assignment](#attrblock--groups--members--assignment) structure is documented below. + +* `client_id` - Indicates the client ID. + +* `host` - Indicates the consumer address. + +* `member_id` - Indicates the member ID. + + +The `assignment` block supports: + +* `partitions` - Indicates the partitions. + +* `topic` - Indicates the topic name. diff --git a/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_consumer_groups_test.go b/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_consumer_groups_test.go index 4b52371b03..7b634d1630 100644 --- a/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_consumer_groups_test.go +++ b/huaweicloud/services/acceptance/dms/data_source_huaweicloud_dms_kafka_consumer_groups_test.go @@ -81,3 +81,35 @@ output "state_validation" { } `, testDmsKafkaConsumerGroup_basic(name)) } + +func TestAccDataSourceDmsKafkaConsumerGroups_consumers(t *testing.T) { + dataSource := "data.huaweicloud_dms_kafka_consumer_groups.test" + dc := acceptance.InitDataSourceCheck(dataSource) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acceptance.TestAccPreCheck(t) + acceptance.TestAccPreCheckDMSKafkaInstanceID(t) + }, + ProviderFactories: acceptance.TestAccProviderFactories, + Steps: []resource.TestStep{ + { + Config: testDataSourceDmsKafkaConsumerGroups_consumers(), + Check: resource.ComposeTestCheckFunc( + dc.CheckResourceExists(), + resource.TestCheckResourceAttrSet(dataSource, "groups.#"), + resource.TestCheckResourceAttrSet(dataSource, "groups.0.members.#"), + resource.TestCheckResourceAttrSet(dataSource, "groups.0.group_message_offsets.#"), + ), + }, + }, + }) +} + +func testDataSourceDmsKafkaConsumerGroups_consumers() string { + return fmt.Sprintf(` +data "huaweicloud_dms_kafka_consumer_groups" "test" { + instance_id = "%[1]s" +} +`, acceptance.HW_DMS_KAFKA_INSTANCE_ID) +} diff --git a/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_consumer_groups.go b/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_consumer_groups.go index feae8389ba..6898ccec70 100644 --- a/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_consumer_groups.go +++ b/huaweicloud/services/dms/data_source_huaweicloud_dms_kafka_consumer_groups.go @@ -1,4 +1,3 @@ -// Generated by PMS #199 package dms import ( @@ -20,6 +19,7 @@ import ( const pageLimit = 10 // @API Kafka GET /v2/{project_id}/instances/{instance_id}/groups +// @API Kafka GET /v2/{project_id}/instances/{instance_id}/management/groups/{group} func DataSourceDmsKafkaConsumerGroups() *schema.Resource { return &schema.Resource{ ReadContext: dataSourceDmsKafkaConsumerGroupsRead, @@ -97,6 +97,89 @@ func DataSourceDmsKafkaConsumerGroups() *schema.Resource { Computed: true, Description: `Indicates the create time.`, }, + "assignment_strategy": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the partition assignment strategy.`, + }, + "members": { + Type: schema.TypeList, + Computed: true, + Description: `Indicates the consumer group members.`, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "host": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the consumer address.`, + }, + "member_id": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the member ID.`, + }, + "client_id": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the client ID.`, + }, + "assignment": { + Type: schema.TypeList, + Computed: true, + Description: `Indicates the details about the partition assigned to the consumer.`, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "topic": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the topic name.`, + }, + "partitions": { + Type: schema.TypeList, + Computed: true, + Description: `Indicates the partitions.`, + Elem: &schema.Schema{Type: schema.TypeInt}, + }, + }, + }, + }, + }, + }, + }, + "group_message_offsets": { + Type: schema.TypeList, + Computed: true, + Description: `Indicates the group message offsets.`, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "partition": { + Type: schema.TypeInt, + Computed: true, + Description: `Indicates the partition.`, + }, + "lag": { + Type: schema.TypeInt, + Computed: true, + Description: `Indicates the number of accumulated messages.`, + }, + "topic": { + Type: schema.TypeString, + Computed: true, + Description: `Indicates the topic name.`, + }, + "message_current_offset": { + Type: schema.TypeInt, + Computed: true, + Description: `Indicates the message current offset.`, + }, + "message_log_end_offset": { + Type: schema.TypeInt, + Computed: true, + Description: `Indicates the message log end offset.`, + }, + }, + }, + }, }, }, }, @@ -157,14 +240,28 @@ func dataSourceDmsKafkaConsumerGroupsRead(_ context.Context, d *schema.ResourceD if val, ok := d.GetOk("description"); ok && state != val { continue } + + // get group details + name := utils.PathSearch("group_id", group, "").(string) + groupDetails, err := getKafkaConsumerGroupDetails(client, d, name) + if err != nil { + return diag.FromErr(err) + } + results = append(results, map[string]interface{}{ - "name": utils.PathSearch("group_id", group, nil), + "name": name, "state": utils.PathSearch("state", group, nil), "lag": utils.PathSearch("lag", group, 0), "coordinator_id": utils.PathSearch("coordinator_id", group, 0), "description": utils.PathSearch("group_desc", group, nil), "created_at": utils.FormatTimeStampRFC3339( int64(utils.PathSearch("createdAt", group, float64(0)).(float64))/1000, true), + + "assignment_strategy": utils.PathSearch("assignment_strategy", groupDetails, nil), + "members": flattenGroupMembers( + utils.PathSearch("members", groupDetails, make([]interface{}, 0)).([]interface{})), + "group_message_offsets": flattenGroupMessageOffsets( + utils.PathSearch("group_message_offsets", groupDetails, make([]interface{}, 0)).([]interface{})), }) } @@ -196,3 +293,74 @@ func buildQueryGroupsListPath(d *schema.ResourceData, listGroupsPath string) str } return listGroupsPath } + +func getKafkaConsumerGroupDetails(client *golangsdk.ServiceClient, d *schema.ResourceData, group string) (interface{}, error) { + getGroupHttpUrl := "v2/{project_id}/instances/{instance_id}/management/groups/{group}" + getGroupPath := client.Endpoint + getGroupHttpUrl + getGroupPath = strings.ReplaceAll(getGroupPath, "{project_id}", client.ProjectID) + getGroupPath = strings.ReplaceAll(getGroupPath, "{instance_id}", d.Get("instance_id").(string)) + getGroupPath = strings.ReplaceAll(getGroupPath, "{group}", group) + getGroupOpt := golangsdk.RequestOpts{ + KeepResponseBody: true, + MoreHeaders: map[string]string{"Content-Type": "application/json"}, + } + + getGroupResp, err := client.Request("GET", getGroupPath, &getGroupOpt) + if err != nil { + return nil, fmt.Errorf("error retrieving group: %s", err) + } + getGroupRespBody, err := utils.FlattenResponse(getGroupResp) + if err != nil { + return nil, fmt.Errorf("error flatten response: %s", err) + } + + return utils.PathSearch("group", getGroupRespBody, nil), nil +} + +func flattenGroupMembers(paramsList []interface{}) interface{} { + if len(paramsList) == 0 { + return nil + } + rst := make([]interface{}, 0, len(paramsList)) + for _, params := range paramsList { + rst = append(rst, map[string]interface{}{ + "host": utils.PathSearch("host", params, nil), + "member_id": utils.PathSearch("member_id", params, nil), + "client_id": utils.PathSearch("client_id", params, nil), + "assignment": flattenGroupMembersAssignment( + utils.PathSearch("assignment", params, make([]interface{}, 0)).([]interface{})), + }) + } + return rst +} + +func flattenGroupMembersAssignment(paramsList []interface{}) interface{} { + if len(paramsList) == 0 { + return nil + } + rst := make([]interface{}, 0, len(paramsList)) + for _, params := range paramsList { + rst = append(rst, map[string]interface{}{ + "topic": utils.PathSearch("topic", params, nil), + "partitions": utils.PathSearch("partitions", params, nil), + }) + } + return rst +} + +func flattenGroupMessageOffsets(paramsList []interface{}) interface{} { + if len(paramsList) == 0 { + return nil + } + rst := make([]interface{}, 0, len(paramsList)) + for _, params := range paramsList { + rst = append(rst, map[string]interface{}{ + "partition": utils.PathSearch("partition", params, nil), + "lag": utils.PathSearch("lag", params, nil), + "topic": utils.PathSearch("topic", params, nil), + "message_current_offset": utils.PathSearch("message_current_offset", params, nil), + "message_log_end_offset": utils.PathSearch("message_log_end_offset", params, nil), + }) + } + return rst +}