Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes experimentation #1054

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
19007a5
Unstructured configs that configure and deploy things to k8s
gregschohn Sep 1, 2024
3e7d858
more k8s & helm experimentation.
gregschohn Sep 2, 2024
785312e
Another checkpoint in slowly (& inconsistently) getting more services…
gregschohn Sep 5, 2024
10fac7d
K8s Checkpoint. With minimal testing, the proxy, ES/OS, grafana seem…
gregschohn Sep 5, 2024
f79142e
K8s checkpoint. Working on getting kafka resources deployed as a hel…
gregschohn Sep 7, 2024
0bffeca
Merge branch 'main' into KubernetesExperimentation
gregschohn Sep 30, 2024
0e201c2
Minor cleanup
gregschohn Oct 1, 2024
9c6f6f4
another checkpoint
gregschohn Oct 3, 2024
e33f89a
Merge remote-tracking branch 'schohn/KubernetesExperimentation' into …
lewijacn Oct 10, 2024
1cd8782
Experiment with k8 structure pattern
lewijacn Oct 11, 2024
4abd50e
Partial working state for test environment
lewijacn Oct 12, 2024
cdbe2b5
Checkpoint - Basic replayer functionality
lewijacn Oct 14, 2024
03e3431
Update services.yaml endpoint
lewijacn Oct 14, 2024
032227b
Basic working Replayer and RFS in local k8
lewijacn Oct 16, 2024
8054cdc
Swap out to use Kafka operator
lewijacn Oct 16, 2024
53057ae
Add opensearch operator (not working)
lewijacn Oct 17, 2024
8614014
Checkpoint after moving to shared volume charts, otel collector in sh…
lewijacn Oct 20, 2024
e174444
Add documentation for getting started with K8 and minor changes
lewijacn Oct 22, 2024
29ff913
another checkpoint
gregschohn Oct 3, 2024
cc31d0d
Start structure change
lewijacn Oct 24, 2024
9b6115e
Merge branch 'main' into KubernetesExperimentation
gregschohn Oct 24, 2024
5981d40
Merge branch 'KubernetesExperimentation' of github.com:gregschohn/ope…
gregschohn Oct 24, 2024
233ac38
Checkpoint 10-24 fill in structure more, a couple more services need …
lewijacn Oct 24, 2024
b1567d4
Modify volume structure and checkpoint
lewijacn Oct 25, 2024
76cab7f
Checkpoint probably working state after refactoring
lewijacn Oct 25, 2024
00001e4
Reorder MA helm chart dependencies
gregschohn Oct 27, 2024
c70cf6d
Merge branch 'tanners-initial-k8' into KubernetesExperimentation
gregschohn Oct 27, 2024
f9751f9
Move original captureProxy chart from k8s and fold the k8 version int…
gregschohn Oct 30, 2024
38799c3
Cleanup to refactor the common helm helpers and to try to support ins…
gregschohn Oct 31, 2024
e6aa8eb
Simple but useful template that can be used JUST to make values templ…
gregschohn Oct 31, 2024
59b0c62
Canonicalize/merge the charts for replayer, PVC components, and MA (o…
gregschohn Oct 31, 2024
4ccc3f1
Checking bugfixes so that the proxy can startup with helm install com…
gregschohn Oct 31, 2024
296886d
excise the roots for the jaeger and prometheus localTesting values so…
gregschohn Oct 31, 2024
ff16308
Checkpoint - Further changes that destabilize the helm charts but are…
gregschohn Nov 5, 2024
ef3018a
Checkpoint that's working on introducing global/shared configuration …
gregschohn Nov 5, 2024
901cbfa
Move helmCommon into sharedResources directory
gregschohn Nov 6, 2024
bc5ff89
Add camel case parameter names in addition to snake case ones.
gregschohn Nov 9, 2024
156142c
Checkpoint - BulkLoad, Console, and Replayer are coming along as K8s …
gregschohn Nov 9, 2024
2ab2578
Checkpoint to start cleaning up namespaces and unifying namespace man…
gregschohn Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,93 +65,93 @@ public static class Args {
private boolean help;

@Parameter(required = true,
names = { "--snapshot-name" },
names = { "--snapshot-name", "--snapshotName" },
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(required = false,
names = { "--snapshot-local-dir" },
names = { "--snapshot-local-dir", "--snapshotLocalDir" },
description = ("The absolute path to the directory on local disk where the snapshot exists. " +
"Use this parameter if have a copy of the snapshot disk. Mutually exclusive with " +
"--s3-local-dir, --s3-repo-uri, and --s3-region."))
public String snapshotLocalDir = null;

@Parameter(required = false,
names = { "--s3-local-dir" },
names = { "--s3-local-dir", "--s3LocalDir" },
description = ("The absolute path to the directory on local disk to download S3 files to. " +
"If you supply this, you must also supply --s3-repo-uri and --s3-region. " +
"Mutually exclusive with --snapshot-local-dir."))
public String s3LocalDir = null;

@Parameter(required = false,
names = {"--s3-repo-uri" },
names = {"--s3-repo-uri", "--s3RepoUri" },
description = ("The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2. " +
"If you supply this, you must also supply --s3-local-dir and --s3-region. " +
"Mutually exclusive with --snapshot-local-dir."))
public String s3RepoUri = null;

@Parameter(required = false,
names = { "--s3-region" },
names = { "--s3-region", "--s3Region" },
description = ("The AWS Region the S3 bucket is in, like: us-east-2. If you supply this, you must"
+ " also supply --s3-local-dir and --s3-repo-uri. Mutually exclusive with --snapshot-local-dir."))
public String s3Region = null;

@Parameter(required = true,
names = { "--lucene-dir" },
names = { "--lucene-dir", "--luceneDir" },
description = "The absolute path to the directory where we'll put the Lucene docs")
public String luceneDir;

@ParametersDelegate
public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

@Parameter(required = false,
names = { "--index-allowlist" },
names = { "--index-allowlist", "--indexAllowlist" },
description = ("Optional. List of index names to migrate (e.g. 'logs_2024_01, logs_2024_02'). " +
"Default: all non-system indices (e.g. those not starting with '.')"))
public List<String> indexAllowlist = List.of();

@Parameter(required = false,
names = { "--max-shard-size-bytes" },
names = { "--max-shard-size-bytes", "--maxShardSizeBytes" },
description = ("Optional. The maximum shard size, in bytes, to allow when " +
"performing the document migration. " +
"Useful for preventing disk overflow. Default: 80 * 1024 * 1024 * 1024 (80 GB)"))
public long maxShardSizeBytes = 80 * 1024 * 1024 * 1024L;

@Parameter(required = false,
names = { "--initial-lease-duration" },
names = { "--initial-lease-duration", "--initialLeaseDuration" },
converter = DurationConverter.class,
description = "Optional. The time that the first attempt to migrate a shard's documents should take. " +
"If a process takes longer than this the process will terminate, allowing another process to " +
"attempt the migration, but with double the amount of time than the last time. Default: PT10M")
public Duration initialLeaseDuration = Duration.ofMinutes(10);

@Parameter(required = false,
names = { "--otel-collector-endpoint" },
names = { "--otel-collector-endpoint", "--otelCollectorEndpoint" },
arity = 1,
description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
+ "forwarded. If no value is provided, metrics will not be forwarded.")
String otelCollectorEndpoint;

@Parameter(required = false,
names = "--documents-per-bulk-request",
names = {"--documents-per-bulk-request", "--documentsPerBulkRequest"},
description = "Optional. The number of documents to be included within each bulk request sent. " +
"Default no max (controlled by documents size)")
int numDocsPerBulkRequest = Integer.MAX_VALUE;

@Parameter(required = false,
names = "--documents-size-per-bulk-request",
names = { "--documents-size-per-bulk-request", "--documentsSizePerBulkRequest" },
description = "Optional. The maximum aggregate document size to be used in bulk requests in bytes. " +
"Note does not apply to single document requests. Default 10 MiB")
long numBytesPerBulkRequest = 10 * 1024L * 1024L;

@Parameter(required = false,
names = "--max-connections",
names = {"--max-connections", "--maxConnections" },
description = "Optional. The maximum number of connections to simultaneously " +
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(required = true,
names = { "--source-version" },
names = { "--source-version", "--sourceVersion" },
converter = VersionConverter.class,
description = ("Version of the source cluster."))
public Version sourceVersion = Version.fromString("ES 7.10");
Expand Down Expand Up @@ -191,7 +191,7 @@ public static void validateArgs(Args args) {
public static void main(String[] args) throws Exception {
// TODO: Add back arg printing after not consuming plaintext password MIGRATIONS-1915
var workerId = ProcessHelpers.getNodeInstanceName();
log.info("Starting RfsMigrateDocuments with workerId =" + workerId);
log.info("Starting RfsMigrateDocuments with workerId=" + workerId);

Args arguments = new Args();
JCommander jCommander = JCommander.newBuilder().addObject(arguments).build();
Expand Down
2 changes: 2 additions & 0 deletions TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
def dockerFilesForExternalServices = [
"elasticsearch_searchguard": "elasticsearchWithSearchGuard",
"capture_proxy_base": "captureProxyBase",
"service_yaml_from_config_maps": "serviceYamlFromConfigMaps",
"elasticsearch_client_test_console": "elasticsearchTestConsole",
"migration_console": "migrationConsole",
"otel_collector": "otelCollector",
Expand Down Expand Up @@ -125,6 +126,7 @@ dockerCompose {
}

task buildDockerImages {
dependsOn buildDockerImage_service_yaml_from_config_maps
dependsOn buildDockerImage_elasticsearch_searchguard
dependsOn buildDockerImage_migration_console
dependsOn buildDockerImage_otel_collector
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
source_cluster:
endpoint: "https://capture-proxy:9200"
endpoint: "https://capture-proxy:9201"
allow_insecure: true
basic_auth:
username: "admin"
password: "admin"
target_cluster:
endpoint: "https://opensearchtarget:9200"
endpoint: "https://opensearch-cluster-master:9200"
allow_insecure: true
basic_auth:
username: "admin"
Expand All @@ -19,14 +19,14 @@ backfill:
replay:
docker:
snapshot:
snapshot_name: "snapshot_2023_01_01"
snapshot_name: "rfs-snapshot"
fs:
repo_path: "/snapshot/test-console"
otel_endpoint: "http://otel-collector:4317"
repo_path: "/storage/snapshot"
otel_endpoint: "http://localhost:4317"
metadata_migration:
from_snapshot:
min_replicas: 0
otel_endpoint: "http://otel-collector:4317"
otel_endpoint: "http://localhost:4317"
kafka:
broker_endpoints: "kafka:9092"
broker_endpoints: "kafka-cluster-kafka-bootstrap:9092"
standard: ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM amazonlinux:2023

ENV PIP_ROOT_USER_ACTION ignore
ENV LANG C.UTF-8

RUN dnf install -y \
jq \
less \
python3.11 \
python3.11-devel \
python3.11-pip \
python3.11-wheel \
tar \
unzip \
vim \
wget \
&& \
dnf clean all && \
rm -rf /var/cache/dnf

# Define the virtual environment path to use for all pipenv runs
ENV WORKON_HOME=/
ENV PIPENV_CUSTOM_VENV_NAME=.venv
ENV PIPENV_DEFAULT_PYTHON_VERSION=3.11
ENV PIPENV_MAX_DEPTH=1

RUN python3.11 -m pip install pipenv
WORKDIR /
RUN python3.11 -m venv .venv

WORKDIR /root
COPY Pipfile .
COPY Pipfile.lock .
RUN pipenv install --deploy

COPY configmap2yaml/* /root/
RUN chmod ug+x /root/*.py

ENTRYPOINT ["/.venv/bin/python", "config_watcher.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
kubernetes = ">=30.1.0"
pyyaml = ">=6.0.2"
Jinja2 = ">=3.1.4"

[dev-packages]

[requires]
python_version = "3.11"
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
```mermaid
graph
Console --> SharedConfigs
Proxy[Proxy Chart] --> SharedConfigs
Proxy --> singleUseConfigs[Direct Single Use ConfigsListenPort]
Replayer[Replayer Chart] --> | c | SharedConfigs
SharedConfigs --> Kafka
Kafka --> KafkaExample[kafkaBroker:\n..brokerEndpoints: kafka-cluster-kafka-bootstrap:9092\n..auth: aws ]
style KafkaExample text-align:left
SharedConfigs --> SourceCluster
SharedConfigs --> OtelEndpoint
RegistrySidecar --> SharedConfigs
subgraph SharedConfigs[Global Configs Chart]
subgraph ConfigMaps
SourceCluster
OtelEndpoint
Kafka
end
end
Console[Migration Console] --> else
```

.......................................................................................................................................................................................................................,,ffdf
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import argparse
from format_services_yaml import YAMLTemplateConverter
from io import StringIO
from kubernetes import client, config, watch
import logging
import os
import signal
import tempfile
from typing import Dict, Any
import sys
import yaml


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ConfigMapWatcher:
def __init__(self, label_selector: str, namespace: str, output_file: str):
self.label_selector = label_selector
self.namespace = namespace
self.output_file = output_file
self.current_data: Dict[str, Any] = {}
self.formatter = YAMLTemplateConverter()

# Validate output file path
output_dir = os.path.dirname(output_file)
if not os.path.exists(output_dir):
raise ValueError(f"Output directory does not exist: {output_dir}")
if not os.access(output_dir, os.W_OK):
raise ValueError(f"Output directory is not writable: {output_dir}")

try:
config.load_incluster_config()
except config.ConfigException:
logger.warning("Unable to load in-cluster config, falling back to local kubeconfig")
config.load_kube_config()

self.v1 = client.CoreV1Api()

def update_yaml_file(self) -> None:
"""Update the output YAML file with new ConfigMap data"""
try:
# Create a temporary file in the same directory as the target file
output_dir = os.path.dirname(self.output_file)
with tempfile.NamedTemporaryFile(mode='w', dir=output_dir, delete=False) as temp_file:
YAMLTemplateConverter().convert(StringIO(yaml.safe_dump(self.current_data)), temp_file)
temp_file.flush()
os.fsync(temp_file.fileno()) # Ensure all data is written to disk

# Atomic rename
os.rename(temp_file.name, self.output_file)
logger.info(f"Updated {self.output_file} with new configuration")
except Exception as e:
logger.error(f"Error updating YAML file: {e}")
# Clean up temporary file if it exists
if 'temp_file' in locals():
try:
os.unlink(temp_file.name)
except OSError:
pass
raise

def watch_configmaps(self) -> None:
"""Watch ConfigMaps for changes and write the contents at startup and upon an configMap changes"""
w = watch.Watch()

# First, get existing ConfigMaps
logger.info(f"Loading existing ConfigMaps for {self.namespace} and {self.label_selector}")
existing_configmaps = self.v1.list_namespaced_config_map(
namespace=self.namespace,
label_selector=self.label_selector
)
logger.info(f"Got configmaps: {existing_configmaps}")
for configmap in existing_configmaps.items:
logger.info(f"configmap={configmap}")
self.current_data[configmap.metadata.name] = configmap.data if configmap.data else {}

self.update_yaml_file()

# Then watch for changes
try:
for event in w.stream(
self.v1.list_namespaced_config_map,
namespace=self.namespace,
label_selector=self.label_selector
):
configmap = event['object']
event_type = event['type']

if event_type in ['ADDED', 'MODIFIED']:
self.current_data[configmap.metadata.name] = configmap.data if configmap.data else {}
elif event_type == 'DELETED':
name = configmap.metadata.name
if name in self.current_data:
logger.info(f"Removing ConfigMap: {name}")
del self.current_data[name]

self.update_yaml_file()

except Exception as e:
logger.error(f"Error watching ConfigMaps: {e}")
raise

def parse_args():
parser = argparse.ArgumentParser(
description='Watch Kubernetes ConfigMaps and update a YAML file'
)
parser.add_argument(
'--outfile',
required=True,
help='Path to output YAML file (required)'
)
parser.add_argument(
'--label-selector',
default=os.getenv('LABEL_SELECTOR', ''),
help='Label selector for ConfigMaps'
)
parser.add_argument(
'--namespace',
default=os.getenv('NAMESPACE', 'default'),
help='Kubernetes namespace (default: default)'
)
return parser.parse_args()

def sigterm_handler(signum, frame):
# Clean exit without traceback
sys.exit(0)

if __name__ == "__main__":
args = parse_args()

# Register the signal handler
try:
signal.signal(signal.SIGTERM, sigterm_handler)
watcher = ConfigMapWatcher(
label_selector=args.label_selector,
namespace=args.namespace,
output_file=args.outfile
)
watcher.watch_configmaps()
except KeyboardInterrupt:
# Handle Ctrl+C cleanly too
sys.exit(0)
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
Loading
Loading