From 187d2bdd1e6110a40da965ac4fc54e2513cb0738 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Sun, 1 Sep 2024 00:25:27 -0700 Subject: [PATCH 1/2] Update resource operator documentation 1. Improve comments 2. Add clarification around geo-replication 3. Add clarification around resource lifecycle 4. Add clarification around connection ref --- README.md | 42 ++- api/v1alpha1/common.go | 61 +++- api/v1alpha1/pulsarconnection_types.go | 47 ++- api/v1alpha1/pulsarnamespace_types.go | 58 +++- api/v1alpha1/pulsarpermission_types.go | 48 ++- api/v1alpha1/pulsartenant_types.go | 31 +- api/v1alpha1/pulsartopic_types.go | 73 ++++- api/v1alpha1/zz_generated.deepcopy.go | 14 - docs/pulsar_connection.md | 437 ++++++++++++++++++------- docs/pulsar_geo_replication.md | 132 ++++++-- docs/pulsar_namespace.md | 103 ++++-- docs/pulsar_permission.md | 123 +++++-- docs/pulsar_resource_lifecycle.md | 130 ++++++++ docs/pulsar_tenant.md | 129 +++++++- docs/pulsar_topic.md | 155 ++++++--- 15 files changed, 1219 insertions(+), 364 deletions(-) create mode 100644 docs/pulsar_resource_lifecycle.md diff --git a/README.md b/README.md index 0ff61ae0..ccf7f752 100644 --- a/README.md +++ b/README.md @@ -4,21 +4,41 @@ Authored by [StreamNative](https://streamnative.io), this Pulsar Resources Oper Currently, the Pulsar Resources Operator provides full lifecycle management for the following Pulsar resources, including creation, update, and deletion. -- Tenants -- Namespaces -- Topics -- Permissions -- Packages -- Functions -- Sinks -- Sources +- [Tenants](docs/pulsar_tenant.md) +- [Namespaces](docs/pulsar_namespace.md) +- [Topics](docs/pulsar_topic.md) +- [Permissions](docs/pulsar_permission.md) +- [Packages](docs/pulsar_package.md) +- [Functions](docs/pulsar_function.md) +- [Sinks](docs/pulsar_sink.md) +- [Sources](docs/pulsar_source.md) +- [Geo-Replication](docs/pulsar_geo_replication.md) +## Lifecycle Management + +The Pulsar Resources Operator provides a flexible approach to managing the lifecycle of Pulsar resources through the `PulsarResourceLifeCyclePolicy`. This policy determines how Pulsar resources are handled when their corresponding Kubernetes custom resources are deleted. For more details on lifecycle management, please refer to the [PulsarResourceLifeCyclePolicy documentation](docs/pulsar_resource_lifecycle.md). + +There are two available options for the lifecycle policy: + +1. `CleanUpAfterDeletion`: When set, the Pulsar resource (such as a tenant, namespace, or topic) will be deleted from the Pulsar cluster when its corresponding Kubernetes custom resource is deleted. This is the default policy. + +2. `KeepAfterDeletion`: When set, the Pulsar resource will remain in the Pulsar cluster even after its corresponding Kubernetes custom resource is deleted. + +You can specify the lifecycle policy in the custom resource definition: + +```yaml +apiVersion: pulsar.streamnative.io/v1beta1 +kind: PulsarTenant +metadata: + name: my-tenant +spec: + pulsarResourceLifeCyclePolicy: KeepAfterDeletion +``` # Installation The Pulsar Resources Operator is an independent controller, it doesn’t need to be installed with the pulsar operator. You can install it when you need the feature. And it is built with the [Operator SDK](https://github.com/operator-framework/operator-sdk), which is part of the [Operator framework](https://github.com/operator-framework/). - You can install the Pulsar Resources Operator using the officially supported `pulsar-resources-operator` Helm [chart](https://github.com/streamnative/charts/tree/master/charts/pulsar-resources-operator). It provides Custom Resource Definitions (CRDs) and Controllers to manage the Pulsar resources. ## Prerequisites @@ -98,6 +118,7 @@ Before creating Pulsar resources, you must create a resource called `PulsarConne In this tutorial, a Kubernetes namespace called `test` is used for examples, which is the namespace that the pulsar cluster installed. - [PulsarConnection](docs/pulsar_connection.md) +- [PulsarResourceLifeCyclePolicy](docs/pulsar_resource_lifecycle.md) - [PulsarTenant](docs/pulsar_tenant.md) - [PulsarNamespace](docs/pulsar_namespace.md) - [PulsarTopic](docs/pulsar_topic.md) @@ -106,8 +127,9 @@ In this tutorial, a Kubernetes namespace called `test` is used for examples, whi - [PulsarFunction](docs/pulsar_function.md) - [PulsarSink](docs/pulsar_sink.md) - [PulsarSource](docs/pulsar_source.md) +- [PulsarGeoReplication](docs/pulsar_geo_replication.md) -## Contributing +# Contributing Contributions are warmly welcomed and greatly appreciated! The project follows the typical GitHub pull request model. diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 6e36e803..521c5dfc 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -38,34 +38,71 @@ type ValueOrSecretRef struct { SecretRef *SecretKeyRef `json:"secretRef,omitempty"` } -// PulsarAuthentication use the token or OAuth2 for pulsar authentication +// PulsarAuthentication defines the authentication configuration for Pulsar resources. +// It supports two authentication methods: Token-based and OAuth2-based. +// Only one authentication method should be specified at a time. type PulsarAuthentication struct { + // Token specifies the configuration for token-based authentication. + // This can be either a direct token value or a reference to a secret containing the token. + // If using a secret, the token should be stored under the specified key in the secret. // +optional Token *ValueOrSecretRef `json:"token,omitempty"` + // OAuth2 specifies the configuration for OAuth2-based authentication. + // This includes all necessary parameters for setting up OAuth2 authentication with Pulsar. + // For detailed information on the OAuth2 fields, refer to the PulsarAuthenticationOAuth2 struct. // +optional OAuth2 *PulsarAuthenticationOAuth2 `json:"oauth2,omitempty"` } -// PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource -// in pulsar cluster after resource is deleted by controller -// KeepAfterDeletion or CleanUpAfterDeletion +// PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources +// when the corresponding custom resource (CR) is deleted from the Kubernetes cluster. +// This policy allows users to control whether Pulsar resources should be retained or +// removed from the Pulsar cluster after the CR is deleted. type PulsarResourceLifeCyclePolicy string const ( - // KeepAfterDeletion keeps the resource in pulsar cluster when cr is deleted + // KeepAfterDeletion instructs the operator to retain the Pulsar resource + // in the Pulsar cluster even after the corresponding CR is deleted from Kubernetes. + // Use this option when: + // - You want to preserve data or configurations in Pulsar for backup or future use. + // - You're performing temporary maintenance on Kubernetes resources without affecting Pulsar. + // - You need to migrate or recreate Kubernetes resources without losing Pulsar data. KeepAfterDeletion PulsarResourceLifeCyclePolicy = "KeepAfterDeletion" - // CleanUpAfterDeletion deletes the resource in pulsar cluster when cr is deleted + + // CleanUpAfterDeletion instructs the operator to remove the Pulsar resource + // from the Pulsar cluster when the corresponding CR is deleted from Kubernetes. + // Use this option when: + // - You want to ensure complete removal of resources to free up Pulsar cluster capacity. + // - You're decommissioning services or environments and need to clean up all associated resources. + // - You want to maintain strict synchronization between Kubernetes and Pulsar resources. CleanUpAfterDeletion PulsarResourceLifeCyclePolicy = "CleanUpAfterDeletion" ) -// PulsarAuthenticationOAuth2 indicates the parameters which are need by pulsar OAuth2 +// PulsarAuthenticationOAuth2 represents the configuration for Pulsar OAuth2 authentication. +// This struct aligns with Pulsar's OAuth2 authentication mechanism as described in +// https://pulsar.apache.org/docs/3.3.x/security-oauth2/ and +// https://docs.streamnative.io/docs/access-cloud-clusters-oauth type PulsarAuthenticationOAuth2 struct { - IssuerEndpoint string `json:"issuerEndpoint"` - ClientID string `json:"clientID"` - Audience string `json:"audience"` - Key *ValueOrSecretRef `json:"key"` - Scope string `json:"scope,omitempty"` + // IssuerEndpoint is the URL of the OAuth2 authorization server. + // This is typically the base URL of your identity provider's OAuth2 service. + IssuerEndpoint string `json:"issuerEndpoint"` + + // ClientID is the OAuth2 client identifier issued to the client during the registration process. + ClientID string `json:"clientID"` + + // Audience is the intended recipient of the token. In Pulsar's context, this is usually + // the URL of your Pulsar cluster or a specific identifier for your Pulsar service. + Audience string `json:"audience"` + + // Key is either the client secret or the path to a JSON credentials file. + // For confidential clients, this would be the client secret. + // For public clients using JWT authentication, this would be the path to the JSON credentials file. + Key *ValueOrSecretRef `json:"key"` + + // Scope is an optional field to request specific permissions from the OAuth2 server. + // If not specified, the default scope defined by the OAuth2 server will be used. + Scope string `json:"scope,omitempty"` } // IsPulsarResourceReady returns true if resource satisfies with these condition diff --git a/api/v1alpha1/pulsarconnection_types.go b/api/v1alpha1/pulsarconnection_types.go index c40d8249..56fb7f00 100644 --- a/api/v1alpha1/pulsarconnection_types.go +++ b/api/v1alpha1/pulsarconnection_types.go @@ -22,59 +22,81 @@ import ( // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // PulsarConnectionSpec defines the desired state of PulsarConnection +// It specifies the configuration for connecting to a Pulsar cluster. +// +// For plaintext (non-TLS) Pulsar clusters: +// - Set AdminServiceURL to "http://:" +// - Set BrokerServiceURL to "pulsar://:" +// +// For TLS-enabled Pulsar clusters: +// - Set AdminServiceSecureURL to "https://:" +// - Set BrokerServiceSecureURL to "pulsar+ssl://:" +// - Optionally set BrokerClientTrustCertsFilePath if using custom CA certificates type PulsarConnectionSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - // AdminServiceURL is the admin service url of the pulsar cluster + // AdminServiceURL is the HTTP(S) URL for the Pulsar cluster's admin service. + // This URL is used for administrative operations. // +optional // +kubebuilder:validation:Pattern="^https?://.+$" AdminServiceURL string `json:"adminServiceURL"` - // Authentication defines authentication configurations + // Authentication defines the authentication configuration for connecting to the Pulsar cluster. + // It supports both token-based and OAuth2-based authentication methods. // +optional Authentication *PulsarAuthentication `json:"authentication,omitempty"` - // BrokerServiceURL is the broker service url of the pulsar cluster + // BrokerServiceURL is the non-TLS URL for connecting to Pulsar brokers. + // Use this for non-secure connections to the Pulsar cluster. // +optional // +kubebuilder:validation:Pattern="^pulsar?://.+$" BrokerServiceURL string `json:"brokerServiceURL,omitempty"` - // BrokerServiceSecureURL is the broker service url for secure connection. + // BrokerServiceSecureURL is the TLS-enabled URL for secure connections to Pulsar brokers. + // Use this for encrypted communications with the Pulsar cluster. // +optional // +kubebuilder:validation:Pattern="^pulsar\\+ssl://.+$" BrokerServiceSecureURL string `json:"brokerServiceSecureURL,omitempty"` - // AdminServiceSecureURL is the admin service url for secure connection. + // AdminServiceSecureURL is the HTTPS URL for secure connections to the Pulsar admin service. + // Use this for encrypted administrative operations. // +optional // +kubebuilder:validation:Pattern="^https://.+$" AdminServiceSecureURL string `json:"adminServiceSecureURL,omitempty"` - // BrokerClientTrustCertsFilePath Path for the trusted TLS certificate file for outgoing connection to a server (broker) + // BrokerClientTrustCertsFilePath is the file path to the trusted TLS certificate + // for outgoing connections to Pulsar brokers. This is used for TLS verification. // +optional BrokerClientTrustCertsFilePath string `json:"brokerClientTrustCertsFilePath,omitempty"` - // ClusterName indicates the local cluster name of the pulsar cluster. It should - // set when enabling the Geo Replication + // ClusterName specifies the name of the local Pulsar cluster. + // When setting up Geo-Replication between Pulsar instances, this should be enabled to identify the cluster. // +optional ClusterName string `json:"clusterName,omitempty"` } -// PulsarConnectionStatus defines the observed state of PulsarConnection +// PulsarConnectionStatus defines the observed state of PulsarConnection. +// It provides information about the current status of the Pulsar connection. type PulsarConnectionStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file // ObservedGeneration is the most recent generation observed for this resource. // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // This field is used to track whether the controller has processed the latest changes. // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` - // SecretKeyHash is the hash of the secret ref + // SecretKeyHash is the hash of the secret reference used for authentication. + // This is used to detect changes in the secret without exposing sensitive information. + // The controller should update this hash when the secret changes. // +optional SecretKeyHash string `json:"secretKeyHash,omitempty"` - // Represents the observations of a connection's current state. + // Conditions represent the latest available observations of the connection's current state. + // It follows the Kubernetes conventions for condition types and status. + // The "Ready" condition type is typically used to indicate the overall status. // +patchMergeKey=type // +patchStrategy=merge // +listType=map @@ -95,6 +117,8 @@ type PulsarConnectionStatus struct { //+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` // PulsarConnection is the Schema for the pulsarconnections API +// It represents a connection to a Pulsar cluster and includes both the desired state (Spec) +// and the observed state (Status) of the connection. type PulsarConnection struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -106,6 +130,7 @@ type PulsarConnection struct { //+kubebuilder:object:root=true // PulsarConnectionList contains a list of PulsarConnection +// This type is used by the Kubernetes API to return multiple PulsarConnection objects. type PulsarConnectionList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/pulsarnamespace_types.go b/api/v1alpha1/pulsarnamespace_types.go index 5203a50f..4dd46282 100644 --- a/api/v1alpha1/pulsarnamespace_types.go +++ b/api/v1alpha1/pulsarnamespace_types.go @@ -25,71 +25,92 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. -// PulsarNamespaceSpec defines the desired state of PulsarNamespace +// PulsarNamespaceSpec defines the desired state of a Pulsar namespace. +// It corresponds to the configuration options available in Pulsar's namespace admin API. type PulsarNamespaceSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file // TODO make these fields immutable - // Name is the namespace name + // Name is the fully qualified namespace name in the format "tenant/namespace". Name string `json:"name"` + // Bundles specifies the number of bundles to split the namespace into. + // This affects how the namespace is distributed across the cluster. Bundles *int32 `json:"bundles,omitempty"` // ConnectionRef is the reference to the PulsarConnection resource + // used to connect to the Pulsar cluster for this namespace. ConnectionRef corev1.LocalObjectReference `json:"connectionRef"` + // LifecyclePolicy determines whether to keep or delete the Pulsar namespace + // when the Kubernetes resource is deleted. // +kubebuilder:validation:Enum=CleanUpAfterDeletion;KeepAfterDeletion // +optional LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"` - // Tenant Policy Setting + // MaxProducersPerTopic sets the maximum number of producers allowed on a single topic in the namespace. // +optional MaxProducersPerTopic *int32 `json:"maxProducersPerTopic,omitempty"` + // MaxConsumersPerTopic sets the maximum number of consumers allowed on a single topic in the namespace. // +optional MaxConsumersPerTopic *int32 `json:"maxConsumersPerTopic,omitempty"` + // MaxConsumersPerSubscription sets the maximum number of consumers allowed on a single subscription in the namespace. // +optional MaxConsumersPerSubscription *int32 `json:"maxConsumersPerSubscription,omitempty"` - // MessageTTL indicates the message ttl for the namespace + // MessageTTL specifies the Time to Live (TTL) for messages in the namespace. + // Messages older than this TTL will be automatically marked as consumed. // +optional MessageTTL *utils.Duration `json:"messageTTL,omitempty"` - // Retention - // Should set at least one of them if setting retention + // RetentionTime specifies the minimum time to retain messages in the namespace. + // Should be set in conjunction with RetentionSize for effective retention policy. // Retention Quota must exceed configured backlog quota for namespace // +optional RetentionTime *utils.Duration `json:"retentionTime,omitempty"` + // RetentionSize specifies the maximum size of backlog retained in the namespace. + // Should be set in conjunction with RetentionTime for effective retention policy. // +optional RetentionSize *resource.Quantity `json:"retentionSize,omitempty"` - // Backlog - // Should set at least one of them if setting backlog + // BacklogQuotaLimitTime specifies the time limit for message backlog. + // Messages older than this limit will be removed or handled according to the retention policy. // +optional BacklogQuotaLimitTime *utils.Duration `json:"backlogQuotaLimitTime,omitempty"` + // BacklogQuotaLimitSize specifies the size limit for message backlog. + // When the limit is reached, older messages will be removed or handled according to the retention policy. // +optional BacklogQuotaLimitSize *resource.Quantity `json:"backlogQuotaLimitSize,omitempty"` + // BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. + // Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". // +optional BacklogQuotaRetentionPolicy *string `json:"backlogQuotaRetentionPolicy,omitempty"` - // BacklogQuotaType controls the backlog by setting the type to destination_storage or message_age - // destination_storage limits backlog by size (in bytes). message_age limits backlog by time, - // that is, message timestamp (broker or publish timestamp) + // BacklogQuotaType controls how the backlog quota is enforced. + // "destination_storage" limits backlog by size (in bytes), while "message_age" limits by time. // +kubebuilder:validation:Enum=destination_storage;message_age // +optional BacklogQuotaType *string `json:"backlogQuotaType,omitempty"` - // GeoReplicationRefs is the reference list to the PulsarGeoReplication resource + // GeoReplicationRefs is a list of references to PulsarGeoReplication resources, + // used to configure geo-replication for this namespace. + // This is **ONLY** used when you are using PulsarGeoReplication for setting up geo-replication + // between two Pulsar instances. + // Please use `ReplicationClusters` instead if you are replicating clusters within the same Pulsar instance. // +optional GeoReplicationRefs []*corev1.LocalObjectReference `json:"geoReplicationRefs,omitempty"` // ReplicationClusters is the list of clusters to which the namespace is replicated + // This is **ONLY** used if you are replicating clusters within the same Pulsar instance. + // Please use `GeoReplicationRefs` instead if you are setting up geo-replication + // between two Pulsar instances. // +optional ReplicationClusters []string `json:"replicationClusters,omitempty"` } @@ -101,10 +122,13 @@ type PulsarNamespaceStatus struct { // ObservedGeneration is the most recent generation observed for this resource. // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // This field is used to track whether the controller has processed the latest changes. // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` - // Represents the observations of a connection's current state. + // Conditions represent the latest available observations of the namespace's current state. + // It follows the Kubernetes conventions for condition types and status. + // The "Ready" condition type is typically used to indicate the overall status of the namespace. // +patchMergeKey=type // +patchStrategy=merge // +listType=map @@ -112,7 +136,8 @@ type PulsarNamespaceStatus struct { // +optional Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` - // GeoReplicationEnabled indicates whether geo-replication is enabled for the namespace + // GeoReplicationEnabled indicates whether geo-replication between two Pulsar instances (via PulsarGeoReplication) + // is enabled for the namespace // +optional GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"` } @@ -126,6 +151,8 @@ type PulsarNamespaceStatus struct { //+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` // PulsarNamespace is the Schema for the pulsarnamespaces API +// It represents a Pulsar namespace in the Kubernetes cluster and includes both +// the desired state (Spec) and the observed state (Status) of the namespace. type PulsarNamespace struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -136,7 +163,8 @@ type PulsarNamespace struct { //+kubebuilder:object:root=true -// PulsarNamespaceList contains a list of PulsarNamespace +// PulsarNamespaceList contains a list of PulsarNamespace resources. +// It is used by the Kubernetes API to return multiple PulsarNamespace objects. type PulsarNamespaceList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/pulsarpermission_types.go b/api/v1alpha1/pulsarpermission_types.go index 525bb081..f64be124 100644 --- a/api/v1alpha1/pulsarpermission_types.go +++ b/api/v1alpha1/pulsarpermission_types.go @@ -19,54 +19,68 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// PulsarPermissionSpec defines the desired state of PulsarPermission +// PulsarPermissionSpec defines the desired state of PulsarPermission. +// It specifies the configuration for granting permissions to Pulsar resources. type PulsarPermissionSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file // ConnectionRef is the reference to the PulsarConnection resource + // used to connect to the Pulsar cluster for this permission. ConnectionRef corev1.LocalObjectReference `json:"connectionRef"` - // ResourceName name of the target resource which will be granted the permssions + // ResourceName is the name of the target resource (namespace or topic) + // to which the permissions will be granted. ResourceName string `json:"resourceName"` + // ResourceType indicates whether the permission is for a namespace or a topic. // +kubebuilder:validation:Enum=namespace;topic - // ResourceType indicates the resource type, the options include namespace and topic ResoureType PulsarResourceType `json:"resourceType"` - // Roles contains a list of role which will be granted the same permissions - // for the same target + + // Roles is a list of role names that will be granted the specified permissions + // for the target resource. Roles []string `json:"roles"` - // Actions contains a list of action to grant. - // the options include produce,consume,functions + + // Actions is a list of permissions to grant. + // Valid options include "produce", "consume", and "functions". + // +optional Actions []string `json:"actions,omitempty"` - // LifecyclePolicy is the policy that how to deal with pulsar resource when - // PulsarPermission is deleted + // LifecyclePolicy determines how to handle the Pulsar permissions + // when the PulsarPermission resource is deleted. // +optional LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"` } -// PulsarResourceType indicates the resource type, the options include namespace and topic +// PulsarResourceType indicates the type of Pulsar resource for which permissions can be granted. +// Currently, it supports namespace and topic level permissions. type PulsarResourceType string const ( - // PulsarResourceTypeNamespace resource type namespace + // PulsarResourceTypeNamespace represents a Pulsar namespace resource. + // Use this when granting permissions at the namespace level. PulsarResourceTypeNamespace PulsarResourceType = "namespace" - // PulsarResourceTypeTopic resource type topic + + // PulsarResourceTypeTopic represents a Pulsar topic resource. + // Use this when granting permissions at the individual topic level. PulsarResourceTypeTopic PulsarResourceType = "topic" ) -// PulsarPermissionStatus defines the observed state of PulsarPermission +// PulsarPermissionStatus defines the observed state of PulsarPermission. +// It provides information about the current status of the Pulsar permission configuration. type PulsarPermissionStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file // ObservedGeneration is the most recent generation observed for this resource. // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // This field is used to track whether the controller has processed the latest changes. // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` - // Represents the observations of a connection's current state. + // Conditions represent the latest available observations of the PulsarPermission's current state. + // It follows the Kubernetes conventions for condition types and status. + // The "Ready" condition type is typically used to indicate the overall status of the permission configuration. // +patchMergeKey=type // +patchStrategy=merge // +listType=map @@ -86,7 +100,8 @@ type PulsarPermissionStatus struct { //+kubebuilder:printcolumn:name="OBSERVED GENERATION",type=string,JSONPath=`.status.observedGeneration` //+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` -// PulsarPermission is the Schema for the pulsarpermissions API +// PulsarPermission is the Schema for the pulsarpermissions API. +// It represents a set of permissions granted to specific roles for a Pulsar resource (namespace or topic). type PulsarPermission struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -97,7 +112,8 @@ type PulsarPermission struct { //+kubebuilder:object:root=true -// PulsarPermissionList contains a list of PulsarPermission +// PulsarPermissionList contains a list of PulsarPermission resources. +// It is used by the Kubernetes API to return multiple PulsarPermission objects. type PulsarPermissionList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/pulsartenant_types.go b/api/v1alpha1/pulsartenant_types.go index 1640db38..454e4928 100644 --- a/api/v1alpha1/pulsartenant_types.go +++ b/api/v1alpha1/pulsartenant_types.go @@ -22,45 +22,66 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. -// PulsarTenantSpec defines the desired state of PulsarTenant +// PulsarTenantSpec defines the desired state of PulsarTenant. +// It corresponds to the configuration options available in Pulsar's tenant admin API. type PulsarTenantSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file // TODO make these fields immutable - // Name is the tenant name + // Name is the tenant name. + // This field is required and must be unique within the Pulsar cluster. Name string `json:"name"` // ConnectionRef is the reference to the PulsarConnection resource + // used to connect to the Pulsar cluster for this tenant. ConnectionRef corev1.LocalObjectReference `json:"connectionRef"` + // AdminRoles is a list of roles that have administrative privileges for this tenant. + // These roles can perform actions like creating namespaces, topics, and managing permissions. // +optional AdminRoles []string `json:"adminRoles,omitempty"` + // AllowedClusters is a list of clusters that this tenant is allowed to access. + // This field is optional and can be used to restrict the clusters a tenant can connect to. + // Please use `GeoReplicationRefs` instead if you are setting up geo-replication + // between multiple Pulsar instances. // +optional AllowedClusters []string `json:"allowedClusters,omitempty"` + // LifecyclePolicy determines whether to keep or delete the Pulsar tenant + // when the Kubernetes resource is deleted. // +kubebuilder:validation:Enum=CleanUpAfterDeletion;KeepAfterDeletion // +optional LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"` - // GeoReplicationRefs is the reference list to the PulsarGeoReplication resource + // GeoReplicationRefs is a list of references to PulsarGeoReplication resources, + // used to configure geo-replication for this tenant across multiple Pulsar instances. + // This is **ONLY** used when you are using PulsarGeoReplication for setting up geo-replication + // between multiple Pulsar instances. + // Please use `AllowedClusters` instead if you are allowing a tenant to be available within + // specific clusters in a same Pulsar instance. // +optional GeoReplicationRefs []*corev1.LocalObjectReference `json:"geoReplicationRefs,omitempty"` } -// PulsarTenantStatus defines the observed state of PulsarTenant +// PulsarTenantStatus defines the observed state of PulsarTenant. +// It contains information about the current state of the Pulsar tenant. type PulsarTenantStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file // ObservedGeneration is the most recent generation observed for this resource. // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // This field is used to track whether the controller has processed the latest changes. // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` - // Represents the observations of a connection's current state. + // Conditions represent the latest available observations of the PulsarTenant's current state. + // It follows the Kubernetes conventions for condition types and status. + // The "Ready" condition type is typically used to indicate the overall status of the tenant. + // Other condition types may be used to provide more detailed status information. // +patchMergeKey=type // +patchStrategy=merge // +listType=map diff --git a/api/v1alpha1/pulsartopic_types.go b/api/v1alpha1/pulsartopic_types.go index 29cef96b..29dbe431 100644 --- a/api/v1alpha1/pulsartopic_types.go +++ b/api/v1alpha1/pulsartopic_types.go @@ -25,7 +25,8 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. -// PulsarTopicSpec defines the desired state of PulsarTopic +// PulsarTopicSpec defines the desired state of PulsarTopic. +// It corresponds to the configuration options available in Pulsar's topic admin API. type PulsarTopicSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file @@ -35,75 +36,109 @@ type PulsarTopicSpec struct { // Name is the topic name Name string `json:"name"` + // Persistent determines if the topic is persistent (true) or non-persistent (false). + // Defaults to true if not specified. // +kubebuilder:default=true // +optional Persistent *bool `json:"persistent,omitempty"` + // Partitions specifies the number of partitions for a partitioned topic. + // Set to 0 for a non-partitioned topic. // +kubebuilder:default=0 // +optional Partitions *int32 `json:"partitions,omitempty"` // ConnectionRef is the reference to the PulsarConnection resource + // used to connect to the Pulsar cluster for this topic. ConnectionRef corev1.LocalObjectReference `json:"connectionRef"` + // LifecyclePolicy determines whether to keep or delete the Pulsar topic + // when the Kubernetes resource is deleted. // +kubebuilder:validation:Enum=CleanUpAfterDeletion;KeepAfterDeletion // +optional LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"` // Topic Policy Setting + + // MaxProducers sets the maximum number of producers allowed on the topic. // +optional MaxProducers *int32 `json:"maxProducers,omitempty"` + // MaxConsumers sets the maximum number of consumers allowed on the topic. // +optional MaxConsumers *int32 `json:"maxConsumers,omitempty"` - // MessageTTL indicates the message ttl for the topic + // MessageTTL specifies the Time to Live (TTL) for messages on the topic. + // Messages older than this TTL will be automatically marked as deleted. // +optional MessageTTL *utils.Duration `json:"messageTTL,omitempty"` - // Max unacked messages + // MaxUnAckedMessagesPerConsumer sets the maximum number of unacknowledged + // messages allowed for a consumer before it's blocked from receiving more messages. // +optional MaxUnAckedMessagesPerConsumer *int32 `json:"maxUnAckedMessagesPerConsumer,omitempty"` + // MaxUnAckedMessagesPerSubscription sets the maximum number of unacknowledged + // messages allowed for a subscription before it's blocked from receiving more messages. // +optional MaxUnAckedMessagesPerSubscription *int32 `json:"maxUnAckedMessagesPerSubscription,omitempty"` - // Retention - // Should set at least one of them if setting retention + // RetentionTime specifies the minimum time to retain messages on the topic. + // Should be set in conjunction with RetentionSize for effective retention policy. // Retention Quota must exceed configured backlog quota for topic // +optional RetentionTime *utils.Duration `json:"retentionTime,omitempty"` + // RetentionSize specifies the maximum size of backlog retained on the topic. + // Should be set in conjunction with RetentionTime for effective retention policy. + // Retention Quota must exceed configured backlog quota for topic // +optional RetentionSize *resource.Quantity `json:"retentionSize,omitempty"` - // Backlog - // Should set at least one of them if setting backlog + // BacklogQuotaLimitTime specifies the time limit for message backlog. + // Messages older than this limit will be removed or handled according to the retention policy. // +optional BacklogQuotaLimitTime *utils.Duration `json:"backlogQuotaLimitTime,omitempty"` + // BacklogQuotaLimitSize specifies the size limit for message backlog. + // When the limit is reached, older messages will be removed or handled according to the retention policy. // +optional BacklogQuotaLimitSize *resource.Quantity `json:"backlogQuotaLimitSize,omitempty"` + // BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. + // Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". // +optional BacklogQuotaRetentionPolicy *string `json:"backlogQuotaRetentionPolicy,omitempty"` + // SchemaInfo defines the schema for the topic, if any. // +optional SchemaInfo *SchemaInfo `json:"schemaInfo,omitempty"` - // GeoReplicationRefs is the reference list to the PulsarGeoReplication resource + // GeoReplicationRefs is a list of references to PulsarGeoReplication resources, + // used to configure geo-replication for this topic across multiple Pulsar instances. + // This is **ONLY** used when you are using PulsarGeoReplication for setting up geo-replication + // between two Pulsar instances. // +optional GeoReplicationRefs []*corev1.LocalObjectReference `json:"geoReplicationRefs,omitempty"` } -// SchemaInfo defines the Pulsar Schema. +// SchemaInfo defines the Pulsar Schema for a topic. // It is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level. type SchemaInfo struct { - // Type determines how to interpret the schema data + // Type determines how to interpret the schema data. + // Valid values include: "AVRO", "JSON", "PROTOBUF", "PROTOBUF_NATIVE", "KEY_VALUE", "BYTES", or "NONE". + // For KEY_VALUE schemas, use the format "KEY_VALUE(KeyType,ValueType)" where KeyType and ValueType + // are one of the other schema types. Type string `json:"type,omitempty"` - // Schema is schema data + + // Schema contains the actual schema definition. + // For AVRO and JSON schemas, this should be a JSON string of the schema definition. + // For PROTOBUF schemas, this should be the protobuf definition string. + // For BYTES or NONE schemas, this field can be empty. Schema string `json:"schema,omitempty"` - // Properties is a user defined properties as a string/string map + + // Properties is a map of user-defined properties associated with the schema. + // These can be used to store additional metadata about the schema. Properties map[string]string `json:"properties,omitempty"` } @@ -114,10 +149,14 @@ type PulsarTopicStatus struct { // ObservedGeneration is the most recent generation observed for this resource. // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // This field is used to track whether the controller has processed the latest changes. // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` - // Represents the observations of a connection's current state. + // Conditions represent the latest available observations of the PulsarTopic's current state. + // It follows the Kubernetes conventions for condition types and status. + // The "Ready" condition type indicates the overall status of the topic. + // The "PolicyReady" condition type indicates whether the topic policies have been successfully applied. // +patchMergeKey=type // +patchStrategy=merge // +listType=map @@ -125,7 +164,8 @@ type PulsarTopicStatus struct { // +optional Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` - // GeoReplicationEnabled + // GeoReplicationEnabled indicates whether geo-replication is enabled for this topic. + // This is set to true when GeoReplicationRefs are configured in the spec and successfully applied. // +optional GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"` } @@ -140,6 +180,8 @@ type PulsarTopicStatus struct { //+kubebuilder:printcolumn:name="POLICY_READY",type=string,JSONPath=`.status.conditions[?(@.type=="PolicyReady")].status` // PulsarTopic is the Schema for the pulsartopics API +// It represents a Pulsar topic in the Kubernetes cluster and includes both +// the desired state (Spec) and the observed state (Status) of the topic. type PulsarTopic struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -150,7 +192,8 @@ type PulsarTopic struct { //+kubebuilder:object:root=true -// PulsarTopicList contains a list of PulsarTopic +// PulsarTopicList contains a list of PulsarTopic resources. +// It is used by the Kubernetes API to return multiple PulsarTopic objects. type PulsarTopicList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 9f971246..03ae3f40 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1,17 +1,3 @@ -// Copyright 2024 StreamNative -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - //go:build !ignore_autogenerated // Code generated by controller-gen. DO NOT EDIT. diff --git a/docs/pulsar_connection.md b/docs/pulsar_connection.md index 7817dad6..8b72ead2 100644 --- a/docs/pulsar_connection.md +++ b/docs/pulsar_connection.md @@ -1,8 +1,167 @@ # PulsarConnection -## Create PulsarConnection +## Overview -1. Define a connection named `test-pulsar-connection` by using the YAML file and save the YAML file `connection.yaml`. +The `PulsarConnection` resource defines the connection details for a Pulsar cluster. It can be used to configure various connection parameters including service URLs, authentication methods, and cluster information. + +## Specifications + +| Field | Description | Required | Version | +|-------|-------------|----------|---------| +| `adminServiceURL` | The admin service URL of the Pulsar cluster (e.g., `http://cluster-broker.test.svc.cluster.local:8080`). | No | All | +| `adminServiceSecureURL` | The admin service URL for secure connection (HTTPS) to the Pulsar cluster (e.g., `https://cluster-broker.test.svc.cluster.local:443`). | No | ≥ 0.3.0 | +| `brokerServiceURL` | The broker service URL of the Pulsar cluster (e.g., `pulsar://cluster-broker.test.svc.cluster.local:6650`). | No | ≥ 0.3.0 | +| `brokerServiceSecureURL` | The broker service URL for secure connection (TLS) to the Pulsar cluster (e.g., `pulsar+ssl://cluster-broker.test.svc.cluster.local:6651`). | No | ≥ 0.3.0 | +| `clusterName` | The Pulsar cluster name. Use `pulsar-admin clusters list` to retrieve. Required for configuring Geo-Replication. | No | ≥ 0.3.0 | +| `authentication` | Authentication configuration. Required when authentication is enabled for the Pulsar cluster. Supports JWT Token and OAuth2 methods. | No | All | +| `brokerClientTrustCertsFilePath` | The file path to the trusted TLS certificate for outgoing connections to Pulsar brokers. Used for TLS verification. | No | ≥ 0.3.0 | + +Note: Fields marked with version ≥ 0.3.0 are only available in that version and above. + +## Authentication Methods + +The `authentication` field supports two methods: JWT Token and OAuth2. Each method can use either a Kubernetes Secret reference or a direct value. + +### Specification + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `token` | JWT Token authentication configuration | `ValueOrSecretRef` | No | +| `oauth2` | OAuth2 authentication configuration | `PulsarAuthenticationOAuth2` | No | + +#### ValueOrSecretRef + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `value` | Direct string value | `*string` | No | +| `secretRef` | Reference to a Kubernetes Secret | `*SecretKeyRef` | No | + +#### SecretKeyRef + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `name` | Name of the Kubernetes Secret | `string` | Yes | +| `key` | Key in the Kubernetes Secret | `string` | Yes | + +#### PulsarAuthenticationOAuth2 + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `issuerEndpoint` | URL of the OAuth2 authorization server | `string` | Yes | +| `clientID` | OAuth2 client identifier | `string` | Yes | +| `audience` | Intended recipient of the token | `string` | Yes | +| `key` | Client secret or path to JSON credentials file | `ValueOrSecretRef` | Yes | +| `scope` | Requested permissions from the OAuth2 server | `string` | No | + +Note: Only one authentication method (either `token` or `oauth2`) should be specified at a time. + +### JWT Token Authentication + +JWT Token authentication can be configured using either a direct value or a Kubernetes Secret reference. + +#### Using a direct value + +To use JWT Token authentication with a direct value, you can set the `token` field to the base64-encoded JWT token. + +```yaml +authentication: + token: + value: +``` + +#### Using a Kubernetes Secret reference + +To use JWT Token authentication with a Kubernetes Secret reference, you need to create a Kubernetes Secret containing the JWT token. The secret should have the following structure: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: +type: Opaque +stringData: + : +``` + +The `` can be any name. It will be referenced in the `PulsarConnection` resource with the `token.secretRef` field. + +```yaml +authentication: + token: + secretRef: + name: + key: +``` + +### OAuth2 Authentication + +OAuth2 authentication can be configured using either a direct value or a Kubernetes Secret reference. + +#### Using a direct value + +To use OAuth2 authentication with a direct value, you can set the `issuerEndpoint`, `clientID`, `audience`, and `key` fields to the OAuth2 configuration. + +```yaml +authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: + audience: urn:sn:pulsar:sndev:us-west + key: + value: | + { + "type":"sn_service_account", + "client_id":"", + "grant_type":"client_credentials", + "client_secret":"", + "issuer_url":"https://auth.streamnative.cloud" + } + scope: +``` + +#### Using a Kubernetes Secret reference + +To use OAuth2 authentication with a Kubernetes Secret reference, you need to create a Kubernetes Secret containing the OAuth2 configuration. The secret should have the following structure: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: +type: Opaque +stringData: + : | + { + "type":"sn_service_account", + "client_id":"", + "grant_type":"client_credentials", + "client_secret":"", + "issuer_url":"https://auth.streamnative.cloud" + } +``` + +The `` should contain the OAuth2 configuration. + +```yaml +authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: + audience: urn:sn:pulsar:sndev:us-west + key: + secretRef: + name: + key: + scope: +``` + +## PlainText Connection vs TLS Connection + +The `PulsarConnection` resource supports both plaintext and TLS connections. Plaintext connections are used for non-secure connections to the Pulsar cluster, while TLS connections are used for secure connections with TLS enabled. + +### Plaintext Connection + +To create a plaintext connection, you need to set the `adminServiceURL`, `brokerServiceURL`, and `clusterName` fields. ```yaml apiVersion: resource.streamnative.io/v1alpha1 @@ -15,126 +174,37 @@ spec: clusterName: pulsar-cluster ``` -Other `PulsarConnection` configuration examples: +### TLS Connection -* TLS connection - - ```yaml - apiVersion: resource.streamnative.io/v1alpha1 - kind: PulsarConnection - metadata: - name: pulsar-connection-tls - namespace: test - spec: - adminServiceSecureURL: https://pulsar-sn-platform-broker.test.svc.cluster.local:443 - brokerServiceSecureURL: pulsar+ssl//pulsar-sn-platform-broker.test.svc.cluster.local:6651 - clusterName: pulsar-cluster - ``` - -* JWT Token authentication with Secret - - ```yaml - apiVersion: resource.streamnative.io/v1alpha1 - kind: PulsarConnection - metadata: - name: pulsar-connection-jwt-secret - spec: - adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 - brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 - clusterName: pulsar-cluster - authentication: - token: - # Use a Kubernetes Secret to store the JWT Token. https://kubernetes.io/docs/concepts/configuration/secret/ - # Secret data field have to be base64-encoded strings. https://kubernetes.io/docs/concepts/configuration/secret/#restriction-names-data - secretRef: - name: test-pulsar-sn-platform-vault-secret-env-injection - key: brokerClientAuthenticationParameters - ``` - -* JWT Token authentication with value - - ```yaml - apiVersion: resource.streamnative.io/v1alpha1 - kind: PulsarConnection - metadata: - name: pulsar-connection-jwt-value - spec: - adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 - brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 - clusterName: pulsar-cluster - authentication: - token: - # Use the JWT Token raw data as the token value - value: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY - ``` - -* OAuth2 authentication with Secret - - ```bash - kubectl create secret generic oauth2-key-file --from-file=sndev-admin.json - ``` - - ```yaml - apiVersion: resource.streamnative.io/v1alpha1 - kind: PulsarConnection - metadata: - name: pulsar-connection-oauth2-secret - spec: - adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 - brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 - clusterName: pulsar-cluster - authentication: - oauth2: - issuerEndpoint: https://auth.streamnative.cloud - clientID: pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ - audience: urn:sn:pulsar:sndev:us-west - key: - secretRef: - name: oauth2-key-file - key: sndev-admin.json - ``` - -* OAuth2 authentication with value - - ```yaml - apiVersion: resource.streamnative.io/v1alpha1 - kind: PulsarConnection - metadata: - name: pulsar-connection-oauth2-values - spec: - adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 - brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 - clusterName: pulsar-cluster - authentication: - oauth2: - issuerEndpoint: https://auth.streamnative.cloud - clientID: pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ - audience: urn:sn:pulsar:sndev:us-west - # Use the keyFile contents as the oauth2 key value - key: - value: | - { - "type":"sn_service_account", - "client_id":"pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ", - "grant_type":"client_credentials", - "client_secret":"zZr_adLu4LuPrN5FwYWH7was07-23nlzBgK50l_Rfsl2hjzUXKHsbKt", - "issuer_url":"https://auth.streamnative.cloud" - } - ``` - -This table lists specifications available for the `PulsarConnection` resource. +To create a TLS connection, you need to set the `adminServiceSecureURL`, `brokerServiceSecureURL`, and `clusterName` fields. -| Option | Description | Required or not | -| ---| --- |--- | -| `adminServiceURL` | The admin service URL of the Pulsar cluster, such as `http://cluster-broker.test.svc.cluster.local:8080`. | No | -| `authentication` | A secret that stores authentication configurations. This option is required when you enable authentication for your Pulsar cluster. Support JWT Token and OAuth2 authentication methods. | No | -| `brokerServiceURL` | The broker service URL of the Pulsar cluster, such as `pulsar://cluster-broker.test.svc.cluster.local:6650`. This option is required for configuring Geo-replication. This option is available for version `0.3.0` or above. | No | -| `brokerServiceSecureURL` | The broker service URL for secure connection to the Pulsar cluster, such as `pulsar+ssl://cluster-broker.test.svc.cluster.local:6651`. This option is required for configuring Geo-replication when TLS is enabled. This option is available for version `0.3.0` or above. | No | -| `adminServiceSecureURL` | The admin service URL for secure connection to the Pulsar cluster, such as `https://cluster-broker.test.svc.cluster.local:443`. This option is available for version `0.3.0` or above. | No | -| `clusterName` | The Pulsar cluster name. You can use the `pulsar-admin clusters list` command to get the Pulsar cluster name. This option is required for configuring Geo-replication. Provided from `0.3.0` | No | - +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection-tls + namespace: test +spec: + adminServiceSecureURL: https://pulsar-sn-platform-broker.test.svc.cluster.local:443 + brokerServiceSecureURL: pulsar+ssl://pulsar-sn-platform-broker.test.svc.cluster.local:6651 +``` + +## Create A Pulsar Connection -1. Apply the YAML file to create the Pulsar Connection. +1. Create a YAML file for the Pulsar Connection. + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster +``` + +2. Apply the YAML file to create the Pulsar Connection. ```shell kubectl apply -f connection.yaml @@ -151,7 +221,7 @@ NAME ADMIN_SERVICE_URL test-pulsar-connection http://ok-sn-platform-broker.test.svc.cluster.local:8080 1 1 True ``` -## Update PulsarConnection +## Update A Pulsar Connection You can update the connection by editing the connection.yaml, then apply it again. For example, if pulsar cluster doesn’t setup the authentication, then you don’t need the authentication part in the spec @@ -169,10 +239,135 @@ spec: kubectl apply -f connection.yaml ``` -## Delete PulsarConnection +## Delete A Pulsar Connection ```shell kubectl -n test delete pulsarconnection.resource.streamnative.io test-pulsar-connection ``` Please be noticed, because the Pulsar Resources Operator are using the connection to manage pulsar resources, If you delete the pulsar connection, it will only be deleted after the resources CRs are deleted + + +## More PulsarConnection Examples + +### PlainText Connection + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster +``` + +### TLS Connection + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection-tls + namespace: test +spec: + adminServiceSecureURL: https://pulsar-sn-platform-broker.test.svc.cluster.local:443 + brokerServiceSecureURL: pulsar+ssl//pulsar-sn-platform-broker.test.svc.cluster.local:6651 + clusterName: pulsar-cluster +``` + +### PlainText Connection with JWT Token Authentication with Secret + +```bash +kubectl create secret generic test-pulsar-sn-platform-vault-secret-env-injection --from-literal=brokerClientAuthenticationParameters=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY +``` + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection-jwt-secret +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + token: + # Use a Kubernetes Secret to store the JWT Token. https://kubernetes.io/docs/concepts/configuration/secret/ + # Secret data field have to be base64-encoded strings. https://kubernetes.io/docs/concepts/configuration/secret/#restriction-names-data + secretRef: + name: test-pulsar-sn-platform-vault-secret-env-injection + key: brokerClientAuthenticationParameters +``` + +### PlainText Connection with JWT Token Authentication with value + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection-jwt-value +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + token: + # Use the JWT Token raw data as the token value + value: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY +``` + +### PlainText Connection with OAuth2 Authentication with Secret + +```bash +kubectl create secret generic oauth2-key-file --from-file=sndev-admin.json +``` + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection-oauth2-secret +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ + audience: urn:sn:pulsar:sndev:us-west + key: + secretRef: + name: oauth2-key-file + key: sndev-admin.json +``` + +### PlainText Connection with OAuth2 Authentication with value + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection-oauth2-values +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ + audience: urn:sn:pulsar:sndev:us-west + # Use the keyFile contents as the oauth2 key value + key: + value: | + { + "type":"sn_service_account", + "client_id":"pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ", + "grant_type":"client_credentials", + "client_secret":"zZr_adLu4LuPrN5FwYWH7was07-23nlzBgK50l_Rfsl2hjzUXKHsbKt", + "issuer_url":"https://auth.streamnative.cloud" + } +``` \ No newline at end of file diff --git a/docs/pulsar_geo_replication.md b/docs/pulsar_geo_replication.md index 4b31c40e..8f1d3fa5 100644 --- a/docs/pulsar_geo_replication.md +++ b/docs/pulsar_geo_replication.md @@ -1,12 +1,70 @@ # PulsarGeoReplication -## Create PulsarGeoReplication +## Overview -The PulsarGeoReplication is a unidirectional setup. When you create a PulsarGeoReplication for `us-east` only, data will be replicated from `us-east` to `us-west`. If you need to replicate data between clusters `us-east` and `us-west`, you need to create PulsarGeoReplication for both `us-east` and `us-west`. +PulsarGeoReplication is a custom resource that enables the configuration of geo-replication between different Pulsar instances. It allows you to set up unidirectional replication of data from one Pulsar cluster to another, even when these clusters are geographically distributed or in separate Pulsar instances. + +Key points about PulsarGeoReplication: + +1. It's used for configuring replication between separate Pulsar instances. +2. The replication is unidirectional. To set up bidirectional replication, you need to create two PulsarGeoReplication resources, one for each direction. +3. It creates a new cluster in the destination Pulsar instance for each PulsarGeoReplication resource. +4. It's different from configuring geo-replication between clusters within a single Pulsar instance. For that purpose, use the `replicationClusters` field in the `PulsarNamespace` resource instead. + +PulsarGeoReplication is particularly useful for scenarios where you need to replicate data across different Pulsar deployments, such as disaster recovery, data locality, or compliance with data residency requirements. + +## Warning + +This resource should be used specifically for configuring geo-replication between clusters in different Pulsar instances. If you're looking to set up geo-replication between clusters within the same Pulsar instance, you should use the `replicationClusters` field in the `PulsarNamespace` resource instead. + +Using PulsarGeoReplication for clusters within the same Pulsar instance may lead to unexpected behavior and is not the intended use case for this resource. Always ensure you're dealing with separate Pulsar instances before utilizing PulsarGeoReplication. + +## Specifications + +The `PulsarGeoReplication` resource has the following specifications: + +| Field | Description | Required | +|-------|-------------|----------| +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the source Pulsar cluster. | Yes | +| `destinationConnectionRef` | Reference to the PulsarConnection resource used to connect to the destination Pulsar cluster. | Yes | +| `lifecyclePolicy` | Determines whether to keep or delete the geo-replication configuration when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No | + +The `PulsarGeoReplication` resource is designed to configure geo-replication between separate Pulsar instances. It creates a new "Cluster" in the destination Pulsar cluster identified by `destinationConnectionRef`. This setup allows configuring the replication of data from the source cluster (identified by `connectionRef`) to the destination cluster. By establishing this connection, the brokers in the source cluster can communicate with and replicate data to the brokers in the destination cluster, enabling geo-replication between the two separate Pulsar instances. + +### Deletion Behavior + +The `lifecyclePolicy` field not only affects the geo-replication configuration but also determines how the Destination Cluster is handled in the source cluster when the PulsarGeoReplication resource is deleted: + +- `CleanUpAfterDeletion` (default): When the PulsarGeoReplication resource is deleted, the operator will remove the Destination Cluster configuration from the source cluster. This means that the cluster entry created in the source cluster for the destination will be deleted using the Pulsar admin API's [DELETE /admin/v2/clusters/{cluster}](https://pulsar.apache.org/admin-rest-api/?apiversion=v2#operation/ClustersBase_deleteCluster) endpoint, effectively removing all traces of the geo-replication setup. + +- `KeepAfterDeletion`: If this policy is set, the Destination Cluster configuration will remain in the source cluster even after the PulsarGeoReplication resource is deleted. This can be useful if you want to temporarily remove the Kubernetes resource while maintaining the ability to quickly re-establish the geo-replication later. The cluster configuration can be viewed using the Pulsar admin API's [GET /admin/v2/clusters/{cluster}](https://pulsar.apache.org/admin-rest-api/?apiversion=v2#operation/ClustersBase_getCluster) endpoint. + +It's important to note that this deletion behavior applies to the cluster configuration in the source Pulsar instance. The actual destination Pulsar instance and its data are not affected by this deletion process. The operator only manages the configuration that enables communication between the two Pulsar instances for geo-replication purposes, which is set up using the Pulsar admin API's [PUT /admin/v2/clusters/{cluster}](https://pulsar.apache.org/admin-rest-api/?apiversion=v2#operation/ClustersBase_updateCluster) endpoint when the PulsarGeoReplication resource is created. + +### Connection References + +Both `connectionRef` and `destinationConnectionRef` are of type `corev1.LocalObjectReference`, which means they should reference existing PulsarConnection resources in the same namespace. For detailed information on how to create and manage PulsarConnection resources, please refer to the [PulsarConnection documentation](pulsar_connection.md). + +Note: When configuring geo-replication between `connectionRef` and `destinationConnectionRef`, it is important to ensure: + +1. The brokers in the `connectionRef` cluster are able to communicate with the `destinationConnectionRef` cluster, and the `destinationConnectionRef` cluster is able to authenticate the connections from the `connectionRef` cluster. + +### Lifecycle Policy +The `lifecyclePolicy` field determines what happens to the geo-replication configuration when the Kubernetes PulsarGeoReplication resource is deleted: -The Pulsar Resource Operator will create a new cluster named `clusterName` in the destination connection for each PulsarGeoReplication. +- `CleanUpAfterDeletion` (default): The geo-replication configuration will be removed from both Pulsar clusters when the Kubernetes resource is deleted. +- `KeepAfterDeletion`: The geo-replication configuration will remain in both Pulsar clusters even after the Kubernetes resource is deleted. +For more information about lifecycle policies, refer to the [PulsarResourceLifeCyclePolicy documentation](pulsar_resource_lifecycle.md). + +For more detailed information about geo-replication in Pulsar, refer to the [Pulsar Geo-replication documentation](https://pulsar.apache.org/docs/administration-geo/). + +## Create PulsarGeoReplication + +The PulsarGeoReplication is a unidirectional setup. When you create a PulsarGeoReplication for `us-east` only, data will be replicated from `us-east` to `us-west`. If you need to replicate data between clusters `us-east` and `us-west`, you need to create PulsarGeoReplication for both `us-east` and `us-west`. + +The Pulsar Resource Operator will create a new cluster in the source connection for each PulsarGeoReplication. This new cluster represents the destination cluster and is created using the information from the PulsarConnection resource referenced by `destinationConnectionRef`. The name of this cluster will be derived from the destination cluster's name, which is specified in the `clusterName` field of the destination PulsarConnection. This setup allows the source cluster to recognize and replicate data to the destination cluster, enabling geo-replication between the two separate Pulsar instances. 1. Define a Geo-replication named `us-east-geo` and save the YAML file as `us-east-geo.yaml`. @@ -24,16 +82,25 @@ spec: lifecyclePolicy: CleanUpAfterDeletion ``` -This table lists specifications available for the `PulsarGeoReplication ` resource. +2. Create the PulsarGeoReplication resource. -| Option | Description | Required or not | -| ---| --- |--- | -| `connectionRef` | The reference to a PulsarConnection. | Yes | -| `destinationConnectionRef` | The reference to a destination PulsarConnection. | Yes | -| `lifecyclePolicy` | The resource lifecycle policy. Available options are `CleanUpAfterDeletion` and `KeepAfterDeletion`. By default, it is set to `CleanUpAfterDeletion`. | Optional | +```bash +kubectl apply -f us-east-geo.yaml +``` + +3. Verify that the PulsarGeoReplication resource is created successfully. + +```bash +kubectl get pulsargeoreplication us-east-geo -n us-east +``` +4. Verify that the new cluster is created in the source Pulsar instance. + +```bash +pulsar-admin clusters list --url http://:8080 +``` -## How to configure Geo-replication +## Tutorial: How to configure Geo-replication This section describes how to configure Geo-replication between clusters `us-east-sn-platform` and `us-west-sn-platform` in different namespaces of the same Kubernetes cluster. @@ -45,10 +112,12 @@ graph TD; ``` ### Prerequisites -1. Deploy two pulsar clusters. + +1. Deploy two separate pulsar clusters, each in a different namespace. Each cluster has its own configuration store. 2. Ensure that both clusters can access each other. -### Update the existing PulsarConnection +### Create a source PulsarConnection for `us-east` + Add the Pulsar cluster `us-east-sn-platform` information through the `clusterName` and `brokerServiceURL` fields to the existing PulsarConnection. ```yaml @@ -65,19 +134,16 @@ spec: brokerServiceURL: pulsar://us-east-sn-platform-broker.us-east.svc.cluster.local:6650 ``` +### Create a destination PulsarConnection for `us-west` - - -### Create a destination PulsarConnection -The destination PulsarConnection has the information of the Pulsar cluster`us-west-sn-platform`. -Add the Pulsar cluster `us-west-sn-platform` information through the `clusterName` and `brokerServiceURL` fields to the destination PulsarConnection. +The destination PulsarConnection has the information of the Pulsar cluster`us-west-sn-platform`. Add the Pulsar cluster `us-west-sn-platform` information through the `clusterName` and `brokerServiceURL` fields to the destination PulsarConnection. ```yaml apiVersion: resource.streamnative.io/v1alpha1 kind: PulsarConnection metadata: - name: us-east-dest-connection - namespace: us-east + name: us-west-dest-connection + namespace: us-west spec: # The destination us-west cluster name clusterName: us-west-sn-platform @@ -87,9 +153,11 @@ spec: ``` #### Use tls connection + When you want to use tls to connect remote cluster, you need to do some extra steps. -1. For a selfsigning cert, you need to create a secret to store the cert file of connecting the remote broker. +1. For a selfsigning cert, you need to create a secret to store the cert file of connecting the `us-west` brokers. + ```yaml apiVersion: v1 data: @@ -100,7 +168,8 @@ metadata: namespace: us-esat type: Opaque ``` -2. Mount the secret to pulsarbroker by adding these line to the `pulsarbroker.spec.pod.secretRefs`. The mount path will be used in pulsar connection. + +2. Mount the secret to `us-west` pulsarbroker by adding these line to the `pulsarbroker.spec.pod.secretRefs`. The mount path will be used in `us-west` pulsar connection. ```yaml spec: pod: @@ -108,12 +177,13 @@ spec: - mountPath: /etc/tls/us-west secretName: us-west-tls-broker ``` + 3. Add `adminServiceSecureURL` and `brokerServiceSecureURL` to the destination connection ```yaml apiVersion: resource.streamnative.io/v1alpha1 kind: PulsarConnection metadata: - name: us-east-dest-connection + name: us-east-to-west-connection namespace: us-east spec: # The destination us-west cluster name @@ -129,15 +199,15 @@ spec: brokerClientTrustCertsFilePath: /etc/tls/us-west/ca.crt # Optional. The cert path is the mountPath in the above step if you are using selfsigning cert. ``` - ### Create a PulsarGeoReplication -This section enabled Geo-replication on `us-east`, which replicates data from `us-east` to `us-west`. The operator will create a new cluster called `us-west-sn-platform`. + +This section enabled Geo-replication on `us-east`, which replicates data from `us-east` to `us-west`. The operator will create a new cluster entry called `us-west-sn-platform` in `us-east` cluster. ```yaml apiVersion: resource.streamnative.io/v1alpha1 kind: PulsarGeoReplication metadata: - name: us-east-geo-replication + name: us-east-to-west-geo-replication namespace: us-east spec: # The local us-east cluster connection @@ -145,7 +215,7 @@ spec: name: us-east-local-connection # The destination us-west cluster connection destinationConnectionRef: - name: us-east-dest-connection + name: us-east-to-west-connection lifecyclePolicy: CleanUpAfterDeletion ``` @@ -164,7 +234,7 @@ spec: connectionRef: name: us-east-local-connection geoReplicationRefs: - - name: us-east-geo-replication + - name: us-east-to-west-geo-replication lifecyclePolicy: CleanUpAfterDeletion ``` @@ -187,13 +257,13 @@ spec: connectionRef: name: us-east-local-connection geoReplicationRefs: - - name: us-east-geo-replication + - name: us-east-to-west-geo-replication lifecyclePolicy: CleanUpAfterDeletion ``` ### Enable Geo-replication at the topic level -You can create a new topic or update an existing topic by adding the field `geoReplicationRefs`. It will add the topic to `cluster2-sn-platform`. +You can create a new topic or update an existing topic by adding the field `geoReplicationRefs`. It will add the topic to `us-west-sn-platform`. ```yaml apiVersion: resource.streamnative.io/v1alpha1 @@ -207,7 +277,7 @@ spec: connectionRef: name: us-east-local-connection geoReplicationRefs: - - name: us-east-geo-replication + - name: us-east-to-west-geo-replication lifecyclePolicy: CleanUpAfterDeletion ``` @@ -215,4 +285,4 @@ spec: After the resources are ready, you can test Geo-replication by producing and consuming messages. - Open a terminal and run the command `./bin/pulsar-client produce geo-test/geo-namespace/geo-topic -m "hello" -n 10` to produce messages to `us-east`. -- Open another terminal and run the command `./bin/pulsar-client consume geo-test/geo-namespace/geo-topic -s sub -n 0` to consume messages from `ue-west`. \ No newline at end of file +- Open another terminal and run the command `./bin/pulsar-client consume geo-test/geo-namespace/geo-topic -s sub -n 0` to consume messages from `us-west`. \ No newline at end of file diff --git a/docs/pulsar_namespace.md b/docs/pulsar_namespace.md index 91cf90e7..2e991faa 100644 --- a/docs/pulsar_namespace.md +++ b/docs/pulsar_namespace.md @@ -1,6 +1,51 @@ # PulsarNamespace -## Create PulsarNamespace +## Overview + +The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allows you to configure various namespace-level settings such as backlog quotas, message TTL, and replication. + +## Specifications + +| Field | Description | Required | +|-------|-------------|----------| +| `name` | The fully qualified namespace name in the format "tenant/namespace". | Yes | +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this namespace. | Yes | +| `bundles` | Number of bundles to split the namespace into. This affects how the namespace is distributed across the cluster. | No | +| `lifecyclePolicy` | Determines whether to keep or delete the Pulsar namespace when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. | No | +| `maxProducersPerTopic` | Maximum number of producers allowed on a single topic in the namespace. | No | +| `maxConsumersPerTopic` | Maximum number of consumers allowed on a single topic in the namespace. | No | +| `maxConsumersPerSubscription` | Maximum number of consumers allowed on a single subscription in the namespace. | No | +| `messageTTL` | Time to Live (TTL) for messages in the namespace. Messages older than this TTL will be automatically marked as consumed. | No | +| `retentionTime` | Minimum time to retain messages in the namespace. Should be set in conjunction with RetentionSize for effective retention policy. | No | +| `retentionSize` | Maximum size of backlog retained in the namespace. Should be set in conjunction with RetentionTime for effective retention policy. | No | +| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No | +| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No | +| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No | +| `backlogQuotaType` | Controls how the backlog quota is enforced. Options: "destination_storage" (limits backlog by size in bytes), "message_age" (limits by time). | No | +| `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to configure geo-replication for this namespace. Use only when using PulsarGeoReplication for setting up geo-replication between two Pulsar instances. | No | +| `replicationClusters` | List of clusters to which the namespace is replicated. Use only if replicating clusters within the same Pulsar instance. | No | + +Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks). + +## replicationClusters vs geoReplicationRefs + +The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar namespace: + +1. `replicationClusters`: + - Use this when replicating data between clusters within the same Pulsar instance. + - It's a simple list of cluster names to which the namespace should be replicated. + - This is suitable for scenarios where all clusters are managed by the same Pulsar instance and have direct connectivity. + - Example use case: Replicating data between regions within a single Pulsar instance. + +2. `geoReplicationRefs`: + - Use this when setting up geo-replication between separate Pulsar instances. + - It references PulsarGeoReplication resources, which contain more detailed configuration for connecting to external Pulsar clusters. + - This is appropriate for scenarios involving separate Pulsar deployments, possibly in different data centers or cloud providers. + - Example use case: Replicating data between two independent Pulsar instancesin different geographical locations. + +Choose `replicationClusters` for simpler, intra-instance replication, and `geoReplicationRefs` for more complex, inter-instance geo-replication scenarios. These fields are mutually exclusive; use only one depending on your replication requirements. + +## Create A Pulsar Namespace 1. Define a namespace named `test-tenant/testns` by using the YAML file and save the YAML file `namespace.yaml`. ```yaml @@ -27,28 +72,7 @@ spec: # lifecyclePolicy: CleanUpAfterDeletion ``` -This table lists specifications available for the `PulsarNamespace` resource. - -| Option | Description | Required or not | -| ---|-------------------------------------------------------------------------------------------------------------------------------------------------------|--- | -| `name` | The namespace name. | Yes | -| `connectionRef` | The reference to a PulsarConnection. | Yes | -| `backlogQuotaLimitTime` | The Backlog quota time limit (valid time units are "s", "m", "h", "d", "w"). | Optional | -| `backlogQuotaLimitSize` | The Backlog quota size limit (such as 10Mi, 10Gi). | Optional | -| `backlogQuotaRetentionPolicy` | The Retention policy to be enforced when the limit is reached. options: producer_exception,producer_request_hold,consumer_backlog_eviction | Optional | -| `messageTTL` | The TTL time duration of messages(valid time units are "s", "m", "h", "d", "w"). | Optional | -| `bundles` | The number of activated bundles. By default, it is set to 4. It couldn’t be updated after namespace is created | Optional | -| `maxProducersPerTopic` | The maximum number of producers per topic for a namespace. | Optional | -| `maxConsumersPerTopic` | The maximum number of consumers per topic for a namespace. | Optional | -| `maxConsumersPerSubscription` | The maximum number of consumers per subscription for a namespace. | Optional | -| `retentionTime` | The retention time (valid time units are "s", "m", "h", "d", "w"). | Optional | -| `retentionSize` | The retention size limit(such as 800Mi, 10Gi). | Optional | -| `lifecyclePolicy` | The resource lifecycle policy. Available options are `CleanUpAfterDeletion` and `KeepAfterDeletion`. By default, it is set to `CleanUpAfterDeletion`. | Optional | -| `geoReplicationRefs` | The reference list of the PulsarGeoReplication. Enable Geo-replication at the namespace level. It will add the namespace to the clusters. | No | -| `replicationClusters` | The list of clusters to which the namespace replicates messages. | Optional | - - -1. Apply the YAML file to create the namespace. +2. Apply the YAML file to create the namespace. ```shell kubectl apply -f namespace.yaml @@ -65,14 +89,37 @@ NAME RESOURCE_NAME GENERATION OBSERVED_GENERATION test-pulsar-namespace test-tenant/testns 1 1 True ``` -## Update PulsarNamespace +## Update A Pulsar Namespace + +You can update the namespace policies by editing the `namespace.yaml` file and then applying it again using `kubectl apply -f namespace.yaml`. This allows you to modify various settings of the Pulsar namespace. + +Please note the following important points: + +1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace. + +2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, and `retentionSize` can be modified. + +3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications: + + - If the new PulsarConnection refers to the same Pulsar cluster (i.e., the admin and broker URLs are the same), the namespace will remain in its original location. The operator will simply use the new connection details to manage the existing namespace. + + - If the new PulsarConnection points to a different Pulsar cluster (i.e., different admin and broker URLs), the operator will attempt to create a new namespace with the same configuration in the new cluster. The original namespace in the old cluster will not be automatically deleted. + + Be cautious when changing the `connectionRef`, especially if it points to a new cluster, as this can lead to namespace duplication across clusters. Always verify the intended behavior and manage any cleanup of the old namespace if necessary. + +4. Changes to `lifecyclePolicy` will only affect what happens when the PulsarNamespace resource is deleted, not the current state of the namespace. + +5. After applying changes, you can check the status of the update using: + ```shell + kubectl -n test get pulsarnamespace.resource.streamnative.io test-pulsar-namespace + ``` + The `OBSERVED_GENERATION` should increment, and `READY` should become `True` when the update is complete. -You can update the namespace policies by editing the namespace.yaml, then apply if again. +6. Be cautious when updating namespace policies, as changes may affect existing producers and consumers. It's recommended to test changes in a non-production environment first. -Please be noticed: -1. The field `name` and `bundles` couldn’t be updated. +## Delete A Pulsar Namespace -## Delete PulsarNamespace +To delete a PulsarNamespace resource, use the following kubectl command: ```shell kubectl -n test delete pulsarnamespace.resource.streamnative.io test-pulsar-namespace diff --git a/docs/pulsar_permission.md b/docs/pulsar_permission.md index f988e1e0..3f51eaf3 100644 --- a/docs/pulsar_permission.md +++ b/docs/pulsar_permission.md @@ -1,6 +1,54 @@ # PulsarPermission -## Create PulsarPermission +## Overview + +The `PulsarPermission` resource is a custom resource in the Pulsar Resources Operator that allows you to manage access control for Pulsar resources declaratively using Kubernetes. It provides a way to grant or revoke permissions for specific roles on Pulsar resources such as namespaces and topics. + +With `PulsarPermission`, you can: + +1. Define fine-grained access control policies for your Pulsar resources. +2. Specify which roles have permissions to perform certain actions (like produce, consume, or manage functions) on specific namespaces or topics. +3. Manage permissions across your entire Pulsar cluster using Kubernetes-native tools and workflows. +4. Automate the process of granting and revoking permissions as part of your CI/CD pipeline or GitOps practices. + +This resource is particularly useful for maintaining consistent and auditable access control across complex Pulsar deployments, ensuring that the right roles have the appropriate levels of access to the right resources. + +## Specifications + +The `PulsarPermission` resource has the following specifications: + +| Field | Description | Required | +|-------|-------------|----------| +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster. | Yes | +| `resourceType` | The type of Pulsar resource to which the permission applies. Can be either "namespace" or "topic". | Yes | +| `resourceName` | The name of the Pulsar resource. For namespaces, use the format "tenant/namespace". For topics, use the full topic name including persistence and namespace, e.g., "persistent://tenant/namespace/topic". | Yes | +| `roles` | A list of roles to which the permissions will be granted. | Yes | +| `actions` | A list of actions to be permitted. Can include "produce", "consume", "functions", "sinks", "sources", and "packages". | No | +| `lifecyclePolicy` | Determines whether to keep or delete the Pulsar permissions when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No | + +### Actions + +The `actions` field can include one or more of the following (for more details, see the [Pulsar documentation on authorization and ACLs](https://pulsar.apache.org/docs/security-authorization/)): + +- `produce`: Allows the role to produce messages to the specified resource. +- `consume`: Allows the role to consume messages from the specified resource. +- `functions`: Allows the role to manage Pulsar Functions on the specified resource. +- `sinks`: Allows the role to manage Pulsar IO Sinks on the specified resource. +- `sources`: Allows the role to manage Pulsar IO Sources on the specified resource. +- `packages`: Allows the role to manage Pulsar Packages on the specified resource. + +If the `actions` field is omitted, no specific actions will be granted, effectively revoking all permissions for the specified roles on the resource. + +### Lifecycle Policy + +The `lifecyclePolicy` field determines what happens to the Pulsar permissions when the Kubernetes `PulsarPermission` resource is deleted: + +- `CleanUpAfterDeletion` (default): The permissions will be removed from the Pulsar cluster when the Kubernetes resource is deleted. +- `KeepAfterDeletion`: The permissions will remain in the Pulsar cluster even after the Kubernetes resource is deleted. + +For more information about lifecycle policies, refer to the [PulsarResourceLifeCyclePolicy documentation](pulsar_resource_lifecycle.md). + +## Create a Pulsar Permission 1. Define a permission by using the YAML file and save the YAML file `permission.yaml`.t This example grants the `ironman` with `consume`, `produce`, `functions`, and `sink` permissions on the namespace `test-tenant/testns`. @@ -25,16 +73,6 @@ spec: # lifecyclePolicy: CleanUpAfterDeletion ``` -This table lists specifications available for the `PulsarPermission` resource. -| Option | Description | Required or not | -| ---| --- |--- | -| `connectionRef` | The reference to a PulsarConnection. | Yes | -| `resourceType` | The type of the target resource which will be granted the permissions.. It can be at the namespace or topic level. | Yes | -| `resourceName` | The name of the target resource which will be granted the permissions. | Yes | -| `roles` | The list of roles which will be granted with the same permissions. | Yes -| `actions` | The list of actions to be granted. The options include `produce`,`consume`,`functions`, `sinks`, `sources`, `packages`. | Optional | -| `lifecyclePolicy` | The resource lifecycle policy. Available options are `CleanUpAfterDeletion` and `KeepAfterDeletion`. By default, it is set to `CleanUpAfterDeletion`. | Optional | - 2. Apply the YAML file to create the permission. ```shell @@ -52,9 +90,35 @@ NAME RESOURCE NAME RESOURCE TYPE ROLES pulsarpermission-sample-topic-error test-tenant/testn1 namespace ["ironman"] ["consume","produce","functions","sinks","sources"] 2 2 True ``` -## Update PulsarPermission +## Update A Pulsar Permission + +You can update the permission by editing the `permission.yaml` file and then applying it again using `kubectl apply -f permission.yaml`. This allows you to modify various settings of the Pulsar permission. + +Important notes about side effects of updating a Pulsar permission: + +1. Changing permissions can immediately affect access to Pulsar resources. Be cautious when modifying permissions in production environments. + +2. Removing permissions may disrupt ongoing operations for affected roles. Ensure all stakeholders are aware of permission changes. + +3. Adding new permissions doesn't automatically grant access to existing data. Users may need to reconnect or refresh their sessions to utilize new permissions. + +4. Modifying the `resourceType` or `resourceName` effectively creates a new permission set rather than updating the existing one. The old permissions will remain unless explicitly removed. + +5. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications: -You can update the permission roles or actions by editing the permission.yaml, then apply if again. For example, add action `sources`. + - If the new PulsarConnection refers to the same Pulsar cluster (i.e., the admin and broker URLs are the same), the permissions will remain in their original location. The operator will simply use the new connection details to manage the existing permissions. + + - If the new PulsarConnection points to a different Pulsar cluster (i.e., different admin and broker URLs), the operator will attempt to create new permissions with the same configuration in the new cluster. The original permissions in the old cluster will not be automatically deleted. + + Be cautious when changing the `connectionRef`, especially if it points to a new cluster, as this can lead to permission duplication across clusters. Always verify the intended behavior and manage any cleanup of the old permissions if necessary. + +6. Updating `lifecyclePolicy` only affects future deletion behavior, not the current state of the permission. For more detailed information about the lifecycle policies and their implications, please refer to the [PulsarResourceLifeCyclePolicy documentation](pulsar_resource_lifecycle.md). + +7. If you're using role-based access control (RBAC) in conjunction with these permissions, ensure that changes here don't conflict with RBAC policies. + +Here's an example of how to update a permission: + +1. Edit the `permission.yaml` file: ```yaml apiVersion: resource.streamnative.io/v1alpha1 @@ -62,32 +126,43 @@ kind: PulsarPermission metadata: name: test-pulsar-namespace-permission namespace: test -spec: +spec: connectionRef: name: "test-pulsar-connection" resourceType: namespace resourceName: test-tenant/testn1 - # lifecyclePolicy: CleanUpAfterDeletion roles: - ironman actions: - - consume - - produce - - functions - - sinks - - sources -# lifecyclePolicy: CleanUpAfterDeletion + - consume # add consume action + - produce # add produce action + - functions # add functions action + - sinks # add sinks action + - sources # add sources action + - packages # add packages action ``` - + +2. Apply the updated YAML file: + ```shell kubectl apply -f permission.yaml ``` +3. Check the resource status: + +```shell +kubectl -n test get pulsarpermission.resource.streamnative.io +``` + +```shell +NAME RESOURCE NAME RESOURCE TYPE ROLES ACTIONS GENERATION OBSERVED GENERATION READY +pulsarpermission-sample-topic-error test-tenant/testn1 namespace ["ironman"] ["consume","produce","functions","sinks","sources"] 2 2 True +``` -## Delete PulsarPermission +## Delete a Pulsar Permission ``` kubectl -n test delete pulsarpermission.resource.streamnative.io test-pulsar-permission ``` -Please be noticed, when you delete the permission, the real permission will still exist if the `lifecyclePolicy` is `KeepAfterDeletion` +Please be noticed, when you delete the permission, the real permission will still exist if the `lifecyclePolicy` is `KeepAfterDeletion`. \ No newline at end of file diff --git a/docs/pulsar_resource_lifecycle.md b/docs/pulsar_resource_lifecycle.md new file mode 100644 index 00000000..defce3d1 --- /dev/null +++ b/docs/pulsar_resource_lifecycle.md @@ -0,0 +1,130 @@ +# PulsarResourceLifeCyclePolicy + +## Overview + +The `PulsarResourceLifeCyclePolicy` is a configuration option that determines the behavior of Pulsar resources when their corresponding Kubernetes custom resources are deleted. This policy helps manage the lifecycle of Pulsar resources in relation to their Kubernetes representations. + +## Available Options + +The `PulsarResourceLifeCyclePolicy` can be set to one of two values: + +1. `CleanUpAfterDeletion` +2. `KeepAfterDeletion` + +### CleanUpAfterDeletion + +When set to `CleanUpAfterDeletion`, the Pulsar resource (such as a tenant, namespace, or topic) will be deleted from the Pulsar cluster when its corresponding Kubernetes custom resource is deleted. + +Example: + +```yaml +apiVersion: pulsar.streamnative.io/v1alpha1 +kind: PulsarTenant +metadata: + name: my-tenant +spec: + lifecyclePolicy: CleanUpAfterDeletion + <...> +``` + +In this example, when the Kubernetes custom resource for the Pulsar tenant is deleted, the corresponding Pulsar tenant will be deleted from the Pulsar cluster. + +### KeepAfterDeletion + +When set to `KeepAfterDeletion`, the Pulsar resource will not be deleted from the Pulsar cluster when its corresponding Kubernetes custom resource is deleted. The resource will remain in the Pulsar cluster after the Kubernetes custom resource is deleted. + +Example: + +```yaml +apiVersion: pulsar.streamnative.io/v1alpha1 +kind: PulsarNamespace +metadata: + name: my-namespace +spec: + lifecyclePolicy: KeepAfterDeletion + <...> +``` + +In this example, when the Kubernetes custom resource for the Pulsar namespace is deleted, the corresponding Pulsar namespace will not be deleted from the Pulsar cluster. The namespace will remain in the Pulsar cluster after the Kubernetes custom resource is deleted. + +## Default Policy + +The default policy for Pulsar resources is `CleanUpAfterDeletion`. This means that the Pulsar resource will be deleted from the Pulsar cluster when its corresponding Kubernetes custom resource is deleted. + +## Deleting the Actual Pulsar Resource + +When you need to delete the actual Pulsar resource (tenant, namespace, or topic) from the Pulsar cluster, regardless of the `lifecyclePolicy` setting, you can follow these steps: + +1. **For resources with `CleanUpAfterDeletion` policy:** + Simply delete the Kubernetes custom resource, and the corresponding Pulsar resource will be automatically deleted from the Pulsar cluster. + + ```shell + kubectl delete pulsartenant my-tenant + ``` + +2. **For resources with `KeepAfterDeletion` policy:** + a. First, update the custom resource to change the policy to `CleanUpAfterDeletion`: + + ```yaml + apiVersion: pulsar.streamnative.io/v1alpha1 + kind: PulsarTenant + metadata: + name: my-tenant + spec: + lifecyclePolicy: CleanUpAfterDeletion + # ... other fields ... + ``` + + Apply the updated resource: + + ```shell + kubectl apply -f updated-tenant.yaml + ``` + + b. Then, delete the Kubernetes custom resource: + + ```shell + kubectl delete pulsartenant my-tenant + ``` + + This two-step process ensures that the Pulsar resource is deleted from both Kubernetes and the Pulsar cluster. + +3. **Manual deletion using Pulsar admin tools:** + If you need to delete the Pulsar resource directly without involving Kubernetes, you can use Pulsar's admin tools. For more detailed information on using these tools, refer to the [Pulsar Admin CLI documentation](https://pulsar.apache.org/docs/admin-api-overview/) or [pulsarctl documentation](https://github.com/streamnative/pulsarctl). For example: + + ```shell + # Delete a tenant + pulsarctl tenants delete my-tenant + + # Delete a namespace + pulsarctl namespaces delete my-tenant/my-namespace + + # Delete a topic + pulsarctl topics delete persistent://my-tenant/my-namespace/my-topic + ``` + + Note: Be cautious when using this method, as it may create inconsistencies between Kubernetes and Pulsar if the corresponding Kubernetes resources are not also deleted. + +Always ensure you have the necessary permissions and have considered the implications of deleting resources before proceeding with any deletion operation. + +## Changing the Policy + +You can change the policy of a Pulsar resource by updating the `lifecyclePolicy` field in the corresponding Kubernetes custom resource. However, there are important considerations to keep in mind when changing the policy: + +1. **Timing**: The policy change takes effect immediately upon updating the custom resource. However, it only affects future deletion attempts, not any ongoing deletion processes. + +2. **From CleanUpAfterDeletion to KeepAfterDeletion**: + - This change prevents the Pulsar resource from being deleted when the Kubernetes resource is removed. + - Ensure you have a plan to manage the retained Pulsar resource outside of Kubernetes. + +3. **From KeepAfterDeletion to CleanUpAfterDeletion**: + - This change means the Pulsar resource will be deleted when the Kubernetes resource is removed. + - Be cautious, as this could lead to unintended data loss if not managed properly. + +4. **Consistency**: After changing the policy, verify that the behavior aligns with your expectations by attempting a delete operation. + +5. **Resource Management**: When changing to `KeepAfterDeletion`, consider how you will manage and potentially clean up these resources in the future to avoid clutter in your Pulsar cluster. + +6. **Documentation**: It's advisable to document any policy changes, especially in production environments, to maintain clarity on resource management strategies. + +Always test policy changes in a non-production environment first to understand their full implications. \ No newline at end of file diff --git a/docs/pulsar_tenant.md b/docs/pulsar_tenant.md index c51b32da..3d4971e9 100644 --- a/docs/pulsar_tenant.md +++ b/docs/pulsar_tenant.md @@ -1,6 +1,45 @@ # PulsarTenant -## Create PulsarTenant +## Overview + +The `PulsarTenant` resource defines a tenant in a Pulsar cluster. It allows you to configure tenant-level settings such as admin roles, allowed clusters, and lifecycle policies. + +## Specifications + +| Field | Description | Required | +|-------|-------------|----------| +| `name` | The tenant name. | Yes | +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this tenant. | Yes | +| `adminRoles` | List of authentication principals allowed to manage the tenant. | No | +| `allowedClusters` | List of clusters the tenant is allowed to access. If not specified, tenant has access to all clusters. | No | +| `lifecyclePolicy` | Determines whether to keep or delete the Pulsar tenant when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. | No | +| `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to grant permissions to the tenant for geo-replication. | No | + +## Allowed Clusters vs GeoReplicationRefs + +The `allowedClusters` and `geoReplicationRefs` fields in the PulsarTenant resource serve different purposes and are used in different scenarios: + +1. `allowedClusters`: + - Use this when you want to restrict a tenant's access to specific clusters within a single Pulsar instance. + - It's a simple list of cluster names that the tenant is allowed to access. + - This is suitable for scenarios where you have multiple clusters in a single Pulsar deployment and want to control tenant access. + - If not specified, the tenant has access to all clusters in the Pulsar instance. + - Example use case: Limiting a tenant to only use clusters in certain regions for data locality or compliance reasons. + +2. `geoReplicationRefs`: + - Use this when setting up geo-replication between separate Pulsar instances for a tenant. + - It references PulsarGeoReplication resources, which contain more detailed configuration for connecting to external Pulsar clusters. + - This is appropriate for scenarios involving separate Pulsar deployments, possibly in different data centers or cloud providers. + - It grants the tenant permission to replicate data between the specified Pulsar instances. + - Example use case: Allowing a tenant to replicate their data between two independent Pulsar instances in different geographical locations. + +When to use which: +- Use `allowedClusters` when you want to restrict a tenant's access within a single Pulsar instance. +- Use `geoReplicationRefs` when you need to set up geo-replication for a tenant across separate Pulsar instances. + +These fields can be used independently or together, depending on your specific requirements for tenant management and data replication. + +## Create A Pulsar Tenant 1. Define a tenant named `test-tenant` by using the YAML file and save the YAML file `tenant.yaml`. @@ -20,17 +59,6 @@ spec: # lifecyclePolicy: CleanUpAfterDeletion ``` -This table lists specifications available for the `PulsarTenant` resource. - -| Option | Description | Required or not | -| ---| --- |--- | -| `name` | The tenant name. | Yes | -| `connectionRef` | The reference to a PulsarConnection. | Yes | -| `adminRoles` | The list of authentication principals allowed to manage the tenant. | Optional | -| `allowedClusters` | The list of allowed clusters. If no cluster is specified, the tenant will have access to all clusters. If you specify a list of clusters, ensure that these clusters exist.| Optional | -| `lifecyclePolicy` | The resource lifecycle policy. Available options are `CleanUpAfterDeletion` and `KeepAfterDeletion`. By default, it is set to `CleanUpAfterDeletion`. | Optional | -| `geoReplicationRefs` | The reference list of the PulsarGeoReplication. It will grant permission to the tenant. | No | - 2. Apply the YAML file to create the tenant. ```shell @@ -48,14 +76,83 @@ NAME RESOURCE_NAME GENERATION OBSERVED_GENERATION READY test-pulsar-tenant test-tenant 1 1 True ``` -## Update PulsarTenant +## Update A Pulsar Tenant + +You can update the tenant by editing the `tenant.yaml` file and then applying it again using kubectl. This allows you to modify various settings of the Pulsar tenant. + +Here's an example of how to update a tenant: + +1. Edit the `tenant.yaml` file: + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarTenant +metadata: + name: test-pulsar-tenant + namespace: test +spec: + name: test-tenant + connectionRef: + name: test-pulsar-connection + adminRoles: + - admin + - ops + allowedClusters: + - cluster1 + - cluster2 +``` + +2. Apply the updated YAML file: + +```shell +kubectl apply -f tenant.yaml +``` + +3. Check the updated status: + +```shell +kubectl -n test get pulsartenant.resource.streamnative.io +``` + +After applying changes, always verify the status of the update using: +```shell +kubectl -n test get pulsartenant.resource.streamnative.io test-pulsar-tenant +``` +The `OBSERVED_GENERATION` should increment, and `READY` should become `True` when the update is complete. + +Please note the following important points when updating a Pulsar tenant: + +1. The `name` field is immutable and cannot be changed after the tenant is created. If you need to rename a tenant, you'll need to create a new one and migrate the resources. + +2. Changes to `adminRoles` will affect who has administrative access to the tenant. Be cautious when modifying this field to avoid accidentally revoking necessary permissions. + +3. Updating `allowedClusters` will change which Pulsar clusters the tenant can use. Ensure that any namespaces and topics within the tenant are compatible with the new cluster list. + +4. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications: + + - If the new PulsarConnection refers to the same Pulsar cluster (i.e., the admin and broker URLs are the same), the tenant will remain in its original location. The operator will simply use the new connection details to manage the existing tenant. + + - If the new PulsarConnection points to a different Pulsar cluster (i.e., different admin and broker URLs), the operator will attempt to create a new tenant with the same configuration in the new cluster. The original tenant in the old cluster will not be automatically deleted. + + Be cautious when changing the `connectionRef`, especially if it points to a new cluster, as this can lead to tenant duplication across clusters. Always verify the intended behavior and manage any cleanup of the old tenant if necessary. -You can update the tenant by editing the tenant.yaml, then apply if again +5. If you're adding the `lifecyclePolicy` field, remember that it only affects what happens when the PulsarTenant resource is deleted, not the current state of the tenant. +6. Be cautious when updating tenant configurations, as changes may affect existing namespaces and topics within the tenant. It's recommended to test changes in a non-production environment first. + +7. If you're using quota management features, remember that changes to the tenant configuration might affect resource allocation across the Pulsar cluster. + + +## Delete A Pulsar Tenant -## Delete PulsarTenant ```shell kubectl -n test delete pulsartenant.resource.streamnative.io test-pulsar-tenant ``` -Please be noticed, when you delete the tenant, the real tenant will still exist if the `lifecyclePolicy` is `KeepAfterDeletion` +Please be noticed, when you delete the tenant, the real tenant will still exist if the `lifecyclePolicy` is `KeepAfterDeletion`. + +If you want to delete the tenant in the pulsar cluster, you can use the following command: + +```shell +pulsarctl tenants delete test-tenant +``` diff --git a/docs/pulsar_topic.md b/docs/pulsar_topic.md index 3d756dab..3a3d0638 100644 --- a/docs/pulsar_topic.md +++ b/docs/pulsar_topic.md @@ -1,6 +1,36 @@ # PulsarTopic -## Create PulsarTopic +## Overview + +The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to configure various topic-level settings such as persistence, partitions, retention policies, and schema information. This resource is part of the Pulsar Resources Operator, which enables declarative management of Pulsar resources using Kubernetes custom resources. + +## Sepcifications + +## Specifications + +| Field | Description | Required | +|-------|-------------|----------| +| `name` | The fully qualified topic name in the format "persistent://tenant/namespace/topic" or "non-persistent://tenant/namespace/topic". | Yes | +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this topic. | Yes | +| `persistent` | Whether the topic is persistent or non-persistent. Default is false. Can also be set by topic name prefix. | No | +| `partitions` | Number of partitions for the topic. Default is 0. | No | +| `maxProducers` | Maximum number of producers allowed on the topic. | No | +| `maxConsumers` | Maximum number of consumers allowed on the topic. | No | +| `messageTTL` | Time to Live (TTL) for messages in the topic. Messages older than this TTL will be automatically marked as consumed. | No | +| `maxUnAckedMessagesPerConsumer` | Maximum number of unacknowledged messages allowed per consumer. | No | +| `maxUnAckedMessagesPerSubscription` | Maximum number of unacknowledged messages allowed per subscription. | No | +| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. | No | +| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. | No | +| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No | +| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No | +| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No | +| `lifecyclePolicy` | Determines whether to keep or delete the Pulsar topic when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No | +| `schemaInfo` | Schema information for the topic. See [schemaInfo](#schemainfo) for more details. | No | +| `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No | + +Note: Valid time units for duration fields are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks). + +## Create A Pulsar Topic 1. Define a topic named `persistent://test-tenant/testns/topic123` by using the YAML file and save the YAML file `topic.yaml`. ```yaml @@ -28,28 +58,7 @@ spec: # lifecyclePolicy: CleanUpAfterDeletion ``` -This table lists specifications available for the `PulsarTopic` resource. - -| Option | Description | Required or not | -| ---| --- |--- | -| `name` | The topic name. | Yes | -| `connectionRef` | The reference to a PulsarConnection. | Yes | -| `persistent` | Set whether it is a persistent or non-persistent topic, you also can set it by topic name prefix with persistent or non-persistent. default is false| Optional | -| `partitions` | The number of partitions for the topic. default is 0 | Optional | -| `maxProducers` | The maximum number of producers for a topic. | Optional | -| `messageTTL` | The TTL time duration of messages (valid time units are "s", "m", "h", "d", "w").| Optional | -| `maxUnAckedMessagesPerConsumer` | The maximum number of unacked messages per consumer. | Optional | -| `maxUnAckedMessagesPerSubscription` | The maximum number of unacked messages per subscription. | Optional | -| `retentionTime` | The retention time (valid time units are "s", "m", "h", "d", "w"). | Optional | -| `retentionSize` | The retention size limit. | Optional | -| `backlogQuotaLimitTime` | The Backlog quota time limit (valid time units are "s", "m", "h", "d", "w"). | Optional | -| `backlogQuotaLimitSize` | The Backlog quota size limit (such as 10Mi, 10Gi). | Optional | -| `backlogQuotaRetentionPolicy` | The Retention policy to be enforced when the limit is reached. | Optional | -| `lifecyclePolicy` | The resource lifecycle policy. Available options are `CleanUpAfterDeletion` and `KeepAfterDeletion`. By default, it is set to `CleanUpAfterDeletion`. | Optional | -| `schemaInfo` | The schema of pulsar topic, default is nil. More details you can find in [schemaInfo](#schemainfo) Optional | -| `geoReplicationRefs` | The reference list of the PulsarGeoReplication. Enable Geo-replication at the topic level. It will add the topic to the clusters. | No | - -1. Apply the YAML file to create the topic. +2. Apply the YAML file to create the topic. ```shell kubectl apply -f topic.yaml @@ -66,43 +75,95 @@ NAME RESOURCE_NAME GENERATION O test-pulsar-topic123 persistent://test-tenant/testns/topic123 1 1 True ``` -## Update PulsarTopic -You can update the topic policies by editing the topic.yaml, then apply if again. +## Update A Pulsar Topic -Please be noticed: -1. The field `name`, `persistent` couldn’t be updated. +You can update the topic policies by editing the `topic.yaml` file and then applying it again using `kubectl apply -f topic.yaml`. This allows you to modify various settings of the Pulsar topic. +Important notes when updating a Pulsar topic: -## Delete PulsarTopic +1. The fields `name` and `persistent` are immutable and cannot be updated after the topic is created. + +2. Other fields such as `partitions`, `maxProducers`, `maxConsumers`, `messageTTL`, `retentionTime`, `retentionSize`, `backlogQuotaLimitTime`, `backlogQuotaLimitSize`, and `backlogQuotaRetentionPolicy` can be modified. + +3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications: + + - If the new PulsarConnection refers to the same Pulsar cluster (i.e., the admin and broker URLs are the same), the topic will remain in its original location. The operator will simply use the new connection details to manage the existing topic. + + - If the new PulsarConnection points to a different Pulsar cluster (i.e., different admin and broker URLs), the operator will attempt to create a new topic with the same configuration in the new cluster. The original topic in the old cluster will not be automatically deleted. + + Be cautious when changing the `connectionRef`, especially if it points to a new cluster, as this can lead to topic duplication across clusters. Always verify the intended behavior and manage any cleanup of the old topic if necessary. + +4. Changes to `lifecyclePolicy` will only affect what happens when the PulsarTopic resource is deleted, not the current state of the topic. + +5. Be cautious when updating topic policies, as changes may affect existing producers and consumers. It's recommended to test changes in a non-production environment first. + +6. After applying changes, you can check the status of the update using: + ```shell + kubectl -n test get pulsartopic.resource.streamnative.io test-pulsar-topic123 + ``` + The `OBSERVED_GENERATION` should increment, and `READY` should become `True` when the update is complete. + +7. Updating the `schemaInfo` field may have implications for existing producers and consumers. Ensure that any schema changes adhere to Pulsar's schema compatibility strategies. For more information on schema evolution and compatibility, refer to the [Pulsar Schema Evolution and Compatibility](https://pulsar.apache.org/docs/schema-understand#schema-evolution) documentation. + +## Delete A PulsarTopic + +To delete a PulsarTopic resource, use the following kubectl command: ```shell -kubectl -n test delete pulsartopic.resource.streamnative.io test-pulsar-topic +kubectl delete pulsartopic.resource.streamnative.io test-pulsar-topic123 ``` -Please be noticed, when you delete the permission, the real permission will still exist if the `lifecyclePolicy` is `KeepAfterDeletion` +Please be aware that when you delete the topic, the actual topic will still exist in the Pulsar cluster if the `lifecyclePolicy` is set to `KeepAfterDeletion`. For more detailed information about the lifecycle policies and their implications, please refer to the [PulsarResourceLifeCyclePolicy documentation](pulsar_resource_lifecycle.md). +If you want to delete the topic in the pulsar cluster, you can use the following command: +```shell +pulsarctl topics delete persistent://test-tenant/testns/topic123 +``` ## SchemaInfo -More details about pulsar schema, you can check the [official document](https://pulsar.apache.org/docs/2.10.x/schema-understand/) - -| Option | Description | -| ---| --- | -| `type` | Schema type, which determines how to interpret the schema data | -| `schema` | Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific | -| `properties` | It is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. | +The `schemaInfo` field in the PulsarTopic specification allows you to define the schema for the topic. For more details about Pulsar schemas, refer to the [official documentation](https://pulsar.apache.org/docs/schema-understand/). + +The `schemaInfo` field has the following structure: + +| Field | Description | Required | +|-------|-------------|----------| +| `type` | The schema type, which determines how to interpret the schema data. | Yes | +| `schema` | The schema definition, which is a base64 encoded string representing the schema. | Yes | +| `properties` | A map of user-defined properties as key-value pairs. Applications can use this for carrying any application-specific logic. | No | + +### Supported Schema Types + +Pulsar supports various schema types, including: + +- AVRO +- JSON +- PROTOBUF +- PROTOBUF_NATIVE +- THRIFT +- BOOLEAN +- INT8 +- INT16 +- INT32 +- INT64 +- FLOAT +- DOUBLE +- STRING +- BYTES +- DATE +- TIME +- TIMESTAMP +- INSTANT +- LOCAL_DATE +- LOCAL_TIME +- LOCAL_DATE_TIME + +For more detailed information about these schema types and their usage, please refer to the [Pulsar Schema documentation](https://pulsar.apache.org/docs/schema-understand/). ### Example -This is a demo that uses the json type schema. - -```golang -type testJSON struct { - ID int `json:"id"` - Name string `json:"name"` -} -``` +Here's an example of a PulsarTopic resource with a JSON schema: ```yaml apiVersion: resource.streamnative.io/v1alpha1 @@ -120,4 +181,6 @@ spec: schema: "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\",\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}" properties: "owner": "pulsar" -``` \ No newline at end of file +``` + +This example defines a JSON schema with two fields, `ID` and `Name`, both of which are required. The `type` field is set to `JSON`, indicating that the schema is in JSON format. The `schema` field contains the actual JSON schema definition. The `properties` field is optional and can be used to add any application-specific logic. \ No newline at end of file From 16e111aa420cf575809a4bcb0070bb669a5ba096 Mon Sep 17 00:00:00 2001 From: Eric Shen Date: Sun, 1 Sep 2024 16:12:31 +0800 Subject: [PATCH 2/2] Update api/v1alpha1/zz_generated.deepcopy.go Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- api/v1alpha1/zz_generated.deepcopy.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 03ae3f40..9f971246 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,17 @@ +// Copyright 2024 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + //go:build !ignore_autogenerated // Code generated by controller-gen. DO NOT EDIT.