-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Autoscaling a CrateDB Cluster out and back (#333)
* update * renamed-to-readme * Updated-after-sync-with-Niklas * WH-latest * wh * latest * Update topic/autoscaling/README.md Co-authored-by: hlcianfagna <[email protected]> * Update topic/autoscaling/README.md Co-authored-by: hlcianfagna <[email protected]> * Update topic/autoscaling/README.md Co-authored-by: hlcianfagna <[email protected]> * Added sleep to autoscale.py Added sleep to the while loop checking the ops status. * Update topic/autoscaling/README.md Co-authored-by: hlcianfagna <[email protected]> * Update topic/autoscaling/README.md Co-authored-by: Niklas Schmidtmer <[email protected]> * Updated-WH * Update-scaling * removed-prometheus-client * Blacked * Latest-changes * Chore: Copy-edit README about autoscaling --------- Co-authored-by: hlcianfagna <[email protected]> Co-authored-by: Niklas Schmidtmer <[email protected]> Co-authored-by: Andreas Motl <[email protected]>
- Loading branch information
1 parent
dd23b28
commit fd4b257
Showing
2 changed files
with
183 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# Autoscaling | ||
|
||
## Introduction | ||
|
||
Autoscaling is the process of scaling up and down your CrateDB Cluster | ||
on demand. | ||
|
||
## About | ||
|
||
This Python program can be used to autoscale a CrateDB Cloud Cluster. | ||
|
||
The program monitors the number of shards that are on the nodes in the | ||
cluster. If that number crosses a threshold, a new node will be added | ||
to the database cluster. | ||
|
||
When the number of shards goes back down below another threshold, the | ||
cluster will be scaled-back, by decommissioning excess nodes again. | ||
|
||
For more depth and background on this please see the community post | ||
about autoscaling. \<ADD LINK\> | ||
|
||
## Usage | ||
|
||
Run a dedicated cluster in CrateDB Cloud: | ||
|
||
> <https://console.cratedb.cloud/> | ||
### Install | ||
|
||
The script uses a couple of Python libraries. Make sure you have installed those: | ||
```shell | ||
pip install datetime | ||
pip install requests | ||
``` | ||
|
||
```shell | ||
git clone https://github.com/crate/cratedb-examples | ||
cd cratedb-examples/topic/autoscaling | ||
``` | ||
|
||
### Configure | ||
|
||
Make sure to edit the script before running it. Update lines 7, 10, | ||
and 11 with your API credentials, organization id and cluster id. | ||
|
||
### Run | ||
```shell | ||
python autoscale.py | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
""" Needed modules """ | ||
import logging | ||
import time | ||
from datetime import datetime | ||
import requests | ||
from requests.auth import HTTPBasicAuth | ||
|
||
auth_data = HTTPBasicAuth( | ||
"YOUR API KEY", | ||
"YOUR API SECRET", | ||
) | ||
|
||
""" Organization and cluster IDs (to be filled by the user) """ | ||
# ORGANIZATION_ID = "FILL IN YOUR ORGANIZATION ID" | ||
# CLUSTER_ID = "FILL IN YOUR CLUSTER ID" | ||
|
||
""" Date format for parsing datetime strings """ | ||
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" | ||
|
||
""" Configure logging """ | ||
logging.basicConfig( | ||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | ||
) | ||
|
||
|
||
def num_shards(): | ||
"""Function to calculate the average number of shards per node for a given cluster""" | ||
headers = {"accept": "plain/text"} | ||
api_url = f"https://console.cratedb.cloud/api/v2/organizations/{ORGANIZATION_ID}/metrics/prometheus/" | ||
response = requests.get(api_url, auth=auth_data, headers=headers, timeout=60) | ||
if response.status_code == 200: | ||
lines = response.text.split("\n") | ||
total_shard_count = 0 | ||
shard_count_instances = 0 | ||
for line in lines: | ||
if ( | ||
"crate_node" in line | ||
and "shard_stats" in line | ||
and CLUSTER_ID in line | ||
and 'property="total"' in line | ||
): | ||
shard_count = float(line.split()[1]) | ||
total_shard_count += shard_count | ||
shard_count_instances += 1 | ||
if shard_count_instances == 0: | ||
return None # Return None if no shard counts were found | ||
return ( | ||
total_shard_count / shard_count_instances | ||
) # Calculate and return the average | ||
logging.info(f"Failed to retrieve metrics. Status code: {response.status_code}") | ||
return None | ||
|
||
|
||
def get_cluster_status(): | ||
"""Function to get the current status of a cluster""" | ||
api_url = f"https://console.cratedb.cloud/api/v2/clusters/{CLUSTER_ID}/" | ||
return requests.get(api_url, auth=auth_data, timeout=60).json() | ||
|
||
|
||
def get_cluster_running_op(cluster_status): | ||
"""Function to get the current running operation in a cluster""" | ||
if ( | ||
datetime.now() | ||
- datetime.strptime(cluster_status["health"]["last_seen"], DATE_FORMAT) | ||
).seconds > 30: | ||
cluster_status = get_cluster_status() | ||
return cluster_status["health"]["running_operation"] | ||
|
||
|
||
def get_cluster_num_nodes(cluster_status): | ||
"""Function to get the current number of nodes minus one in a cluster""" | ||
if ( | ||
datetime.now() | ||
- datetime.strptime(cluster_status["health"]["last_seen"], DATE_FORMAT) | ||
).seconds > 30: | ||
cluster_status = get_cluster_status() | ||
return cluster_status["num_nodes"] - 1 | ||
|
||
|
||
def scale_cluster(num, cluster_status, max_num_shard): | ||
"""Function to scale out of back when reaching the treshold(s)""" | ||
operation_in_progress_msg = "Scaling operation in progress." | ||
if num > (0.8 * max_num_shard): | ||
num_nodes = get_cluster_num_nodes(cluster_status) | ||
logging.info(f"Start scaling out from {num_nodes + 1} to {num_nodes + 2}") | ||
requests.put( | ||
f"https://console.cratedb.cloud/api/v2/clusters/{CLUSTER_ID}/scale/", | ||
json={"product_unit": num_nodes + 1}, | ||
auth=auth_data, | ||
timeout=60, | ||
) | ||
logging.info(operation_in_progress_msg) | ||
while get_cluster_running_op(cluster_status) != "": | ||
time.sleep(10) # Check every 10 seconds instead of immediately looping back | ||
# Optionally, log at intervals rather than trying to print dots | ||
logging.info(operation_in_progress_msg) | ||
logging.info("Scaled up successfully!") | ||
elif num < (0.5 * max_num_shard): | ||
num_nodes = get_cluster_num_nodes(cluster_status) | ||
logging.info(f"Start scaling down from {num_nodes + 1} to {num_nodes}") | ||
requests.put( | ||
f"https://console.cratedb.cloud/api/v2/clusters/{CLUSTER_ID}/scale/", | ||
json={"product_unit": num_nodes - 1}, | ||
auth=auth_data, | ||
timeout=60, | ||
) | ||
logging.info(operation_in_progress_msg) | ||
while get_cluster_running_op(cluster_status) != "": | ||
time.sleep(10) # Check every 10 seconds | ||
# Optionally, log at intervals rather than trying to print dots | ||
logging.info(operation_in_progress_msg) | ||
logging.info("Scaled down successfully!") | ||
else: | ||
logging.info("Nothing to do!") | ||
|
||
|
||
DELAY_SECONDS = 5 | ||
MAX_NUM_SHARDS = 30 | ||
|
||
""" Main loop to monitor and adjust cluster size based on shard count """ | ||
while True: | ||
try: | ||
status = get_cluster_status() # Fetch current cluster status | ||
number_shards = num_shards() # Calculate average shard count | ||
if number_shards is not None: | ||
logging.info(f"Current avg number of shards: {number_shards}") | ||
scale_cluster( | ||
number_shards, status, MAX_NUM_SHARDS | ||
) # Refactored scaling logic into this function | ||
else: | ||
logging.error("Failed to retrieve shard metrics.") | ||
except Exception as e: | ||
logging.error(f"An error occurred: {e}") | ||
time.sleep(DELAY_SECONDS) |