Skip to content

Commit

Permalink
disable wrapping last statement by default
Browse files Browse the repository at this point in the history
  • Loading branch information
skshetry committed Sep 11, 2024
1 parent ec06ad1 commit 01fcceb
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 257 deletions.
50 changes: 21 additions & 29 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
TYPE_CHECKING,
Any,
Callable,
NamedTuple,
NoReturn,
Optional,
Union,
Expand Down Expand Up @@ -128,12 +127,6 @@ def _process_stream(stream: "IO[bytes]", callback: Callable[[str], None]) -> Non
callback(line)

Check warning on line 127 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L126-L127

Added lines #L126 - L127 were not covered by tests


class QueryResult(NamedTuple):
dataset: Optional[DatasetRecord]
version: Optional[int]
output: str


class DatasetRowsFetcher(NodesThreadPool):
def __init__(
self,
Expand Down Expand Up @@ -1779,13 +1772,14 @@ def apply_udf(
def query(
self,
query_script: str,
envs: Optional[Mapping[str, str]] = None,
env: Optional[Mapping[str, str]] = None,
python_executable: str = sys.executable,
save: bool = False,
capture_output: bool = True,
output_hook: Callable[[str], None] = noop,
params: Optional[dict[str, str]] = None,
job_id: Optional[str] = None,
_execute_last_expression: bool = False,
) -> None:
"""
Method to run custom user Python script to run a query and, as result,
Expand All @@ -1809,17 +1803,21 @@ def query(
C.size > 1000
)
"""
try:
code_ast = ast.parse(query_script)
code_ast = self.attach_query_wrapper(code_ast)
query_script_compiled = ast.unparse(code_ast)
except Exception as exc:
raise QueryScriptCompileError(
f"Query script failed to compile, reason: {exc}"
) from exc
if _execute_last_expression:
try:
code_ast = ast.parse(query_script)
code_ast = self.attach_query_wrapper(code_ast)
query_script_compiled = ast.unparse(code_ast)

Check warning on line 1810 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1809-L1810

Added lines #L1809 - L1810 were not covered by tests
except Exception as exc:
raise QueryScriptCompileError(
f"Query script failed to compile, reason: {exc}"
) from exc
else:
query_script_compiled = query_script
assert not save

envs = dict(envs or os.environ)
envs.update(
env = dict(env or os.environ)
env.update(
{
"DATACHAIN_QUERY_PARAMS": json.dumps(params or {}),
"PYTHONPATH": os.getcwd(), # For local imports
Expand All @@ -1832,19 +1830,13 @@ def query(
if capture_output:
popen_kwargs = {"stdout": subprocess.PIPE, "stderr": subprocess.STDOUT}

with subprocess.Popen( # type: ignore[call-overload] # noqa: S603
[python_executable, "-c", query_script_compiled],
env=envs,
**popen_kwargs,
) as proc:
cmd = [python_executable, "-c", query_script_compiled]
with subprocess.Popen(cmd, env=env, **popen_kwargs) as proc: # type: ignore[call-overload] # noqa: S603
if capture_output:
thread = Thread(
target=_process_stream,
daemon=True,
args=(proc.stdout, output_hook),
)
args = (proc.stdout, output_hook)
thread = Thread(target=_process_stream, args=args, daemon=True)
thread.start()
thread.join()
thread.join() # wait for the reader thread

if proc.returncode == QUERY_SCRIPT_CANCELED_EXIT_CODE:
raise QueryScriptCancelError(

Check warning on line 1842 in src/datachain/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/catalog/catalog.py#L1842

Added line #L1842 was not covered by tests
Expand Down
4 changes: 0 additions & 4 deletions src/datachain/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def __init__(self, message: str, return_code: int = 0, output: str = ""):
super().__init__(self.message)


class QueryScriptDatasetNotFound(QueryScriptRunError): # noqa: N818
pass


class QueryScriptCancelError(QueryScriptRunError):
pass

Expand Down
2 changes: 1 addition & 1 deletion tests/func/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ def test_query_fail_to_compile(cloud_test_catalog):
query_script = "syntax error"

with pytest.raises(QueryScriptCompileError):
catalog.query(query_script)
catalog.query(query_script, _execute_last_expression=True)


def test_query_subprocess_wrong_return_code(mock_popen, cloud_test_catalog):
Expand Down
Loading

0 comments on commit 01fcceb

Please sign in to comment.