Skip to content

Commit

Permalink
Merge pull request #112 from WorldCereal/fix_s1_safe_conversion
Browse files Browse the repository at this point in the history
Fixx errors management
  • Loading branch information
Fahdben authored Oct 12, 2022
2 parents 88902e7 + aa88024 commit 4692a15
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 61 deletions.
31 changes: 27 additions & 4 deletions src/ewoc_dag/bucket/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,31 @@
import logging
import os
from pathlib import Path
import shutil
from tempfile import gettempdir
from typing import List

from ewoc_dag.bucket.eobucket import EOBucket
from ewoc_dag.bucket.eobucket import EOBucket, EOBucketException
from ewoc_dag.eo_prd_id.l8_prd_id import L8C2PrdIdInfo
from ewoc_dag.eo_prd_id.s1_prd_id import S1PrdIdInfo
from ewoc_dag.eo_prd_id.s2_prd_id import S2PrdIdInfo
from ewoc_dag.safe_format import aws_to_safe
from ewoc_dag.safe_format import S1SafeConversionError, aws_to_safe

logger = logging.getLogger(__name__)


class AWSDownloadError(Exception):
"""Exception raised for errors in the S1 SAFE conversion format on AWS."""

def __init__(self, error=None):
self._error = error
self._message = "Error while downloading from AWS:"
super().__init__(self._message)

def __str__(self):
return f"{self._message} {self._error}"


class AWSEOBucket(EOBucket):
"""Base class for EO public data bucket access on AWS"""

Expand Down Expand Up @@ -101,10 +114,20 @@ def download_prd(
+ "/"
)
logger.debug("prd_prefix: %s", prd_prefix)
super()._download_prd(prd_prefix, out_dirpath, request_payer=True)
try:
super()._download_prd(prd_prefix, out_dirpath, request_payer=True)
except EOBucketException as exc:
logger.error(exc)
out_dirpath.rmdir()
raise AWSDownloadError(exc) from exc

if safe_format:
return aws_to_safe(out_dirpath, prd_id)
try:
return aws_to_safe(out_dirpath, prd_id)
except S1SafeConversionError as exc:
shutil.rmtree(out_dirpath)
logger.error(exc)
raise AWSDownloadError(exc) from exc

return out_dirpath

Expand Down
93 changes: 58 additions & 35 deletions src/ewoc_dag/bucket/eobucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ def __str__(self):
return f"{self.prd_dirpath} fail to be uploaded to {self.object_prefix}!"


class EOBucketException(Exception):
"""Exception raised when product download failed"""

def __init__(
self,
prd_prefix,
response,
bucket_name,
):
self._prd_prefix = prd_prefix
self._bucket_name = bucket_name
self._response = response
self._message = "Error while downloading:"
super().__init__(self._message)

def __str__(self):
return f"{self._message} {self._prd_prefix} from {self._bucket_name} with {self._response}"


class DownloadFileError(Exception):
"""Exception raised when file download failed"""

Expand Down Expand Up @@ -275,42 +294,46 @@ def _download_prd(
Bucket=self._bucket_name,
Prefix=prd_prefix,
)

for obj in response["Contents"]:
# Should we use select this object?
is_selected = prd_items is None
if prd_items is not None:
for filter_band in prd_items:
if filter_band in obj["Key"]:
is_selected = True
if obj["Key"].endswith("/"):
is_file = False
else:
is_file = True
if is_selected and is_file:
logger.debug("obj.key: %s", obj["Key"])
filename = obj["Key"].split(
sep="/", maxsplit=len(prd_prefix.split("/")) - 1
)[-1]
output_filepath = out_dirpath / filename
(output_filepath.parent).mkdir(parents=True, exist_ok=True)
if not output_filepath.exists():
logging.info(
"Try to download from %s to %s", obj["Key"], output_filepath
)
self._s3_client.download_file(
Bucket=self._bucket_name,
Key=obj["Key"],
Filename=str(output_filepath),
ExtraArgs=extra_args,
)
logging.info(
"Download from %s to %s succeed!", obj["Key"], output_filepath
)
try:
for obj in response["Contents"]:
# Should we use select this object?
is_selected = prd_items is None
if prd_items is not None:
for filter_band in prd_items:
if filter_band in obj["Key"]:
is_selected = True
if obj["Key"].endswith("/"):
is_file = False
else:
logging.info(
"%s already available, skip downloading!", output_filepath
)
is_file = True
if is_selected and is_file:
logger.debug("obj.key: %s", obj["Key"])
filename = obj["Key"].split(
sep="/", maxsplit=len(prd_prefix.split("/")) - 1
)[-1]
output_filepath = out_dirpath / filename
(output_filepath.parent).mkdir(parents=True, exist_ok=True)
if not output_filepath.exists():
logging.info(
"Try to download from %s to %s", obj["Key"], output_filepath
)
self._s3_client.download_file(
Bucket=self._bucket_name,
Key=obj["Key"],
Filename=str(output_filepath),
ExtraArgs=extra_args,
)
logging.info(
"Download from %s to %s succeed!",
obj["Key"],
output_filepath,
)
else:
logging.info(
"%s already available, skip downloading!", output_filepath
)
except KeyError as exc:
raise EOBucketException(prd_prefix, response, self._bucket_name) from exc

def _upload_file(self, filepath: Path, key: str) -> int:
"""Upload a object to a bucket
Expand Down
55 changes: 46 additions & 9 deletions src/ewoc_dag/cli_eo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ewoc_dag.eo_prd_id.s1_prd_id import S1PrdIdInfo
from ewoc_dag.eo_prd_id.s2_prd_id import S2PrdIdInfo
from ewoc_dag.l8c2l2_dag import get_l8c2l2_product, _L8C2_SOURCES
from ewoc_dag.s1_dag import get_s1_product
from ewoc_dag.s1_dag import S1DagError, get_s1_product
from ewoc_dag.s2_dag import get_s2_product


Expand All @@ -23,13 +23,26 @@
logger = logging.getLogger(__name__)


class EwocEODagException(Exception):
"""Base Class for ewoc_dag package"""

def __init__(self, error=None):
self._error = error
self._message = "EwoC EO DAG error:"
super().__init__(self._message)

def __str__(self):
return f"{self._message} {self._error}"


def get_eo_data(
prd_id: str,
out_dirpath: Path = Path(gettempdir()),
eo_data_source: str = "eodag",
eodata_config_filepath: Path = None,
only_l2a_mask: bool = False,
use_s2_cogs: bool = False,
to_safe: bool = False,
) -> None:
"""Retrieve EO data from the product ID
Expand All @@ -50,12 +63,17 @@ def get_eo_data(
eodag_config_file=eodata_config_filepath,
)
elif S1PrdIdInfo.is_valid(prd_id):
get_s1_product(
prd_id,
out_dirpath,
source=eo_data_source,
eodag_config_file=eodata_config_filepath,
)
try:
get_s1_product(
prd_id,
out_dirpath,
source=eo_data_source,
eodag_config_file=eodata_config_filepath,
safe_format=to_safe,
)
except S1DagError as exc:
logger.error(exc)
raise EwocEODagException(exc) from exc
elif S2PrdIdInfo.is_valid(prd_id):
get_s2_product(
prd_id,
Expand Down Expand Up @@ -104,6 +122,12 @@ def parse_args(args):
type=str,
default="eodag",
)
parser.add_argument(
"--to-safe",
dest="to_safe",
help="Convert to SAFE format (for AWS S2 L1C and S1 L1 GRD data)",
action="store_true",
)
parser.add_argument(
"-v",
"--verbose",
Expand Down Expand Up @@ -153,8 +177,21 @@ def main(args):
args.data_source,
args.out_dirpath,
)
get_eo_data(args.prd_ids, args.out_dirpath, eo_data_source=args.data_source)
logger.info("Data are available at %s!", args.out_dirpath)
try:
get_eo_data(
args.prd_ids,
args.out_dirpath,
eo_data_source=args.data_source,
to_safe=args.to_safe,
)
except EwocEODagException as exc:
logger.error(exc)
sys.exit(2)
except BaseException as err:
logger.error(f"Unexpected {err=}, {type(err)=}")
sys.exit(1)
else:
logger.info("Data %s are available at %s!", args.prd_ids, args.out_dirpath)


def run():
Expand Down
30 changes: 23 additions & 7 deletions src/ewoc_dag/s1_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path
from tempfile import gettempdir

from ewoc_dag.bucket.aws import AWSS1Bucket
from ewoc_dag.bucket.aws import AWSDownloadError, AWSS1Bucket
from ewoc_dag.bucket.creodias import CreodiasBucket
from ewoc_dag.eodag_utils import get_product_by_id

Expand All @@ -15,6 +15,18 @@
_S1_SOURCES = ["eodag", "aws", "creodias"]


class S1DagError(Exception):
"""Exception raised for errors in the S1 SAFE conversion format on AWS."""

def __init__(self, error=None):
self._error = error
self.message = "Error while S1 downloading:"
super().__init__(self.message)

def __str__(self):
return f"{self.message} {self._error}"


def get_s1_default_provider() -> str:
"""Return the default provider according the computation of two env variables:
- EWOC_CLOUD_PROVIDER
Expand Down Expand Up @@ -67,7 +79,7 @@ def get_s1_product(
logging.info(
"Use EODAG to retrieve S1 product!",
)
out_prd_path = get_product_by_id(
s1_prd_path = get_product_by_id(
prd_id,
out_root_dirpath,
provider="creodias", # TODO Keep eodag manage
Expand All @@ -76,13 +88,17 @@ def get_s1_product(
)
elif s1_provider == "creodias":
logging.info("Use CREODIAS object storage to retrieve S1 product!")
out_prd_path = CreodiasBucket().download_s1_prd(prd_id, out_root_dirpath)
s1_prd_path = CreodiasBucket().download_s1_prd(prd_id, out_root_dirpath)
elif s1_provider == "aws":
logging.info("Use AWS object storage to retrieve S1 product!")
out_prd_path = AWSS1Bucket().download_prd(
prd_id, out_root_dirpath, safe_format=safe_format
)
try:
s1_prd_path = AWSS1Bucket().download_prd(
prd_id, out_root_dirpath, safe_format=safe_format
)
except AWSDownloadError as exc:
logger.error(exc)
raise S1DagError(exc) from exc
else:
raise ValueError(f"Source {s1_provider} is not supported")

return out_prd_path
return s1_prd_path
31 changes: 25 additions & 6 deletions src/ewoc_dag/safe_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@
import shutil
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional

from ewoc_dag.eo_prd_id.s1_prd_id import S1PrdIdInfo
from ewoc_dag.eo_prd_id.s2_prd_id import S2PrdIdInfo

logger = logging.getLogger(__name__)


class S1SafeConversionError(Exception):
"""Exception raised for errors in the S1 SAFE conversion format on AWS."""

def __init__(self, error):
self._error = error
self._message = "Error during S1 SAFE conversion:"
super().__init__(self._message)

def __str__(self):
return f"{self._message} {self._error} !"


def aws_to_safe(
out_dirpath: Path,
prd_id: str,
out_safe_dirroot: Path = None,
out_safe_dirroot: Optional[Path] = None,
) -> Path:
"""Translate from format used by some AWS buckets to SAFE format
Expand All @@ -41,18 +53,25 @@ def aws_to_safe(
out_safe_dirpath = out_dirpath.parent / safe_prd_id
else:
out_safe_dirpath = out_safe_dirroot / safe_prd_id

out_safe_dirpath.mkdir(exist_ok=True)

if S1PrdIdInfo.is_valid(prd_id):
out_safe_dirpath = aws_s1_to_safe(out_dirpath, out_safe_dirpath)
if (out_dirpath / "productInfo.json").exists():
prd_safe_dirpath = aws_s1_to_safe(out_dirpath, out_safe_dirpath)
else:
out_safe_dirpath.rmdir()
raise S1SafeConversionError(
f'No conversion file for {prd_id.split(".")[0]}'
)
elif S2PrdIdInfo.is_valid(prd_id) and S2PrdIdInfo.is_l1c(prd_id):
out_safe_dirpath = aws_s2_l1c_to_safe(out_dirpath, out_safe_dirpath)
prd_safe_dirpath = aws_s2_l1c_to_safe(out_dirpath, out_safe_dirpath)
else:
raise ValueError("Product ID not supported!")

shutil.rmtree(out_dirpath)

return out_safe_dirpath
return prd_safe_dirpath


def aws_s1_to_safe(
Expand Down Expand Up @@ -202,7 +221,7 @@ def aws_s2_l1c_to_safe(
if gr_elt.parts[-2] == "AUX_DATA":
safe_gr_aux_data_dir = gr_elt.parent
break
if 'safe_gr_aux_data_dir' not in locals():
if "safe_gr_aux_data_dir" not in locals():
safe_gr_aux_data_dir = safe_gr_qi_data_dir.parents[0] / "AUX_DATA"
try:
shutil.copy(
Expand Down

0 comments on commit 4692a15

Please sign in to comment.