diff --git a/.gitignore b/.gitignore index 2ce613a..319d825 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,6 @@ dependency-reduced-pom.xml release.properties *.srl - +test/build +*.egg-info +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index e494b2e..cf24cda 100644 --- a/README.md +++ b/README.md @@ -35,13 +35,17 @@ For example, the following PromQL query will return an estimate of the number of ## Compatibility -*cassandra-exporter* is has been tested with: +*cassandra-exporter* is now Cassandra 4.0+ compatible, but the change is not a backwards compatible. Support for older Cassandra versions is via the older releases, as follows: -| Component | Version | +| Cassandra Version | Compatible Exporter Version | |-----------------|---------------| -| Apache Cassandra| 3.0.17 (experimental), 3.11.2, 3.11.3 | -| Prometheus | 2.0 and later | +| Apache Cassandra 4.x | 0.9.12 | +| Apache Cassandra 3.0.17, 3.11.2, 3.11.3 | 0.9.11 | +| Prometheus Version | +|-----------------| +| 2.42.0 +| Other Cassandra and Prometheus versions will be tested for compatibility in the future. ## Usage @@ -409,6 +413,137 @@ We suggest viewing the metrics endpoint (e.g., ) are exported by your Cassandra node. +## Testing + +### Java +There are unit tests in the various projects which will get executed with the maven commands. + +### Integration test harness + +There is an integration test harness available in the */test/* folder. +This harness is a work in progress, and is currently only useful for manual verification. + +#### Requirements + +The test harness uses Python (tested with 3.10). + +Initialise the project by using the pyproject.toml file + + pip install . + +The tool can be launched via + + python test_tool.py + +#### Operation + +There are four modes of operation: + +- `benchmark` + + Not Implemented - TBA - Intended to test the speed of collection. + +- `demo` + + Usage: test_tool.py demo [OPTIONS] + + Start a Cassandra cluster with cassandra-exporter installed (agent or + standalone). Optionally setup a schema. Wait for ctrl-c to shut everything + down. + + Working Directory: + -C, --working-directory PATH location to install Cassandra and/or Prometheus. + Must be empty or not exist. Defaults to a + temporary directory. + --cleanup-working-directory [on-error|always|never] + how to delete the working directory on exit: + "on-error": delete working directory on exit + unless an error occurs, "always": always delete + working directory on exit, "never": never delete + working directory. [default: on-error] + + Cassandra: + --cluster-name TEXT name of the Cassandra cluster [default: test- + cluster] + --cassandra-version TEXT Cassandra version to run [default: 4.1.0] + --topology DCS RACKS NODES number of data centers, racks per data center, + and nodes per rack. [default: 2, 3, 1] + -j, --exporter-jar PATH path of the cassandra-exporter jar to use, + either agent or standalone builds, or one of + "agent" or "standalone" for the currently built + jar of that type in the project directory + (assumes that the sources for this test tool are + in the standard location within the project, and + that the jar(s) have been built). [default: + agent] + -s, --schema PATH path of the CQL schema YAML file to apply on + cluster start. The YAML file must contain a list + of CQL statement strings, which are applied in + order. [default: /root/source/forks/cassandra- + exporter/test/schema.yaml] + +- `dump` + + Usage: test_tool.py dump [OPTIONS] COMMAND [ARGS]... + + Commands to capture, validate and diff metrics dumps + + Options: + --help Show this message and exit. + + Commands: + capture Start a Cassandra cluster, capture metrics from each node's... + diff Compare two metrics dumps and output the difference + validate Validate a metrics dump using Prometheus's promtool. + +- `e2e` - *Note no tests are run at the moment* + + Usage: test_tool.py e2e [OPTIONS] + + Run cassandra-exporter end-to-end tests. + + - Start C* with the exporter JAR (agent or standalone). + - Setup a schema. + - Configure and start prometheus. + - Wait for all scrape targets to get healthy. + - Run some tests. + + Working Directory: + -C, --working-directory PATH location to install Cassandra and/or + Prometheus. Must be empty or not exist. + Defaults to a temporary directory. + --cleanup-working-directory [on-error|always|never] + how to delete the working directory on exit: + "on-error": delete working directory on exit + unless an error occurs, "always": always delete + working directory on exit, "never": never + delete working directory. [default: on-error] + + Cassandra: + --cluster-name TEXT name of the Cassandra cluster [default: test- + cluster] + --cassandra-version TEXT Cassandra version to run [default: 4.1.0] + --topology DCS RACKS NODES number of data centers, racks per data center, + and nodes per rack. [default: 2, 3, 1] + -j, --exporter-jar PATH path of the cassandra-exporter jar to use, + either agent or standalone builds, or one of + "agent" or "standalone" for the currently built + jar of that type in the project directory + (assumes that the sources for this test tool + are in the standard location within the + project, and that the jar(s) have been built). + [default: agent] + -s, --schema PATH path of the CQL schema YAML file to apply on + cluster start. The YAML file must contain a + list of CQL statement strings, which are + applied in order. [default: + /root/source/forks/cassandra- + exporter/test/schema.yaml] + + Prometheus Archive: [mutually exclusive] + --prometheus-version TAG + --prometheus-archive PATH/URL + ## Unstable, Missing & Future Features See the [project issue tracker](https://github.com/instaclustr/cassandra-exporter/issues) for a complete list. diff --git a/agent/pom.xml b/agent/pom.xml index 73f3384..9092467 100644 --- a/agent/pom.xml +++ b/agent/pom.xml @@ -5,11 +5,11 @@ com.zegelin.cassandra-exporter exporter-parent - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT agent - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT Cassandra Exporter Agent diff --git a/agent/src/main/java/com/zegelin/cassandra/exporter/InternalMetadataFactory.java b/agent/src/main/java/com/zegelin/cassandra/exporter/InternalMetadataFactory.java index 7eb07f2..ceb4094 100644 --- a/agent/src/main/java/com/zegelin/cassandra/exporter/InternalMetadataFactory.java +++ b/agent/src/main/java/com/zegelin/cassandra/exporter/InternalMetadataFactory.java @@ -1,29 +1,33 @@ package com.zegelin.cassandra.exporter; import com.zegelin.cassandra.exporter.MetadataFactory; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.schema.Schema; import java.net.InetAddress; import java.util.Optional; import java.util.Set; public class InternalMetadataFactory extends MetadataFactory { - private static Optional getCFMetaData(final String keyspaceName, final String tableName) { - return Optional.ofNullable(Schema.instance.getCFMetaData(keyspaceName, tableName)); + private static Optional getTableMetaData(final String keyspaceName, final String tableName) { + return Optional.ofNullable(Schema.instance.getTableMetadata(keyspaceName, tableName)); + } + + private static Optional getIndexMetadata(final String keyspaceName, final String indexName) { + return Optional.ofNullable(Schema.instance.getIndexTableMetadataRef(keyspaceName, indexName)); } @Override public Optional indexMetadata(final String keyspaceName, final String tableName, final String indexName) { - return getCFMetaData(keyspaceName, tableName) - .flatMap(m -> m.getIndexes().get(indexName)) + return getIndexMetadata(keyspaceName, indexName) + .flatMap(m -> m.get().indexName()) .map(m -> { - final IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(m.kind.toString()); - final Optional className = Optional.ofNullable(m.options.get("class_name")); + final IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(m); + final Optional className = Optional.ofNullable(m); return new IndexMetadata() { @Override @@ -41,7 +45,7 @@ public Optional customClassName() { @Override public Optional tableOrViewMetadata(final String keyspaceName, final String tableOrViewName) { - return getCFMetaData(keyspaceName, tableOrViewName) + return getTableMetaData(keyspaceName, tableOrViewName) .map(m -> new TableMetadata() { @Override public String compactionStrategyClassName() { @@ -67,12 +71,12 @@ public Optional endpointMetadata(final InetAddress endpoint) { return Optional.of(new EndpointMetadata() { @Override public String dataCenter() { - return endpointSnitch.getDatacenter(endpoint); + return endpointSnitch.getDatacenter(InetAddressAndPort.getByAddress(endpoint)); } @Override public String rack() { - return endpointSnitch.getRack(endpoint); + return endpointSnitch.getRack(InetAddressAndPort.getByAddress(endpoint)); } }); } @@ -84,6 +88,6 @@ public String clusterName() { @Override public InetAddress localBroadcastAddress() { - return FBUtilities.getBroadcastAddress(); + return FBUtilities.getBroadcastAddressAndPort().getAddress(); } } diff --git a/agent/src/main/java/com/zegelin/cassandra/exporter/collector/InternalGossiperMBeanMetricFamilyCollector.java b/agent/src/main/java/com/zegelin/cassandra/exporter/collector/InternalGossiperMBeanMetricFamilyCollector.java index 0ccbb84..9b378a6 100644 --- a/agent/src/main/java/com/zegelin/cassandra/exporter/collector/InternalGossiperMBeanMetricFamilyCollector.java +++ b/agent/src/main/java/com/zegelin/cassandra/exporter/collector/InternalGossiperMBeanMetricFamilyCollector.java @@ -1,10 +1,12 @@ package com.zegelin.cassandra.exporter.collector; +import com.google.common.collect.ImmutableSet; import com.zegelin.cassandra.exporter.MetadataFactory; import com.zegelin.prometheus.domain.Labels; import com.zegelin.prometheus.domain.NumericMetric; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.InetAddressAndPort; import java.net.InetAddress; import java.util.Map; @@ -34,13 +36,11 @@ private InternalGossiperMBeanMetricFamilyCollector(final Gossiper gossiper, fina @Override protected void collect(final Stream.Builder generationNumberMetrics, final Stream.Builder downtimeMetrics, final Stream.Builder activeMetrics) { - final Set> endpointStates = gossiper.getEndpointStates(); + for (InetAddressAndPort endpoint : gossiper.getEndpoints()) { + final InetAddress endpointAddress = endpoint.getAddress(); + final EndpointState state = gossiper.getEndpointStateForEndpoint(endpoint); - for (final Map.Entry endpointState : endpointStates) { - final InetAddress endpoint = endpointState.getKey(); - final EndpointState state = endpointState.getValue(); - - final Labels labels = metadataFactory.endpointLabels(endpoint); + final Labels labels = metadataFactory.endpointLabels(endpointAddress); generationNumberMetrics.add(new NumericMetric(labels, gossiper.getCurrentGenerationNumber(endpoint))); downtimeMetrics.add(new NumericMetric(labels, millisecondsToSeconds(gossiper.getEndpointDowntime(endpoint)))); diff --git a/common/pom.xml b/common/pom.xml index 2267c33..152b36f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -5,11 +5,11 @@ com.zegelin.cassandra-exporter exporter-parent - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT common - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT Cassandra Exporter Common diff --git a/pom.xml b/pom.xml index 28fdf30..584aaa0 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.zegelin.cassandra-exporter exporter-parent - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT pom Cassandra Exporter Parent @@ -15,7 +15,7 @@ - 3.11.2 + 4.1.0 2.5.3 3.1.1 @@ -41,7 +41,7 @@ com.zegelin.cassandra-exporter common - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT org.glassfish.hk2.external diff --git a/standalone/pom.xml b/standalone/pom.xml index d539c16..a8d8d2d 100644 --- a/standalone/pom.xml +++ b/standalone/pom.xml @@ -5,11 +5,11 @@ com.zegelin.cassandra-exporter exporter-parent - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT standalone - 0.9.11-SNAPSHOT + 0.9.12-SNAPSHOT Cassandra Exporter Standalone/CLI diff --git a/test/lib/ccm.py b/test/lib/ccm.py new file mode 100644 index 0000000..aaabcc8 --- /dev/null +++ b/test/lib/ccm.py @@ -0,0 +1,197 @@ +import signal +import subprocess +import typing as t +from functools import wraps +from pathlib import Path +from typing import List, Optional + +import click +import cloup +from ccmlib.cluster import Cluster +from ccmlib.common import check_socket_listening + +from lib.click_helpers import fixup_kwargs, ppstrlist +from lib.jar_utils import ExporterJar, ExporterJarParamType +from lib.net import SocketAddress +from lib.schema import CqlSchema, CqlSchemaParamType + +import cassandra.cluster +import cassandra.connection + +import logging + +logger = logging.getLogger('ccm') + + +class TestCluster(Cluster): + logger = logging.getLogger(f'{__name__}.{__qualname__}') + + standalone_processes: List[subprocess.Popen] = [] + + def __init__(self, cluster_directory: Path, cassandra_version: str, + nodes: int, racks: int, datacenters: int, + exporter_jar: ExporterJar, + initial_schema: Optional[CqlSchema]): + + if cluster_directory.exists(): + raise RuntimeError(f'Cluster directory {cluster_directory} must not exist.') # CCM wants to create this + + super().__init__( + path=cluster_directory.parent, + name=cluster_directory.name, + version=cassandra_version, + create_directory=True # if this is false, various config files wont be created... + ) + + self.exporter_jar = exporter_jar + self.initial_schema = initial_schema + + self.populate(nodes, racks, datacenters) + + def populate(self, nodes: int, racks: int = 1, datacenters: int = 1, + debug=False, tokens=None, use_vnodes=False, ipprefix='127.0.0.', ipformat=None, + install_byteman=False): + result = super().populate(nodes, debug, tokens, use_vnodes, ipprefix, ipformat, install_byteman) + + for i, node in enumerate(self.nodelist()): + node.exporter_address = SocketAddress(node.ip_addr, 9500 + i) + + node.rack = f'rack-{(int(i / nodes) % racks) + 1}' + node.data_center = f'dc-{(int(i / nodes * racks)) + 1}' + + if self.exporter_jar.type == ExporterJar.ExporterType.AGENT: + node.set_environment_variable('JVM_OPTS', f'-javaagent:{self.exporter_jar.path}=-l{node.exporter_address}') + + # set dc/rack manually, since CCM doesn't support custom racks + node.set_configuration_options({ + 'endpoint_snitch': 'GossipingPropertyFileSnitch' + }) + + with (Path(node.get_conf_dir()) / 'cassandra-rackdc.properties').open('w') as f: + print(f'dc={node.data_center}', file=f) + print(f'rack={node.rack}', file=f) + + return result + + def start(self, verbose=False, wait_for_binary_proto=True, wait_other_notice=True, jvm_args=None, + profile_options=None, quiet_start=False, allow_root=False, **kwargs): + + self.logger.info('Starting Cassandra cluster...') + result = super().start(False, verbose, wait_for_binary_proto, wait_other_notice, jvm_args, profile_options, + quiet_start, allow_root, **kwargs) + self.logger.info('Cassandra cluster started successfully') + + # start the standalone exporters, if requested + if self.exporter_jar.type == ExporterJar.ExporterType.STANDALONE: + for node in self.nodelist(): + self.logger.info('Starting standalone cassandra-exporter for node %s...', node.ip_addr) + + process = self.exporter_jar.start_standalone( + logfile_path=Path(node.get_path()) / 'logs' / 'cassandra-exporter.log', + listen_address=node.exporter_address, + jmx_address=SocketAddress('localhost', node.jmx_port), + cql_address=SocketAddress(*node.network_interfaces["binary"]) + ) + + self.standalone_processes.append(process) + + self.logger.info('Standalone cassandra-exporters started successfully') + + if self.initial_schema: + self.logger.info('Applying initial CQL schema...') + self.apply_schema(self.initial_schema) + + # wait for the exporters to accept connections + for node in self.nodelist(): + check_socket_listening(node.exporter_address) + + return result + + def stop(self, wait=True, signal_event=signal.SIGTERM, **kwargs): + if len(self.standalone_processes): + # shutdown standalone exporters, if they're still running + self.logger.info('Stopping standalone cassandra-exporters...') + for p in self.standalone_processes: + p.terminate() + + if wait: + p.wait() + self.logger.info('Standalone cassandra-exporters stopped') + + self.logger.info('Stopping Cassandra cluster...') + result = super().stop(wait, signal_event, **kwargs) + self.logger.info('Cassandra cluster stopped') + + return result + + def apply_schema(self, schema: CqlSchema): + contact_points = map(lambda n: cassandra.connection.DefaultEndPoint(*n.network_interfaces['binary']), self.nodelist()) + + with cassandra.cluster.Cluster(list(contact_points)) as cql_cluster: + with cql_cluster.connect() as cql_session: + for stmt in schema.statements: + self.logger.debug('Executing CQL statement "{}".'.format(stmt.split('\n')[0])) + cql_session.execute(stmt) + + # # the collector defers registrations by a second or two. + # # See com.zegelin.cassandra.exporter.Harvester.defer() + # self.logger.info('Pausing to wait for deferred MBean registrations to complete.') + # time.sleep(5) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + + +def with_ccm_cluster(): + def decorator(func: t.Callable) -> t.Callable: + jar_types = [type.name.lower() for type in ExporterJar.ExporterType] + + @cloup.option_group( + "Cassandra", + cloup.option('--cluster-name', 'cassandra_cluster_name', default='test-cluster', show_default=True, + help='name of the Cassandra cluster'), + cloup.option('--cassandra-version', default='4.1.0', show_default=True, + help='Cassandra version to run'), + cloup.option('--topology', 'cassandra_topology', + type=(int, int, int), default=(2, 3, 1), show_default=True, + metavar='DCS RACKS NODES', help='number of data centers, racks per data center, and nodes per rack.'), + cloup.option('-j', '--exporter-jar', default='agent', show_default=True, type=ExporterJarParamType(), + help=f'path of the cassandra-exporter jar to use, either {ppstrlist(jar_types)} builds, ' + f'or one of {ppstrlist(jar_types, quote=True)} for the currently built jar of that type in the project directory ' + f'(assumes that the sources for this test tool are in the standard location within the project, and that the jar(s) have been built).'), + cloup.option('-s', '--schema', 'cql_schema', default=CqlSchema.default_schema_path(), show_default=True, type=CqlSchemaParamType(), + help='path of the CQL schema YAML file to apply on cluster start. The YAML file must contain a list of CQL statement strings, which are applied in order.') + ) + @click.pass_context + @wraps(func) + def wrapper(ctx: click.Context, + cassandra_version: str, cassandra_cluster_name: str, cassandra_topology: t.Tuple[int, int, int], + exporter_jar: ExporterJar, + cql_schema: t.Optional[CqlSchema], + working_directory: Path, **kwargs): + + datacenters, racks, nodes, = cassandra_topology + + logger.info('Creating Cassandra %s cluster, with:', cassandra_version) + logger.info(' CCM working directory %s:', working_directory) + logger.info(' Topology: %s data center(s), %s rack(s) per DC, %s node(s) per rack (%s node(s) total)', datacenters, racks, nodes, (nodes * racks * datacenters)) + logger.info(' cassandra-exporter: %s', exporter_jar) + + ccm_cluster = ctx.with_resource(TestCluster( + cluster_directory=(working_directory / cassandra_cluster_name), + cassandra_version=cassandra_version, + nodes=nodes*racks*datacenters, racks=racks, datacenters=datacenters, + exporter_jar=exporter_jar, + initial_schema=cql_schema + )) + + fixup_kwargs() + + func(ccm_cluster=ccm_cluster, **kwargs) + + return wrapper + + return decorator diff --git a/test/lib/click_helpers.py b/test/lib/click_helpers.py new file mode 100644 index 0000000..32c717f --- /dev/null +++ b/test/lib/click_helpers.py @@ -0,0 +1,150 @@ +import inspect +import shutil +import tempfile +import typing as t +from enum import Enum +from functools import wraps +from itertools import chain +from pathlib import Path + +import click +import cloup + +from lib.path_utils import nonexistent_or_empty_directory_arg + + +def fixup_kwargs(*skip: str): + """ + inspect the caller's frame, grab any arguments and shove them back into kwargs + + this is useful when the caller is a wrapper and wants to pass on the majority its arguments to the wrapped function + """ + + caller_frame = inspect.stack()[1].frame + args, _, kwvar, values = inspect.getargvalues(caller_frame) + + args: t.List[str] = [a for a in args if a not in skip] + + kwargs: t.Dict[str, t.Any] = values[kwvar] + + for a in args: + v = values[a] + if isinstance(v, click.Context): + continue + + kwargs[a] = v + + +def ppstrlist(sl: t.List[t.Any], conj: str = 'or', quote: bool = False): + joins = [', '] * len(sl) + joins += [f' {conj} ', ''] + + joins = joins[-len(sl):] + + if quote: + sl = [f'"{s}"' for s in sl] + + return ''.join(chain.from_iterable(zip(sl, joins))) + + + +class DictChoice(click.Choice): + """like Choice except takes a Dict[str, Any]. + + The choices are the string keys of the dict. + convert() returns the value for the chosen key.""" + + dict_choices: t.Dict[str, t.Any] + + def __init__(self, choices: t.Dict[str, t.Any], case_sensitive: bool = True) -> None: + self.dict_choices = choices + super().__init__(list(choices.keys()), case_sensitive) + + def convert(self, value: t.Any, param: t.Optional[click.Parameter], ctx: t.Optional[click.Context]) -> t.Any: + return self.dict_choices[super().convert(value, param, ctx)] + + +class WorkingDirectory: + class CleanupMode(Enum): + KEEP_ON_ERROR = (True, False) + KEEP_ALWAYS = (False, False) + DELETE_ALWAYS = (True, True) + + def __init__(self, delete_normally: bool, delete_on_exception: bool): + self.delete_normally = delete_normally + self.delete_on_exception = delete_on_exception + + def should_delete(self, has_exception: bool) -> bool: + return self.delete_on_exception if has_exception else self.delete_normally + + def __init__(self, cleanup_mode: CleanupMode, directory: t.Optional[Path] = None): + self.cleanup_mode = cleanup_mode + self.directory = directory + + def __enter__(self) -> Path: + if self.directory is None: + self.directory = Path(tempfile.mkdtemp()) + + self.directory.mkdir(exist_ok=True) + + return self.directory + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + has_e = exc_type is not None + + if self.cleanup_mode.should_delete(has_e): + shutil.rmtree(self.directory) + + return False + + +class DirectoryPathType(click.Path): + def __init__(self, empty: bool = False): + super().__init__(path_type=Path) + self.empty = empty + + def convert(self, value: t.Any, param: t.Optional[click.Parameter], ctx: t.Optional[click.Context]) -> t.Any: + path: Path = super().convert(value, param, ctx) + + if path.exists(): + if not path.is_dir(): + self.fail(f'{path}: must be a directory', param, ctx) + + if self.empty and next(path.iterdir(), None) is not None: + self.fail(f'{path}: must be an empty directory', param, ctx) + + return path + + +def with_working_directory(): + keep_option_choices = { + 'on-error': WorkingDirectory.CleanupMode.KEEP_ON_ERROR, + 'always': WorkingDirectory.CleanupMode.KEEP_ALWAYS, + 'never': WorkingDirectory.CleanupMode.DELETE_ALWAYS + } + + def decorator(func: t.Callable) -> t.Callable: + @cloup.option_group( + "Working Directory", + cloup.option('-C', '--working-directory', type=DirectoryPathType(empty=True), + help='location to install Cassandra and/or Prometheus. Must be empty or not exist. Defaults to a temporary directory.'), + cloup.option('--cleanup-working-directory', type=DictChoice(keep_option_choices, case_sensitive=False), + default='on-error', show_default=True, + help='how to delete the working directory on exit: ' + '"on-error": delete working directory on exit unless an error occurs, ' + '"always": always delete working directory on exit, ' + '"never": never delete working directory.') + ) + @click.pass_context + @wraps(func) + def wrapper(ctx: click.Context, working_directory: Path, + cleanup_working_directory: WorkingDirectory.CleanupMode, **kwargs): + working_directory = ctx.with_resource(WorkingDirectory(cleanup_working_directory, working_directory)) + + fixup_kwargs() + + func(**kwargs) + + return wrapper + + return decorator diff --git a/test/lib/dump.py b/test/lib/dump.py new file mode 100644 index 0000000..da88e16 --- /dev/null +++ b/test/lib/dump.py @@ -0,0 +1,77 @@ +import itertools +from pathlib import Path +from typing import NamedTuple, Any, Union, Iterable, List + +import io + +from frozendict import frozendict +from prometheus_client import Metric +from prometheus_client.parser import text_fd_to_metric_families +import prometheus_client.samples + + +class ValidationResult(NamedTuple): + untyped_families: Any + duplicate_families: Any + duplicate_samples: Any + + # = namedtuple('ValidationResult', ['duplicate_families', 'duplicate_samples']) +#DiffResult = namedtuple('DiffResult', ['added_families', 'removed_families', 'added_samples', 'removed_samples']) + + +class MetricsDump(NamedTuple): + path: Union[str, Path] + metric_families: List[Metric] + + @classmethod + def from_file(cls, path: Path) -> 'MetricsDump': + with open(path, 'rt', encoding='utf-8') as fd: + return MetricsDump.from_lines(fd) + + @classmethod + def from_str(cls, s: str) -> 'MetricsDump': + with io.StringIO(s) as fd: + return MetricsDump.from_lines(fd) + + @classmethod + def from_lines(cls, lines: Iterable[str]) -> 'MetricsDump': + def parse_lines(): + for family in text_fd_to_metric_families(lines): + # freeze the labels dict so its hashable and the keys can be used as a set + #family.samples = [sample._replace(labels=frozendict(sample.labels)) for sample in family.samples] + + yield family + + metric_families = list(parse_lines()) + + path = '' + if isinstance(lines, io.BufferedReader): + path = lines.name + + return MetricsDump(path, metric_families) + + def validate(self) -> ValidationResult: + def find_duplicate_families(): + def family_name_key_fn(f): + return f.name + + families = sorted(self.metric_families, key=family_name_key_fn) # sort by name + family_groups = itertools.groupby(families, key=family_name_key_fn) # group by name + family_groups = [(k, list(group)) for k, group in family_groups] # convert groups to lists + + return {name: group for name, group in family_groups if len(group) > 1} + + def find_duplicate_samples(): + samples = itertools.chain(family.samples for family in self.metric_families) + #sample_groups = + + return + + + return ValidationResult( + duplicate_families=find_duplicate_families(), + duplicate_samples=find_duplicate_samples() + ) + + def diff(self, other: 'MetricsDump'): + pass \ No newline at end of file diff --git a/test/lib/dump_tests.py b/test/lib/dump_tests.py new file mode 100644 index 0000000..159bb95 --- /dev/null +++ b/test/lib/dump_tests.py @@ -0,0 +1,115 @@ +import unittest + +from lib.dump import MetricsDump + + +class Tests(unittest.TestCase): + def test(self): + dump1 = MetricsDump.from_str(""" +# the following are duplicate families +test_family_d {abc="123"} 0 0 +test_family_d {abc="456"} 0 0 +""") + + dump2 = MetricsDump.from_str(""" +# the following are duplicate families +# TYPE test_family_d counter +test_family_d {abc="123"} 0 0 +test_family_d {abc="456"} 0 0 +""") + + pass + + +class ValidationTests(unittest.TestCase): + # def test_invalid_input(self): + # """ + # Test the + # """ + # data = """ + # busted busted busted + # """ + # + # with self.assertRaises(ValueError): + # metric_dump_tool.MetricsDump.from_lines(data) + + def test_duplicate_families(self): + """ + Test that validation finds duplicated metric families + """ + dump = MetricsDump.from_str(""" +# TYPE test_family_a counter +test_family_a {} 1234 1234 + +test_family_b {} 0 0 + +# TYPE test_family_a gauge +test_family_a {} 5678 1234 + +# the following are duplicate samples, not duplicate families +# TYPE test_family_c gauge +test_family_c {} 1234 1234 +test_family_c {} 1234 1234 + +# the following are duplicate families +test_family_d {abc="123"} 0 0 +test_family_d {abc="456"} 0 0 + """) + + result = dump.validate() + + self.assertIn('test_family_a', result.duplicate_families) + self.assertIn('test_family_d', result.duplicate_families) + self.assertNotIn('test_family_b', result.duplicate_families) + self.assertNotIn('test_family_c', result.duplicate_families) + + def test_duplicate_samples(self): + """ + Test that validation finds duplicated metric families + """ + dump = MetricsDump.from_lines(""" +# TYPE test_family_a gauge +test_family_a {hello="world"} 1234 1234 +test_family_a {hello="world"} 1234 1234 + """) + + result = dump.validate() + + self.assertIn('test_family_a', result.duplicate_families) + self.assertNotIn('test_family_b', result.duplicate_families) + + +class DiffTests(unittest.TestCase): + def test_added_families(self): + from_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 + """) + + to_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 +test_family_a {hello="universe"} 0 0 + +test_family_b {} 0 0 + """) + + result = from_dump.diff(to_dump) + + self.assertIn('test_family_b', result.added_families) + self.assertNotIn('test_family_a', result.added_families) + + def test_removed_families(self): + from_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 +test_family_a {hello="universe"} 0 0 + +test_family_b {} 0 0 + """) + + to_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 + """) + + result = from_dump.diff(to_dump) + + self.assertIn('test_family_b', result.removed_families) + self.assertNotIn('test_family_a', result.removed_families) \ No newline at end of file diff --git a/test/lib/experiment.py b/test/lib/experiment.py new file mode 100644 index 0000000..b61b069 --- /dev/null +++ b/test/lib/experiment.py @@ -0,0 +1,79 @@ +import contextlib +import http.server +import logging +import random +import socketserver +import tempfile +import threading +import time +import typing +import unittest +from collections import defaultdict +from datetime import datetime +from enum import Enum, auto +from functools import partial +from pathlib import Path +from typing import Dict + +from frozendict import frozendict + +from lib.net import SocketAddress +from lib.prometheus import PrometheusInstance, RemotePrometheusArchive + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(f'{__name__}') + + +ENDPOINT_ADDRESS = SocketAddress('localhost', 9500) + + +class TestMetricsHTTPHandler(http.server.BaseHTTPRequestHandler): + """A test HTTP endpoint for Prometheus to scrape.""" + + + def do_GET(self): + if self.path != '/metrics': + self.send_error(404) + + self.send_response(200) + self.end_headers() + + self.wfile.write(b""" +# TYPE test_counter counter +test_counter {abc="123"} 0 +test_counter {abc="456"} 0 + +test_untyped {abc="123"} 0 +test_untyped {abc="456"} 0 +""") + + +cm = contextlib.ExitStack() + +work_dir = Path(cm.enter_context(tempfile.TemporaryDirectory())) + +archive = RemotePrometheusArchive.for_tag('latest').download() +prometheus: PrometheusInstance = cm.enter_context(PrometheusInstance(archive, work_dir)) + +prometheus.start() + + + +httpd = http.server.HTTPServer(ENDPOINT_ADDRESS, TestMetricsHTTPHandler) +thread = threading.Thread(target=httpd.serve_forever, daemon=True) + +prometheus.set_static_scrape_config('test', [ENDPOINT_ADDRESS]) + +thread.start() + +input('Press any key...') + + +httpd.shutdown() +thread.join() + + +cm.close() + + diff --git a/test/lib/jar_utils.py b/test/lib/jar_utils.py new file mode 100644 index 0000000..a70440c --- /dev/null +++ b/test/lib/jar_utils.py @@ -0,0 +1,106 @@ +from dataclasses import dataclass +import logging +import re +import subprocess +import typing as t +import zipfile +from enum import Enum +from os import PathLike +from pathlib import Path +from xml.etree import ElementTree + +import click + +from lib.net import SocketAddress +from lib.path_utils import existing_file_arg + +@dataclass +class ExporterJar: + logger = logging.getLogger(f'{__name__}.{__qualname__}') + + class ExporterType(Enum): + AGENT = ('Premain-Class', 'com.zegelin.cassandra.exporter.Agent') + STANDALONE = ('Main-Class', 'com.zegelin.cassandra.exporter.Application') + + def path(self, version: str): + lname = self.name.lower() + return f'{lname}/target/cassandra-exporter-{lname}-{version}.jar' + + path: Path + type: ExporterType + + @classmethod + def from_path(cls, path: PathLike) -> 'ExporterJar': + path = existing_file_arg(path) + + # determine the JAR type (agent or standalone) via the Main/Premain class name listed in the manifest + try: + with zipfile.ZipFile(path) as zf: + manifest = zf.open('META-INF/MANIFEST.MF').readlines() + + def parse_line(line): + m = re.match('(.+): (.+)', line.decode("utf-8").strip()) + return None if m is None else m.groups() + + manifest = dict(filter(None, map(parse_line, manifest))) + + type = next(iter([t for t in ExporterJar.ExporterType if t.value in manifest.items()]), None) + if type is None: + raise ValueError(f'no manifest attribute found that matches known values') + + return cls(path, type) + + except Exception as e: + raise ValueError(f'{path} is not a valid cassandra-exporter jar: {e}') + + @staticmethod + def default_jar_path(type: ExporterType = ExporterType.AGENT) -> Path: + project_dir = Path(__file__).parents[2] + + root_pom = ElementTree.parse(project_dir / 'pom.xml').getroot() + project_version = root_pom.find('{http://maven.apache.org/POM/4.0.0}version').text + + return project_dir / type.path(project_version) + + def __str__(self) -> str: + return f'{self.path} ({self.type.name})' + + def start_standalone(self, listen_address: SocketAddress, + jmx_address: SocketAddress, + cql_address: SocketAddress, + logfile_path: Path): + + self.logger.info('Standalone log file: %s', logfile_path) + + logfile = logfile_path.open('w') # TODO: cleanup + + command = ['java', + '-jar', self.path, + '--listen', listen_address, + '--jmx-service-url', f'service:jmx:rmi:///jndi/rmi://{jmx_address}/jmxrmi', + '--cql-address', cql_address + ] + command = [str(v) for v in command] + + self.logger.debug('Standalone exec(%s)', ' '.join(command)) + + return subprocess.Popen(command, stdout=logfile, stderr=subprocess.STDOUT) + + +class ExporterJarParamType(click.ParamType): + name = "path" + + def convert(self, value: t.Any, param: t.Optional[click.Parameter], ctx: t.Optional[click.Context]) -> ExporterJar: + if isinstance(value, ExporterJar): + return value + + try: + if isinstance(value, str): + for t in ExporterJar.ExporterType: + if t.name.lower() == value.lower(): + return ExporterJar.from_path(ExporterJar.default_jar_path(t)) + + return ExporterJar.from_path(value) + + except Exception as e: + self.fail(str(e), param, ctx) diff --git a/test/lib/net.py b/test/lib/net.py new file mode 100644 index 0000000..0200c2c --- /dev/null +++ b/test/lib/net.py @@ -0,0 +1,9 @@ +import typing + + +class SocketAddress(typing.NamedTuple): + host: str + port: int + + def __str__(self) -> str: + return f'{self.host}:{self.port}' diff --git a/test/lib/path_utils.py b/test/lib/path_utils.py new file mode 100644 index 0000000..3133a35 --- /dev/null +++ b/test/lib/path_utils.py @@ -0,0 +1,26 @@ +from os import PathLike +from pathlib import Path + + +def existing_file_arg(path: PathLike): + path = Path(path) + if not path.exists(): + raise ValueError(f'{path}: file does not exist.') + + if not path.is_file(): + raise ValueError(f'{path}: not a regular file.') + + return path + + +def nonexistent_or_empty_directory_arg(path): + path = Path(path) + + if path.exists(): + if not path.is_dir(): + raise ValueError(f'"{path}" must be a directory.') + + if next(path.iterdir(), None) is not None: + raise ValueError(f'"{path}" must be an empty directory.') + + return path \ No newline at end of file diff --git a/test/lib/prometheus.py b/test/lib/prometheus.py new file mode 100644 index 0000000..d0f15da --- /dev/null +++ b/test/lib/prometheus.py @@ -0,0 +1,393 @@ +import http.client +import json +import platform +import re +import signal +import ssl +import subprocess +import tarfile +import time +import typing as t +import urllib.request +import urllib.error +from contextlib import contextmanager +from datetime import datetime, timedelta +from functools import wraps +from io import TextIOWrapper +from pathlib import Path +from typing import List, NamedTuple, Optional, Union, TextIO +from urllib.parse import urlparse + +import appdirs +import click +import cloup +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization +import yaml +from tqdm import tqdm + +import logging + +from lib.ccm import TestCluster +from lib.click_helpers import fixup_kwargs + +from lib.net import SocketAddress + + +class _TqdmIOStream(object): + def __init__(self, stream, t): + self._stream = stream + self._t = t + + def read(self, size): + buf = self._stream.read(size) + self._t.update(len(buf)) + return buf + + def __enter__(self, *args, **kwargs): + self._stream.__enter__(*args, **kwargs) + return self + + def __exit__(self, *args, **kwargs): + self._stream.__exit__(*args, **kwargs) + + def __getattr__(self, attr): + return getattr(self._stream, attr) + + +class LocalPrometheusArchive(NamedTuple): + path: Path + + def extract(self, destination_directory: Path) -> Path: + archive_roots = set() + + with tarfile.open(self.path, mode='r') as archive: + for member in archive: + archive_roots.add(Path(member.name).parts[0]) + + archive.extract(member, destination_directory) + + return destination_directory / next(iter(archive_roots)) + + +class RemotePrometheusArchive(NamedTuple): + url: str + + logger = logging.getLogger(f'{__name__}.{__qualname__}') + + @classmethod + def for_tag(cls, tag: str): + def architecture_str(): + machine_aliases = { + 'x86_64': 'amd64' + } + + machine = platform.machine() + machine = machine_aliases.get(machine, machine) + + system = platform.system().lower() + + return f'{system}-{machine}' + + asset_pattern = re.compile(r'prometheus-.+\.' + architecture_str() + '\.tar\..+') + + with urllib.request.urlopen(f'https://api.github.com/repos/prometheus/prometheus/releases/{tag}') as response: + release_info = json.load(response) + + for asset in release_info['assets']: + if asset_pattern.fullmatch(asset['name']) is not None: + return RemotePrometheusArchive(asset['browser_download_url'], ) + + + @staticmethod + def default_download_cache_directory() -> Path: + return Path(appdirs.user_cache_dir('cassandra-exporter-e2e')) / 'prometheus' + + def download(self, download_cache_directory: Path = None) -> LocalPrometheusArchive: + if download_cache_directory is None: + download_cache_directory = RemotePrometheusArchive.default_download_cache_directory() + + url_parts = urlparse(self.url) + url_path = Path(url_parts.path) + + destination = download_cache_directory / url_path.name + destination.parent.mkdir(parents=True, exist_ok=True) + + if destination.exists(): + return LocalPrometheusArchive(destination) + + self.logger.info(f'Downloading {self.url} to {destination}...') + + try: + with tqdm(unit='bytes', unit_scale=True, miniters=1) as t: + def report(block_idx: int, block_size: int, file_size: int): + if t.total is None: + t.reset(file_size) + + t.update(block_size) + + urllib.request.urlretrieve(self.url, destination, report) + + except: + destination.unlink(missing_ok=True) # don't leave half-download files around + raise + + return LocalPrometheusArchive(destination) + + +def archive_from_path_or_url(purl: str) -> Union[LocalPrometheusArchive, RemotePrometheusArchive]: + url_parts = urlparse(purl) + + if url_parts.netloc == '': + return LocalPrometheusArchive(Path(purl)) + + return RemotePrometheusArchive(purl) + + +class PrometheusApi: + def __init__(self, address: SocketAddress, ssl_context: ssl.SSLContext): + self.address = address + self.ssl_context = ssl_context + + def _api_call(self, path): + with urllib.request.urlopen(f'https://{self.address}{path}', context=self.ssl_context) as response: + response_envelope = json.load(response) + + if response_envelope['status'] != 'success': + raise Exception(response.url, response.status, response_envelope) + + return response_envelope['data'] + + def get_targets(self): + return self._api_call('/api/v1/targets') + + def query(self, q): + return self._api_call(f'/api/v1/query?query={q}') + + +class PrometheusInstance: + logger = logging.getLogger(f'{__name__}.{__qualname__}') + + listen_address: SocketAddress + directory: Path = None + + process: subprocess.Popen = None + log_file: TextIO + + tls_key_path: Path + tls_cert_path: Path + ssl_context: ssl.SSLContext + + api: PrometheusApi + + def __init__(self, archive: LocalPrometheusArchive, working_directory: Path, + listen_address: SocketAddress = SocketAddress('localhost', 9090)): + self.directory = archive.extract(working_directory) + self.listen_address = listen_address + + self.setup_tls() + + self.api = PrometheusApi(listen_address, self.ssl_context) + + def setup_tls(self): + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048 + ) + + self.tls_key_path = (self.directory / 'tls_key.pem') + with self.tls_key_path.open('wb') as f: + f.write(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + )) + + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"AU"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Australian Capital Territory"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Canberra"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Instaclustr Pty Ltd"), + x509.NameAttribute(NameOID.COMMON_NAME, u"Temporary Prometheus Server Certificate"), + ]) + + cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + private_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.utcnow() + ).not_valid_after( + datetime.utcnow() + timedelta(days=1) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName(self.listen_address.host)]), + critical=False, + ).sign(private_key, hashes.SHA256()) # Sign certificate with private key + + self.tls_cert_path = (self.directory / 'tls_cert.pem') + with self.tls_cert_path.open('wb') as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + + self.ssl_context = ssl.SSLContext() + self.ssl_context.load_verify_locations(self.tls_cert_path) + self.ssl_context.verify_mode = ssl.VerifyMode.CERT_REQUIRED + + def start(self, wait=True): + web_config_path = (self.directory / 'web-config.yaml') + with web_config_path.open('w') as f: + config = { + 'tls_server_config': { + 'cert_file': str(self.tls_cert_path), + 'key_file': str(self.tls_key_path) + } + } + + yaml.safe_dump(config, f) + + self.log_file = (self.directory / 'prometheus.log').open('w') + + self.logger.info('Starting Prometheus...') + self.process = subprocess.Popen( + args=[str(self.directory / 'prometheus'), + f'--web.config.file={web_config_path}', + f'--web.listen-address={self.listen_address}'], + cwd=str(self.directory), + stdout=self.log_file, + stderr=subprocess.STDOUT + ) + + if wait: + self.wait_ready() + + self.logger.info('Prometheus started successfully') + + def stop(self): + self.logger.info('Stopping Prometheus...') + + if self.process is not None: + self.process.terminate() + + self.logger.info('Prometheus stopped successfully') + + def wait_ready(self): + self.logger.info('Waiting for Prometheus to become ready...') + while not self.is_ready(): + rc = self.process.poll() + if rc is not None: + raise Exception(f'Prometheus process {self.process.pid} exited unexpectedly with rc {rc} while waiting for ready state!') + + time.sleep(1) + + @contextmanager + def _modify_config(self): + config_file_path = self.directory / 'prometheus.yml' + + with config_file_path.open('r+') as stream: + config = yaml.safe_load(stream) + + yield config + + stream.seek(0) + stream.truncate() + + yaml.safe_dump(config, stream) + + if self.process is not None: + self.process.send_signal(signal.SIGHUP) + self.wait_ready() + + def set_static_scrape_config(self, job_name: str, static_targets: List[Union[str, SocketAddress]]): + with self._modify_config() as config: + config['scrape_configs'] = [{ + 'job_name': job_name, + 'scrape_interval': '10s', + 'static_configs': [{ + 'targets': [str(t) for t in static_targets] + }] + }] + + def is_ready(self): + try: + with urllib.request.urlopen(f'https://{self.listen_address}/-/ready', context=self.ssl_context) as response: + return response.status == 200 + + except urllib.error.HTTPError as e: + self.logger.debug('HTTP error while checking for ready state: %s', e) + return False + + except urllib.error.URLError as e: + self.logger.debug('urllib error while checking for ready state: %s', e) + if isinstance(e.reason, ConnectionRefusedError): + return False + + if isinstance(e.reason, ssl.SSLError): + self.logger.warning('SSL/TLS errors may mean that an instance of Prometheus (or some other server) is already listening on %s. Check the port.', self.listen_address) + + raise e + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + + if self.process is not None: + self.process.__exit__(exc_type, exc_val, exc_tb) + + if self.log_file is not None: + self.log_file.close() + + +def with_prometheus(): + def decorator(func: t.Callable) -> t.Callable: + @cloup.option_group( + "Prometheus Archive", + cloup.option('--prometheus-version', metavar='TAG'), + cloup.option('--prometheus-archive', metavar='PATH/URL'), + constraint=cloup.constraints.mutually_exclusive + ) + @click.pass_context + @wraps(func) + def wrapper(ctx: click.Context, + prometheus_version: str, + prometheus_archive: str, + working_directory: Path, + ccm_cluster: t.Optional[TestCluster] = None, + **kwargs): + + if prometheus_version is None and prometheus_archive is None: + prometheus_version = 'latest' + + if prometheus_version is not None: + archive = RemotePrometheusArchive.for_tag(prometheus_version) + + else: + archive = archive_from_path_or_url(prometheus_archive) + + if isinstance(archive, RemotePrometheusArchive): + archive = archive.download() + + prometheus = ctx.with_resource(PrometheusInstance( + archive=archive, + working_directory=working_directory + )) + + if ccm_cluster: + prometheus.set_static_scrape_config('cassandra', + [str(n.exporter_address) for n in ccm_cluster.nodelist()] + ) + + fixup_kwargs() + + func(prometheus=prometheus, **kwargs) + + return wrapper + + return decorator diff --git a/test/lib/prometheus_tests.py b/test/lib/prometheus_tests.py new file mode 100644 index 0000000..81a9eef --- /dev/null +++ b/test/lib/prometheus_tests.py @@ -0,0 +1,166 @@ +import contextlib +import http.server +import logging +import random +import socketserver +import tempfile +import threading +import time +import typing +import unittest +from collections import defaultdict +from datetime import datetime +from enum import Enum, auto +from functools import partial +from pathlib import Path +from typing import Dict + +from frozendict import frozendict + +from lib.net import SocketAddress +from lib.prometheus import PrometheusInstance, RemotePrometheusArchive + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(f'{__name__}') + + +ENDPOINT_ADDRESS = SocketAddress('localhost', 9500) + + +class EndpointMode(Enum): + RETURN_VALID_RESPONSE = auto() + RETURN_INVALID_RESPONSE = auto() + + +class TestMetricsHTTPHandler(http.server.BaseHTTPRequestHandler): + """A test HTTP endpoint for Prometheus to scrape.""" + + mode: EndpointMode + + def __init__(self, mode: EndpointMode, *args) -> None: + self.mode = mode + super().__init__(*args) + + def do_GET(self): + if self.path != '/metrics': + self.send_error(404) + + self.send_response(200) + self.end_headers() + + if self.mode == EndpointMode.RETURN_VALID_RESPONSE: + self.wfile.write(b'# TYPE test_family gauge\n' + b'test_family 123\n') + + elif self.mode == EndpointMode.RETURN_INVALID_RESPONSE: + self.wfile.write(b'# TYPE test_family gauge\n' + b'test_family123\n') + + else: + raise NotImplementedError(f'unknown mode {self.mode}') + + + + +# class TargetScrapeStatus(typing.NamedTuple): +# health: str +# lastError: str +# +# +# TargetsScrapeHistory = Dict[str, Dict[str, TargetScrapeStatus]] +# +# +# def collect_target_scrape_history(min_scrapes: int = 5) -> TargetsScrapeHistory: +# target_histories = defaultdict(dict) +# +# while True: +# targets = prometheus.api.get_targets() +# print(targets) +# +# for target in targets['activeTargets']: +# labels = frozendict(target['labels']) +# +# history = target_histories[labels] +# +# if target['health'] == 'unknown': +# # hasn't been scraped yet +# continue +# +# ts = target['lastScrape'] +# history[ts] = TargetScrapeStatus(target['health'], target['lastError']) +# +# # collect min_scrapes or more scrape statuses for each target +# if len(target_histories) > 0 and all([len(v) >= min_scrapes for v in target_histories.values()]): +# break +# +# time.sleep(1) +# +# return target_histories +# +# +# def is_target_healthy(target: str, scrape_history: TargetsScrapeHistory) -> bool: +# target_history = scrape_history[target] +# +# return len(target_history) and all([h.health == 'up' for h in target_history.values()]) + + + +# assert run_test(EndpointMode.RETURN_VALID_RESPONSE) is True +# assert run_test(EndpointMode.RETURN_INVALID_RESPONSE) is False + + +class TestMetricsHandlerTest(unittest.TestCase): + def test(self): + cm = contextlib.ExitStack() + + work_dir = Path(cm.enter_context(tempfile.TemporaryDirectory())) + + archive = RemotePrometheusArchive.for_tag('latest').download() + prometheus: PrometheusInstance = cm.enter_context(PrometheusInstance(archive, work_dir)) + + prometheus.start() + + + def run_test(mode: EndpointMode): + httpd = http.server.HTTPServer(ENDPOINT_ADDRESS, partial(TestMetricsHTTPHandler, mode)) + thread = threading.Thread(target=httpd.serve_forever, daemon=True) + + thread.start() + + try: + pass + # prometheus.set_static_scrape_config('test', [ENDPOINT_ADDRESS]) + # + # history = collect_target_scrape_history() + # print(history) + # return is_target_healthy('test', history) + + finally: + httpd.shutdown() + thread.join() + + +class ConcurrentPrometheusInstancesTest(unittest.TestCase): + def test_concurrent_instances(self): + """verify that trying to start a 2nd copy of prometheus fails. + prometheus + this is handled by creating a unique server tls cert for each instance and requiring a valid cert on connections. + if the api client connects to the wrong instance cert verification will fail and """ + cm = contextlib.ExitStack() # TODO: clean this up + + work_dir1 = Path(cm.enter_context(tempfile.TemporaryDirectory())) # TODO: make these delete only if no exception occured + work_dir2 = Path(cm.enter_context(tempfile.TemporaryDirectory())) + + archive = RemotePrometheusArchive.for_tag('latest').download() + prometheus1: PrometheusInstance = cm.enter_context(PrometheusInstance(archive, work_dir1)) + prometheus2: PrometheusInstance = cm.enter_context(PrometheusInstance(archive, work_dir2)) + + prometheus1.start() + + with self.assertRaisesRegex(Exception, 'certificate verify failed'): + prometheus2.start() + + + cm.close() + diff --git a/test/lib/schema.py b/test/lib/schema.py new file mode 100644 index 0000000..6b3921c --- /dev/null +++ b/test/lib/schema.py @@ -0,0 +1,51 @@ +import argparse +from dataclasses import dataclass +from os import PathLike +import typing as t + +import click +import yaml +from pathlib import Path +from collections import namedtuple + +from lib.path_utils import existing_file_arg + +@dataclass +class CqlSchema: + path: Path + statements: t.List[str] + + @classmethod + def from_path(cls, path: PathLike) -> 'CqlSchema': + path = existing_file_arg(path) + + with open(path, 'r') as f: + schema = yaml.load(f, Loader=yaml.SafeLoader) + + if not isinstance(schema, list): + raise ValueError(f'root of the schema YAML must be a list. Got a {type(schema).__name__}.') + + for i, o in enumerate(schema): + if not isinstance(o, str): + raise ValueError(f'schema YAML must be a list of statement strings. Item {i} is a {type(o).__name__}.') + + return cls(path, schema) + + @staticmethod + def default_schema_path() -> Path: + test_dir = Path(__file__).parents[1] + return test_dir / "schema.yaml" + + +class CqlSchemaParamType(click.ParamType): + name = "path" + + def convert(self, value: t.Any, param: t.Optional[click.Parameter], ctx: t.Optional[click.Context]) -> CqlSchema: + if isinstance(value, CqlSchema): + return value + + try: + return CqlSchema.from_path(value) + + except Exception as e: + self.fail(str(e), param, ctx) diff --git a/test/old/capture_dump.py b/test/old/capture_dump.py new file mode 100644 index 0000000..1fa4707 --- /dev/null +++ b/test/old/capture_dump.py @@ -0,0 +1,95 @@ +# spin up a multi-node CCM cluster with cassandra-exporter installed, apply a schema, and capture the metrics output + +import argparse +import contextlib +import logging +import os +import tempfile +import urllib.request +from pathlib import Path + +from lib.ccm import TestCluster +from lib.jar_utils import ExporterJar +from lib.schema import CqlSchema + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +def cluster_directory(path): + path = Path(path) + + if path.exists(): + if not path.is_dir(): + raise argparse.ArgumentTypeError(f'"{path}" must be a directory.') + + if next(path.iterdir(), None) is not None: + raise argparse.ArgumentTypeError(f'"{path}" must be an empty directory.') + + return path + + +def output_directory(path): + path = Path(path) + + if path.exists(): + if not path.is_dir(): + raise argparse.ArgumentTypeError(f'"{path}" must be a directory.') + + # the empty directory check is done later, since it can be skipped with --overwrite-output + + return path + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('cassandra_version', type=str, help="version of Cassandra to run", metavar="CASSANDRA_VERSION") + parser.add_argument('output_directory', type=output_directory, help="location to write metrics dumps", metavar="OUTPUT_DIRECTORY") + + parser.add_argument('-o', '--overwrite-output', action='store_true', help="don't abort when the output directory exists or is not empty") + + parser.add_argument('--cluster-directory', type=cluster_directory, help="location to install Cassandra. Must be empty or not exist. (default is a temporary directory)") + parser.add_argument('--keep-cluster-directory', type=bool, help="don't delete the cluster directory on exit") + parser.add_argument('--keep-cluster-running', type=bool, help="don't stop the cluster on exit (implies --keep-cluster-directory)") + + parser.add_argument('-d', '--datacenters', type=int, help="number of data centers (default: %(default)s)", default=2) + parser.add_argument('-r', '--racks', type=int, help="number of racks per data center (default: %(default)s)", default=3) + parser.add_argument('-n', '--nodes', type=int, help="number of nodes (default: %(default)s)", default=6) + + parser.add_argument('-j', '--exporter-jar', type=ExporterJar.from_path, help="location of the cassandra-exporter jar, either agent or standalone (default: %(default)s)", default=str(ExporterJar.default_jar_path())) + parser.add_argument('-s', '--schema', type=CqlSchema.from_path, help="CQL schema to apply (default: %(default)s)", default=str(CqlSchema.default_schema_path())) + + args = parser.parse_args() + + if args.cluster_directory is None: + args.cluster_directory = Path(tempfile.mkdtemp()) / "test-cluster" + + if args.output_directory.exists() and not args.overwrite_output: + if next(args.output_directory.iterdir(), None) is not None: + raise argparse.ArgumentTypeError(f'"{args.output_directory}" must be an empty directory.') + + os.makedirs(args.output_directory, exist_ok=True) + + with contextlib.ExitStack() as defer: + logger.info('Setting up Cassandra cluster.') + ccm_cluster = defer.push(TestCluster( + cluster_directory=args.cluster_directory, + cassandra_version=args.cassandra_version, + exporter_jar=args.exporter_jar, + nodes=args.nodes, racks=args.racks, datacenters=args.datacenters, + delete_cluster_on_stop=not args.keep_cluster_directory, + )) + + logger.info('Starting cluster.') + ccm_cluster.start() + + logger.info('Applying CQL schema.') + ccm_cluster.apply_schema(args.schema) + + logger.info('Capturing metrics dump.') + for node in ccm_cluster.nodelist(): + url = f'http://{node.ip_addr}:{node.exporter_port}/metrics?x-accept=text/plain' + destination = args.output_directory / f'{node.name}.txt' + urllib.request.urlretrieve(url, destination) + + logger.info(f'Wrote {url} to {destination}') diff --git a/test/old/create_demo_cluster.py b/test/old/create_demo_cluster.py new file mode 100644 index 0000000..505adee --- /dev/null +++ b/test/old/create_demo_cluster.py @@ -0,0 +1,88 @@ +# spin up a CCM cluster of the specified C* version and Exporter build. +# Useful for testing and demos. + +import argparse +import contextlib +import http.server +import logging +import random +import shutil +import sys +import tempfile +import time +from collections import defaultdict +from pathlib import Path + +import yaml +from frozendict import frozendict + +from lib.ccm import TestCluster +from lib.jar_utils import ExporterJar +from lib.path_utils import nonexistent_or_empty_directory_arg +from lib.prometheus import PrometheusInstance, RemotePrometheusArchive +from lib.schema import CqlSchema + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('cassandra_version', type=str, help="version of Cassandra to run", metavar="CASSANDRA_VERSION") + + parser.add_argument('-C', '--working-directory', type=nonexistent_or_empty_directory_arg, + help="location to install Cassandra and Prometheus. Must be empty or not exist. (default is a temporary directory)") + parser.add_argument('--keep-working-directory', help="don't delete the cluster directory on exit", + action='store_true') + + parser.add_argument('-d', '--datacenters', type=int, help="number of data centers (default: %(default)s)", + default=1) + parser.add_argument('-r', '--racks', type=int, help="number of racks per data center (default: %(default)s)", + default=3) + parser.add_argument('-n', '--nodes', type=int, help="number of nodes per data center rack (default: %(default)s)", + default=3) + + ExporterJar.add_jar_argument('--exporter-jar', parser) + CqlSchema.add_schema_argument('--schema', parser) + RemotePrometheusArchive.add_archive_argument('--prometheus-archive', parser) + + args = parser.parse_args() + + if args.working_directory is None: + args.working_directory = Path(tempfile.mkdtemp()) + + + def delete_working_dir(): + shutil.rmtree(args.working_directory) + + + with contextlib.ExitStack() as defer: + if not args.keep_working_directory: + defer.callback(delete_working_dir) # LIFO order -- this gets called last + + logger.info('Setting up Cassandra cluster.') + ccm_cluster = defer.push(TestCluster( + cluster_directory=args.working_directory / 'test-cluster', + cassandra_version=args.cassandra_version, + exporter_jar=args.exporter_jar, + nodes=args.nodes, racks=args.racks, datacenters=args.datacenters, + delete_cluster_on_stop=not args.keep_working_directory, + )) + + + + print('Prometheus scrape config:') + config = {'scrape_configs': [{ + 'job_name': 'cassandra', + 'scrape_interval': '10s', + 'static_configs': [{ + 'targets': [f'http://localhost:{node.exporter_port}' for node in ccm_cluster.nodelist()] + }] + }]} + + yaml.safe_dump(config, sys.stdout) + + ccm_cluster.start() + logger.info("Cluster is now running.") + + input("Press any key to stop cluster...") \ No newline at end of file diff --git a/test/old/debug_agent.py b/test/old/debug_agent.py new file mode 100644 index 0000000..ed51a36 --- /dev/null +++ b/test/old/debug_agent.py @@ -0,0 +1,85 @@ +# simple script to launch a single-node CCM cluster with the exporter agent installed, and the C* JVM +# configured to start the remote debugger agent + +import argparse +import os +from pathlib import Path + +from ccmlib.cluster import Cluster +from ccmlib.cluster_factory import ClusterFactory + +from lib.jar_utils import ExporterJar + + +def create_ccm_cluster(cluster_directory: Path, cassandra_version: str, node_count: int): + if cluster_directory.exists(): + cluster_directory.rmdir() # CCM wants to create this + + print('Creating cluster...') + ccm_cluster = Cluster( + path=cluster_directory.parent, + name=cluster_directory.name, + version=cassandra_version, + create_directory=True # if this is false, various config files wont be created... + ) + + ccm_cluster.populate(nodes=node_count) + + return ccm_cluster + + +def yesno_bool(b: bool): + return ('n', 'y')[b] + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('cassandra_version', type=str, help="version of Cassandra to run", metavar="CASSANDRA_VERSION") + parser.add_argument('cluster_directory', type=Path, help="location", metavar="CLUSTER_DIRECTORY") + + parser.add_argument('--jvm-debug-wait-attach', dest='jvm_debug_wait_attach', help="suspend JVM on startup and wait for debugger to attach", action='store_true') + parser.add_argument('--no-jvm-debug-wait-attach', dest='jvm_debug_wait_attach', help="suspend JVM on startup and wait for debugger to attach", action='store_false') + parser.add_argument('--jvm-debug-address', type=str, help="address/port for JVM debug agent to listen on", default='5005') + + parser.add_argument('--exporter-args', type=str, help="exporter agent arguments", default='-l:9500') + + ExporterJar.add_jar_argument('--exporter-jar', parser) + + parser.set_defaults(jvm_debug_wait_attach=True) + + args = parser.parse_args() + + print(f'Cluster directory is: {args.cluster_directory}') + + if not args.cluster_directory.exists() or \ + (args.cluster_directory.exists() and next(args.cluster_directory.iterdir(), None) is None): + + # non-existent or empty directory -- new cluster + ccm_cluster = create_ccm_cluster(args.cluster_directory, args.cassandra_version, node_count=1) + + else: + # existing, non-empty directory -- assume existing cluster + print('Loading cluster...') + ccm_cluster = ClusterFactory.load(args.cluster_directory.parent, args.cluster_directory.name) + + node = ccm_cluster.nodelist()[0] + print(f'Configuring node {node.name}') + + node.set_environment_variable('JVM_OPTS', f'-javaagent:{args.exporter_jar.path}={args.exporter_args} ' + f'-agentlib:jdwp=transport=dt_socket,server=y,suspend={yesno_bool(args.jvm_debug_wait_attach)},address={args.jvm_debug_address}') + + print(f'JVM remote debugger listening on {args.jvm_debug_address}. JVM will suspend on start.') + print('Starting single node cluster...') + + launch_bin = node.get_launch_bin() + args = [launch_bin, '-f'] + env = node.get_env() + + os.execve(launch_bin, args, env) + + + + + + + diff --git a/test/old/e2e_test.py b/test/old/e2e_test.py new file mode 100644 index 0000000..4b11f06 --- /dev/null +++ b/test/old/e2e_test.py @@ -0,0 +1,129 @@ +# this end-to-end test does the following: +# 1. download Prometheus (for the current platform) +# 2. setup a multi-node Cassandra cluster with the exporter installed +# 3. configure Prometheus to scrape from the Cassandra nodes +# 4. verifies that Prometheus successfully scrapes the metrics +# 5. cleans up everything + +import argparse +import contextlib +import http.server +import logging +import random +import shutil +import sys +import tempfile +import time +from collections import defaultdict +from pathlib import Path + +from frozendict import frozendict + +from lib.ccm import TestCluster +from lib.jar_utils import ExporterJar +from lib.path_utils import nonexistent_or_empty_directory_arg +from lib.prometheus import PrometheusInstance, RemotePrometheusArchive +from lib.schema import CqlSchema + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('cassandra_version', type=str, help="version of Cassandra to run", metavar="CASSANDRA_VERSION") + + parser.add_argument('-C', '--working-directory', type=nonexistent_or_empty_directory_arg, + help="location to install Cassandra and Prometheus. Must be empty or not exist. (default is a temporary directory)") + parser.add_argument('--keep-working-directory', help="don't delete the cluster directory on exit", + action='store_true') + + parser.add_argument('-d', '--datacenters', type=int, help="number of data centers (default: %(default)s)", + default=2) + parser.add_argument('-r', '--racks', type=int, help="number of racks per data center (default: %(default)s)", + default=3) + parser.add_argument('-n', '--nodes', type=int, help="number of nodes per data center rack (default: %(default)s)", + default=3) + + ExporterJar.add_jar_argument('--exporter-jar', parser) + CqlSchema.add_schema_argument('--' + 'schema', parser) + RemotePrometheusArchive.add_archive_argument('--prometheus-archive', parser) + + args = parser.parse_args() + + if args.working_directory is None: + args.working_directory = Path(tempfile.mkdtemp()) + + def delete_working_dir(): + shutil.rmtree(args.working_directory) + + with contextlib.ExitStack() as defer: + if not args.keep_working_directory: + defer.callback(delete_working_dir) # LIFO order -- this gets called last + + logger.info('Setting up Prometheus.') + prometheus = defer.push(PrometheusInstance( + archive=args.prometheus_archive, + working_directory=args.working_directory + )) + + logger.info('Setting up Cassandra cluster.') + ccm_cluster = defer.push(TestCluster( + cluster_directory=args.working_directory / 'test-cluster', + cassandra_version=args.cassandra_version, + exporter_jar=args.exporter_jar, + nodes=args.nodes, racks=args.racks, datacenters=args.datacenters, + delete_cluster_on_stop=not args.keep_working_directory, + )) + + # httpd = http.server.HTTPServer(("", 9500), DummyPrometheusHTTPHandler) + # threading.Thread(target=httpd.serve_forever, daemon=True).start() + # + # httpd = http.server.HTTPServer(("", 9501), DummyPrometheusHTTPHandler) + # threading.Thread(target=httpd.serve_forever, daemon=True).start() + + prometheus.set_static_scrape_config('cassandra', + list(map(lambda n: f'localhost:{n.exporter_port}', ccm_cluster.nodelist()))) + # prometheus.set_scrape_config('cassandra', ['localhost:9500', 'localhost:9501']) + prometheus.start() + + logger.info('Starting Cassandra cluster.') + ccm_cluster.start() + + logger.info('Applying CQL schema.') + ccm_cluster.apply_schema(args.schema) + + target_histories = defaultdict(dict) + + while True: + targets = prometheus.get_targets() + + if len(targets['activeTargets']) > 0: + for target in targets['activeTargets']: + labels = frozendict(target['labels']) + + # even if the target health is unknown, ensure the key exists so the length check below + # is aware of the target + history = target_histories[labels] + + if target['health'] == 'unknown': + continue + + history[target['lastScrape']] = (target['health'], target['lastError']) + + if all([len(v) >= 5 for v in target_histories.values()]): + break + + time.sleep(1) + + unhealthy_targets = dict((target, history) for target, history in target_histories.items() + if any([health != 'up' for (health, error) in history.values()])) + + if len(unhealthy_targets): + logger.error('One or more Prometheus scrape targets was unhealthy.') + logger.error(unhealthy_targets) + sys.exit(-1) diff --git a/test/old/e2e_test_tests.py b/test/old/e2e_test_tests.py new file mode 100644 index 0000000..ef5b19c --- /dev/null +++ b/test/old/e2e_test_tests.py @@ -0,0 +1,10 @@ +# Tests for the End-to-End Test! + +import pprint +import unittest +from metric_dump_tool import MetricsDump +import metric_dump_tool + + +class ValidationTests(unittest.TestCase): + pass \ No newline at end of file diff --git a/test/old/metric_dump_tool.py b/test/old/metric_dump_tool.py new file mode 100644 index 0000000..e197c99 --- /dev/null +++ b/test/old/metric_dump_tool.py @@ -0,0 +1,228 @@ +import argparse +import io + +import ccmlib.cluster +import os +import urllib.request +import re +from collections import namedtuple, defaultdict, Counter +from enum import Enum, auto, unique +from frozendict import frozendict +import itertools + +from prometheus_client.parser import text_fd_to_metric_families +import prometheus_client.samples + + +class MetricsDump(namedtuple('MetricsDump', ['path', 'metric_families'])): + __slots__ = () + + @classmethod + def from_file(cls, path): + with open(path, 'rt', encoding='utf-8') as fd: + return MetricsDump.from_lines(fd) + + @classmethod + def from_lines(cls, lines): + with io.StringIO(lines) as fd: + return MetricsDump.from_fd(fd) + + @classmethod + def from_fd(cls, fd): + def parse_lines(): + for family in text_fd_to_metric_families(fd): + # freeze the labels dict so its hashable and the keys can be used as a set + family.samples = [sample._replace(labels=frozendict(sample.labels)) for sample in family.samples] + + yield family + + metric_families = list(parse_lines()) + + path = '' + if isinstance(fd, io.BufferedReader): + path = fd.name + + return MetricsDump(path, metric_families) + + +ValidationResult = namedtuple('ValidationResult', ['duplicate_families', 'duplicate_samples']) +DiffResult = namedtuple('DiffResult', ['added_families', 'removed_families', 'added_samples', 'removed_samples']) + +# patch Sample equality & hash so that only name + labels are the identity (ignore value, timestamp, etc) +prometheus_client.samples.Sample.__eq__ = lambda self, o: (isinstance(o, prometheus_client.samples.Sample) and self.name == o.name and self.labels == o.labels) +prometheus_client.samples.Sample.__hash__ = lambda self: hash((self.name, self.labels)) + + + + +def validate_dump(dump: MetricsDump) -> ValidationResult: + def find_duplicate_families(): + def family_name_key_fn(f): + return f.name + + families = sorted(dump.metric_families, key=family_name_key_fn) # sort by name + family_groups = itertools.groupby(families, key=family_name_key_fn) # group by name + family_groups = ((k, list(group)) for k, group in family_groups) # convert groups to lists + + return {name: group for name, group in family_groups if len(group) > 1} + + def find_duplicate_samples(): + samples = itertools.chain(family.samples for family in dump.metric_families) + #sample_groups = + + return + + + return ValidationResult( + duplicate_families=find_duplicate_families(), + duplicate_samples=find_duplicate_samples() + ) + + # duplicate_metric_families = [key for key, value + # in Counter([metric.name for metric in families]).items() + # if value > 1] + + # if len(duplicate_metric_families): + # print('The following metric families are duplicated:') + # for family_name in duplicate_metric_families: + # print(f'\t{family_name}') + + + # # find duplicate samples + # for family in args.dump.metric_families: + # duplicate_samples = [key for key, value + # in Counter(family.samples).items() + # if value > 1] + # + # if len(duplicate_samples) == 0: + # continue + # + # print(f'Metric family "{family.name}" contains duplicate samples:') + # + # for sample in duplicate_samples: + # print(f'\t{sample}') + + +def validate_dump_entrypoint(args): + result = validate_dump(args.dump) + + if len(result.duplicate_families): + print('The following metric families are duplicated:') + + for name, group in result.duplicate_families.items(): + print(f'\t{name}') + + pass + + + +def diff_dump(from_dump: MetricsDump, to_dump): + def diff_families(): + from_families = [(metric.name, metric.type) for metric in from_dump.metric_families] + to_families = [(metric.name, metric.type) for metric in to_dump.metric_families] + + pass + + diff_families() + + return DiffResult([], [], [], []) + +def diff_dump_entrypoint(args): + pass + + + + + + +def prometheus_metrics(path): + try: + return MetricsDump.from_file(path) + + except Exception as e: + raise argparse.ArgumentTypeError(f"error while parsing '{path}': {e}") from e + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + + validate_parser = subparsers.add_parser('validate', help='validate a metrics dump for common problems') + validate_parser.add_argument("dump", type=prometheus_metrics, metavar="DUMP") + validate_parser.set_defaults(entrypoint=validate_dump_entrypoint) + + diff_parser = subparsers.add_parser('diff', help='diff two metrics dumps') + diff_parser.add_argument("from", type=prometheus_metrics, metavar="FROM") + diff_parser.add_argument("to", type=prometheus_metrics, metavar="TO") + diff_parser.set_defaults(entrypoint=diff_dump_entrypoint) + + + args = parser.parse_args() + args.entrypoint(args) + + + + + + + + + + +# def index_metrics(metrics): +# for metric in metrics: +# metric.samples = {sample.labels: sample for sample in metric.samples} +# +# return {metric.name: metric for metric in metrics} +# +# +# # index the metrics (convert lists to dicts) -- this removes duplicated families/samples +# from_metric_families = index_metrics(known_metric_families) +# to_metric_families = index_metrics(latest_metric_families) +# +# # find differences +# known_names = set(known_metric_families.keys()) +# latest_names = set(latest_metric_families.keys()) +# +# removed_names = known_names.difference(latest_names) +# if len(removed_names): +# print('The following metric families no longer exist:') +# for name in removed_names: +# print(f'\t{name}') +# +# added_names = latest_names.difference(known_names) +# if len(added_names): +# print('The following metric families are new:') +# for name in added_names: +# print(f'\t{name}') +# +for name in latest_names.intersection(known_names): +# known_metric = known_metric_families[name] +# latest_metric = latest_metric_families[name] +# +# known_labels = set(known_metric.samples.keys()) +# latest_labels = set(latest_metric.samples.keys()) +# +# removed_labels = known_labels.difference(latest_labels) +# if len(removed_labels): +# print(f'The following samples no longer exist for metric family "{name}":') +# for labels in removed_labels: +# print(f'\t{labels}') +# +# added_labels = latest_labels.difference(known_labels) +# if len(added_labels): +# print(f'The following samples are new for metric family "{name}":') +# for labels in added_labels: +# print(f'\t{labels}') +# +# +# pass +# +# +# pass + + +# +# # cluster.stop() +# +# # cluster.set_environment_variable() \ No newline at end of file diff --git a/test/old/metric_dump_tool_tests.py b/test/old/metric_dump_tool_tests.py new file mode 100644 index 0000000..b3d1f88 --- /dev/null +++ b/test/old/metric_dump_tool_tests.py @@ -0,0 +1,102 @@ +import unittest +from metric_dump_tool import MetricsDump +import metric_dump_tool + + +class ValidationTests(unittest.TestCase): +# def test_invalid_input(self): +# """ +# Test the +# """ +# data = """ +# busted busted busted +# """ +# +# with self.assertRaises(ValueError): +# metric_dump_tool.MetricsDump.from_lines(data) + + def test_duplicate_families(self): + """ + Test that validation finds duplicated metric families + """ + dump = MetricsDump.from_lines(""" +# TYPE test_family_a counter +test_family_a {} 1234 1234 + +test_family_b {} 0 0 + +# TYPE test_family_a gauge +test_family_a {} 5678 1234 + +# the following are duplicate samples, not duplicate families +# TYPE test_family_c gauge +test_family_c {} 1234 1234 +test_family_c {} 1234 1234 + +# the following are duplicate families +test_family_d {abc="123"} 0 0 +test_family_d {abc="456"} 0 0 + """) + + result = metric_dump_tool.validate_dump(dump) + + self.assertIn('test_family_a', result.duplicate_families) + self.assertIn('test_family_d', result.duplicate_families) + self.assertNotIn('test_family_b', result.duplicate_families) + self.assertNotIn('test_family_c', result.duplicate_families) + + def test_duplicate_samples(self): + """ + Test that validation finds duplicated metric families + """ + dump = MetricsDump.from_lines(""" +# TYPE test_family_a gauge +test_family_a {hello="world"} 1234 1234 +test_family_a {hello="world"} 1234 1234 + """) + + result = metric_dump_tool.validate_dump(dump) + + self.assertIn('test_family_a', result.duplicate_families) + self.assertNotIn('test_family_b', result.duplicate_families) + + +class DiffTests(unittest.TestCase): + def test_added_families(self): + from_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 + """) + + to_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 +test_family_a {hello="universe"} 0 0 + +test_family_b {} 0 0 + """) + + result = metric_dump_tool.diff_dump(from_dump, to_dump) + + self.assertIn('test_family_b', result.added_families) + self.assertNotIn('test_family_a', result.added_families) + + def test_removed_families(self): + from_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 +test_family_a {hello="universe"} 0 0 + +test_family_b {} 0 0 + """) + + to_dump = MetricsDump.from_lines(""" +test_family_a {hello="world"} 0 0 + """) + + result = metric_dump_tool.diff_dump(from_dump, to_dump) + + self.assertIn('test_family_b', result.removed_families) + self.assertNotIn('test_family_a', result.removed_families) + + + +if __name__ == '__main__': + unittest.main() diff --git a/test/pyproject.toml b/test/pyproject.toml new file mode 100644 index 0000000..fa0eeec --- /dev/null +++ b/test/pyproject.toml @@ -0,0 +1,7 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] + +[project] +name = "cassandra-exporter" +requires-python = ">=3.8" +dynamic = ["version", "description", "authors", "dependencies"] \ No newline at end of file diff --git a/test/schema.yaml b/test/schema.yaml new file mode 100644 index 0000000..046f1ed --- /dev/null +++ b/test/schema.yaml @@ -0,0 +1,22 @@ +- > + CREATE KEYSPACE example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +- > + CREATE TABLE example.metric_families ( + name text, + type text, + help text, + PRIMARY KEY (name) + ) +- > + CREATE INDEX ON example.metric_families (type) +- > + CREATE TABLE example.numeric_metrics ( + family text, + labels frozen>, + bucket date, + time timestamp, + value float, + + PRIMARY KEY ((family, labels, bucket), time) + ) + diff --git a/test/setup.py b/test/setup.py new file mode 100644 index 0000000..9a4a66c --- /dev/null +++ b/test/setup.py @@ -0,0 +1,13 @@ +from setuptools import setup + +setup( + name='cassandra-exporter-e2e-tests', + version='1.0', + description='End-to-end testing tools for cassandra-exporter', + author='Adam Zegelin', + author_email='adam@instaclustr.com', + packages=['lib', 'tools'], + install_requires=['ccm', 'prometheus_client', + 'cassandra-driver', 'frozendict', 'pyyaml', 'tqdm', 'click', + 'cloup', 'appdirs', 'cryptography'], +) diff --git a/test/test_tool.py b/test/test_tool.py new file mode 100644 index 0000000..198d2ae --- /dev/null +++ b/test/test_tool.py @@ -0,0 +1,132 @@ +import logging +import os +import sys +import time +import typing as t +from itertools import chain + +import pkg_resources + +import cloup + +from lib.ccm import TestCluster, with_ccm_cluster +from lib.click_helpers import with_working_directory +from lib.prometheus import PrometheusInstance, with_prometheus +from tools.dump import dump + +logger = logging.getLogger('test-tool') + + + + +@cloup.group() +def cli(): + pass + + +@cli.command('demo') +@with_working_directory() +@with_ccm_cluster() +def run_demo_cluster(ccm_cluster: TestCluster, **kwargs): + """ + Start a Cassandra cluster with cassandra-exporter installed (agent or standalone). + Optionally setup a schema. + Wait for ctrl-c to shut everything down. + """ + ccm_cluster.start() + + for node in ccm_cluster.nodelist(): + logger.info('Node %s cassandra-exporter running on http://%s', node.name, node.exporter_address) + + sys.stderr.flush() + sys.stdout.flush() + + input("Press any key to stop cluster...") + + + + +@cli.command() +@with_working_directory() +@with_ccm_cluster() +@with_prometheus() +def e2e(ccm_cluster: TestCluster, prometheus: PrometheusInstance, **kwargs): + """ + Run cassandra-exporter end-to-end tests. + + - Start C* with the exporter JAR (agent or standalone). + - Setup a schema. + - Configure and start prometheus. + - Wait for all scrape targets to get healthy. + - Run some tests. + """ + + ccm_cluster.start() + + prometheus.start() + + for node in ccm_cluster.nodelist(): + logger.info('Node %s cassandra-exporter running on http://%s', node.name, node.exporter_address) + + logger.info("Prometheus running on: https://%s", prometheus.listen_address) + + input("Press any key to stop cluster...") + + while True: + targets = prometheus.api.get_targets() + + pass + + # if len(targets['activeTargets']) > 0: + # for target in targets['activeTargets']: + # labels = frozendict(target['labels']) + # + # # even if the target health is unknown, ensure the key exists so the length check below + # # is aware of the target + # history = target_histories[labels] + # + # if target['health'] == 'unknown': + # continue + # + # history[target['lastScrape']] = (target['health'], target['lastError']) + # + # if all([len(v) >= 5 for v in target_histories.values()]): + # break + + time.sleep(1) + + # unhealthy_targets = dict((target, history) for target, history in target_histories.items() + # if any([health != 'up' for (health, error) in history.values()])) + # + # if len(unhealthy_targets): + # logger.error('One or more Prometheus scrape targets was unhealthy.') + # logger.error(unhealthy_targets) + # sys.exit(-1) + + + +@cli.command('benchmark') +@with_working_directory() +@with_ccm_cluster() +def benchmark(ccm_cluster: TestCluster, **kwargs): + """""" + pass + + + +cli.add_command(dump) + + +def main(): + os.environ['CCM_JAVA8_DEBUG'] = 'please' + logging.basicConfig(level=logging.DEBUG) + + # load ccm extensions (useful for ccm-java8, for example) + for entry_point in pkg_resources.iter_entry_points(group='ccm_extension'): + entry_point.load()() + + cli() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/tools/dump.py b/test/tools/dump.py new file mode 100644 index 0000000..eb5c318 --- /dev/null +++ b/test/tools/dump.py @@ -0,0 +1,105 @@ +import logging +from pathlib import Path + +import yaml +import urllib.request +import typing as t + +from lib.ccm import TestCluster, with_ccm_cluster +from lib.click_helpers import with_working_directory + +import click +import cloup + +logger = logging.getLogger('dump') + +@cloup.group('dump') +def dump(): + """Commands to capture, validate and diff metrics dumps""" + + +DUMP_MANIFEST_NAME = 'dump-manifest.yaml' + + +@dump.command('capture') +@with_working_directory() +@with_ccm_cluster() +@click.argument('destination') +def dump_capture(ccm_cluster: TestCluster, destination: Path, **kwargs): + """Start a Cassandra cluster, capture metrics from each node's cassandra-exporter and save them to disk.""" + + ccm_cluster.start() + + destination = Path(destination) + destination.mkdir(exist_ok=True) + + logger.info('Capturing metrics dump to %s...', destination) + + with (destination / DUMP_MANIFEST_NAME).open('w') as f: + manifest = { + 'version': '20221207', + 'cassandra': { + 'version': ccm_cluster.version(), + 'topology': { + 'nodes': {n.name: { + 'rack': n.rack, + 'datacenter': n.data_center, + 'ip': n.ip_addr + } for n in ccm_cluster.nodelist()} + } + }, + 'exporter': { + 'version': 'unknown' + } + } + + yaml.safe_dump(manifest, f) + + for node in ccm_cluster.nodelist(): + for mimetype, ext in (('text/plain', 'txt'), ('application/json', 'json')): + url = f'http://{node.exporter_address}/metrics?x-accept={mimetype}' + download_path = destination / f'{node.name}-metrics.{ext}' + + urllib.request.urlretrieve(url, download_path) + + logger.info(f'Wrote {url} to {download_path}') + + +class DumpPathParamType(click.ParamType): + name = 'dump' + + def convert(self, value: t.Any, param: t.Optional[click.Parameter], ctx: t.Optional[click.Context]) -> t.Any: + if isinstance(value, Path): + return value + + p = Path(value) + if p.is_file(): + p = p.parent + + manifest = p / DUMP_MANIFEST_NAME + if not manifest.exists(): + self.fail(f'{p}: not a valid dump: {manifest} does not exist.', param, ctx) + + return p + + +@dump.command('validate') +@click.argument('dump', type=DumpPathParamType()) +def dump_validate(dump: Path, **kwargs): + """Validate a metrics dump using Prometheus's promtool""" + pass + + +@dump.command('diff') +@click.argument('dump1', type=DumpPathParamType()) +@click.argument('dump2', type=DumpPathParamType()) +def dump_diff(dump1: Path, dump2: Path): + """Compare two metrics dumps and output the difference""" + pass + + + +# capture dump (start C* with exporter, fetch and write metrics to file) +# this is every similar to the demo cmd +# validate dump (check for syntax errors, etc) +# compare/diff dump (list metrics added & removed)