Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dagster Data pipeline #798

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/tabby/tabby-data-pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# tabby_data_pipeline

This is a [Dagster](https://dagster.io/) project scaffolded with [`dagster project scaffold`](https://docs.dagster.io/getting-started/create-new-project).

## Getting started

First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in ["editable mode"](https://pip.pypa.io/en/latest/topics/local-project-installs/#editable-installs) so that as you develop, local code changes will automatically apply.

```bash
pip install -e ".[dev]"
```

Then, start the Dagster UI web server:

```bash
dagster dev
```

Open http://localhost:3000 with your browser to see the project.

You can start writing assets in `tabby_data_pipeline/assets.py`. The assets are automatically loaded into the Dagster code location as you define them.

## Development


### Adding new Python dependencies

You can specify new Python dependencies in `setup.py`.

### Unit testing

Tests are in the `tabby_data_pipeline_tests` directory and you can run tests using `pytest`:

```bash
pytest tabby_data_pipeline_tests
```

### Schedules and sensors

If you want to enable Dagster [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) or [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors) for your jobs, the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) process must be running. This is done automatically when you run `dagster dev`.

Once your Dagster Daemon is running, you can start turning on schedules and sensors for your jobs.

## Deploy on Dagster Cloud

The easiest way to deploy your Dagster project is to use Dagster Cloud.

Check out the [Dagster Cloud Documentation](https://docs.dagster.cloud) to learn more.
6 changes: 6 additions & 0 deletions python/tabby/tabby-data-pipeline/log.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
model: TabbyML/StarCoder-1B; language: python; file: line_completion.jsonlSkipped 0 rows, 10 rows with predictions, 0 rows with errors

model: TabbyML/StarCoder-1B; language: python; file: line_completion_rg1_bm25.jsonlSkipped 0 rows, 10 rows with predictions, 0 rows with errors

model: TabbyML/StarCoder-1B; language: python; file: line_completion_oracle_bm25.jsonlSkipped 0 rows, 10 rows with predictions, 0 rows with errors

201 changes: 201 additions & 0 deletions python/tabby/tabby-data-pipeline/predict.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put prediction code under modal directory.

Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from pathlib import Path

import modal
from modal import Image, Mount, Secret, Stub, asgi_app, gpu, method
import os


import asyncio

GPU_CONFIG = gpu.A10G()

MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]


def download_model():
import subprocess
import os
MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
print(f'MODEL_ID={MODEL_ID}')
subprocess.run(
[
"/opt/tabby/bin/tabby",
"download",
"--model",
MODEL_ID,
]
)


image = (
Image.from_registry(
"tabbyml/tabby:0.5.5",
add_python="3.11",
)
.dockerfile_commands("ENTRYPOINT []")
.pip_install(
"git+https://github.com/TabbyML/tabby.git#egg=tabby-python-client&subdirectory=experimental/eval/tabby-python-client",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embed this directory inside tabby-data-pipeline, use https://modal.com/docs/reference/modal.Image#copy_local_dir to copy and run pip install

"pandas"
)
.copy_local_file(local_path="/tmp/tabby_model_id", remote_path="/tmp/tabby_model_id")
.run_function(download_model)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use https://modal.com/docs/reference/modal.Image#env to pass MODEL_ID as environment variable, and in download_model you can access model id as os.environ.get("MODEL_ID")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.env("MODEL_ID", os.environ.get("MODEL_ID"))

)

stub = Stub("tabby-" + MODEL_ID.split("/")[-1], image=image)


@stub.cls(
gpu=GPU_CONFIG,
concurrency_limit=10,
allow_concurrent_inputs=2,
container_idle_timeout=60 * 10,
timeout=600,
)
class Model:
def __enter__(self):
import socket
import subprocess, os
import time

from tabby_python_client import Client

my_env = os.environ.copy()
my_env["TABBY_DISABLE_USAGE_COLLECTION"] = "1"
MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

print(f'MODEL_ID={MODEL_ID}')
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env)
self.client = Client("http://127.0.0.1:8000", timeout=240)

# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs.
def webserver_ready():
try:
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()
return True
except (socket.timeout, ConnectionRefusedError):
# Check if launcher webserving process has exited.
# If so, a connection can never be made.
retcode = self.launcher.poll()
if retcode is not None:
raise RuntimeError(
f"launcher exited unexpectedly with code {retcode}"
)
return False

while not webserver_ready():
time.sleep(1.0)

print("Tabby server ready!")

def __exit__(self, _exc_type, _exc_value, _traceback):
self.launcher.terminate()

@method()
async def health(self):
from tabby_python_client.api.v1 import health

resp = await health.asyncio(client=self.client)
return resp.to_dict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return resp?


@method()
async def complete(self, language, crossfile_context, index, row):
Copy link
Member

@wsxiaoys wsxiaoys Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type annotation for every argument. For row, define a named tuple: https://docs.python.org/3/library/typing.html#typing.NamedTuple

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to list all column name of the dataframe in the namedtuple?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For columns used for input / output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there's a dynamic column? For example, "prediction" didn't exist when you first run the file and it will be added to the file. Next time, we need to pass it through the "row"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from tabby_python_client.api.v1 import completion
from tabby_python_client.models import (
CompletionRequest,
DebugOptions,
CompletionResponse,
Segments,
)
from tabby_python_client.types import Response
from tabby_python_client import errors
import pandas as pd

if 'prediction' in row and not pd.isnull(row['prediction']):
return None, None, None

if crossfile_context:
prompt = row["crossfile_context"]["text"] + row["prompt"]
else:
prompt = row["prompt"]

groundtruth = row["groundtruth"]

request = CompletionRequest(
language=language, debug_options=DebugOptions(raw_prompt=prompt)
)
# resp: CompletionResponse = await completion.asyncio(
# client=self.client, json_body=request
# )
try:
resp: Response = await completion.asyncio_detailed(
client=self.client, json_body=request
)

if resp.parsed != None:
return index, resp.parsed.choices[0].text, None
else:
return index, None, f"<{resp.status_code}>"
except errors.UnexpectedStatus as e:
return index, None, f"error: code={e.status_code} content={e.content} error={e}"
except Exception as e:
return index, None, f"error type: {type(e)}"



@stub.local_entrypoint()
async def main(language, file):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type hints

import json
import pandas as pd


print(MODEL_ID)

model = Model()
print("model info:")
health_resp = model.health.remote()
print(health_resp)
assert(health_resp['model'] == MODEL_ID)

whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file

if file == 'line_completion.jsonl':
crossfile_context = False
else:
crossfile_context = True

objs = []
with open(whole_path_file) as fin:
for line in fin:
obj = json.loads(line)
objs.append(obj)

df = pd.DataFrame(objs)

outputs = await asyncio.gather(*[model.complete.remote.aio(language, crossfile_context, index, row) for index, row in df.iterrows()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it still necessary to run this in chunks?


skipped = 0
success = 0
error = 0

for index, prediction, error_msg in outputs:
if index is None:
skipped += 1
elif prediction is not None:
df.loc[index, 'prediction'] = prediction
success += 1
else:
df.loc[index, 'error'] = error_msg
error += 1
print(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")

with open(whole_path_file, 'w') as fout:
for index, row in df.iterrows():
json.dump(row.to_dict(), fout)
fout.write('\n')


with open("log.txt", "a") as flog:
flog.write(f"model: {MODEL_ID}; language: {language}; file: {file}")
flog.write(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")
flog.write("\n\n")
6 changes: 6 additions & 0 deletions python/tabby/tabby-data-pipeline/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "tabby_data_pipeline"
2 changes: 2 additions & 0 deletions python/tabby/tabby-data-pipeline/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = tabby_data_pipeline
17 changes: 17 additions & 0 deletions python/tabby/tabby-data-pipeline/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from setuptools import find_packages, setup

setup(
name="tabby_data_pipeline",
packages=find_packages(exclude=["tabby_data_pipeline_tests"]),
install_requires=[
"dagster",
"dagster-cloud",
"dagstermill",
"papermill-origami>=0.0.8",
"pandas",
"matplotlib",
"seaborn",
"scikit-learn",
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Metadata-Version: 2.1
Name: tabby-data-pipeline
Version: 0.0.0
Requires-Dist: dagster
Requires-Dist: dagster-cloud
Provides-Extra: dev
Requires-Dist: dagster-webserver; extra == "dev"
Requires-Dist: pytest; extra == "dev"
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
README.md
pyproject.toml
setup.cfg
setup.py
tabby_data_pipeline/__init__.py
tabby_data_pipeline/analyze.py
tabby_data_pipeline/assets.py
tabby_data_pipeline/predict.py
tabby_data_pipeline.egg-info/PKG-INFO
tabby_data_pipeline.egg-info/SOURCES.txt
tabby_data_pipeline.egg-info/dependency_links.txt
tabby_data_pipeline.egg-info/requires.txt
tabby_data_pipeline.egg-info/top_level.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dagster
dagster-cloud

[dev]
dagster-webserver
pytest
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tabby_data_pipeline
18 changes: 18 additions & 0 deletions python/tabby/tabby-data-pipeline/tabby_data_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dagster import Definitions, load_assets_from_modules

from dagstermill import define_dagstermill_asset, ConfigurableLocalOutputNotebookIOManager

from dagster import AssetIn, Field, Int, asset, file_relative_path

from . import assets

all_assets = load_assets_from_modules([assets])

defs = Definitions(
assets=all_assets,
resources = {
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager()
}
)


Loading