Skip to content

Commit

Permalink
fix topic not found namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet committed Dec 18, 2024
1 parent fccccad commit 5fb24b3
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pulsar/resource_pulsar_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -162,6 +163,10 @@ func resourcePulsarTopicRead(ctx context.Context, d *schema.ResourceData, meta i

topicName, found, err := getTopic(d, meta)
if err != nil {
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
d.SetId("")
return nil
}
return diag.Errorf("%v", err)
}
if !found {
Expand Down
151 changes: 151 additions & 0 deletions pulsar/resource_pulsar_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,54 @@ func TestPartionedTopicWithPermissionGrantUpdate(t *testing.T) {
testTopicWithPermissionGrantUpdate(t, 10)
}

func TestTopicNamespaceExternallyRemoved(t *testing.T) {

resourceName := "pulsar_topic.test"
cName := acctest.RandString(10)
tName := acctest.RandString(10)
nsName := acctest.RandString(10)
topicName := acctest.RandString(10)

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
IDRefreshName: resourceName,
CheckDestroy: testPulsarNamespaceDestroy,
Steps: []resource.TestStep{
{
Config: testPulsarNamespaceWithTopic(testWebServiceURL, cName, tName, nsName, topicName),
Check: resource.ComposeTestCheckFunc(
testPulsarTopicExists(resourceName),
),
},
{
PreConfig: func() {
client, err := sharedClient(testWebServiceURL)
if err != nil {
t.Fatalf("ERROR_GETTING_PULSAR_CLIENT: %v", err)
}

conn := client.(admin.Client)
topicName, err := utils.GetTopicName(fmt.Sprintf("persistent://%s/%s/%s", tName, nsName, topicName))
if err != nil {
t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err)
}
if err = conn.Topics().Delete(*topicName, false, false); err != nil {
t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err)
}
if err = conn.Namespaces().DeleteNamespace(tName + "/" + nsName); err != nil {
t.Fatalf("ERROR_DELETING_TEST_NS: %v", err)
}
},
Config: testPulsarNamespaceWithTopic(testWebServiceURL, cName, tName, nsName, topicName),
PlanOnly: true,
ExpectNonEmptyPlan: true,
ExpectError: nil,
},
},
})
}

func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
resourceName := "pulsar_topic.test"
tname := acctest.RandString(10)
Expand Down Expand Up @@ -338,3 +386,106 @@ resource "pulsar_topic" "test" {
}
`, url, ttype, tname, pnum, permissionGrants)
}

func testPulsarNamespaceWithTopic(wsURL, cluster, tenant, ns, topicName string) string {
return fmt.Sprintf(`
provider "pulsar" {
web_service_url = "%s"
}
resource "pulsar_cluster" "test_cluster" {
cluster = "%s"
cluster_data {
web_service_url = "http://localhost:8080"
broker_service_url = "http://localhost:6050"
peer_clusters = ["standalone"]
}
}
resource "pulsar_tenant" "test_tenant" {
tenant = "%s"
allowed_clusters = [pulsar_cluster.test_cluster.cluster, "standalone"]
}
resource "pulsar_namespace" "test" {
tenant = pulsar_tenant.test_tenant.tenant
namespace = "%s"
enable_deduplication = true
namespace_config {
anti_affinity = "anti-aff"
max_consumers_per_subscription = "50"
max_consumers_per_topic = "50"
max_producers_per_topic = "50"
message_ttl_seconds = "86400"
replication_clusters = ["standalone"]
is_allow_auto_update_schema = false
offload_threshold_size_in_mb = "100"
}
dispatch_rate {
dispatch_msg_throttling_rate = 50
rate_period_seconds = 50
dispatch_byte_throttling_rate = 2048
}
subscription_dispatch_rate {
dispatch_msg_throttling_rate = 50
rate_period_seconds = 50
dispatch_byte_throttling_rate = 2048
}
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"
}
persistence_policies {
bookkeeper_ensemble = 2
bookkeeper_write_quorum = 2
bookkeeper_ack_quorum = 2
managed_ledger_max_mark_delete_rate = 0.0
}
backlog_quota {
limit_bytes = "10000000000"
limit_seconds = "-1"
policy = "producer_request_hold"
type = "destination_storage"
}
permission_grant {
role = "some-role-1"
actions = ["produce", "consume", "functions"]
}
permission_grant {
role = "some-role-2"
actions = ["produce", "consume"]
}
topic_auto_creation {
enable = false
}
depends_on = [
pulsar_tenant.test_tenant
]
}
resource "pulsar_topic" "test" {
tenant = "%s"
namespace = "%s"
topic_type = "persistent"
topic_name = "%s"
partitions = 0
depends_on = [
pulsar_namespace.test,
pulsar_tenant.test_tenant
]
}
`, wsURL, cluster, tenant, ns, tenant, ns, topicName)
}

0 comments on commit 5fb24b3

Please sign in to comment.