Skip to content

Commit

Permalink
Merge branch 'main' into ilongin/420-fix-storage-dataset-dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
ilongin authored Sep 11, 2024
2 parents 187370b + 43f5b21 commit b40c1b9
Show file tree
Hide file tree
Showing 38 changed files with 180 additions and 333 deletions.
8 changes: 7 additions & 1 deletion examples/computer_vision/iptc_exif_xmp_lib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# pip install defusedxml
"""
To install the required dependencies:
pip install datachain[examples]
"""

import json

from PIL import (
Expand Down
8 changes: 7 additions & 1 deletion examples/computer_vision/llava2_image_desc_lib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# pip install accelerate torch
"""
To install the required dependencies:
pip install datachain[examples]
"""

import torch
from transformers import (
AutoProcessor,
Expand Down
2 changes: 0 additions & 2 deletions examples/get_started/json-csv-reader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# pip install datamodel-code-generator jmespath

from typing import Optional

from pydantic import BaseModel
Expand Down
7 changes: 6 additions & 1 deletion examples/get_started/torch-loader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# pip install Pillow torchvision
"""
To install the required dependencies:
pip install datachain[torch]
"""

import os
from posixpath import basename
Expand Down
4 changes: 2 additions & 2 deletions examples/get_started/udfs/stateful.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
To install dependencies:
To install the required dependencies:
pip install open_clip_torch
pip install datachain[examples]
"""

Expand Down
1 change: 0 additions & 1 deletion src/datachain/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class UniqueId:
etag: str
version: str = ""
is_latest: bool = True
vtype: str = ""
location: Optional[str] = None
last_modified: datetime = TIME_ZERO

Expand Down
15 changes: 3 additions & 12 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from datachain.node import DirType, Node, NodeWithPath
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.remote.studio import StudioClient
from datachain.sql.types import JSON, Boolean, DateTime, Int, Int64, SQLType, String
from datachain.sql.types import JSON, Boolean, DateTime, Int64, SQLType, String
from datachain.storage import Storage, StorageStatus, StorageURI
from datachain.utils import (
DataChainDir,
Expand Down Expand Up @@ -513,8 +513,6 @@ def find_column_to_str( # noqa: PLR0911
)
if column == "name":
return posixpath.basename(row[field_lookup["path"]]) or ""
if column == "owner":
return row[field_lookup["owner_name"]] or ""
if column == "path":
is_dir = row[field_lookup["dir_type"]] == DirType.DIR
path = row[field_lookup["path"]]
Expand Down Expand Up @@ -666,16 +664,12 @@ def enlist_source(
source_metastore = self.metastore.clone(client.uri)

columns = [
Column("vtype", String),
Column("dir_type", Int),
Column("path", String),
Column("etag", String),
Column("version", String),
Column("is_latest", Boolean),
Column("last_modified", DateTime(timezone=True)),
Column("size", Int64),
Column("owner_name", String),
Column("owner_id", String),
Column("location", JSON),
Column("source", String),
]
Expand Down Expand Up @@ -1396,12 +1390,12 @@ def dataset_table_export_file_names(self, name: str, version: int) -> list[str]:
dataset = self.get_dataset(name)
return self.warehouse.dataset_table_export_file_names(dataset, version)

def dataset_stats(self, name: str, version: int) -> DatasetStats:
def dataset_stats(self, name: str, version: Optional[int]) -> DatasetStats:
"""
Returns tuple with dataset stats: total number of rows and total dataset size.
"""
dataset = self.get_dataset(name)
dataset_version = dataset.get_version(version)
dataset_version = dataset.get_version(version or dataset.latest_version)
return DatasetStats(
num_objects=dataset_version.num_objects,
size=dataset_version.size,
Expand Down Expand Up @@ -1516,7 +1510,6 @@ def _get_row_uid(self, row: RowDict) -> UniqueId:
row["etag"],
row["version"],
row["is_latest"],
row["vtype"],
row["location"],
row["last_modified"],
)
Expand Down Expand Up @@ -1987,8 +1980,6 @@ def find(
field_set.add("path")
elif column == "name":
field_set.add("path")
elif column == "owner":
field_set.add("owner_name")
elif column == "path":
field_set.add("dir_type")
field_set.add("path")
Expand Down
10 changes: 4 additions & 6 deletions src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

TTL_HUMAN = "4h"
TTL_INT = 4 * 60 * 60
FIND_COLUMNS = ["du", "name", "owner", "path", "size", "type"]
FIND_COLUMNS = ["du", "name", "path", "size", "type"]


def human_time_type(value_str: str, can_be_none: bool = False) -> Optional[int]:
Expand Down Expand Up @@ -579,9 +579,8 @@ def _node_data_to_ls_values(row, long_format=False):
value = name + ending
if long_format:
last_modified = row[2]
owner_name = row[3]
timestamp = last_modified if not is_dir else None
return long_line_str(value, timestamp, owner_name)
return long_line_str(value, timestamp)
return value


Expand All @@ -599,15 +598,15 @@ def _ls_urls_flat(
if client_cls.is_root_url(source):
buckets = client_cls.ls_buckets(**catalog.client_config)
if long:
values = (long_line_str(b.name, b.created, "") for b in buckets)
values = (long_line_str(b.name, b.created) for b in buckets)
else:
values = (b.name for b in buckets)
yield source, values
else:
found = False
fields = ["name", "dir_type"]
if long:
fields.extend(["last_modified", "owner_name"])
fields.append("last_modified")
for data_source, results in catalog.ls([source], fields=fields, **kwargs):
values = (_node_data_to_ls_values(r, long) for r in results)
found = True
Expand Down Expand Up @@ -683,7 +682,6 @@ def ls_remote(
entry = long_line_str(
row["name"] + ("/" if row["dir_type"] else ""),
row["last_modified"],
row["owner_name"],
)
print(format_ls_entry(entry))
else:
Expand Down
1 change: 0 additions & 1 deletion src/datachain/client/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ def _open_tar(self, uid: UniqueId, use_cache: bool = True):
parent["path"],
parent["size"],
parent["etag"],
vtype=parent["vtype"],
location=parent["location"],
)
f = self.open_object(parent_uid, use_cache=use_cache)
Expand Down
4 changes: 0 additions & 4 deletions src/datachain/client/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def _entry_from_boto(self, v, bucket, versions=False):
is_latest=v.get("IsLatest", True),
last_modified=v.get("LastModified", ""),
size=v["Size"],
owner_name=v.get("Owner", {}).get("DisplayName", ""),
owner_id=v.get("Owner", {}).get("ID", ""),
)

async def _fetch_dir(
Expand Down Expand Up @@ -165,8 +163,6 @@ def convert_info(self, v: dict[str, Any], path: str) -> Entry:
is_latest=v.get("IsLatest", True),
last_modified=v.get("LastModified", ""),
size=v["size"],
owner_name=v.get("Owner", {}).get("DisplayName", ""),
owner_id=v.get("Owner", {}).get("ID", ""),
)

def info_to_file(self, v: dict[str, Any], path: str) -> File:
Expand Down
12 changes: 4 additions & 8 deletions src/datachain/data_storage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

import sqlalchemy as sa
from sqlalchemy.sql import func as f
from sqlalchemy.sql.expression import null, true
from sqlalchemy.sql.expression import false, null, true

from datachain.node import DirType
from datachain.sql.functions import path
from datachain.sql.types import Int, SQLType, UInt64

Expand Down Expand Up @@ -81,8 +80,7 @@ class DirExpansion:
def base_select(q):
return sa.select(
q.c.sys__id,
q.c.vtype,
(q.c.dir_type == DirType.DIR).label("is_dir"),
false().label("is_dir"),
q.c.source,
q.c.path,
q.c.version,
Expand All @@ -94,16 +92,15 @@ def apply_group_by(q):
return (
sa.select(
f.min(q.c.sys__id).label("sys__id"),
q.c.vtype,
q.c.is_dir,
q.c.source,
q.c.path,
q.c.version,
f.max(q.c.location).label("location"),
)
.select_from(q)
.group_by(q.c.source, q.c.path, q.c.vtype, q.c.is_dir, q.c.version)
.order_by(q.c.source, q.c.path, q.c.vtype, q.c.is_dir, q.c.version)
.group_by(q.c.source, q.c.path, q.c.is_dir, q.c.version)
.order_by(q.c.source, q.c.path, q.c.is_dir, q.c.version)
)

@classmethod
Expand All @@ -113,7 +110,6 @@ def query(cls, q):
q = q.union_all(
sa.select(
sa.literal(-1).label("sys__id"),
sa.literal("").label("vtype"),
true().label("is_dir"),
q.c.source,
parent.label("path"),
Expand Down
34 changes: 12 additions & 22 deletions src/datachain/data_storage/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

if TYPE_CHECKING:
from sqlalchemy.sql._typing import _ColumnsClauseArgument
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.selectable import Select
from sqlalchemy.types import TypeEngine

Expand Down Expand Up @@ -341,9 +340,7 @@ def _is_glob(path: str) -> bool:

column_objects = [dr.c[c] for c in column_names]
# include all object types - file, tar archive, tar file (subobject)
select_query = dr.select(*column_objects).where(
dr.c.dir_type.in_(DirTypeGroup.FILE) & (dr.c.is_latest == true())
)
select_query = dr.select(*column_objects).where(dr.c.is_latest == true())
if path is None:
return select_query
if recursive:
Expand Down Expand Up @@ -404,13 +401,14 @@ def dataset_stats(
expressions: tuple[_ColumnsClauseArgument[Any], ...] = (
sa.func.count(table.c.sys__id),
)
if "file__size" in table.columns:
expressions = (*expressions, sa.func.sum(table.c.file__size))
elif "size" in table.columns:
expressions = (*expressions, sa.func.sum(table.c.size))
size_columns = [
c for c in table.columns if c.name == "size" or c.name.endswith("__size")
]
if size_columns:
expressions = (*expressions, sa.func.sum(sum(size_columns)))
query = select(*expressions)
((nrows, *rest),) = self.db.execute(query)
return nrows, rest[0] if rest else None
return nrows, rest[0] if rest else 0

def prepare_entries(
self, uri: str, entries: Iterable[Entry]
Expand All @@ -420,7 +418,6 @@ def prepare_entries(
"""

def _prepare_entry(entry: Entry):
assert entry.dir_type is not None
return attrs.asdict(entry) | {"source": uri}

return [_prepare_entry(e) for e in entries]
Expand All @@ -440,7 +437,7 @@ def insert_dataset_rows(self, df, dataset: DatasetRecord, version: int) -> int:
"""Inserts dataset rows directly into dataset table"""

@abstractmethod
def instr(self, source, target) -> "ColumnElement":
def instr(self, source, target) -> sa.ColumnElement:
"""
Return SQLAlchemy Boolean determining if a target substring is present in
source string column
Expand Down Expand Up @@ -500,7 +497,7 @@ def add_node_type_where(
c = query.selected_columns
q = query.where(c.dir_type.in_(file_group))
if not include_subobjects:
q = q.where(c.vtype == "")
q = q.where((c.location == "") | (c.location.is_(None)))
return q

def get_nodes(self, query) -> Iterator[Node]:
Expand Down Expand Up @@ -624,8 +621,7 @@ def with_default(column):

return sa.select(
de.c.sys__id,
with_default(dr.c.vtype),
case((de.c.is_dir == true(), DirType.DIR), else_=dr.c.dir_type).label(
case((de.c.is_dir == true(), DirType.DIR), else_=DirType.FILE).label(
"dir_type"
),
de.c.path,
Expand All @@ -634,8 +630,6 @@ def with_default(column):
with_default(dr.c.is_latest),
dr.c.last_modified,
with_default(dr.c.size),
with_default(dr.c.owner_name),
with_default(dr.c.owner_id),
with_default(dr.c.sys__rand),
dr.c.location,
de.c.source,
Expand All @@ -650,7 +644,6 @@ def get_node_by_path(self, dataset_rows: "DataTable", path: str) -> Node:
query = dr.select().where(
self.path_expr(dr) == path,
dr.c.is_latest == true(),
dr.c.dir_type != DirType.DIR,
)
row = next(self.db.execute(query), None)
if row is not None:
Expand All @@ -660,7 +653,6 @@ def get_node_by_path(self, dataset_rows: "DataTable", path: str) -> Node:
dr.select()
.where(
dr.c.is_latest == true(),
dr.c.dir_type != DirType.DIR,
dr.c.path.startswith(path),
)
.exists()
Expand Down Expand Up @@ -761,13 +753,11 @@ def size(

sub_glob = posixpath.join(path, "*")
dr = dataset_rows
selections = [
selections: list[sa.ColumnElement] = [
func.sum(dr.c.size),
]
if count_files:
selections.append(
func.sum(dr.c.dir_type.in_(DirTypeGroup.FILE)),
)
selections.append(func.count())
results = next(
self.db.execute(
dr.select(*selections).where(
Expand Down
1 change: 0 additions & 1 deletion src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ class DataChain(DatasetQuery):
DEFAULT_FILE_RECORD: ClassVar[dict] = {
"source": "",
"path": "",
"vtype": "",
"size": 0,
}

Expand Down
3 changes: 0 additions & 3 deletions src/datachain/lib/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class File(DataModel):
is_latest: bool = Field(default=True)
last_modified: datetime = Field(default=TIME_ZERO)
location: Optional[Union[dict, list[dict]]] = Field(default=None)
vtype: str = Field(default="")

_datachain_column_types: ClassVar[dict[str, Any]] = {
"source": String,
Expand All @@ -129,7 +128,6 @@ class File(DataModel):
"is_latest": Boolean,
"last_modified": DateTime,
"location": JSON,
"vtype": String,
}

_unique_id_keys: ClassVar[list[str]] = [
Expand All @@ -139,7 +137,6 @@ class File(DataModel):
"etag",
"version",
"is_latest",
"vtype",
"location",
"last_modified",
]
Expand Down
Loading

0 comments on commit b40c1b9

Please sign in to comment.