Skip to content

Commit

Permalink
Remove 'dir_type' column and Entry.dir_type attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
rlamy authored and skshetry committed Sep 11, 2024
1 parent 0a7852f commit d30f420
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 58 deletions.
3 changes: 1 addition & 2 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from datachain.node import DirType, Node, NodeWithPath
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.remote.studio import StudioClient
from datachain.sql.types import JSON, Boolean, DateTime, Int, Int64, SQLType, String
from datachain.sql.types import JSON, Boolean, DateTime, Int64, SQLType, String
from datachain.storage import Storage, StorageStatus, StorageURI
from datachain.utils import (
DataChainDir,
Expand Down Expand Up @@ -699,7 +699,6 @@ def enlist_source(
source_metastore = self.metastore.clone(client.uri)

columns = [
Column("dir_type", Int),
Column("path", String),
Column("etag", String),
Column("version", String),
Expand Down
5 changes: 2 additions & 3 deletions src/datachain/data_storage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

import sqlalchemy as sa
from sqlalchemy.sql import func as f
from sqlalchemy.sql.expression import null, true
from sqlalchemy.sql.expression import false, null, true

from datachain.node import DirType
from datachain.sql.functions import path
from datachain.sql.types import Int, SQLType, UInt64

Expand Down Expand Up @@ -81,7 +80,7 @@ class DirExpansion:
def base_select(q):
return sa.select(
q.c.sys__id,
(q.c.dir_type == DirType.DIR).label("is_dir"),
false().label("is_dir"),
q.c.source,
q.c.path,
q.c.version,
Expand Down
18 changes: 5 additions & 13 deletions src/datachain/data_storage/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

if TYPE_CHECKING:
from sqlalchemy.sql._typing import _ColumnsClauseArgument
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.selectable import Select
from sqlalchemy.types import TypeEngine

Expand Down Expand Up @@ -341,9 +340,7 @@ def _is_glob(path: str) -> bool:

column_objects = [dr.c[c] for c in column_names]
# include all object types - file, tar archive, tar file (subobject)
select_query = dr.select(*column_objects).where(
dr.c.dir_type.in_(DirTypeGroup.FILE) & (dr.c.is_latest == true())
)
select_query = dr.select(*column_objects).where(dr.c.is_latest == true())
if path is None:
return select_query
if recursive:
Expand Down Expand Up @@ -420,7 +417,6 @@ def prepare_entries(
"""

def _prepare_entry(entry: Entry):
assert entry.dir_type is not None
return attrs.asdict(entry) | {"source": uri}

return [_prepare_entry(e) for e in entries]
Expand All @@ -440,7 +436,7 @@ def insert_dataset_rows(self, df, dataset: DatasetRecord, version: int) -> int:
"""Inserts dataset rows directly into dataset table"""

@abstractmethod
def instr(self, source, target) -> "ColumnElement":
def instr(self, source, target) -> sa.ColumnElement:
"""
Return SQLAlchemy Boolean determining if a target substring is present in
source string column
Expand Down Expand Up @@ -624,7 +620,7 @@ def with_default(column):

return sa.select(
de.c.sys__id,
case((de.c.is_dir == true(), DirType.DIR), else_=dr.c.dir_type).label(
case((de.c.is_dir == true(), DirType.DIR), else_=DirType.FILE).label(
"dir_type"
),
de.c.path,
Expand All @@ -649,7 +645,6 @@ def get_node_by_path(self, dataset_rows: "DataTable", path: str) -> Node:
query = dr.select().where(
self.path_expr(dr) == path,
dr.c.is_latest == true(),
dr.c.dir_type != DirType.DIR,
)
row = next(self.db.execute(query), None)
if row is not None:
Expand All @@ -659,7 +654,6 @@ def get_node_by_path(self, dataset_rows: "DataTable", path: str) -> Node:
dr.select()
.where(
dr.c.is_latest == true(),
dr.c.dir_type != DirType.DIR,
dr.c.path.startswith(path),
)
.exists()
Expand Down Expand Up @@ -760,13 +754,11 @@ def size(

sub_glob = posixpath.join(path, "*")
dr = dataset_rows
selections = [
selections: list[sa.ColumnElement] = [
func.sum(dr.c.size),
]
if count_files:
selections.append(
func.sum(dr.c.dir_type.in_(DirTypeGroup.FILE)),
)
selections.append(func.count())
results = next(
self.db.execute(
dr.select(*selections).where(
Expand Down
19 changes: 2 additions & 17 deletions src/datachain/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class DirTypeGroup:
class Node:
sys__id: int = 0
sys__rand: int = 0
dir_type: Optional[int] = None
path: str = ""
etag: str = ""
version: Optional[str] = None
Expand All @@ -60,6 +59,7 @@ class Node:
owner_id: str = ""
location: Optional[str] = None
source: StorageURI = StorageURI("")
dir_type: int = DirType.FILE

@property
def is_dir(self) -> bool:
Expand Down Expand Up @@ -143,7 +143,6 @@ def parent(self):

@attrs.define
class Entry:
dir_type: Optional[int] = None
path: str = ""
etag: str = ""
version: str = ""
Expand All @@ -154,26 +153,12 @@ class Entry:
owner_id: str = ""
location: Optional[str] = None

@property
def is_dir(self) -> bool:
return self.dir_type == DirType.DIR

@classmethod
def from_dir(cls, path: str, **kwargs) -> "Entry":
return cls(dir_type=DirType.DIR, path=path, **kwargs)

@classmethod
def from_file(cls, path: str, **kwargs) -> "Entry":
return cls(dir_type=DirType.FILE, path=path, **kwargs)

@classmethod
def root(cls):
return cls(dir_type=DirType.DIR)
return cls(path=path, **kwargs)

@property
def full_path(self) -> str:
if self.is_dir and self.path:
return self.path + "/"
return self.path

@property
Expand Down
3 changes: 0 additions & 3 deletions src/datachain/query/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def load_tar(raw):
C.source,
C.path,
C.size,
C.dir_type,
C.owner_name,
C.owner_id,
C.is_latest,
Expand All @@ -37,7 +36,6 @@ def index_tar(
source,
parent_path,
size,
dir_type,
owner_name,
owner_id,
is_latest,
Expand All @@ -51,7 +49,6 @@ def index_tar(
source=source,
path=parent_path,
size=size,
dir_type=dir_type,
owner_name=owner_name,
owner_id=owner_id,
is_latest=bool(is_latest),
Expand Down
5 changes: 1 addition & 4 deletions src/datachain/query/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sqlalchemy as sa
from fsspec.callbacks import DEFAULT_CALLBACK, Callback

from datachain.sql.types import JSON, Boolean, DateTime, Int, Int64, SQLType, String
from datachain.sql.types import JSON, Boolean, DateTime, Int64, SQLType, String

if TYPE_CHECKING:
from datachain.catalog import Catalog
Expand Down Expand Up @@ -222,7 +222,6 @@ class DatasetRow:
"path": String,
"size": Int64,
"location": JSON,
"dir_type": Int,
"owner_name": String,
"owner_id": String,
"is_latest": Boolean,
Expand All @@ -237,7 +236,6 @@ def create(
source: str = "",
size: int = 0,
location: Optional[dict[str, Any]] = None,
dir_type: int = 0,
owner_name: str = "",
owner_id: str = "",
is_latest: bool = True,
Expand Down Expand Up @@ -268,7 +266,6 @@ def create(
path,
size,
location,
dir_type,
owner_name,
owner_id,
is_latest,
Expand Down
4 changes: 2 additions & 2 deletions tests/func/test_dataset_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ def test_mutate(cloud_test_catalog, save):
else:
result = q.db_results(row_factory=lambda c, v: dict(zip(c, v)))
assert len(result) == 4
assert len(result[0]) == 18
assert len(result[0]) == 17
cols = {"size10x", "size1000x", "s2", "s3", "s4"}
new_data = [[v for k, v in r.items() if k in cols] for r in result]
assert new_data == [
Expand Down Expand Up @@ -568,7 +568,7 @@ def test_order_by_limit(cloud_test_catalog, save):
assert dataset_record.status == DatasetStatus.COMPLETE
else:
result = q.db_results()
assert [posixpath.basename(r[3]) for r in result] == [
assert [posixpath.basename(r[2]) for r in result] == [
"dog4",
"dog3",
"dog2",
Expand Down
7 changes: 1 addition & 6 deletions tests/func/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@

from datachain.dataset import DatasetStatus
from datachain.error import DataChainError
from datachain.node import DirType
from datachain.utils import JSONSerialize
from tests.data import ENTRIES
from tests.utils import assert_row_names, skip_if_not_sqlite


@pytest.fixture
def dog_entries():
return [
attrs.asdict(e) for e in ENTRIES if e.name.startswith("dog") and not e.is_dir
]
return [attrs.asdict(e) for e in ENTRIES if e.name.startswith("dog")]


@pytest.fixture
Expand Down Expand Up @@ -47,7 +44,6 @@ def _adapt_row(row):
adapted["sys__rand"] = 1
adapted["location"] = b""
adapted["source"] = b"s3://dogs"
adapted["dir_type"] = DirType.FILE
return adapted

dog_entries = [_adapt_row(e) for e in dog_entries]
Expand All @@ -62,7 +58,6 @@ def _adapt_row(row):
def schema():
return {
"id": {"type": "UInt64"},
"dir_type": {"type": "Int32"},
"path": {"type": "String"},
"etag": {"type": "String"},
"version": {"type": "String"},
Expand Down
9 changes: 1 addition & 8 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,43 +158,36 @@ def text_embedding(text: str) -> list[float]:
SIMPLE_DS_QUERY_RECORDS = [
{
"path": "cats/cat1",
"dir_type": 0,
"is_latest": 1,
"size": 4,
},
{
"path": "cats/cat2",
"dir_type": 0,
"is_latest": 1,
"size": 4,
},
{
"path": "description",
"dir_type": 0,
"is_latest": 1,
"size": 13,
},
{
"path": "dogs/dog1",
"dir_type": 0,
"is_latest": 1,
"size": 4,
},
{
"path": "dogs/dog2",
"dir_type": 0,
"is_latest": 1,
"size": 3,
},
{
"path": "dogs/dog3",
"dir_type": 0,
"is_latest": 1,
"size": 4,
},
{
"path": "dogs/others/dog4",
"dir_type": 0,
"is_latest": 1,
"size": 4,
},
Expand All @@ -204,7 +197,7 @@ def text_embedding(text: str) -> list[float]:
def get_simple_ds_query(path, catalog):
return (
DatasetQuery(path=path, catalog=catalog)
.select(C.path, C.dir_type, C.is_latest, C.size)
.select(C.path, C.is_latest, C.size)
.order_by(C.source, C.path)
)

Expand Down

0 comments on commit d30f420

Please sign in to comment.