Skip to content

Commit

Permalink
Feat: 레이블링 브릿지 개발 임시저장
Browse files Browse the repository at this point in the history
  • Loading branch information
ProtossDragoon committed Jun 15, 2024
1 parent f030102 commit 0f4af9c
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
53 changes: 53 additions & 0 deletions autosink_data_elt/pipe/autosink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from autosink_data_elt.path.autosink import AutosinkPath, LOCAL_BACKEND
from autosink_data_elt.pipe.base import BasePipe
from autosink_data_elt.pipe.tools.unzip import unzip
from sparse_to_dense.cli.web import main


class LabelingPipe(BasePipe):

def __init__(self) -> None:
super().__init__()

def __call__(self, directory_path):
main(directory_path)


class ELTPipeBeforeLabeling(BasePipe):

def __init__(self) -> None:
super().__init__()

def __call__(self, directory_path):
""" 해야 하는 일들
1. `volume/data-lake` 에서 `unzip()` 수행. -> 압축 해제 결과들이 `volume/data-lake/extract` 에 저장됨
2. `volume/data-lake/extract` 내 모든 폴더들에 대해서 레이블링 파이프라인 실행
2-1. 레이블링 파이프라인이 읽어갈 수 있는 형태로 셋업
???
2-2.
"""
return unzip(directory_path)


class ELTPipeAfterLabeling(BasePipe):

def __init__(self) -> None:
super().__init__()

def __call__(self):
return super().__call__()


if __name__ == '__main__':
autosink_path = AutosinkPath(
backend=LOCAL_BACKEND,
mount_dir='.',
data_lake_rel_dir='data-lake',
feature_store_rel_dir='feature-store',
)
pipe = ELTPipeBeforeLabeling()
target_dir = pipe(autosink_path.data_lake_dir)
pipe = LabelingPipe()
pipe(target_dir)
pipe = ELTPipeAfterLabeling()
pipe()
11 changes: 11 additions & 0 deletions autosink_data_elt/pipe/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from abc import ABC, abstractmethod


class BasePipe(ABC):

def __init__(self) -> None:
pass

@abstractmethod
def __call__(self):
raise NotImplementedError
44 changes: 44 additions & 0 deletions autosink_data_elt/pipe/tools/unzip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import zipfile
from typing import Union, Optional


def unzip(
directory_path: Union[os.PathLike, str],
extract_root: Optional[Union[os.PathLike, str]] = None,
):
""" The function `unzip` takes a directory path as input and likely unzips any compressed files
within that directory.
Args:
directory_path: A string representing the path to a directory containing a zip file
that you want to unzip.
extract_root: 압축 해제된 파일들이 저장될 최상위 디렉토리.
"""
if not extract_root:
extract_root = os.path.join(directory_path, 'extract')

# 압축 파일 디렉토리에서 모든 ZIP 파일 찾기
for file in os.listdir(directory_path):
if file.endswith('.zip'):
# 각 ZIP 파일의 전체 경로
zip_file_path = os.path.join(directory_path, file)
# 압축 해제될 하위 디렉토리 경로 (ZIP 파일 이름을 기반으로)
extract_to_path = os.path.join(extract_root, file.rsplit('.', 1)[0])

# 해당 디렉토리가 이미 존재하지 않으면 압축 해제 수행
if not os.path.exists(extract_to_path):
os.makedirs(extract_to_path) # 디렉토리 생성
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(extract_to_path)
print(f'Extracted {file} to {extract_to_path}')
else:
print(f'{file} is already extracted to {extract_to_path}')
return extract_root


if __name__ == '__main__':
unzip(
'volume/data-lake',
'volume/data-lake/extract',
)

0 comments on commit 0f4af9c

Please sign in to comment.