Skip to content

Commit

Permalink
Allow rabbitmq configurator to run on default VHost (#259)
Browse files Browse the repository at this point in the history
Default vhost / was not url-encoded, so caused attempts to access api///

Also, reraise original exception instead of new one. This caused failures to be obscured.
  • Loading branch information
ndevenish authored Sep 25, 2024
1 parent d830c1f commit 87b2b60
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/zocalo/cli/configure_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def run():
logger.error(f"Error 404: VHost not found for url: {e.response.url}")
sys.exit(1)
except Exception:
raise
raise e
logger.error(e)
sys.exit(1)

Expand Down
35 changes: 22 additions & 13 deletions src/zocalo/util/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pathlib
import secrets
import urllib
import urllib.parse
import urllib.request
from typing import Any, Dict, List, Optional, Tuple, Union

Expand All @@ -20,6 +21,11 @@
logger = logging.getLogger("zocalo.util.rabbitmq")


def _quote(arg: str) -> str:
"""URL-quote a VHost name (which can contain /)"""
return urllib.parse.quote(arg, safe=[])


class MessageStats(BaseModel):
publish: Optional[int] = Field(None, description="Count of messages published.")

Expand Down Expand Up @@ -750,7 +756,7 @@ def bindings(
) -> List[BindingInfo]:
endpoint = "bindings"
if vhost is not None:
endpoint = f"{endpoint}/{vhost}"
endpoint = f"{endpoint}/{_quote(vhost)}"
_check = {source, destination, destination_type}
if None in _check and len(_check) > 1:
raise ValueError(
Expand Down Expand Up @@ -789,7 +795,9 @@ def bindings_delete(
):
# If properties_key is not specified then all bindings between the specified
# source and destination are deleted
endpoint = f"bindings/{vhost}/e/{source}/{destination_type}/{destination}"
endpoint = (
f"bindings/{_quote(vhost)}/e/{source}/{destination_type}/{destination}"
)
if properties_key is None:
dest_map = {"queue": "q", "exchange": "e"}

Expand Down Expand Up @@ -834,11 +842,11 @@ def exchanges(
) -> Union[List[ExchangeInfo], ExchangeInfo]:
endpoint = "exchanges"
if vhost is not None and name is not None:
endpoint = f"{endpoint}/{vhost}/{name}/"
endpoint = f"{endpoint}/{_quote(vhost)}/{name}/"
response = self.get(endpoint)
return ExchangeInfo(**response.json())
elif vhost is not None:
endpoint = f"{endpoint}/{vhost}/"
endpoint = f"{endpoint}/{_quote(vhost)}/"
elif name is not None:
raise ValueError("name can not be set without vhost")
response = self.get(endpoint)
Expand All @@ -853,19 +861,19 @@ def exchange_declare(self, exchange: ExchangeSpec):
response.raise_for_status()

def exchange_delete(self, vhost: str, name: str, if_unused: bool = False):
endpoint = f"exchanges/{vhost}/{name}"
endpoint = f"exchanges/{_quote(vhost)}/{name}"
response = self.delete(endpoint, params={"if-unused": if_unused})
response.raise_for_status()

def policies(self, vhost: Optional[str] = None) -> List[PolicySpec]:
endpoint = "policies"
if vhost is not None:
endpoint = f"{endpoint}/{vhost}/"
endpoint = f"{endpoint}/{_quote(vhost)}/"
response = self.get(endpoint)
return [PolicySpec(**p) for p in response.json()]

def policy(self, vhost: str, name: str) -> PolicySpec:
endpoint = f"policies/{vhost}/{name}/"
endpoint = f"policies/{_quote(vhost)}/{name}/"
response = self.get(endpoint)
return PolicySpec(**response.json())

Expand All @@ -880,7 +888,7 @@ def set_policy(self, policy: PolicySpec):
response.raise_for_status()

def clear_policy(self, vhost: str, name: str):
endpoint = f"policies/{vhost}/{name}/"
endpoint = f"policies/{_quote(vhost)}/{name}/"
response = self.delete(endpoint)
response.raise_for_status()

Expand All @@ -889,11 +897,11 @@ def queues(
) -> Union[List[QueueInfo], QueueInfo]:
endpoint = "queues"
if vhost is not None and name is not None:
endpoint = f"{endpoint}/{vhost}/{name}"
endpoint = f"{endpoint}/{_quote(vhost)}/{name}"
response = self.get(endpoint)
return QueueInfo(**response.json())
elif vhost is not None:
endpoint = f"{endpoint}/{vhost}"
endpoint = f"{endpoint}/{_quote(vhost)}"
elif name is not None:
raise ValueError("name can not be set without vhost")
response = self.get(endpoint)
Expand All @@ -910,7 +918,8 @@ def queue_declare(self, queue: QueueSpec):
def queue_delete(
self, vhost: str, name: str, if_unused: bool = False, if_empty: bool = False
):
endpoint = f"queues/{vhost}/{name}"
logger.debug(f"Deleting queue {_quote(vhost)}/{name}")
endpoint = f"queues/{_quote(vhost)}/{name}"
response = self.delete(
endpoint, params={"if-unused": if_unused, "if-empty": if_empty}
)
Expand All @@ -931,7 +940,7 @@ def permissions(
) -> List[PermissionSpec] | PermissionSpec:
endpoint = "permissions"
if vhost is not None and user is not None:
endpoint = f"{endpoint}/{vhost}/{user}/"
endpoint = f"{endpoint}/{_quote(vhost)}/{user}/"
response = self.get(endpoint)
return PermissionSpec(**response.json())
elif vhost is not None or user is not None:
Expand All @@ -950,7 +959,7 @@ def set_permissions(self, permission: PermissionSpec):
response.raise_for_status()

def clear_permissions(self, vhost: str, user: str):
endpoint = f"permissions/{vhost}/{user}/"
endpoint = f"permissions/{_quote(vhost)}/{user}/"
response = self.delete(endpoint)
response.raise_for_status()

Expand Down

0 comments on commit 87b2b60

Please sign in to comment.