Skip to content

Commit

Permalink
feat(dms): add group details for kafka data source (#5959)
Browse files Browse the repository at this point in the history
  • Loading branch information
saf3dfsa authored Nov 30, 2024
1 parent b4aa0c6 commit 885771a
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 6 deletions.
48 changes: 44 additions & 4 deletions docs/data-sources/dms_kafka_consumer_groups.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ Use this data source to get the list of Kafka consumer groups.
## Example Usage

```hcl
variable "instance_id" {}
data "huaweicloud_dms_kafka_consumer_groups" "test" {
instance_id = var.instance_id
variable "instance_id" {}
data "huaweicloud_dms_kafka_consumer_groups" "test" {
instance_id = var.instance_id
}
```

Expand Down Expand Up @@ -69,3 +69,43 @@ The `groups` block supports:
* `state` - Indicates the consumer group status.

* `created_at` - Indicates the create time.

* `group_message_offsets` - Indicates the group message offsets.
The [group_message_offsets](#attrblock--groups--group_message_offsets) structure is documented below.

* `members` - Indicates the consumer group members.
The [members](#attrblock--groups--members) structure is documented below.

* `assignment_strategy` - Indicates the partition assignment strategy.

<a name="attrblock--groups--group_message_offsets"></a>
The `group_message_offsets` block supports:

* `lag` - Indicates the number of accumulated messages.

* `message_current_offset` - Indicates the message current offset.

* `message_log_end_offset` - Indicates the message log end offset.

* `partition` - Indicates the partition.

* `topic` - Indicates the topic name.

<a name="attrblock--groups--members"></a>
The `members` block supports:

* `assignment` - Indicates the details about the partition assigned to the consumer.
The [assignment](#attrblock--groups--members--assignment) structure is documented below.

* `client_id` - Indicates the client ID.

* `host` - Indicates the consumer address.

* `member_id` - Indicates the member ID.

<a name="attrblock--groups--members--assignment"></a>
The `assignment` block supports:

* `partitions` - Indicates the partitions.

* `topic` - Indicates the topic name.
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,35 @@ output "state_validation" {
}
`, testDmsKafkaConsumerGroup_basic(name))
}

func TestAccDataSourceDmsKafkaConsumerGroups_consumers(t *testing.T) {
dataSource := "data.huaweicloud_dms_kafka_consumer_groups.test"
dc := acceptance.InitDataSourceCheck(dataSource)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acceptance.TestAccPreCheck(t)
acceptance.TestAccPreCheckDMSKafkaInstanceID(t)
},
ProviderFactories: acceptance.TestAccProviderFactories,
Steps: []resource.TestStep{
{
Config: testDataSourceDmsKafkaConsumerGroups_consumers(),
Check: resource.ComposeTestCheckFunc(
dc.CheckResourceExists(),
resource.TestCheckResourceAttrSet(dataSource, "groups.#"),
resource.TestCheckResourceAttrSet(dataSource, "groups.0.members.#"),
resource.TestCheckResourceAttrSet(dataSource, "groups.0.group_message_offsets.#"),
),
},
},
})
}

func testDataSourceDmsKafkaConsumerGroups_consumers() string {
return fmt.Sprintf(`
data "huaweicloud_dms_kafka_consumer_groups" "test" {
instance_id = "%[1]s"
}
`, acceptance.HW_DMS_KAFKA_INSTANCE_ID)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// Generated by PMS #199
package dms

import (
Expand All @@ -20,6 +19,7 @@ import (
const pageLimit = 10

// @API Kafka GET /v2/{project_id}/instances/{instance_id}/groups
// @API Kafka GET /v2/{project_id}/instances/{instance_id}/management/groups/{group}
func DataSourceDmsKafkaConsumerGroups() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceDmsKafkaConsumerGroupsRead,
Expand Down Expand Up @@ -97,6 +97,89 @@ func DataSourceDmsKafkaConsumerGroups() *schema.Resource {
Computed: true,
Description: `Indicates the create time.`,
},
"assignment_strategy": {
Type: schema.TypeString,
Computed: true,
Description: `Indicates the partition assignment strategy.`,
},
"members": {
Type: schema.TypeList,
Computed: true,
Description: `Indicates the consumer group members.`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"host": {
Type: schema.TypeString,
Computed: true,
Description: `Indicates the consumer address.`,
},
"member_id": {
Type: schema.TypeString,
Computed: true,
Description: `Indicates the member ID.`,
},
"client_id": {
Type: schema.TypeString,
Computed: true,
Description: `Indicates the client ID.`,
},
"assignment": {
Type: schema.TypeList,
Computed: true,
Description: `Indicates the details about the partition assigned to the consumer.`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"topic": {
Type: schema.TypeString,
Computed: true,
Description: `Indicates the topic name.`,
},
"partitions": {
Type: schema.TypeList,
Computed: true,
Description: `Indicates the partitions.`,
Elem: &schema.Schema{Type: schema.TypeInt},
},
},
},
},
},
},
},
"group_message_offsets": {
Type: schema.TypeList,
Computed: true,
Description: `Indicates the group message offsets.`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"partition": {
Type: schema.TypeInt,
Computed: true,
Description: `Indicates the partition.`,
},
"lag": {
Type: schema.TypeInt,
Computed: true,
Description: `Indicates the number of accumulated messages.`,
},
"topic": {
Type: schema.TypeString,
Computed: true,
Description: `Indicates the topic name.`,
},
"message_current_offset": {
Type: schema.TypeInt,
Computed: true,
Description: `Indicates the message current offset.`,
},
"message_log_end_offset": {
Type: schema.TypeInt,
Computed: true,
Description: `Indicates the message log end offset.`,
},
},
},
},
},
},
},
Expand Down Expand Up @@ -157,14 +240,28 @@ func dataSourceDmsKafkaConsumerGroupsRead(_ context.Context, d *schema.ResourceD
if val, ok := d.GetOk("description"); ok && state != val {
continue
}

// get group details
name := utils.PathSearch("group_id", group, "").(string)
groupDetails, err := getKafkaConsumerGroupDetails(client, d, name)
if err != nil {
return diag.FromErr(err)
}

results = append(results, map[string]interface{}{
"name": utils.PathSearch("group_id", group, nil),
"name": name,
"state": utils.PathSearch("state", group, nil),
"lag": utils.PathSearch("lag", group, 0),
"coordinator_id": utils.PathSearch("coordinator_id", group, 0),
"description": utils.PathSearch("group_desc", group, nil),
"created_at": utils.FormatTimeStampRFC3339(
int64(utils.PathSearch("createdAt", group, float64(0)).(float64))/1000, true),

"assignment_strategy": utils.PathSearch("assignment_strategy", groupDetails, nil),
"members": flattenGroupMembers(
utils.PathSearch("members", groupDetails, make([]interface{}, 0)).([]interface{})),
"group_message_offsets": flattenGroupMessageOffsets(
utils.PathSearch("group_message_offsets", groupDetails, make([]interface{}, 0)).([]interface{})),
})
}

Expand Down Expand Up @@ -196,3 +293,74 @@ func buildQueryGroupsListPath(d *schema.ResourceData, listGroupsPath string) str
}
return listGroupsPath
}

func getKafkaConsumerGroupDetails(client *golangsdk.ServiceClient, d *schema.ResourceData, group string) (interface{}, error) {
getGroupHttpUrl := "v2/{project_id}/instances/{instance_id}/management/groups/{group}"
getGroupPath := client.Endpoint + getGroupHttpUrl
getGroupPath = strings.ReplaceAll(getGroupPath, "{project_id}", client.ProjectID)
getGroupPath = strings.ReplaceAll(getGroupPath, "{instance_id}", d.Get("instance_id").(string))
getGroupPath = strings.ReplaceAll(getGroupPath, "{group}", group)
getGroupOpt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: map[string]string{"Content-Type": "application/json"},
}

getGroupResp, err := client.Request("GET", getGroupPath, &getGroupOpt)
if err != nil {
return nil, fmt.Errorf("error retrieving group: %s", err)
}
getGroupRespBody, err := utils.FlattenResponse(getGroupResp)
if err != nil {
return nil, fmt.Errorf("error flatten response: %s", err)
}

return utils.PathSearch("group", getGroupRespBody, nil), nil
}

func flattenGroupMembers(paramsList []interface{}) interface{} {
if len(paramsList) == 0 {
return nil
}
rst := make([]interface{}, 0, len(paramsList))
for _, params := range paramsList {
rst = append(rst, map[string]interface{}{
"host": utils.PathSearch("host", params, nil),
"member_id": utils.PathSearch("member_id", params, nil),
"client_id": utils.PathSearch("client_id", params, nil),
"assignment": flattenGroupMembersAssignment(
utils.PathSearch("assignment", params, make([]interface{}, 0)).([]interface{})),
})
}
return rst
}

func flattenGroupMembersAssignment(paramsList []interface{}) interface{} {
if len(paramsList) == 0 {
return nil
}
rst := make([]interface{}, 0, len(paramsList))
for _, params := range paramsList {
rst = append(rst, map[string]interface{}{
"topic": utils.PathSearch("topic", params, nil),
"partitions": utils.PathSearch("partitions", params, nil),
})
}
return rst
}

func flattenGroupMessageOffsets(paramsList []interface{}) interface{} {
if len(paramsList) == 0 {
return nil
}
rst := make([]interface{}, 0, len(paramsList))
for _, params := range paramsList {
rst = append(rst, map[string]interface{}{
"partition": utils.PathSearch("partition", params, nil),
"lag": utils.PathSearch("lag", params, nil),
"topic": utils.PathSearch("topic", params, nil),
"message_current_offset": utils.PathSearch("message_current_offset", params, nil),
"message_log_end_offset": utils.PathSearch("message_log_end_offset", params, nil),
})
}
return rst
}

0 comments on commit 885771a

Please sign in to comment.