diff --git a/.gitignore b/.gitignore index 5ce9226..d6a04c3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,3 @@ .idea -spcs/examples/llama7b-chat-hf/snow/__pycache__ -spcs/examples/llama7b-chat-hf/spcs_spec.yaml .DS_Store user-metrics/metrics-service.yml diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index dca9a90..0000000 --- a/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -click diff --git a/spcs/examples/llama7b-chat-hf/Dockerfile b/spcs/examples/llama7b-chat-hf/Dockerfile deleted file mode 100644 index 0cc6dac..0000000 --- a/spcs/examples/llama7b-chat-hf/Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM nvcr.io/nvidia/pytorch:23.06-py3 - -COPY requirements.txt ./ - -RUN pip install -r requirements.txt - -ADD exec ./exec - -CMD ["./exec/controller.sh"] diff --git a/spcs/examples/llama7b-chat-hf/README.md b/spcs/examples/llama7b-chat-hf/README.md deleted file mode 100644 index b5051ea..0000000 --- a/spcs/examples/llama7b-chat-hf/README.md +++ /dev/null @@ -1,72 +0,0 @@ -# LLAMA2 + HF on SPCS - -Launch LLAMA2 + HF on SPCS in three simple steps: - -1. Configure Hugging Face account and get HF token -2. Configure your Snowflake account -3. Run a simple script - -**Note: You might have issues in running this tutorial due to resource capacity limitations. -Make sure that you delete your service and compute pool after you finished!!!** - -### Configure Hugging Face account and get HF token - -In order to use LLAMA2 and HF you need to create account and get access to the LLAMA2 model. Use the following steps to -do this: - -1. Follow https://ai.meta.com/resources/models-and-libraries/llama-downloads/ to request access to LLAMA2 - **You Don't need to download anything locally**. -2. Create HF account: https://huggingface.co -3. Follow https://huggingface.co/meta-llama/Llama-2-7b-hf to access hugging face model -4. Follow: https://huggingface.co/docs/hub/security-tokens to get token. - -**Note, it might take one or two days for HF to approve your request to use LLAMA2. -You would need to wait for their email before proceeding further** - -### Configure Snowflake account and environment - -In order to launch LLAMA2 model you need to have the following: - -1. Create or retrieve your Snowflake account name, username and password -2. Make sure that the default role that your user has will have the following permissions: - -* Create Compute Pool -* Create SPCS service -* Create Stage -* Create SPCS repository - - -3. Download this Github repository: `git clone https://github.com/Snowflake-Labs/spcs-templates.git` - -4. Make sure you have `python3` installed -5. Run `pip install -r requirements.txt` -6. **Make sure you have docker installed and running** - -### Run the command - -``` -python snow/main.py run-setup \ ---account $YOUR_ACCOUNT \ ---username $YOUR_USERNAME \ ---password $YOUR_PASSWORD \ ---db $YOUR_DATABASE \ ---schema $YOUR_SCHEMA \ ---compute-pool $NAME_OF_COMPUTE_POOL \ ---service-name $NAME_OF_SERVICE \ ---repo-name $NAME_OF_REPO \ ---stage-name $NAME_OF_STAGE \ ---hf-token $YOUR_HF_TOKEN -``` - -### Cleanup resources - -``` -python snow/main.py cleanup \ ---account $YOUR_ACCOUNT \ ---username $YOUR_USERNAME \ ---password $YOUR_PASSWORD \ ---db $YOUR_DATABASE \ ---schema $YOUR_SCHEMA \ ---compute-pool $NAME_OF_COMPUTE_POOL \ ---service-name $NAME_OF_SERVICE -``` diff --git a/spcs/examples/llama7b-chat-hf/exec/controller.sh b/spcs/examples/llama7b-chat-hf/exec/controller.sh deleted file mode 100755 index 54443c0..0000000 --- a/spcs/examples/llama7b-chat-hf/exec/controller.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -exec python3 ./exec/exec.py \ - --command 'python3 -m fastchat.serve.controller' \ - --write_condition_location ${SYNC_DIR}/controller \ - --write_condition 'Uvicorn running' diff --git a/spcs/examples/llama7b-chat-hf/exec/exec.py b/spcs/examples/llama7b-chat-hf/exec/exec.py deleted file mode 100755 index a20c058..0000000 --- a/spcs/examples/llama7b-chat-hf/exec/exec.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import os -import subprocess -import time -from pathlib import Path -from typing import Optional - - -def setup_logging(log_level=logging.INFO): - logger = logging.getLogger("setup") - logger.setLevel(log_level) - console_handler = logging.StreamHandler() - console_handler.setLevel(log_level) - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - return logger - - -log = setup_logging() - - -def _condition_fulfilled(line: str, condition: str) -> bool: - if not condition: - return False - return condition.lower() in line.lower() - - -def _write_wait_condition(location: str) -> None: - path = Path(location) - os.makedirs(path.parent, exist_ok=True) - with open(path, 'w', encoding='utf-8') as file: - file.write("fulfilled") - log.info("Write condition fulfilled") - - -def _process_stream(data: Optional[str], prefix: str, write_condition_location: str, write_condition: str) -> None: - if not data: - return - log.info(f"{prefix}: {data}") - if _condition_fulfilled(data, write_condition): - _write_wait_condition(write_condition_location) - - -def run_cmd_and_stream(cmd: str, write_condition_location: str, write_condition: str) -> None: - log.info(f"Executing: {cmd}") - cmd = cmd.split(" ") - process = subprocess.Popen(cmd, stderr=subprocess.PIPE, - universal_newlines=True, encoding='utf8', - bufsize=1, close_fds=True) - - try: - for stderr_line in iter(process.stderr.readline, b""): - if not stderr_line: - time.sleep(0.01) - continue - _process_stream(stderr_line, "[STDERR]", write_condition_location, write_condition) - except KeyboardInterrupt as e: - process.kill() - raise e - - -def wait_for_condition(condition_location: str, timeout_seconds: int = 600) -> None: - path = Path(condition_location) - start_time_ms = round(time.time() * 1000) - end_time_ms = start_time_ms + timeout_seconds * 1000 - while end_time_ms > start_time_ms: - if path.is_file(): - return - else: - log.info(f"Waiting for condition: {condition_location}") - time.sleep(5) - raise Exception( - f"Timeout waiting for condition: {condition_location}, check logs of depending service") - - -def main(): - parser = argparse.ArgumentParser(description="Exec") - - parser.add_argument("--command", required=True, help="Command to execute") - parser.add_argument("--wait_condition_location", required=False, help="Condition location to wait before execution") - parser.add_argument("--write_condition_location", required=False, help="Write condition location") - parser.add_argument("--write_condition", required=False, help="Write Condition") - - args = parser.parse_args() - - log.info("===============================") - log.info("Command: %s", args.command) - log.info("Condition: %s", args.wait_condition_location) - log.info("Condition: %s", args.write_condition) - log.info("Condition: %s", args.write_condition_location) - log.info("===============================") - - if args.wait_condition_location: - wait_for_condition(args.wait_condition_location) - - run_cmd_and_stream(args.command, args.write_condition_location, args.write_condition) - - -if __name__ == "__main__": - main() diff --git a/spcs/examples/llama7b-chat-hf/exec/gradio_ui.sh b/spcs/examples/llama7b-chat-hf/exec/gradio_ui.sh deleted file mode 100755 index 1ed3b88..0000000 --- a/spcs/examples/llama7b-chat-hf/exec/gradio_ui.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -exec python3 ./exec/exec.py \ - --command 'python3 -m fastchat.serve.gradio_web_server' \ - --wait_condition_location ${SYNC_DIR}/worker \ No newline at end of file diff --git a/spcs/examples/llama7b-chat-hf/exec/model_worker.sh b/spcs/examples/llama7b-chat-hf/exec/model_worker.sh deleted file mode 100755 index 33596b4..0000000 --- a/spcs/examples/llama7b-chat-hf/exec/model_worker.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -exec python3 ./exec/exec.py \ - --command 'python3 -m fastchat.serve.model_worker --model-path meta-llama/Llama-2-7b-chat-hf' \ - --wait_condition_location ${SYNC_DIR}/controller \ - --write_condition_location ${SYNC_DIR}/worker \ - --write_condition 'Uvicorn running' diff --git a/spcs/examples/llama7b-chat-hf/requirements.txt b/spcs/examples/llama7b-chat-hf/requirements.txt deleted file mode 100644 index 53a655d..0000000 --- a/spcs/examples/llama7b-chat-hf/requirements.txt +++ /dev/null @@ -1,9 +0,0 @@ -accelerate -pyarrow==10.0.1 -snowflake-snowpark-python -snowflake-connector-python==2.7.12 -streamlit==1.24.0 -Pillow==9.5.0 -fschat -transformers -gradio \ No newline at end of file diff --git a/spcs/examples/llama7b-chat-hf/snow/__init__.py b/spcs/examples/llama7b-chat-hf/snow/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/spcs/examples/llama7b-chat-hf/snow/cleanup.py b/spcs/examples/llama7b-chat-hf/snow/cleanup.py deleted file mode 100644 index 5b2a4be..0000000 --- a/spcs/examples/llama7b-chat-hf/snow/cleanup.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 - -import click - -from logger import log -from snow import get_snowflake_connection, SnowHandler - -IMG_TAG: str = "llm-base" -BASE_IMAGE: str = f"public.ecr.aws/h9k5u5w6/aivanou-pub01:{IMG_TAG}" -PROJECT_NAME: str = "llama2-hf-chat" -COMPUTE_POOL_INSTANCE: str = "GPU_3" - - -@click.command() -@click.option("--account", required=True, type=str, help="Snowflake account") -@click.option("--username", required=True, type=str, help="Snowflake username") -@click.option("--password", required=True, type=str, help="Snowflake password") -@click.option("--db", required=True, type=str, help="Snowflake database") -@click.option("--schema", required=True, type=str, help="Snowflake schema") -@click.option("--compute-pool", required=True, type=str, help="SPCS Compute Pool") -@click.option("--service-name", required=True, type=str, help="SPCS Service Name") -def cleanup(account: str, username: str, password: str, - db: str, schema: str, compute_pool: str, service_name: str) -> None: - log.info("===============================") - log.info("Account: %s", account) - log.info("Username: %s", username) - log.info("Password: %s", password) - log.info("Database: %s", db) - log.info("Schema: %s", schema) - log.info("Compute Pool: %s", compute_pool) - log.info("Service Name: %s", service_name) - log.info("===============================") - - with get_snowflake_connection(account, username, password, db, schema) as ctx: - handler = SnowHandler(ctx) - log.info("Dropping service") - handler.drop_service(db, schema, service_name) - log.info("Dropping compute pool") - handler.drop_compute_pool(compute_pool) diff --git a/spcs/examples/llama7b-chat-hf/snow/logger.py b/spcs/examples/llama7b-chat-hf/snow/logger.py deleted file mode 100644 index 7a98834..0000000 --- a/spcs/examples/llama7b-chat-hf/snow/logger.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 - -import logging - - -def setup_logging(log_level=logging.INFO): - logger = logging.getLogger("setup") - logger.setLevel(log_level) - console_handler = logging.StreamHandler() - console_handler.setLevel(log_level) - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - return logger - - -log = setup_logging() diff --git a/spcs/examples/llama7b-chat-hf/snow/main.py b/spcs/examples/llama7b-chat-hf/snow/main.py deleted file mode 100644 index b0a7f97..0000000 --- a/spcs/examples/llama7b-chat-hf/snow/main.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 - -import click - -from cleanup import cleanup -from setup import run_setup - - -@click.group() -def main(): - pass - - -main.add_command(run_setup) -main.add_command(cleanup) - -if __name__ == "__main__": - main() diff --git a/spcs/examples/llama7b-chat-hf/snow/setup.py b/spcs/examples/llama7b-chat-hf/snow/setup.py deleted file mode 100644 index b20bf23..0000000 --- a/spcs/examples/llama7b-chat-hf/snow/setup.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 - -from pathlib import Path - -import click - -from logger import log -from snow import get_snowflake_connection, SnowHandler -from utils import get_hf_token, generate_spcs_spec, upload_image - -IMG_TAG: str = "llm-base" -BASE_IMAGE: str = f"public.ecr.aws/h9k5u5w6/aivanou-pub01:{IMG_TAG}" -PROJECT_NAME: str = "llama2-hf-chat" -COMPUTE_POOL_INSTANCE: str = "GPU_3" - - -@click.command() -@click.option("--account", required=True, type=str, help="Snowflake account") -@click.option("--username", required=True, type=str, help="Snowflake username") -@click.option("--password", required=True, type=str, help="Snowflake password") -@click.option("--db", required=True, type=str, help="Snowflake database") -@click.option("--schema", required=True, type=str, help="Snowflake schema") -@click.option("--compute-pool", required=True, type=str, help="SPCS Compute Pool") -@click.option("--service-name", required=True, type=str, help="SPCS Service Name") -@click.option("--repo-name", required=True, type=str, help="SPCS Repository name") -@click.option("--stage-name", required=True, type=str, help="Snowflake Stage name") -@click.option("--hf-token", required=False, type=str, help="Huggingface token") -def run_setup(account: str, username: str, password: str, - db: str, schema: str, compute_pool: str, service_name: str, - repo_name: str, stage_name: str, hf_token: str) -> None: - log.info("===============================") - log.info("Account: %s", account) - log.info("Username: %s", username) - log.info("Password: %s", password) - log.info("Database: %s", db) - log.info("Schema: %s", schema) - log.info("Compute Pool: %s", compute_pool) - log.info("Service Name: %s", service_name) - log.info("Repository Name: %s", repo_name) - log.info("Stage Name: %s", stage_name) - log.info("HF Token: %s", hf_token) - log.info("===============================") - - with get_snowflake_connection(account, username, password, db, schema) as ctx: - handler = SnowHandler(ctx) - log.info("Checking database") - handler.create_database(db) - log.info("Checking schema") - handler.create_schema(db, schema) - log.info(f"Creating compute pool: {compute_pool}") - handler.create_compute_pool(compute_pool, "GPU_3") - log.info("Creating repository") - handler.create_repository(db, schema, repo_name) - log.info("Getting repository") - repo = handler.get_repository(db, schema, repo_name) - if repo is None: - raise Exception(f"Repository: {db}.{schema}.{repo_name} not found") - log.info(f"Uploading image to: {repo.repository_url}") - image = upload_image(BASE_IMAGE, repo.repository_url, username, password) - log.info(f"Waiting for compute pool: {compute_pool}") - handler.wait_for_compute_pool(compute_pool) - log.info(f"Generating service spec") - hf_token = get_hf_token(hf_token) - spec_local_file = generate_spcs_spec(Path("./spcs_spec.yaml.j2"), image, hf_token) - log.info(f"Creating stage to upload spcs spec") - handler.create_stage(db, schema, stage_name) - log.info(f"Uploading spcs spec") - stage_path = handler.upload_service_spec(db, schema, stage_name, spec_local_file) - log.info(f"Creating service") - handler.create_service(compute_pool, stage_path, db, schema, service_name) - log.info(f"Waiting for service to be ready") - service = handler.wait_for_service(db, schema, service_name) - log.info(f"Created service: {service}") - result = handler.stream_service_logs(db, schema, - service_name, '0', - 'model-worker', 'Uvicorn running') - if not result: - raise Exception("Something went wrong, `model-worker` did not start properly") - service = handler.get_service(db, schema, service_name) - service_endpoint = service.endpoints['service'] - log.info(f"Created service: {db}.{schema}.{service_name}") - log.info(f"Created compute pool: {compute_pool}") - log.info( - f"Your services is ready!!! Access it via: https://{service_endpoint} " - f"Use {username}/{password} to log in") diff --git a/spcs/examples/llama7b-chat-hf/snow/snow.py b/spcs/examples/llama7b-chat-hf/snow/snow.py deleted file mode 100644 index b5d42b3..0000000 --- a/spcs/examples/llama7b-chat-hf/snow/snow.py +++ /dev/null @@ -1,308 +0,0 @@ -#!/usr/bin/env python3 - -import hashlib -import json -import os -import sys -import time -from dataclasses import dataclass -from typing import List, Dict, Optional - -import snowflake.connector -from snowflake.connector.connection import SnowflakeConnection -from snowflake.connector.cursor import SnowflakeCursor - -from logger import log - -IMG_TAG: str = "llm-base" -BASE_IMAGE: str = f"public.ecr.aws/h9k5u5w6/aivanou-pub01:{IMG_TAG}" -PROJECT_NAME: str = "llama2-hf-chat" -COMPUTE_POOL_INSTANCE: str = "GPU_3" - - -@dataclass -class ImageRepository: - name: str - db: str - schema: str - repository_url: str - - -@dataclass -class ComputePool: - name: str - state: str - instance_family: str - - -@dataclass -class Container: - status: str - message: str - name: str - instanceId: str - - -@dataclass -class Service: - db: str - schema: str - name: str - owner: str - compute_pool: str - containers: List[Container] - endpoints: Dict[str, str] - - -def _exec_sql(ctx: SnowflakeConnection, sql: str, log_results: bool = False) -> list[dict]: - log.info(f"Executing: {sql}") - output: List[SnowflakeCursor] = list(ctx.execute_string(sql)) - result = output[0].fetchall() - if log_results: - log.info(result) - return result - - -def _get_pending_containers(service: Service) -> List[Container]: - containers = list() - for container in service.containers: - if container.status.upper() == 'PENDING': - containers.append(container) - return containers - - -def get_snowflake_connection(account: str, user: str, password: str, db: str, schema: str) -> SnowflakeConnection: - return snowflake.connector.connect( - host=f"sfengineering-{account}.snowflakecomputing.com", - protocol='https', - account=account, - user=user, - password=password, - database=db, - schema=schema, - ) - - -class SnowHandler: - - def __init__(self, ctx: SnowflakeConnection): - self.ctx = ctx - - def create_database(self, db: str): - sql = f""" - CREATE DATABASE IF NOT EXISTS {db}; - """ - _exec_sql(self.ctx, sql, log_results=True) - - def drop_compute_pool(self, compute_pool_name: str) -> None: - try: - stop_sql = f""" - ALTER COMPUTE POOL {compute_pool_name} STOP ALL - """ - _exec_sql(self.ctx, stop_sql, log_results=True) - except Exception as e: - log.info("Compute pool does not exist, skipping") - drop_sql = f""" - DROP COMPUTE POOL IF EXISTS {compute_pool_name} - """ - _exec_sql(self.ctx, drop_sql, log_results=True) - - def drop_service(self, db: str, schema: str, service_name: str) -> None: - sql = f""" - DROP SERVICE IF EXISTS {db}.{schema}.{service_name} - """ - _exec_sql(self.ctx, sql, log_results=True) - - def create_schema(self, db: str, schema: str): - sql = f""" - CREATE SCHEMA IF NOT EXISTS {db}.{schema}; - """ - _exec_sql(self.ctx, sql, log_results=True) - - def create_compute_pool(self, compute_pool_name: str, instance_type: str) -> None: - sql = f""" - CREATE COMPUTE POOL IF NOT EXISTS {compute_pool_name} - MIN_NODES = 1 - MAX_NODES = 1 - INSTANCE_FAMILY = {instance_type} - """ - result = _exec_sql(self.ctx, sql, log_results=True) - statement = result[0][0] - if 'already exists' in statement: - compute_pool = self.get_compute_pool(compute_pool_name) - if compute_pool.instance_family.upper() != COMPUTE_POOL_INSTANCE: - raise Exception(f""" - Compute pool with name: {compute_pool_name} exists with incorrect instance type. - Expected instance type: {COMPUTE_POOL_INSTANCE}, actual type: {compute_pool.instance_family} - Either drop compute pool or use different compute pool name - """) - - def get_compute_pool(self, compute_pool_name: str) -> ComputePool: - sql = f""" - DESC COMPUTE POOL {compute_pool_name} - """ - result = _exec_sql(self.ctx, sql) - if len(result) == 0: - raise Exception(f"compute pool: {compute_pool_name} not found") - cp_data = result[0] - return ComputePool(name=cp_data[0], state=cp_data[1], instance_family=cp_data[4]) - - def wait_for_compute_pool(self, compute_pool_name: str, timeout_seconds: int = 1200) -> None: - start_time_ms = round(time.time() * 1000) - end_time_ms = start_time_ms + timeout_seconds * 1000 - while end_time_ms > round(time.time() * 1000): - compute_pool = self.get_compute_pool(compute_pool_name) - num_seconds = (round(time.time() * 1000) - start_time_ms) / 1000 - print(f"Waiting {num_seconds} seconds, current compute pool status: {compute_pool.state}", end="") - if compute_pool.state.upper() in ['ACTIVE', 'IDLE']: - print("") - return - else: - time.sleep(10) # wait 30 seconds - print("\r", end="") - status = self.get_compute_pool(compute_pool_name) - raise Exception( - f"Compute pool stuck in: {status}, this might be related to resource shortage. Contact SPCS oncall") - - def list_compute_pools(self): - sql = f""" - SHOW COMPUTE POOLS; - """ - output: List[SnowflakeCursor] = list(self.ctx.execute_string(sql)) - if len(output) != 1: - raise Exception("Error") - compute_pools = output[0].fetchall() - for compute_pool in compute_pools: - log.info("%s - %s - %s", compute_pool[0], compute_pool[1], compute_pool[4]) - - def create_stage(self, db: str, schema: str, stage_name: str): - sql = f""" - CREATE STAGE IF NOT EXISTS {db}.{schema}.{stage_name} ENCRYPTION = (type = 'SNOWFLAKE_SSE'); - """ - repo = self.get_repository(db, schema, stage_name) - if repo is not None: - raise Exception(f""" - You have image repository with name {db}.{schema}.{stage_name}, please - use different name for stage. - """) - _exec_sql(self.ctx, sql, log_results=True) - - def upload_service_spec(self, db: str, schema: str, stage: str, spec_local_file: str) -> str: - file_hash = hashlib.md5(open(spec_local_file, "rb").read()).hexdigest() - spec_filename = os.path.basename(spec_local_file) - stage_dir = os.path.join("services", file_hash) - sql: str = f""" - PUT file://{spec_local_file} @{db}.{schema}.{stage}/{stage_dir} auto_compress=false OVERWRITE = TRUE; - """ - _exec_sql(self.ctx, sql, log_results=True) - return f"{stage}/{stage_dir}/{spec_filename}" - - def get_repository(self, db: str, schema: str, repo_name: str) -> Optional[ImageRepository]: - sql = f""" - SHOW IMAGE REPOSITORIES; - """ - output: List[SnowflakeCursor] = list(self.ctx.execute_string(sql)) - data = output[0].fetchall() - for repo in data: - curr_repo_name, curr_db, curr_schema = repo[1], repo[2], repo[3] - if curr_repo_name.lower() == repo_name.lower() \ - and curr_db.lower() == db.lower() \ - and curr_schema.lower() == schema.lower(): - return ImageRepository(repo_name, db, schema, repo[4]) - return None - - def create_repository(self, db: str, schema: str, repo_name: str): - sql = f""" - CREATE IMAGE REPOSITORY IF NOT EXISTS {db}.{schema}.{repo_name}; - """ - _exec_sql(self.ctx, sql, log_results=True) - - def create_service(self, - compute_pool_name: str, - stage_path: str, - db: str, - schema: str, - service_name: str): - sql = f""" - CREATE SERVICE {db}.{schema}.{service_name} - MIN_INSTANCES = 1 - MAX_INSTANCES = 1 - COMPUTE_POOL = {compute_pool_name} - spec=@{stage_path}; - """ - _exec_sql(self.ctx, sql, log_results=True) - - def _get_service_status(self, db: str, schema: str, name: str) -> List[Container]: - sql = f""" - call SYSTEM$GET_SERVICE_STATUS('{db}.{schema}.{name}') - """ - result = _exec_sql(self.ctx, sql) - data = json.loads(result[0][0]) - containers = list() - for cell in data: - containers.append(Container(name=cell['containerName'], - status=cell['status'], - message=cell['message'], - instanceId=cell['instanceId'])) - return containers - - def get_service(self, db: str, schema: str, name: str) -> Service: - sql = f""" - DESC SERVICE {db}.{schema}.{name} - """ - result = _exec_sql(self.ctx, sql) - if len(result) == 0: - raise Exception(f"service: {db}.{schema}.{name} not found") - data = result[0] - endpoints = json.loads(data[7]) - containers = self._get_service_status(db, schema, name) - return Service(name=data[0], db=data[1], schema=data[2], owner=data[3], - compute_pool=data[4], containers=containers, endpoints=endpoints) - - def wait_for_service(self, db: str, schema: str, name: str, - timeout_seconds: int = 600) -> Service: - start_time_ms = round(time.time() * 1000) - end_time_ms = start_time_ms + timeout_seconds * 1000 - while end_time_ms > round(time.time() * 1000): - service = self.get_service(db, schema, name) - num_seconds = (round(time.time() * 1000) - start_time_ms) / 1000 - pending_containers = _get_pending_containers(service) - print(f"Waiting {num_seconds} seconds for service to be ready. Pending containers: {pending_containers}", - end="") - if len(pending_containers) == 0: - print("") - return service - else: - time.sleep(10) # wait 30 seconds - print("\r", end="") - service = self.get_service(db, schema, name) - raise Exception( - f"Service is not ready, service: {service}. Contact SPCS oncall") - - def _get_log_batch(self, db: str, schema: str, - service_name: str, instance_id: str, - container_name: str) -> List[str]: - sql = f""" - CALL SYSTEM$GET_SERVICE_LOGS('{db}.{schema}.{service_name}', '{instance_id}', '{container_name}', 100); - """ - result = _exec_sql(self.ctx, sql, log_results=False) - if len(result[0]) != 1: - raise Exception(f"Incorrect result: {result} for service {db}.{schema}.{service_name}") - return result[0][0].split('\n') - - def stream_service_logs(self, db: str, schema: str, service_name: str, - instance_id: str, container_name: str, stop_condition_phrase: str, - timeout_seconds: int = 300) -> bool: - start_time_ms = round(time.time() * 1000) - end_time_ms = start_time_ms + timeout_seconds * 1000 - while end_time_ms > round(time.time() * 1000): - log_batch = self._get_log_batch(db, schema, service_name, instance_id, container_name) - for log_line in log_batch: - log.info(log_line) - if stop_condition_phrase.lower() in log_line.lower(): - return True - time.sleep(10) - for _ in log_batch: - sys.stdout.write("\033[F") - sys.stdout.write("\033[K") - return False diff --git a/spcs/examples/llama7b-chat-hf/snow/utils.py b/spcs/examples/llama7b-chat-hf/snow/utils.py deleted file mode 100644 index 2444ff2..0000000 --- a/spcs/examples/llama7b-chat-hf/snow/utils.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python3 - -import os -import subprocess -from jinja2 import Template -from typing import Optional, Tuple, Dict -from pathlib import Path - -from logger import log - -IMG_TAG: str = "llm-base" -BASE_IMAGE: str = f"public.ecr.aws/h9k5u5w6/aivanou-pub01:{IMG_TAG}" -PROJECT_NAME: str = "llama2-hf-chat" -COMPUTE_POOL_INSTANCE: str = "GPU_3" - - -def _get_std_or_none(stream) -> Optional[str]: - if stream is None: - return None - else: - return stream.decode('utf-8') - - -def run_cmd(cmd: str, print_output: bool = False, stdout_channel: Optional[int] = subprocess.PIPE, - stderr_channel: Optional[int] = subprocess.PIPE) -> Tuple[Optional[str], Optional[str]]: - cmd = cmd.split(" ") - result = subprocess.run(cmd, stdout=stdout_channel, stderr=stderr_channel) - stdout = _get_std_or_none(result.stdout) - stderr = _get_std_or_none(result.stderr) - if result.returncode != 0 or print_output: - log.info(f"Printing output of: {cmd}, exitcode: {result.returncode}") - log.info("================STDOUT==============") - log.info(stdout) - log.info("================STDERR==============") - log.info(stderr) - log.info(("===================================")) - return stdout, stderr - - -def upload_image(source_image: str, destination_repo: str, username: str, password: str) -> str: - run_cmd("docker logout public.ecr.aws") - run_cmd(f"docker login {destination_repo} -u {username} -p {password}") - run_cmd(f"docker pull {source_image}", stdout_channel=None, stderr_channel=None) - destination_img: str = f"{destination_repo}/images:{PROJECT_NAME}" - run_cmd(f"docker tag {source_image} {destination_img}") - run_cmd(f"docker push {destination_img}", stdout_channel=None, stderr_channel=None) - return destination_img - - -def generate_spcs_spec(spec_template_file_path: Path, image_repo: str, hf_token: str) -> str: - with open(spec_template_file_path, 'r', encoding='utf-8') as file: - spec_template = file.read() - template = Template(spec_template, trim_blocks=True, keep_trailing_newline=True) - params: Dict[str, str] = { - "repo_image": image_repo, - "hf_token": hf_token, - } - rendered_text = template.render(params) - - spec_file_path = os.path.join(spec_template_file_path.parent, spec_template_file_path.stem) - with open(spec_file_path, 'w', encoding='utf-8') as file: - file.write(rendered_text) - return spec_file_path - - -def get_hf_token(args_hf_token: Optional[str]) -> str: - if args_hf_token is not None: - return args_hf_token - elif 'HUGGING_FACE_HUB_TOKEN' in os.environ: - return os.environ['HUGGING_FACE_HUB_TOKEN'] - else: - raise Exception(""" -No Hugging Face token found. -1. Create HF account: https://huggingface.co -2. Follow: https://huggingface.co/docs/hub/security-tokens to get token. -3. Follow https://ai.meta.com/resources/models-and-libraries/llama-downloads/ to request access to LLAMA2 -4. Follow https://huggingface.co/meta-llama/Llama-2-7b-hf to access hugging face model - """) diff --git a/spcs/examples/llama7b-chat-hf/spcs_spec.yaml.j2 b/spcs/examples/llama7b-chat-hf/spcs_spec.yaml.j2 deleted file mode 100644 index 3e150e7..0000000 --- a/spcs/examples/llama7b-chat-hf/spcs_spec.yaml.j2 +++ /dev/null @@ -1,43 +0,0 @@ -spec: - container: - - name: controller - image: {{ repo_image }} - env: - SYNC_DIR: /tmp/sync - command: - - ./exec/controller.sh - volumeMounts: - - name: sync - mountPath: /tmp/sync - - name: model-worker - image: {{ repo_image }} - env: - HUGGING_FACE_HUB_TOKEN: {{ hf_token }} - SYNC_DIR: /tmp/sync - command: - - ./exec/model_worker.sh - volumeMounts: - - name: sync - mountPath: /tmp/sync - resources: - limits: - nvidia.com/gpu: 1 - requests: - nvidia.com/gpu: 1 - - name: gradio-ui - image: {{ repo_image }} - env: - SYNC_DIR: /tmp/sync - command: - - ./exec/gradio_ui.sh - volumeMounts: - - name: sync - mountPath: /tmp/sync - volumes: - - name: sync - source: local - endpoint: - - name: service - port: 7860 - public: true -