From 0625a72a580ef61e0dff55135e94780d5ca304fa Mon Sep 17 00:00:00 2001 From: SaltyChiang Date: Tue, 26 Nov 2024 19:52:21 +0800 Subject: [PATCH] Use the standalong `file.py` to implement the HDF5 format. --- pyquda_core/pyquda/__init__.py | 162 +++++----- pyquda_core/pyquda/field.py | 558 +++++++++++++++------------------ pyquda_core/pyquda/file.py | 348 ++++++++++++++++++++ pyquda_utils/io/chroma.py | 6 +- pyquda_utils/io/milc.py | 6 +- tests/test.io.py | 1 + 6 files changed, 688 insertions(+), 393 deletions(-) create mode 100644 pyquda_core/pyquda/file.py diff --git a/pyquda_core/pyquda/__init__.py b/pyquda_core/pyquda/__init__.py index ea9d433..99fdd85 100644 --- a/pyquda_core/pyquda/__init__.py +++ b/pyquda_core/pyquda/__init__.py @@ -179,7 +179,84 @@ def _setEnviron(env, key, value): _setEnviron(f"QUDA_{key.upper()}", key, kwargs[key]) -def _initQUDA(grid_size, gpuid): +def initGPU(backend: Literal["numpy", "cupy", "torch"] = None, gpuid: int = -1): + global _CUDA_BACKEND, _HIP, _GPUID, _COMPUTE_CAPABILITY + + if isGridInitialized(): + _MPI_LOGGER.critical("initGPU should be called before init", RuntimeError) + if _GPUID < 0: + from platform import node as gethostname + + if backend is None: + backend = environ["PYQUDA_BACKEND"] if "PYQUDA_BACKEND" in environ else "cupy" + if backend == "numpy": + cudaGetDeviceCount: Callable[[], int] = lambda: 0x7FFFFFFF + cudaGetDeviceProperties: Callable[[int], Dict[str, Any]] = lambda device: {"major": 0, "minor": 0} + cudaSetDevice: Callable[[int], None] = lambda device: None + elif backend == "cupy": + import cupy + from cupy.cuda.runtime import getDeviceCount as cudaGetDeviceCount + from cupy.cuda.runtime import getDeviceProperties as cudaGetDeviceProperties + from cupy.cuda.runtime import is_hip + + cudaSetDevice: Callable[[int], None] = lambda device: cupy.cuda.Device(device).use() + _HIP = is_hip + elif backend == "torch": + import torch + from torch.cuda import device_count as cudaGetDeviceCount + from torch.cuda import get_device_properties as cudaGetDeviceProperties + from torch.version import hip + + cudaSetDevice: Callable[[int], None] = lambda device: torch.set_default_device(f"cuda:{device}") + _HIP = hip is not None + else: + _MPI_LOGGER.critical(f"Unsupported CUDA backend {backend}", ValueError) + _CUDA_BACKEND = backend + _MPI_LOGGER.info(f"Using CUDA backend {backend}") + + # if backend == "cupy": + # from . import malloc_pyquda + + # allocator = cupy.cuda.PythonFunctionAllocator( + # malloc_pyquda.pyquda_device_malloc, malloc_pyquda.pyquda_device_free + # ) + # cupy.cuda.set_allocator(allocator.malloc) + + # quda/include/communicator_quda.h + # determine which GPU this rank will use + hostname = gethostname() + hostname_recv_buf = _MPI_COMM.allgather(hostname) + + if gpuid < 0: + device_count = cudaGetDeviceCount() + if device_count == 0: + _MPI_LOGGER.critical("No devices found", RuntimeError) + + gpuid = 0 + for i in range(_MPI_RANK): + if hostname == hostname_recv_buf[i]: + gpuid += 1 + + if gpuid >= device_count: + if "QUDA_ENABLE_MPS" in environ and environ["QUDA_ENABLE_MPS"] == "1": + gpuid %= device_count + print(f"MPS enabled, rank={_MPI_RANK} -> gpu={gpuid}") + else: + _MPI_LOGGER.critical(f"Too few GPUs available on {hostname}", RuntimeError) + _GPUID = gpuid + + props = cudaGetDeviceProperties(gpuid) + if hasattr(props, "major") and hasattr(props, "minor"): + _COMPUTE_CAPABILITY = _ComputeCapability(int(props.major), int(props.minor)) + else: + _COMPUTE_CAPABILITY = _ComputeCapability(int(props["major"]), int(props["minor"])) + + cudaSetDevice(gpuid) + else: + _MPI_LOGGER.warning("GPU is already initialized", RuntimeWarning) + + +def initQUDA(grid_size: List[int], gpuid: int): import atexit quda.initCommsGridQuda(4, grid_size) @@ -226,7 +303,7 @@ def init( """ global _GRID_SIZE, _GRID_COORD, _DEFAULT_LATTICE if _GRID_SIZE is None: - from platform import node as gethostname + initGPU(backend) use_default_grid = grid_size is None and latt_size is not None use_default_latt = latt_size is not None and t_boundary is not None and anisotropy is not None @@ -274,74 +351,8 @@ def init( device_reset="1" if device_reset else None, ) - global _CUDA_BACKEND, _HIP, _GPUID, _COMPUTE_CAPABILITY - - if backend is None: - backend = environ["PYQUDA_BACKEND"] if "PYQUDA_BACKEND" in environ else "cupy" - if backend == "numpy": - cudaGetDeviceCount: Callable[[], int] = lambda: 0x7FFFFFFF - cudaGetDeviceProperties: Callable[[int], Dict[str, Any]] = lambda device: {"major": 0, "minor": 0} - cudaSetDevice: Callable[[int], None] = lambda device: None - elif backend == "cupy": - import cupy - from cupy.cuda.runtime import getDeviceCount as cudaGetDeviceCount - from cupy.cuda.runtime import getDeviceProperties as cudaGetDeviceProperties - from cupy.cuda.runtime import is_hip - - cudaSetDevice: Callable[[int], None] = lambda device: cupy.cuda.Device(device).use() - _HIP = is_hip - elif backend == "torch": - import torch - from torch.cuda import device_count as cudaGetDeviceCount - from torch.cuda import get_device_properties as cudaGetDeviceProperties - from torch.version import hip - - cudaSetDevice: Callable[[int], None] = lambda device: torch.set_default_device(f"cuda:{device}") - _HIP = hip is not None - else: - _MPI_LOGGER.critical(f"Unsupported CUDA backend {backend}", ValueError) - _CUDA_BACKEND = backend - _MPI_LOGGER.info(f"Using CUDA backend {backend}") - - # if _CUDA_BACKEND == "cupy": - # from . import malloc_pyquda - - # allocator = cupy.cuda.PythonFunctionAllocator( - # malloc_pyquda.pyquda_device_malloc, malloc_pyquda.pyquda_device_free - # ) - # cupy.cuda.set_allocator(allocator.malloc) - - # quda/include/communicator_quda.h - # determine which GPU this rank will use - hostname = gethostname() - hostname_recv_buf = _MPI_COMM.allgather(hostname) - - if _GPUID < 0: - device_count = cudaGetDeviceCount() - if device_count == 0: - _MPI_LOGGER.critical("No devices found", RuntimeError) - - _GPUID = 0 - for i in range(_MPI_RANK): - if hostname == hostname_recv_buf[i]: - _GPUID += 1 - - if _GPUID >= device_count: - if "QUDA_ENABLE_MPS" in environ and environ["QUDA_ENABLE_MPS"] == "1": - _GPUID %= device_count - print(f"MPS enabled, rank={_MPI_RANK} -> gpu={_GPUID}") - else: - _MPI_LOGGER.critical(f"Too few GPUs available on {hostname}", RuntimeError) - - props = cudaGetDeviceProperties(_GPUID) - if hasattr(props, "major") and hasattr(props, "minor"): - _COMPUTE_CAPABILITY = _ComputeCapability(int(props.major), int(props.minor)) - else: - _COMPUTE_CAPABILITY = _ComputeCapability(int(props["major"]), int(props["minor"])) - - cudaSetDevice(_GPUID) if init_quda: - _initQUDA(_GRID_SIZE, _GPUID) + initQUDA(_GRID_SIZE, _GPUID) else: _MPI_LOGGER.warning("PyQUDA is already initialized", RuntimeWarning) @@ -354,7 +365,11 @@ def setLoggerLevel(level: Literal["debug", "info", "warning", "error", "critical _MPI_LOGGER.logger.setLevel(level.upper()) -def isInitialized(): +def isGPUInitialized(): + return _GPUID >= 0 + + +def isGridInitialized(): return _GRID_SIZE is not None @@ -398,13 +413,6 @@ def isHIP(): return _HIP -def setGPUID(gpuid: int): - global _GPUID - assert _GRID_SIZE is None, "setGPUID() should be called before init()" - assert gpuid >= 0 - _GPUID = gpuid - - def getGPUID(): return _GPUID diff --git a/pyquda_core/pyquda/field.py b/pyquda_core/pyquda/field.py index 1c682a1..1d06af2 100644 --- a/pyquda_core/pyquda/field.py +++ b/pyquda_core/pyquda/field.py @@ -9,18 +9,18 @@ class LatticeInfo: + Nd: int = 4 Ns: int = 4 Nc: int = 3 - Nd: int = 4 def __init__(self, latt_size: List[int], t_boundary: Literal[1, -1] = 1, anisotropy: float = 1.0) -> None: self._checkLattice(latt_size) self._setLattice(latt_size, t_boundary, anisotropy) def _checkLattice(self, latt_size: List[int]): - from . import init, isInitialized, getLogger, getGridSize + from . import init, isGridInitialized, getLogger, getGridSize - if not isInitialized(): + if not isGridInitialized(): init(None, latt_size) Gx, Gy, Gz, Gt = getGridSize() Lx, Ly, Lz, Lt = latt_size @@ -65,7 +65,47 @@ def _setLattice(self, latt_size: List[int], t_boundary: Literal[1, -1], anisotro self.anisotropy = anisotropy -Ns, Nc, Nd = LatticeInfo.Ns, LatticeInfo.Nc, LatticeInfo.Nd +Nd, Ns, Nc = LatticeInfo.Nd, LatticeInfo.Ns, LatticeInfo.Nc + + +class GeneralInfo: + def __init__(self, latt_size: List[int], grid_size: List[int], Ns: int = 4, Nc: int = 3) -> None: + self._checkLattice(latt_size, grid_size) + self._setLattice(latt_size, grid_size) + self.Nd = len(latt_size) + self.Ns = Ns + self.Nc = Nc + + def _checkLattice(self, latt_size: List[int], grid_size: List[int]): + from . import getLogger + + assert len(latt_size) == len(grid_size) + for GL, G in zip(latt_size, grid_size): + if not (GL % G == 0): + getLogger().critical("lattice size must be divisible by gird size", ValueError) + + def _setLattice(self, latt_size: List[int], grid_size: List[int]): + from . import initGPU, isGPUInitialized, getLogger, getMPIComm, getMPISize, getMPIRank + + if not isGPUInitialized(): + initGPU() + if getMPISize() != int(numpy.prod(grid_size)): + getLogger().critical(f"The MPI size {getMPISize()} does not match the grid size {grid_size}", ValueError) + self.mpi_comm = getMPIComm() + self.mpi_size = getMPISize() + self.mpi_rank = getMPIRank() + self.grid_size = grid_size + grid_coord = [] + mpi_rank = getMPIRank() + for G in grid_size[::-1]: + grid_coord.append(mpi_rank % G) + mpi_rank //= G + self.grid_coord = grid_coord[::-1] + + self.global_size = latt_size + self.global_volume = int(numpy.prod(latt_size)) + self.size = [GL // G for GL, G in zip(latt_size, grid_size)] + self.volume = int(numpy.prod(self.size)) class _Direction(int): @@ -126,24 +166,21 @@ def cb2(data: numpy.ndarray, axes: List[int], dtype=None): return data_cb2.reshape(*shape[: axes[0]], 2, Lt, Lz, Ly, Lx // 2, *shape[axes[-1] + 1 :]) -def checksum(latt_info, data: numpy.ndarray) -> Tuple[int, int]: +def checksum(latt_info: Union[LatticeInfo, GeneralInfo], data: numpy.ndarray) -> Tuple[int, int]: import zlib from mpi4py import MPI - gx, gy, gz, gt = latt_info.grid_coord - Lx, Ly, Lz, Lt = latt_info.size - gLx, gLy, gLz, gLt = gx * Lx, gy * Ly, gz * Lz, gt * Lt - GLx, GLy, GLz, GLt = latt_info.global_size work = numpy.empty((latt_info.volume), "> (32 - rank29)).item(), MPI.BXOR ) @@ -153,35 +190,34 @@ def checksum(latt_info, data: numpy.ndarray) -> Tuple[int, int]: return sum29, sum31 -def _field_shape_dtype(field: str, Ns: int, Nc: int, int_nbytes: int = 4, float_nbytes: int = 8): +def _field_shape_dtype(field: str, Ns: int, Nc: int, use_fp32: bool = False): from . import getLogger + float_nbytes = 4 if use_fp32 else 8 if field in ["Int"]: - return [], f" None: + def __init__(self, latt_info: Union[LatticeInfo, GeneralInfo]) -> None: from . import getCUDABackend self.latt_info = latt_info @@ -189,6 +225,117 @@ def __init__(self, latt_info: LatticeInfo) -> None: self.backend: Literal["numpy", "cupy", "torch"] = getCUDABackend() self.L5 = None + @abstractmethod + def _shape(self): + from . import getLogger + + getLogger().critical("_setShape method must be implemented", NotImplementedError) + + @classmethod + def _groupName(cls): + from . import getLogger + + if cls.__name__ == "LatticeMom": + getLogger().critical("LatticeMom is not supported for save/load", ValueError) + elif cls.__name__ == "LatticeClover": + getLogger().critical("LatticeClover is not supported for save/load", ValueError) + + return ( + cls.__name__.replace("Multi", "") + .replace("General", "") + .replace("Link", "ColorMatrix") + .replace("Gauge", "ColorMatrix") + .replace("Fermion", "SpinColorVector") + .replace("Propagator", "SpinColorMatrix") + .replace("StaggeredFermion", "ColorVector") + .replace("StaggeredPropagator", "ColorMatrix") + ) + + def save( + self, + filename: str, + label: Union[int, str, Sequence[int], Sequence[str]], + *, + annotation: str = "", + check: bool = True, + use_fp32: bool = False, + ): + from . import getLogger + from .file import File + + assert hasattr(self, "lexico") + s = perf_counter() + gbytes = 0 + filename = path.expanduser(path.expandvars(filename)) + with File(filename, "w") as f: + f.save( + self._groupName(), + label, + self.lexico(), + self.latt_info.grid_size, + annotation=annotation, + check=check, + use_fp32=use_fp32, + ) + secs = perf_counter() - s + getLogger().debug(f"Saved {filename} in {secs:.3f} secs, {gbytes / secs:.3f} GB/s") + + def append( + self, + filename: str, + label: Union[int, str, Sequence[int], Sequence[str]], + *, + annotation: str = "", + check: bool = True, + use_fp32: bool = False, + ): + from . import getLogger + from .file import File + + assert hasattr(self, "lexico") + s = perf_counter() + gbytes = 0 + filename = path.expanduser(path.expandvars(filename)) + with File(filename, "r+") as f: + f.append( + self._groupName(), + label, + self.lexico(), + self.latt_info.grid_size, + annotation=annotation, + check=check, + use_fp32=use_fp32, + ) + secs = perf_counter() - s + getLogger().debug(f"Appended {filename} in {secs:.3f} secs, {gbytes / secs:.3f} GB/s") + + def update( + self, + filename: str, + label: Union[int, str, Sequence[int], Sequence[str]], + *, + annotation: str = "", + check: bool = True, + ): + from . import getLogger + from .file import File + + assert hasattr(self, "lexico") + s = perf_counter() + gbytes = 0 + filename = path.expanduser(path.expandvars(filename)) + with File(filename, "r+") as f: + f.update( + self._groupName(), + label, + self.lexico(), + self.latt_info.grid_size, + annotation=annotation, + check=check, + ) + secs = perf_counter() - s + getLogger().debug(f"Updated {filename} in {secs:.3f} secs, {gbytes / secs:.3f} GB/s") + @property def data(self): return self._data @@ -208,17 +355,10 @@ def data(self, value): def data_ptr(self) -> Pointer: return ndarrayPointer(self.data.reshape(-1), True) - @abstractmethod - def _field(self): - from . import getLogger - - getLogger().critical("_field method must be implemented", NotImplementedError) - - @abstractmethod - def _shape(self): - from . import getLogger - - getLogger().critical("_setShape method must be implemented", NotImplementedError) + @classmethod + def _field(cls) -> str: + group_name = cls._groupName() + return group_name[group_name.index("Lattice") + len("Lattice") :] def _setField(self): field_shape, field_dtype = _field_shape_dtype(self._field(), self.latt_info.Ns, self.latt_info.Nc) @@ -373,77 +513,62 @@ def __itruediv__(self, other): return self -class SpatialField(BaseField): - def __init__(self, latt_info: LatticeInfo, value: Any = None, init_data: bool = True) -> None: - super().__init__(latt_info) - if init_data: - self._initData(value) - - @classmethod - def _field(cls) -> str: - return cls.__name__[cls.__name__.index("Space") + len("Space") :] - - def _shape(self): - Lx, Ly, Lz, Lt = self.latt_info.size - self.space_shape = [Lz, Ly, Lx] - if self.L5 is None: - return (*self.space_shape, *self.field_shape) - else: - return (self.L5, *self.space_shape, *self.field_shape) - - -class TemporalField(BaseField): - def __init__(self, latt_info: LatticeInfo, value: Any = None, init_data: bool = True) -> None: +class GeneralField(BaseField): + def __init__(self, latt_info: GeneralInfo, value: Any = None, init_data: bool = True) -> None: super().__init__(latt_info) if init_data: self._initData(value) - @classmethod - def _field(cls) -> str: - return cls.__name__[cls.__name__.index("Time") + len("Time") :] - def _shape(self): - Lx, Ly, Lz, Lt = self.latt_info.size - self.time_shape = [Lt] + self.lattice_shape = self.latt_info.size[::-1] if self.L5 is None: - return (*self.time_shape, *self.field_shape) + return (*self.lattice_shape, *self.field_shape) else: - return (self.L5, *self.time_shape, *self.field_shape) + return (self.L5, *self.lattice_shape, *self.field_shape) + def lexico(self, dtype=None): + return self.getHost().astype(dtype) -class SpatiotemporalField(BaseField): - def __init__(self, latt_info: LatticeInfo, value: Any = None, init_data: bool = True) -> None: - super().__init__(latt_info) - if init_data: - self._initData(value) + def checksum(self) -> Tuple[int, int]: + return checksum(self.latt_info, self.lexico().reshape(self.latt_info.volume, self.field_size).view(" str: - return cls.__name__[cls.__name__.index("Spacetime") + len("Spacetime") :] + def load( + cls, + filename: str, + label: Union[int, str, Sequence[int], Sequence[str]], + *, + check: bool = True, + grid_size: List[int] = None, + ): + from . import getLogger + from .file import File - def _shape(self): - Lx, Ly, Lz, Lt = self.latt_info.size - self.spacetime_shape = [Lt, Lz, Ly, Lx] - if self.L5 is None: - return (*self.spacetime_shape, *self.field_shape) + s = perf_counter() + gbytes = 0 + filename = path.expanduser(path.expandvars(filename)) + with File(filename, "r") as f: + latt_size, Ns, Nc, value = f.load(cls._groupName(), label, grid_size, check=check) + latt_info = GeneralInfo(latt_size, grid_size, Ns, Nc) + if not issubclass(cls, MultiField): + retval = cls(latt_info, value) else: - return (self.L5, *self.spacetime_shape, *self.field_shape) + retval = cls(latt_info, len(label), numpy.asarray(value)) + secs = perf_counter() - s + getLogger().debug(f"Loaded {filename} in {secs:.3f} secs, {gbytes / secs:.3f} GB/s") + return retval class ParityField(BaseField): def __init__(self, latt_info: LatticeInfo, value: Any = None, init_data: bool = True) -> None: super().__init__(latt_info) - self.full_lattice = False + self.full_field = False if init_data: self._initData(value) - @classmethod - def _field(cls) -> str: - return cls.__name__[cls.__name__.index("Lattice") + len("Lattice") :] - def _shape(self): Lx, Ly, Lz, Lt = self.latt_info.size - self.lattice_shape = [2, Lt, Lz, Ly, Lx // 2] if self.full_lattice else [Lt, Lz, Ly, Lx // 2] + self.lattice_shape = [2, Lt, Lz, Ly, Lx // 2] if self.full_field else [Lt, Lz, Ly, Lx // 2] if self.L5 is None: return (*self.lattice_shape, *self.field_shape) else: @@ -469,17 +594,17 @@ def timeslice(self, start: int, stop: int = None, step: int = None, return_field x = self.__class__(self.latt_info) else: x = self.__class__(self.latt_info, self.L5) - if self.full_lattice and self.L5 is not None: + if self.full_field and self.L5 is not None: x.data[:, :, start:stop:step, :, :, :] = self.data[:, :, start:stop:step, :, :, :] - elif self.full_lattice or self.L5 is not None: + elif self.full_field or self.L5 is not None: x.data[:, start:stop:step, :, :, :] = self.data[:, start:stop:step, :, :, :] else: x.data[start:stop:step, :, :, :] = self.data[start:stop:step, :, :, :] return x else: - if self.full_lattice and self.L5 is not None: + if self.full_field and self.L5 is not None: return self.data[:, :, start:stop:step, :, :, :] - elif self.full_lattice or self.L5 is not None: + elif self.full_field or self.L5 is not None: return self.data[:, start:stop:step, :, :, :] else: return self.data[start:stop:step, :, :, :] @@ -494,7 +619,7 @@ def __init__(self, latt_info: LatticeInfo, value: Any = None, init_data: bool = s.__field_class__.__base__.__init__(self, latt_info, value, False) else: s.__init__(latt_info, value, False) - self.full_lattice = True + self.full_field = True if init_data: self._initData(value) @@ -526,221 +651,34 @@ def lexico(self, dtype=None): return lexico(self.getHost(), [0, 1, 2, 3, 4], dtype) def checksum(self) -> Tuple[int, int]: - return checksum(self.latt_info, self.lexico().reshape(self.latt_info.volume, self.field_size)) - - @classmethod - def _load(cls, latt_info: LatticeInfo, dataset): - gx, gy, gz, gt = latt_info.grid_coord - Lx, Ly, Lz, Lt = latt_info.size - data: numpy.ndarray = dataset[ - gt * Lt : (gt + 1) * Lt, - gz * Lz : (gz + 1) * Lz, - gy * Ly : (gy + 1) * Ly, - gx * Lx : (gx + 1) * Lx, - ] - sum29, sum31 = checksum(latt_info, data.reshape(latt_info.volume, -1)) - assert dataset.attrs["sum29"] == f"0x{sum29:08x}" - assert dataset.attrs["sum31"] == f"0x{sum31:08x}" - return data + return checksum(self.latt_info, self.lexico().reshape(self.latt_info.volume, self.field_size).view(" Pointer: return ndarrayPointer(self.data.reshape(self.L5, -1)[index], True) def even_ptr(self, index: int) -> Pointer: - assert self.full_lattice + assert self.full_field return ndarrayPointer(self.data.reshape(self.L5, 2, -1)[index, 0], True) def odd_ptr(self, index: int) -> Pointer: - assert self.full_lattice + assert self.full_field return ndarrayPointer(self.data.reshape(self.L5, 2, -1)[index, 1], True) @property @@ -819,12 +757,12 @@ def data_ptrs(self) -> Pointers: @property def even_ptrs(self) -> Pointers: - assert self.full_lattice + assert self.full_field return ndarrayPointer(self.data.reshape(self.L5, 2, -1)[:, 0], True) @property def odd_ptrs(self) -> Pointers: - assert self.full_lattice + assert self.full_field return ndarrayPointer(self.data.reshape(self.L5, 2, -1)[:, 1], True) def copy(self): @@ -918,17 +856,17 @@ def __init__(self, latt_info: LatticeInfo, L5: Union[int, Any] = Nd, value=None) self._gauge_dirac = None @classmethod - def load(cls, filename: str) -> "LatticeGauge": - return super().load(filename, range(Nd)) + def load(cls, filename: str, *, check: bool = True) -> "LatticeGauge": + return super().load(filename, ["X", "Y", "Z", "T"], check=check) - def save(self, filename: str, *, annotation: str = "", int_nbytes: int = 4, float_nbytes: int = 8): - super().save(filename, range(Nd), annotation=annotation, int_nbytes=int_nbytes, float_nbytes=float_nbytes) + def save(self, filename: str, *, annotation: str = "", check: bool = True, use_fp32: bool = False): + super().save(filename, ["X", "Y", "Z", "T"], annotation=annotation, check=check, use_fp32=use_fp32) - def append(self, filename: str, *, annotation: str = "", int_nbytes: int = 4, float_nbytes: int = 8): - super().append(filename, range(Nd), annotation=annotation, int_nbytes=int_nbytes, float_nbytes=float_nbytes) + def append(self, filename: str, *, annotation: str = "", check: bool = True, use_fp32: bool = False): + super().append(filename, ["X", "Y", "Z", "T"], annotation=annotation, check=check, use_fp32=use_fp32) - def update(self, filename: str, *, annotation: str = ""): - super().update(filename, range(Nd), annotation=annotation) + def update(self, filename: str, *, annotation: str = "", check: bool = True): + super().update(filename, ["X", "Y", "Z", "T"], annotation=annotation, check=check) @property def gauge_dirac(self): @@ -1253,17 +1191,17 @@ def __init__(self, latt_info: LatticeInfo, L5: Union[int, Any] = 4, value=None) self._gauge_dirac = None @classmethod - def load(cls, filename: str) -> "LatticeMom": - return super().load(filename, range(Nd)) + def load(cls, filename: str, *, check: bool = True) -> "LatticeMom": + return super().load(filename, ["X", "Y", "Z", "T"], check=check) - def save(self, filename: str, *, annotation: str = "", int_nbytes: int = 4, float_nbytes: int = 8): - super().save(filename, range(Nd), annotation=annotation, int_nbytes=int_nbytes, float_nbytes=float_nbytes) + def save(self, filename: str, *, annotation: str = "", check: bool = True, use_fp32: bool = False): + super().save(filename, ["X", "Y", "Z", "T"], annotation=annotation, check=check, use_fp32=use_fp32) - def append(self, filename: str, *, annotation: str = "", int_nbytes: int = 4, float_nbytes: int = 8): - super().append(filename, range(Nd), annotation=annotation, int_nbytes=int_nbytes, float_nbytes=float_nbytes) + def append(self, filename: str, *, annotation: str = "", check: bool = True, use_fp32: bool = False): + super().append(filename, ["X", "Y", "Z", "T"], annotation=annotation, check=check, use_fp32=use_fp32) - def update(self, filename: str, *, annotation: str = ""): - super().update(filename, range(Nd), annotation=annotation) + def update(self, filename: str, *, annotation: str = "", check: bool = True): + super().update(filename, ["X", "Y", "Z", "T"], annotation=annotation, check=check) @property def gauge_dirac(self): diff --git a/pyquda_core/pyquda/file.py b/pyquda_core/pyquda/file.py new file mode 100644 index 0000000..e2dde87 --- /dev/null +++ b/pyquda_core/pyquda/file.py @@ -0,0 +1,348 @@ +from time import perf_counter +from typing import List, Sequence, Tuple, Union + +import numpy +from mpi4py import MPI +import h5py + + +class _LatticeInfo: + def __init__(self, latt_size: List[int], grid_size: List[int]) -> None: + self._checkLattice(latt_size, grid_size) + self._setLattice(latt_size, grid_size) + + def _checkLattice(self, latt_size: List[int], grid_size: List[int]): + assert len(latt_size) == len(grid_size), "lattice size and grid size must have the same dimension" + for GL, G in zip(latt_size, grid_size): + if not (GL % G == 0): + raise ValueError("lattice size must be divisible by gird size") + + def _setLattice(self, latt_size: List[int], grid_size: List[int]): + if MPI.COMM_WORLD.Get_size() != int(numpy.prod(grid_size)): + raise ValueError(f"The MPI size {MPI.COMM_WORLD.Get_size()} does not match the grid size {grid_size}") + sublatt_size = [GL // G for GL, G in zip(latt_size, grid_size)] + sublatt_slice = [] + mpi_rank = MPI.COMM_WORLD.Get_rank() + for G, L in zip(grid_size[::-1], sublatt_size[::-1]): + g = mpi_rank % G + mpi_rank //= G + sublatt_slice.append(slice(g * L, (g + 1) * L)) + + self.global_size = latt_size + self.global_volume = int(numpy.prod(latt_size)) + self.size = sublatt_size + self.volume = int(numpy.prod(sublatt_size)) + self.slice = tuple(sublatt_slice) + + +# CRC32LUT = numpy.empty((4, 256), dtype="> 8) ^ CRC32LUT[0, CRC32LUT[0, i] & 0xFF] +# CRC32LUT[2, i] = (CRC32LUT[1, i] >> 8) ^ CRC32LUT[0, CRC32LUT[1, i] & 0xFF] +# CRC32LUT[3, i] = (CRC32LUT[2, i] >> 8) ^ CRC32LUT[0, CRC32LUT[2, i] & 0xFF] +# # CRC32LUT[4, i] = (CRC32LUT[3, i] >> 8) ^ CRC32LUT[0, CRC32LUT[3, i] & 0xFF] +# # CRC32LUT[5, i] = (CRC32LUT[4, i] >> 8) ^ CRC32LUT[0, CRC32LUT[4, i] & 0xFF] +# # CRC32LUT[6, i] = (CRC32LUT[5, i] >> 8) ^ CRC32LUT[0, CRC32LUT[5, i] & 0xFF] +# # CRC32LUT[7, i] = (CRC32LUT[6, i] >> 8) ^ CRC32LUT[0, CRC32LUT[6, i] & 0xFF] + + +def checksum(latt_info, data: numpy.ndarray) -> Tuple[int, int]: + import zlib + + work = numpy.empty((latt_info.volume), "> (32 - rank29)).item(), MPI.BXOR) + sum31 = MPI.COMM_WORLD.allreduce(numpy.bitwise_xor.reduce(work << rank31 | work >> (32 - rank31)).item(), MPI.BXOR) + return sum29, sum31 + + +def _spin_color_dtype(name: str, shape: Sequence[int], use_fp32: bool = True) -> Tuple[int, int]: + float_nbytes = 4 if use_fp32 else 8 + Ns, Nc, dtype = 4, 3, f"> (32 - rank29)).item(), MPI.BXOR) sum31 = getMPIComm().allreduce(numpy.bitwise_xor.reduce(work << rank31 | work >> (32 - rank31)).item(), MPI.BXOR) return sum29, sum31 diff --git a/pyquda_utils/io/milc.py b/pyquda_utils/io/milc.py index b6d39d8..375c9d2 100644 --- a/pyquda_utils/io/milc.py +++ b/pyquda_utils/io/milc.py @@ -24,12 +24,12 @@ def checksum_milc(latt_size: List[int], data): work = data.view("> (32 - rank29)).item(), MPI.BXOR) sum31 = getMPIComm().allreduce(numpy.bitwise_xor.reduce(work << rank31 | work >> (32 - rank31)).item(), MPI.BXOR) return sum29, sum31 diff --git a/tests/test.io.py b/tests/test.io.py index b8a94f7..1e93db7 100644 --- a/tests/test.io.py +++ b/tests/test.io.py @@ -16,6 +16,7 @@ propagator = core.invert(dslash, "point", [0, 0, 0, 0]) dslash.destroy() +print([(f"{i:08x}", f"{j:08x}") for i, j in gauge.checksum()]) gauge.save("pt_prop_1.h5") propagator.append("pt_prop_1.h5", 0) convert.propagatorToMultiFermion(propagator).append("pt_prop_1.h5", range(12))