Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use wrabbit 0.2.4 #97

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ruamel.yaml >= 0.16
sevenbridges-python >= 2.0
nf-core==2.1
wrabbit==0.2.3
wrabbit==0.2.4
cwlformat
packaging
41 changes: 41 additions & 0 deletions sbpack/noncwl/manifest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sevenbridges.models.project import Project
from sevenbridges import Api

import logging
import sbpack.lib as lib
Expand Down Expand Up @@ -42,6 +43,46 @@ def paths_to_check(file_name: str) -> list:
return rtrn


def get_path_from_id(api: Api, file: str) -> str:
"""
Extracts the full path of a file from ID
:param api: Initialized SevenBridges API
:param file: id of a file
:return: Path to the File
"""
file = api.files.get(file)
temp = file
full_path = [file.name]

project_root = api.projects.get(file.project)
project_root_name = api.files.get(project_root).name

while temp.parent != project_root:
temp = api.files.get(temp.parent)
full_path.append(temp.name)

full_path.append(project_root_name)
return "vs:///Projects/" + "/".join(full_path[::-1])


def get_path_from_name(api: Api, file_name: str, project: Project) -> str:
"""
Extract the full path of a file from File Name
:param api: Initialized SevenBridges API
:param file_name: Name of the file
:param project: SevenBridges Project
:return:
"""

file = api.files.query(project=project, names=[file_name])
if file:
return get_path_from_id(api, file[0].id)
else:
raise FileNotFoundError(
f"Unable to find file with name {file_name} in {project}"
)


def remap_cell(project_root: str, path: str) -> str:
"""
Remaps a file path to the 'vs:' file system.
Expand Down
49 changes: 38 additions & 11 deletions sbpack/noncwl/nextflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from sbpack.noncwl.utils import (
zip_and_push_to_sb,
install_or_upgrade_app,
remove_local_file,
get_git_repo,
)

from wrabbit.parser.utils import (
Expand Down Expand Up @@ -82,16 +84,24 @@ def main():
help="Takes the form {user or division}/{project}/{app_id}.",
)
parser.add_argument(
"--workflow-path", required=True,
help="Path to the main workflow directory",
"--workflow-path", required=False,
help="Path to the main workflow directory.",
)
parser.add_argument(
"--git-url", required=False,
help="URL to the git repository.",
)
parser.add_argument(
"--branch", required=False,
help="Used with --git-url. If git url is provided, branch to clone.",
)
parser.add_argument(
"--entrypoint", required=False,
help="Relative path to the workflow from the main workflow directory. "
"If not provided, 'main.nf' will be used if available. "
"If not available, but a single '*.nf' is located in the "
"workflow-path will be used. If more than one '*.nf' script is "
"detected, an error is raised.",
"workflow-path (or git-url) will be used. If more than one '*.nf'"
" script is detected, an error is raised.",
)
parser.add_argument(
"--sb-package-id", required=False,
Expand Down Expand Up @@ -158,7 +168,7 @@ def main():
parser.add_argument(
"--auto", action="store_true", required=False,
help="Automatically detect all possible inputs directly from the "
"--workflow-path location",
"--workflow-path or --git-url location",
)

args = parser.parse_args()
Expand All @@ -175,8 +185,18 @@ def main():
label = args.app_name or None
dump_sb_app = args.dump_sb_app or False
sb_package_id = args.sb_package_id or None
workflow_path = args.workflow_path or None
git_url = args.git_url or None
branch = args.branch or None
cleanup_workflow_path = False # changes to True if temp git dir is created

# Input validation
if (not workflow_path and not git_url) or \
(workflow_path and git_url):
raise Exception(
"Either --workflow_path OR --git_url must be provided."
)

if not dump_sb_app and not args.appid and not args.auto:
raise Exception(
"The --appid argument is required if "
Expand All @@ -203,18 +223,22 @@ def main():
"Using --sb-schema option overwrites --entrypoint"
)

if git_url:
cleanup_workflow_path = True
workflow_path = get_git_repo(git_url, branch)

sb_doc = None
if args.sb_doc:
with open(args.sb_doc, 'r') as f:
sb_doc = f.read()
elif get_readme(args.workflow_path):
with open(get_readme(args.workflow_path), 'r') as f:
elif get_readme(workflow_path):
with open(get_readme(workflow_path), 'r') as f:
sb_doc = f.read()

if args.auto:
# This is where the magic happens
if not sb_schema:
sb_schema = get_latest_sb_schema(args.workflow_path)
sb_schema = get_latest_sb_schema(workflow_path)
if sb_schema:
logger.info(f'Using sb schema <{sb_schema}>')

Expand All @@ -225,7 +249,7 @@ def main():

# locate sample sheet
if not sample_sheet_schema:
sample_sheet_schema = get_sample_sheet_schema(args.workflow_path)
sample_sheet_schema = get_sample_sheet_schema(workflow_path)
if sample_sheet_schema:
logger.info(
f'Using sample sheet schema <{sample_sheet_schema}>'
Expand All @@ -239,7 +263,7 @@ def main():
)

nf_wrapper = SBNextflowWrapper(
workflow_path=args.workflow_path,
workflow_path=workflow_path,
sb_doc=sb_doc,
label=label,
entrypoint=entrypoint,
Expand Down Expand Up @@ -275,7 +299,7 @@ def main():
project_id = '/'.join(args.appid.split('/')[:2])
sb_package_id = zip_and_push_to_sb(
api=api,
workflow_path=args.workflow_path,
workflow_path=workflow_path,
project_id=project_id,
folder_name='nextflow_workflows',
exclude_patterns=args.exclude,
Expand All @@ -293,6 +317,9 @@ def main():
nf_wrapper.dump_sb_wrapper(out_format=out_format)
install_or_upgrade_app(api, args.appid, nf_wrapper.sb_wrapper.dump())

if cleanup_workflow_path:
remove_local_file(workflow_path)


if __name__ == "__main__":
main()
54 changes: 52 additions & 2 deletions sbpack/noncwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import yaml
import re
import subprocess

from typing import Optional
import fnmatch
Expand Down Expand Up @@ -97,12 +98,16 @@ def zip_directory(workflow_path, exclude_patterns: Optional[list] = None):
base_dir='./'
)

shutil.rmtree(intermediary_dir)
print(f'Temporary local folder {intermediary_dir} deleted.')
remove_local_file(intermediary_dir)

return intermediary_dir + '.zip'


def remove_local_file(directory):
shutil.rmtree(directory)
print(f'Temporary local folder {directory} deleted.')


def push_zip(api, zip_path, project_id, folder_name=None):
if os.path.getsize(zip_path) > PACKAGE_SIZE_LIMIT:
logger.error(f"File size too big: {os.path.getsize(zip_path)}")
Expand Down Expand Up @@ -184,3 +189,48 @@ def install_or_upgrade_app(api, app_id, sb_app_raw):
raw=sb_app_raw
)
print(f"App created successfully!")


def get_git_repo(url, branch=None, max_retries=3, delay=2):
"""
Clones a Git repository to a specified directory,
optionally checking out a specific branch.

:param url: The URL of the Git repository to clone.
:param branch: The specific branch or tag to check out after cloning.
:param max_retries: Maximum number of retry attempts for cloning.
:param delay: Delay (in seconds) between retry attempts.
:return: Temporary directory where git is cloned.
"""
# TBD change this to git.Repo with gitpython

clone_dir = update_timestamp(os.path.basename(url))
attempts = 0

while attempts < max_retries:
try:
# Clone the repository
if branch:
subprocess.run(
['git', 'clone', '--branch', branch, url, clone_dir],
check=True
)
print(f"Successfully cloned {url}:{branch} into {clone_dir}")
else:
subprocess.run(
['git', 'clone', url, clone_dir],
check=True
)
print(f"Successfully cloned {url} into {clone_dir}")
return clone_dir # Return upon successful cloning

except subprocess.CalledProcessError as e:
attempts += 1
print(f"Error during cloning: {e}. "
f"Attempt {attempts} of {max_retries}.")
if attempts < max_retries:
print(f"Retrying in {delay} seconds...")
time.sleep(delay)

raise RuntimeError(f"Failed to clone repository {url} after "
f"{max_retries} attempts.")
Loading