diff --git a/aiooss2/adapter.py b/aiooss2/adapter.py index ee468d9..421a63a 100644 --- a/aiooss2/adapter.py +++ b/aiooss2/adapter.py @@ -1,15 +1,26 @@ """ Adapter (crc check and progress bar call backs) for data types """ +import asyncio import logging -from typing import Any, Optional +from abc import ABC, abstractmethod +from typing import ( + IO, + Any, + AsyncIterator, + Awaitable, + Callable, + Iterator, + Optional, + Union, +) from aiohttp.abc import AbstractStreamWriter from aiohttp.payload import PAYLOAD_REGISTRY, TOO_LARGE_BYTES_BODY, Payload from oss2.compat import to_bytes from oss2.utils import ( - _BytesAndFileAdapter, - _FileLikeAdapter, + _CHUNK_SIZE, + _get_data_size, _invoke_cipher_callback, _invoke_crc_callback, _invoke_progress_callback, @@ -18,55 +29,229 @@ logger = logging.getLogger(__name__) -class BytesAndStringAdapter(_BytesAndFileAdapter): - """Adapter for data with a length attributes""" +class StreamAdapter: # pylint: disable=too-few-public-methods + """Adapter for data types""" - def __len__(self) -> int: - return self.size + def __init__( + self, + stream: Union[bytes, IO], + progress_callback: Optional[Callable] = None, + crc_callback: Optional[Callable] = None, + cipher_callback: Optional[Callable] = None, + ): + """ + Args: + stream (Union[bytes, IO]): + original data stream can be bytes or file like object. + progress_callback (Optional[Callable], optional): + function used for progress bar. + crc_callback (Optional[Callable], optional): + function used for crc calculation. + cipher_callback (Optional[Callable], optional): + function used for cipher calculation. + """ + self.stream = to_bytes(stream) + self.progress_callback: Optional[Callable] = progress_callback + self.crc_callback: Optional[Callable] = crc_callback + self.cipher_callback: Optional[Callable] = cipher_callback + self.offset = 0 + + @property + def crc(self): + """return crc value of the data""" + if self.crc_callback: + return self.crc_callback.crc + return None - async def aread(self, amt: Optional[int] = None): - """read data + +class SyncAdapter(StreamAdapter, ABC): + """Adapter for data with sync read method""" + + def __iter__(self) -> Iterator: + return self + + def __next__(self) -> bytes: + content = self.read(_CHUNK_SIZE) + if content: + return content + raise StopIteration + + @abstractmethod + def read(self, amt=None) -> Awaitable[bytes]: + """sync api to read a chunk from the data Args: amt (int, optional): batch size of the data to read """ - if self.offset >= self.size: - return to_bytes("") - if amt is None or amt < 0: - bytes_to_read = self.size - self.offset - else: - bytes_to_read = min(amt, self.size - self.offset) - content = await self.data.read(bytes_to_read) +class AsyncAdapter(StreamAdapter, ABC): + """Adapter for data with async read method""" + + def __aiter__(self) -> AsyncIterator: + return self + + async def __anext__(self) -> Awaitable[bytes]: + content = await self.read(_CHUNK_SIZE) + if content: + return content + raise StopAsyncIteration - self.offset += bytes_to_read + @abstractmethod + async def read(self, amt=None) -> Awaitable[bytes]: + """async api to read a chunk from the data + Args: + amt (int, optional): batch size of the data to read + """ + + +class SizedAdapter(StreamAdapter, ABC): + """Adapter for data that can have a fixed size""" + + def __init__( + self, + stream: Union[bytes, str, IO], + size: Optional[int] = None, + **kwargs, + ): + """ + Args: + size (Optional[int]): + size of the data stream. + """ + self.size = size or _get_data_size(stream) + super().__init__(stream, **kwargs) + + def __len__(self) -> int: + return self.size + + def _length_to_read(self, amt: Optional[int]) -> int: + length_to_read = self.size - self.offset + if amt and amt > 0: + length_to_read = min(amt, length_to_read) + return length_to_read + + def _invoke_callbacks(self, content: bytes): + self.offset += len(content) _invoke_progress_callback( self.progress_callback, min(self.offset, self.size), self.size ) + if content: + _invoke_crc_callback(self.crc_callback, content) + content = _invoke_cipher_callback(self.cipher_callback, content) + return content - _invoke_crc_callback(self.crc_callback, content) - content = _invoke_cipher_callback(self.cipher_callback, content) +class SyncSizedAdapter(SyncAdapter, SizedAdapter): + """Adapter for data can get its length via `len(size)`""" - return content + def read(self, amt: Optional[int] = None) -> bytes: + if self.offset >= self.size: + return b"" + length_to_read = self._length_to_read(amt) + if hasattr(self.stream, "read"): + content = self.stream.read(length_to_read) + else: + content = self.stream[self.offset : self.offset + length_to_read] + return self._invoke_callbacks(content) -class BytesOrStringPayload(Payload): - """Payload of data with a length attributes""" +class AsyncSizedAdapter(AsyncAdapter, SizedAdapter): + """Adapter for file-like object""" + + async def read(self, amt: Optional[int] = None) -> bytes: + if self.offset >= self.size: + return b"" + length_to_read = self._length_to_read(amt) + if hasattr(self.stream, "read"): + content = await self.stream.read(length_to_read) + else: + content = self.stream[self.offset : self.offset + length_to_read] + return self._invoke_callbacks(content) + - _value: BytesAndStringAdapter +class UnsizedAdapter( + StreamAdapter, ABC +): # pylint: disable=too-few-public-methods + """Adapter for data that do not know its size.""" def __init__( - self, value: BytesAndStringAdapter, *args: Any, **kwargs: Any - ) -> None: - if not isinstance(value, BytesAndStringAdapter): - raise TypeError( - "value argument must be aiooss2.utils.BytesAndStringAdapter" - f", not {type(value)!r}" + self, stream: Union[bytes, str, IO], discard: int = 0, **kwargs + ): + super().__init__(stream, **kwargs) + self._read_all = False + self.discard = discard + self.size = None + + def _invoke_callbacks(self, content): + if not content: + self._read_all = True + else: + + self.offset += len(content) + + real_discard = 0 + if self.offset < self.discard: + if len(content) <= self.discard: + real_discard = len(content) + else: + real_discard = self.discard + + _invoke_progress_callback( + self.progress_callback, self.offset, None + ) + _invoke_crc_callback(self.crc_callback, content, real_discard) + content = _invoke_cipher_callback( + self.cipher_callback, content, real_discard ) + self.discard -= real_discard + return content + + +class SyncUnsizedAdapter(SyncAdapter, UnsizedAdapter): + """ + Adapter for data that have a `read` method to get its + data but do not know its size. + """ + + def __init__( + self, stream: Union[bytes, str, IO], discard: int = 0, **kwargs + ): + SyncAdapter.__init__(self, stream, **kwargs) + UnsizedAdapter.__init__(stream, discard, **kwargs) + + def read(self, amt: Optional[int] = None) -> bytes: + if self._read_all: + return b"" + if self.offset < self.discard and amt and self.cipher_callback: + amt += self.discard + + content = self.stream.read(amt) + return self._invoke_callbacks(content) + + +class AsyncUnsizedAdapter(AsyncAdapter, UnsizedAdapter): + """ + Adapter for data that have a async `read` method to get its + data but do not know its size. + """ + + async def read(self, amt: Optional[int] = None) -> bytes: + if self._read_all: + return b"" + if self.offset < self.discard and amt and self.cipher_callback: + amt += self.discard + + content = await self.stream.read(amt) + return self._invoke_callbacks(content) + + +class SizedPayload(Payload, ABC): + """Payload of data with a fixed length""" + + def __init__(self, value: SizedAdapter, *args: Any, **kwargs: Any) -> None: if "content_type" not in kwargs: kwargs["content_type"] = "application/octet-stream" @@ -74,6 +259,20 @@ def __init__( self._size = len(value) + +class SyncSizedPayload(SizedPayload): + """Payload of data with a length attributes""" + + _value: SyncSizedAdapter + + def __init__(self, value: SyncSizedAdapter, *args, **kwargs): + if not isinstance(value, SyncSizedAdapter): + raise TypeError( + "value argument must be SyncSizedAdapter" + f", not {type(value)!r}" + ) + super().__init__(value, *args, **kwargs) + if self._size > TOO_LARGE_BYTES_BODY: kwargs = {"source": self} logger.warning( @@ -85,57 +284,62 @@ def __init__( ) async def write(self, writer: AbstractStreamWriter) -> None: - """payload data writer - - Args: - writer (AbstractStreamWriter): _description_ - """ await writer.write(self._value.read()) -class FileAdapter(_FileLikeAdapter): - """adapter for those data do not know its size.""" +class AsyncSizedPayload(SizedPayload): + """Payload of data with a length attributes""" - async def aread(self, amt=None): - """async read read from the fileobj + _value: AsyncSizedAdapter - Args: - amt (_type_, optional): _description_. Defaults to None. + def __init__(self, value: AsyncSizedAdapter, *args, **kwargs): + if not isinstance(value, AsyncSizedAdapter): + raise TypeError( + "value argument must be AsyncSizedAdapter" + f", not {type(value)!r}" + ) + super().__init__(value, *args, **kwargs) - Returns: - _type_: _description_ - """ - offset_start = self.offset - if offset_start < self.discard and amt and self.cipher_callback: - amt += self.discard + async def write(self, writer: AbstractStreamWriter) -> None: + await writer.write(await self._value.read()) - content = await self.fileobj.read(amt) - if not content: - self.read_all = True - _invoke_progress_callback( - self.progress_callback, self.offset, None - ) - else: - _invoke_progress_callback( - self.progress_callback, self.offset, None - ) - self.offset += len(content) +class UnsizedPayload(Payload, ABC): + """Payload of data of unknown length""" - real_discard = 0 - if offset_start < self.discard: - if len(content) <= self.discard: - real_discard = len(content) - else: - real_discard = self.discard + def __init__(self, value: SizedAdapter, *args: Any, **kwargs: Any) -> None: + if "content_type" not in kwargs: + kwargs["content_type"] = "application/octet-stream" - _invoke_crc_callback(self.crc_callback, content, real_discard) - content = _invoke_cipher_callback( - self.cipher_callback, content, real_discard - ) + super().__init__(value, *args, **kwargs) - self.discard -= real_discard - return content + +class SyncUnsizedPayload(UnsizedPayload): + """Payload of sync data of unknown length""" + + _value: SyncUnsizedAdapter + + async def write(self, writer: AbstractStreamWriter) -> None: + loop = asyncio.get_event_loop() + chunk = await loop.run_in_executor(None, self._value.read, 2**16) + while chunk: + await writer.write(chunk) + chunk = await loop.run_in_executor(None, self._value.read, 2**16) + + +class AsyncUnsizedPayload(UnsizedPayload): + """Payload of async data of unknown length""" + + _value: AsyncUnsizedAdapter + + async def write(self, writer: AbstractStreamWriter) -> None: + chunk = await self._value.read(2**16) + while chunk: + await writer.write(chunk) + chunk = await self._value.read(2**16) -PAYLOAD_REGISTRY.register(BytesOrStringPayload, _BytesAndFileAdapter) +PAYLOAD_REGISTRY.register(SyncSizedPayload, SyncSizedAdapter) +PAYLOAD_REGISTRY.register(AsyncSizedPayload, AsyncSizedAdapter) +PAYLOAD_REGISTRY.register(SyncUnsizedPayload, SyncUnsizedAdapter) +PAYLOAD_REGISTRY.register(AsyncUnsizedPayload, AsyncUnsizedAdapter) diff --git a/aiooss2/http.py b/aiooss2/http.py index 341725b..c128cfc 100644 --- a/aiooss2/http.py +++ b/aiooss2/http.py @@ -1,7 +1,6 @@ """ aiooss2.http """ -# pylint: disable=invalid-overridden-method import logging from typing import TYPE_CHECKING, Optional @@ -9,7 +8,7 @@ from aiohttp.client_exceptions import ClientResponseError from oss2 import defaults from oss2.exceptions import RequestError -from oss2.http import _CHUNK_SIZE, USER_AGENT, CaseInsensitiveDict, Response +from oss2.http import _CHUNK_SIZE, USER_AGENT, CaseInsensitiveDict if TYPE_CHECKING: from aiohttp.client_reqrep import ClientResponse @@ -58,7 +57,7 @@ def __init__( # pylint: disable=too-many-arguments ) -class AioResponse(Response): +class AioResponse: """Async Version of the response wrapper adapting to the aiooss2 api """ @@ -66,11 +65,19 @@ class AioResponse(Response): response: "ClientResponse" def __init__(self, response: "ClientResponse"): - response.status_code = response.status # type: ignore[attr-defined] - super().__init__(response) + self.response = response + self.status = response.status + self.headers = response.headers + self.request_id = response.headers.get("x-oss-request-id", "") self.__all_read = False + logger.debug( + "Get response headers, req-id: %s, status: %d, headers: %s", + self.request_id, + self.status, + self.headers, + ) - async def aread(self, amt=None) -> bytes: + async def read(self, amt=None) -> bytes: """read the contents from the response""" if self.__all_read: return b"" @@ -89,10 +96,7 @@ async def aread(self, amt=None) -> bytes: self.__all_read = True return content - async def read(self, amt=None) -> bytes: - return await self.aread(amt) - - async def __iter__(self): + async def __aiter__(self): """content iterator""" return await self.response.content.iter_chunked(_CHUNK_SIZE) diff --git a/aiooss2/models.py b/aiooss2/models.py index d189f16..41c3082 100644 --- a/aiooss2/models.py +++ b/aiooss2/models.py @@ -3,7 +3,7 @@ """ import copy import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Awaitable, Optional from oss2.exceptions import ClientError from oss2.headers import ( @@ -11,7 +11,7 @@ KMS_ALI_WRAP_ALGORITHM, OSS_CLIENT_SIDE_ENCRYPTION_KEY, ) -from oss2.models import ContentCryptoMaterial, GetObjectResult, _hget +from oss2.models import ContentCryptoMaterial, HeadObjectResult, _hget from aiooss2.utils import make_adapter @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -class AioGetObjectResult(GetObjectResult): +class AioGetObjectResult(HeadObjectResult): """class for the result of api get_object""" resp: "AioResponse" @@ -35,7 +35,7 @@ def __init__( # pylint: disable=too-many-arguments crypto_provider=None, discard=0, ): - super(GetObjectResult, self).__init__(resp) + super().__init__(resp) self.__crypto_provider = crypto_provider self.__crc_enabled = crc_enabled @@ -144,13 +144,35 @@ async def __aenter__(self): async def __aexit__(self, *args): self.resp.release() + async def __aiter__(self): + async for data in self.stream: + return data + + @staticmethod + def _parse_range_str(content_range): + # :param str content_range: sample 'bytes 0-128/1024' + range_data = (content_range.split(" ", 2)[1]).split("/", 2)[0] + range_start, range_end = range_data.split("-", 2) + return int(range_start), int(range_end) + + def close(self): + """close the response response""" + self.resp.response.close() + @property def client_crc(self): + """the client crc""" if self.__crc_enabled: return self.stream.crc return None - async def read( - self, amt=None - ): # pylint: disable=invalid-overridden-method - return await self.stream.aread(amt) + async def read(self, amt: Optional[int] = None) -> Awaitable[bytes]: + """async read data from stream + + Args: + amt (int, optional): batch size of the data to read + + Returns: + Awaitable[bytes]: + """ + return await self.stream.read(amt) diff --git a/aiooss2/utils.py b/aiooss2/utils.py index d9d60ab..29ca4e7 100644 --- a/aiooss2/utils.py +++ b/aiooss2/utils.py @@ -1,6 +1,7 @@ """ Utils used in project. """ +import inspect import logging from typing import Callable, Optional @@ -8,7 +9,12 @@ from oss2.exceptions import ClientError, InconsistentError from oss2.utils import Crc64, _get_data_size, _IterableAdapter -from aiooss2.adapter import BytesAndStringAdapter, FileAdapter +from aiooss2.adapter import ( + AsyncSizedAdapter, + AsyncUnsizedAdapter, + SyncSizedAdapter, + SyncUnsizedAdapter, +) logger = logging.getLogger(__name__) @@ -75,7 +81,15 @@ def make_adapter( # pylint: disable=too-many-arguments raise ClientError( "Bytes of file object adapter does not support discard bytes" ) - return BytesAndStringAdapter( + if hasattr(data, "read") and inspect.iscoroutinefunction(data.read): + return AsyncSizedAdapter( + data, + progress_callback=progress_callback, + size=size, + crc_callback=crc_callback, + ) + + return SyncSizedAdapter( data, progress_callback=progress_callback, size=size, @@ -83,11 +97,18 @@ def make_adapter( # pylint: disable=too-many-arguments ) if hasattr(data, "read"): - return FileAdapter( + if inspect.iscoroutinefunction(data.read): + return AsyncUnsizedAdapter( + data, + progress_callback=progress_callback, + discard=discard, + crc_callback=crc_callback, + ) + return SyncUnsizedAdapter( data, - progress_callback, - crc_callback=crc_callback, + progress_callback=progress_callback, discard=discard, + crc_callback=crc_callback, ) if hasattr(data, "__iter__"):