diff --git a/sigmf/apps/convert_wav.py b/sigmf/apps/convert_wav.py index c694750..aad76d4 100755 --- a/sigmf/apps/convert_wav.py +++ b/sigmf/apps/convert_wav.py @@ -65,8 +65,8 @@ def convert_wav( if archive_filename is None: archive_filename = input_stem + archive.SIGMF_ARCHIVE_EXT - meta.tofile(archive_filename, toarchive=True) - return os.path.abspath(archive_filename) + out_path = meta.tofile(archive_filename, toarchive=True) + return out_path def main(): @@ -87,11 +87,11 @@ def main(): } logging.basicConfig(level=level_lut[min(args.verbose, 2)]) - out_fname = convert_wav( + out_path = convert_wav( input_wav_filename=args.input, author=args.author, ) - log.info(f"Write {out_fname}") + log.info(f"Wrote {out_path}") if __name__ == "__main__": diff --git a/sigmf/archive.py b/sigmf/archive.py index 4424675..895e3d1 100644 --- a/sigmf/archive.py +++ b/sigmf/archive.py @@ -7,10 +7,10 @@ """Create and extract SigMF archives.""" import io -import os import shutil import tarfile import tempfile +from pathlib import Path from .error import SigMFFileError @@ -21,135 +21,134 @@ class SigMFArchive: - """Archive a SigMFFile. + """ + Archive a SigMFFile A `.sigmf` file must include both valid metadata and data. If `self.data_file` is not set or the requested output file - is not writable, raise `SigMFFileError`. - - Parameters: - - sigmffile -- A SigMFFile object with valid metadata and data_file - - name -- path to archive file to create. If file exists, overwrite. - If `name` doesn't end in .sigmf, it will be appended. - For example: if `name` == "/tmp/archive1", then the - following archive will be created: - /tmp/archive1.sigmf - - archive1/ - - archive1.sigmf-meta - - archive1.sigmf-data - - fileobj -- If `fileobj` is specified, it is used as an alternative to - a file object opened in binary mode for `name`. It is - supposed to be at position 0. `name` is not required, but - if specified will be used to determine the directory and - file names within the archive. `fileobj` won't be closed. - For example: if `name` == "archive1" and fileobj is given, - a tar archive will be written to fileobj with the - following structure: - - archive1/ - - archive1.sigmf-meta - - archive1.sigmf-data + is not writable, raises `SigMFFileError`. + + Parameters + ---------- + + sigmffile : SigMFFile + A SigMFFile object with valid metadata and data_file. + + name : PathLike | str | bytes + Path to archive file to create. If file exists, overwrite. + If `name` doesn't end in .sigmf, it will be appended. + For example: if `name` == "/tmp/archive1", then the + following archive will be created: + /tmp/archive1.sigmf + - archive1/ + - archive1.sigmf-meta + - archive1.sigmf-data + + fileobj : BufferedWriter + If `fileobj` is specified, it is used as an alternative to + a file object opened in binary mode for `name`. It is + supposed to be at position 0. `name` is not required, but + if specified will be used to determine the directory and + file names within the archive. `fileobj` won't be closed. + For example: if `name` == "archive1" and fileobj is given, + a tar archive will be written to fileobj with the + following structure: + - archive1/ + - archive1.sigmf-meta + - archive1.sigmf-data """ def __init__(self, sigmffile, name=None, fileobj=None): + is_buffer = fileobj is not None self.sigmffile = sigmffile - self.name = name - self.fileobj = fileobj - - self._check_input() + self.path, arcname, fileobj = self._resolve(name, fileobj) - archive_name = self._get_archive_name() - sigmf_fileobj = self._get_output_fileobj() - sigmf_archive = tarfile.TarFile(mode="w", fileobj=sigmf_fileobj, format=tarfile.PAX_FORMAT) - tmpdir = tempfile.mkdtemp() - sigmf_md_filename = archive_name + SIGMF_METADATA_EXT - sigmf_md_path = os.path.join(tmpdir, sigmf_md_filename) - sigmf_data_filename = archive_name + SIGMF_DATASET_EXT - sigmf_data_path = os.path.join(tmpdir, sigmf_data_filename) + self._ensure_data_file_set() + self._validate() - with open(sigmf_md_path, "w") as mdfile: - self.sigmffile.dump(mdfile, pretty=True) + tar = tarfile.TarFile(mode="w", fileobj=fileobj, format=tarfile.PAX_FORMAT) + tmpdir = Path(tempfile.mkdtemp()) + meta_path = tmpdir / (arcname + SIGMF_METADATA_EXT) + data_path = tmpdir / (arcname + SIGMF_DATASET_EXT) + # write files + with open(meta_path, "w") as handle: + self.sigmffile.dump(handle) if isinstance(self.sigmffile.data_buffer, io.BytesIO): - self.sigmffile.data_file = sigmf_data_path - with open(sigmf_data_path, "wb") as f: - f.write(self.sigmffile.data_buffer.getbuffer()) + # write data buffer to archive + self.sigmffile.data_file = data_path + with open(data_path, "wb") as handle: + handle.write(self.sigmffile.data_buffer.getbuffer()) else: - shutil.copy(self.sigmffile.data_file, sigmf_data_path) - - def chmod(tarinfo): - if tarinfo.isdir(): - tarinfo.mode = 0o755 # dwrxw-rw-r - else: - tarinfo.mode = 0o644 # -wr-r--r-- - return tarinfo - - sigmf_archive.add(tmpdir, arcname=archive_name, filter=chmod) - sigmf_archive.close() - if not fileobj: - sigmf_fileobj.close() - + # copy data to archive + shutil.copy(self.sigmffile.data_file, data_path) + tar.add(tmpdir, arcname=arcname, filter=self.chmod) + # close files & remove tmpdir + tar.close() + if not is_buffer: + # only close fileobj if we aren't working w/a buffer + fileobj.close() shutil.rmtree(tmpdir) - self.path = sigmf_archive.name - - def _check_input(self): - self._ensure_name_has_correct_extension() - self._ensure_data_file_set() - self._validate_sigmffile_metadata() - - def _ensure_name_has_correct_extension(self): - name = self.name - if name is None: - return - - has_extension = "." in name - has_correct_extension = name.endswith(SIGMF_ARCHIVE_EXT) - if has_extension and not has_correct_extension: - apparent_ext = os.path.splitext(name)[-1] - err = "extension {} != {}".format(apparent_ext, SIGMF_ARCHIVE_EXT) - raise SigMFFileError(err) - - self.name = name if has_correct_extension else name + SIGMF_ARCHIVE_EXT + @staticmethod + def chmod(tarinfo: tarfile.TarInfo): + """permission filter for writing tar files""" + if tarinfo.isdir(): + tarinfo.mode = 0o755 # dwrxw-rw-r + else: + tarinfo.mode = 0o644 # -wr-r--r-- + return tarinfo def _ensure_data_file_set(self): if not self.sigmffile.data_file and not isinstance(self.sigmffile.data_buffer, io.BytesIO): - err = "no data file - use `set_data_file`" - raise SigMFFileError(err) + raise SigMFFileError("No data file in SigMFFile; use `set_data_file` before archiving.") - def _validate_sigmffile_metadata(self): + def _validate(self): self.sigmffile.validate() - def _get_archive_name(self): - if self.fileobj and not self.name: - pathname = self.fileobj.name - else: - pathname = self.name - - filename = os.path.split(pathname)[-1] - archive_name, archive_ext = os.path.splitext(filename) - return archive_name - - def _get_output_fileobj(self): - try: - fileobj = self._get_open_fileobj() - except: - if self.fileobj: - err = "fileobj {!r} is not byte-writable".format(self.fileobj) - else: - err = "can't open {!r} for writing".format(self.name) - - raise SigMFFileError(err) - - return fileobj - - def _get_open_fileobj(self): - if self.fileobj: - fileobj = self.fileobj - fileobj.write(bytes()) # force exception if not byte-writable + def _resolve(self, name, fileobj): + """ + Resolve both (name, fileobj) into (path, arcname, fileobj) given either or both. + + Returns + ------- + path : PathLike + Path of the archive file. + arcname : str + Name of the sigmf object within the archive. + fileobj : BufferedWriter + Open file handle object. + """ + if fileobj: + try: + # exception if not byte-writable + fileobj.write(bytes()) + # exception if no name property of handle + path = Path(fileobj.name) + if not name: + arcname = path.stem + else: + arcname = name + except io.UnsupportedOperation: + raise SigMFFileError(f"fileobj {fileobj} is not byte-writable.") + except AttributeError: + raise SigMFFileError(f"fileobj {fileobj} is invalid.") + elif name: + path = Path(name) + # ensure name has correct suffix if it exists + if path.suffix == "": + # add extension if none was given + path = path.with_suffix(SIGMF_ARCHIVE_EXT) + elif path.suffix != SIGMF_ARCHIVE_EXT: + # ensure suffix is correct + raise SigMFFileError(f"Invalid extension ({path.suffix} != {SIGMF_ARCHIVE_EXT}).") + arcname = path.stem + + try: + fileobj = open(path, "wb") + except (OSError, IOError): + raise SigMFFileError(f"Can't open {name} for writing.") else: - fileobj = open(self.name, "wb") + raise SigMFFileError("Either `name` or `fileobj` needs to be defined.") - return fileobj + return path, arcname, fileobj diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 9ff5cb7..015036b 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -26,28 +26,40 @@ class SigMFArchiveReader: - """Access data within SigMF archive `tar` in-place without extracting. - - Parameters: - - name -- path to archive file to access. If file does not exist, - or if `name` doesn't end in .sigmf, SigMFFileError is raised. + """ + Access data within SigMF archive `tar` in-place without extracting. + + Parameters + ---------- + name : str | bytes | PathLike, optional + Optional path to archive file to access. + skip_checksum : bool, optional + Skip dataset checksum calculation. + map_readonly : bool, optional + Indicate whether assignments on the numpy.memmap are allowed. + archive_buffer : buffer, optional + + + Raises + ------ + SigMFError + Archive file does not exist or is improperly formatted. """ def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None): - self.name = name - if self.name is not None: - if not name.endswith(SIGMF_ARCHIVE_EXT): + if name is not None: + path = Path(name) + if path.suffix != SIGMF_ARCHIVE_EXT: err = "archive extension != {}".format(SIGMF_ARCHIVE_EXT) raise SigMFFileError(err) - tar_obj = tarfile.open(self.name) + tar_obj = tarfile.open(path) elif archive_buffer is not None: tar_obj = tarfile.open(fileobj=archive_buffer, mode="r:") else: - raise ValueError("In sigmf.archivereader.__init__(), either `name` or `archive_buffer` must be not None") + raise ValueError("Either `name` or `archive_buffer` must be not None.") json_contents = None data_offset = None diff --git a/sigmf/schema.py b/sigmf/schema.py index 250acad..6660ef7 100644 --- a/sigmf/schema.py +++ b/sigmf/schema.py @@ -7,21 +7,22 @@ """Schema IO""" import json -import os +from pathlib import Path +from . import __version__ as toolversion from . import utils SCHEMA_META = "schema-meta.json" SCHEMA_COLLECTION = "schema-collection.json" -def get_schema(version=None, schema_file=SCHEMA_META): +def get_schema(version=toolversion, schema_file=SCHEMA_META): """ Load JSON Schema to for either a `sigmf-meta` or `sigmf-collection`. TODO: In the future load specific schema versions. """ - schema_path = os.path.join(utils.get_schema_path(os.path.dirname(utils.__file__)), schema_file) - with open(schema_path, "rb") as handle: + schema_dir = Path(__file__).parent + with open(schema_dir / schema_file, "rb") as handle: schema = json.load(handle) return schema diff --git a/sigmf/sigmf_hash.py b/sigmf/sigmf_hash.py index 9206464..9482c35 100644 --- a/sigmf/sigmf_hash.py +++ b/sigmf/sigmf_hash.py @@ -7,7 +7,7 @@ """Hashing Functions""" import hashlib -import os +from pathlib import Path def calculate_sha512(filename=None, fileobj=None, offset=None, size=None): @@ -21,7 +21,7 @@ def calculate_sha512(filename=None, fileobj=None, offset=None, size=None): if filename is not None: fileobj = open(filename, "rb") if size is None: - bytes_to_hash = os.path.getsize(filename) + bytes_to_hash = Path(filename).stat().st_size else: fileobj.seek(offset) diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index f1b34c2..272dcc3 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -13,7 +13,6 @@ import tempfile import warnings from collections import OrderedDict -from os import path from pathlib import Path import numpy as np @@ -260,7 +259,7 @@ def _is_conforming_dataset(self): # check for any non-zero `header_bytes` fields in captures segments if capture.get(self.HEADER_BYTES_KEY, 0): return False - if self.data_file is not None and not path.isfile(self.data_file): + if self.data_file is not None and not self.data_file.is_file: return False # if we get here, the file exists and is conforming return True @@ -403,7 +402,7 @@ def get_capture_byte_boundarys(self, index): end_byte = start_byte if index == len(self.get_captures()) - 1: # last captures...data is the rest of the file - end_byte = path.getsize(self.data_file) - self.get_global_field(self.TRAILING_BYTES_KEY, 0) + end_byte = self.data_file.stat().st_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) else: end_byte += ( (self.get_capture_start(index + 1) - self.get_capture_start(index)) @@ -483,7 +482,7 @@ def _count_samples(self): sample_count = self._get_sample_count_from_annotations() else: header_bytes = sum([c.get(self.HEADER_BYTES_KEY, 0) for c in self.get_captures()]) - file_size = path.getsize(self.data_file) if self.data_size_bytes is None else self.data_size_bytes + file_size = self.data_file.stat().st_size if self.data_size_bytes is None else self.data_size_bytes file_data_size = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes # bytes sample_size = self.get_sample_size() # size of a sample in bytes num_channels = self.get_num_channels() @@ -555,7 +554,7 @@ def set_data_file( if self.get_global_field(self.DATATYPE_KEY) is None: raise SigMFFileError("Error setting data file, the DATATYPE_KEY must be set in the global metadata first.") - self.data_file = data_file + self.data_file = Path(data_file) if data_file else None self.data_buffer = data_buffer self.data_offset = offset self.data_size_bytes = size_bytes @@ -594,8 +593,8 @@ def set_data_file( self.shape = self._memmap.shape if (self._return_type is None) else self._memmap.shape[:-1] if self.data_file is not None: - file_name = path.split(self.data_file)[1] - ext = path.splitext(file_name)[1] + file_name = self.data_file.name + ext = self.data_file.suffix if ext.lower() != SIGMF_DATASET_EXT: self.set_global_field(SigMFFile.DATASET_KEY, file_name) @@ -839,7 +838,7 @@ def verify_stream_hashes(self) -> None: old_hash = stream.get("hash") metafile_name = get_sigmf_filenames(stream.get("name"))["meta_fn"] metafile_path = self.base_path / metafile_name - if path.isfile(metafile_path): + if Path.is_file(metafile_path): new_hash = sigmf_hash.calculate_sha512(filename=metafile_path) if old_hash != new_hash: raise SigMFFileError( @@ -854,9 +853,10 @@ def set_streams(self, metafiles) -> None: streams = [] for metafile in self.metafiles: metafile_path = self.base_path / metafile - if metafile.endswith(".sigmf-meta") and path.isfile(metafile_path): + if metafile.endswith(".sigmf-meta") and Path.is_file(metafile_path): stream = { - "name": get_sigmf_filenames(metafile)["base_fn"], + # name must be string here to be serializable later + "name": str(get_sigmf_filenames(metafile)["base_fn"]), "hash": sigmf_hash.calculate_sha512(filename=metafile_path), } streams.append(stream) @@ -1012,7 +1012,7 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): compliant_filename = get_sigmf_filenames(meta_fn)["data_fn"] noncompliant_filename = metadata["global"].get(SigMFFile.DATASET_KEY, None) - if path.isfile(compliant_filename): + if Path.is_file(compliant_filename): if noncompliant_filename: warnings.warn( f"Compliant Dataset `{compliant_filename}` exists but " @@ -1021,9 +1021,9 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): return compliant_filename elif noncompliant_filename: - dir_path = path.split(meta_fn)[0] - noncompliant_data_file_path = path.join(dir_path, noncompliant_filename) - if path.isfile(noncompliant_data_file_path): + dir_path = Path(meta_fn).parent + noncompliant_data_file_path = Path.joinpath(dir_path, noncompliant_filename) + if Path.is_file(noncompliant_data_file_path): if metadata["global"].get(SigMFFile.METADATA_ONLY_KEY, False): raise SigMFFileError( f"Schema defines {SigMFFile.DATASET_KEY} " @@ -1045,7 +1045,6 @@ def fromarchive(archive_path, dir=None, skip_checksum=False): access SigMF archives without extracting them. """ from .archivereader import SigMFArchiveReader - return SigMFArchiveReader(archive_path, skip_checksum=skip_checksum).sigmffile @@ -1058,15 +1057,15 @@ def fromfile(filename, skip_checksum=False): Parameters ---------- - filename: str + filename: str | bytes | PathLike Path for SigMF Metadata, Dataset, Archive or Collection (with or without extension). skip_checksum: bool, default False - When True will not read entire dataset to caculate hash. + When True will not read entire dataset to calculate hash. Returns ------- object - SigMFFile object with dataset & metadata or a SigMFCollection depending on the type of file + SigMFFile with dataset & metadata or a SigMFCollection depending on file type. """ fns = get_sigmf_filenames(filename) meta_fn = fns["meta_fn"] @@ -1074,19 +1073,20 @@ def fromfile(filename, skip_checksum=False): collection_fn = fns["collection_fn"] # extract the extension to check whether we are dealing with an archive, collection, etc. - file_path, ext = path.splitext(filename) # works with Pathlib - ext contains a dot + file_path = Path(filename) + ext = file_path.suffix - if (ext.lower().endswith(SIGMF_ARCHIVE_EXT) or not path.isfile(meta_fn)) and path.isfile(archive_fn): + if (ext.lower().endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): return fromarchive(archive_fn, skip_checksum=skip_checksum) - if (ext.lower().endswith(SIGMF_COLLECTION_EXT) or not path.isfile(meta_fn)) and path.isfile(collection_fn): + if (ext.lower().endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): collection_fp = open(collection_fn, "rb") bytestream_reader = codecs.getreader("utf-8") mdfile_reader = bytestream_reader(collection_fp) metadata = json.load(mdfile_reader) collection_fp.close() - dir_path = path.split(meta_fn)[0] + dir_path = meta_fn.parent return SigMFCollection(metadata=metadata, base_path=dir_path, skip_checksums=skip_checksum) else: @@ -1103,16 +1103,21 @@ def fromfile(filename, skip_checksum=False): def get_sigmf_filenames(filename): """ Safely returns a set of SigMF file paths given an input filename. - Returned as dict with 'data_fn', 'meta_fn', and 'archive_fn' as keys. - Keyword arguments: - filename -- the SigMF filename + Parameters + ---------- + filename : str | bytes | PathLike + The SigMF filename with any extension. + + Returns + ------- + dict with 'data_fn', 'meta_fn', and 'archive_fn' as keys. """ - filename = path.splitext(filename)[0] + stem_path = Path(filename).with_suffix("") return { - "base_fn": filename, - "data_fn": filename + SIGMF_DATASET_EXT, - "meta_fn": filename + SIGMF_METADATA_EXT, - "archive_fn": filename + SIGMF_ARCHIVE_EXT, - "collection_fn": filename + SIGMF_COLLECTION_EXT, + "base_fn": stem_path, + "data_fn": stem_path.with_suffix(SIGMF_DATASET_EXT), + "meta_fn": stem_path.with_suffix(SIGMF_METADATA_EXT), + "archive_fn": stem_path.with_suffix(SIGMF_ARCHIVE_EXT), + "collection_fn": stem_path.with_suffix(SIGMF_COLLECTION_EXT), } diff --git a/sigmf/utils.py b/sigmf/utils.py index 935d64e..7d805c3 100644 --- a/sigmf/utils.py +++ b/sigmf/utils.py @@ -10,6 +10,7 @@ import sys from copy import deepcopy from datetime import datetime, timezone +from pathlib import Path import numpy as np @@ -71,13 +72,6 @@ def dict_merge(a_dict: dict, b_dict: dict) -> dict: return result -def get_schema_path(module_path: str) -> str: - """ - TODO: Allow getting different schemas for specific SigMF versions - """ - return module_path - - def get_endian_str(ray: np.ndarray) -> str: """Return SigMF compatible endianness string for a numpy array""" if not isinstance(ray, np.ndarray): diff --git a/tests/test_archive.py b/tests/test_archive.py index eccb1b4..1db92e0 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -10,7 +10,7 @@ import json import tarfile import tempfile -from os import path +from pathlib import Path import jsonschema import numpy as np @@ -61,8 +61,7 @@ def test_name_used_in_fileobj(test_sigmffile): assert basedir.name == "testarchive" def filename(tarinfo): - path_root, _ = path.splitext(tarinfo.name) - return path.split(path_root)[-1] + return Path(tarinfo.name).stem assert filename(file1) == "testarchive" assert filename(file2) == "testarchive" @@ -102,17 +101,17 @@ def test_tarfile_names_and_extensions(test_sigmffile): sigmf_tarfile = create_test_archive(test_sigmffile, temp) basedir, file1, file2 = sigmf_tarfile.getmembers() archive_name = basedir.name - assert archive_name == path.split(temp.name)[-1] + assert archive_name == Path(temp.name).name file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT} - file1_name, file1_ext = path.splitext(file1.name) + file1_name, file1_ext = Path(file1.name).stem, Path(file1.name).suffix + assert file1_name == archive_name assert file1_ext in file_extensions - assert path.split(file1_name)[-1] == archive_name file_extensions.remove(file1_ext) - file2_name, file2_ext = path.splitext(file2.name) - assert path.split(file2_name)[-1] == archive_name + file2_name, file2_ext = Path(file2.name).stem, Path(file2.name).suffix + assert file2_name == archive_name assert file2_ext in file_extensions diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index a7bdc67..bc04476 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -87,7 +87,6 @@ def test_archiveread_data_file_unchanged(test_sigmffile): with tempfile.NamedTemporaryFile(suffix=".sigmf") as temp: input_samples = test_sigmffile.read_samples() test_sigmffile.archive(temp.name) - arc = sigmf.sigmffile.fromfile(temp.name) output_samples = arc.read_samples() diff --git a/tests/test_ncd.py b/tests/test_ncd.py index 5978cbf..a3e2ba7 100644 --- a/tests/test_ncd.py +++ b/tests/test_ncd.py @@ -7,7 +7,6 @@ """Tests for Non-Conforming Datasets""" import copy -import os import shutil import tempfile import unittest @@ -39,7 +38,7 @@ def test_load_ncd(self, subdir: str) -> None: """test loading non-conforming dataset""" data_path = self.temp_dir / subdir / "dat.bin" meta_path = self.temp_dir / subdir / "dat.sigmf-meta" - os.makedirs(data_path.parent, exist_ok=True) + Path.mkdir(data_path.parent, parents=True, exist_ok=True) # create data file TEST_FLOAT32_DATA.tofile(data_path) @@ -59,6 +58,6 @@ def test_load_ncd(self, subdir: str) -> None: # otherwise the np.memmap instances (stored in self._memmap) block the deletion meta = None meta_loopback = None - os.remove(data_path) + Path.unlink(data_path) with self.assertRaises(SigMFFileError): _ = fromfile(meta_path) diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index d688c0e..dccfcb6 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -8,7 +8,6 @@ import copy import json -import os import shutil import tempfile import unittest @@ -99,7 +98,7 @@ def test_set_data_file_without_annotations(self): smf = SigMFFile(copy.deepcopy(TEST_METADATA)) smf._metadata[SigMFFile.ANNOTATION_KEY].clear() with tempfile.TemporaryDirectory() as tmpdir: - temp_path_data = os.path.join(tmpdir, "datafile") + temp_path_data = Path(tmpdir) / "datafile" TEST_FLOAT32_DATA.tofile(temp_path_data) smf.set_data_file(temp_path_data) samples = smf.read_samples() @@ -114,7 +113,7 @@ def test_set_data_file_with_annotations(self): smf = SigMFFile(copy.deepcopy(TEST_METADATA)) smf.add_annotation(start_index=0, length=32) with tempfile.TemporaryDirectory() as tmpdir: - temp_path_data = os.path.join(tmpdir, "datafile") + temp_path_data = Path(tmpdir) / "datafile" TEST_FLOAT32_DATA.tofile(temp_path_data) with self.assertWarns(Warning): # Issues warning since file ends before the final annotatio @@ -160,14 +159,10 @@ def test_add_annotation(): def test_fromarchive(test_sigmffile): - print("test_sigmffile is:\n", test_sigmffile) - tf = tempfile.mkstemp()[1] - td = tempfile.mkdtemp() + _, tf = tempfile.mkstemp() archive_path = test_sigmffile.archive(name=tf) - result = sigmffile.fromarchive(archive_path=archive_path, dir=td) + result = sigmffile.fromarchive(archive_path=archive_path) assert result._metadata == test_sigmffile._metadata == TEST_METADATA - os.remove(tf) - shutil.rmtree(td) def test_add_multiple_captures_and_annotations():