diff --git a/odps/_version.py b/odps/_version.py index 933d164..c665693 100644 --- a/odps/_version.py +++ b/odps/_version.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -version_info = (0, 11, 6) +version_info = (0, 11, 6, 1) _num_index = max(idx if isinstance(v, int) else 0 for idx, v in enumerate(version_info)) __version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \ diff --git a/odps/apis/storage_api/conftest.py b/odps/apis/storage_api/conftest.py index 62f33a0..754f87c 100644 --- a/odps/apis/storage_api/conftest.py +++ b/odps/apis/storage_api/conftest.py @@ -29,21 +29,22 @@ @pytest.fixture -def storage_api_client(odps_daily): +def storage_api_client(odps): global _test_table_id options.always_enable_schema = True test_table_name = tn("test_halo_common_table_" + str(_test_table_id)) _test_table_id += 1 - odps_daily.delete_table(test_table_name, if_exists=True) - table = odps_daily.create_table( + odps.delete_table(test_table_name, if_exists=True) + table = odps.create_table( test_table_name, ("a BIGINT, b BIGINT, c BIGINT, d BIGINT", "pt string"), if_not_exists=True, ) try: - yield StorageApiArrowClient(odps_daily, table) + yield StorageApiArrowClient(odps, table) finally: table.drop(async_=True) - options.always_enable_schema = False \ No newline at end of file + options.always_enable_schema = False + diff --git a/odps/core.py b/odps/core.py index bfdbb9f..fe85aca 100644 --- a/odps/core.py +++ b/odps/core.py @@ -775,7 +775,7 @@ def exist_resource(self, name, project=None, schema=None): def open_resource( self, name, project=None, mode='r+', encoding='utf-8', schema=None, - type="file", stream=False, comment=None + type="file", stream=False, comment=None, temp=False ): """ Open a file resource as file-like object. @@ -831,7 +831,7 @@ def open_resource( return name.open(mode=mode) parent = self._get_project_or_schema(project, schema) - return parent.resources.get_typed(name, type=type, comment=comment).open( + return parent.resources.get_typed(name, type=type, comment=comment, temp=temp).open( mode=mode, encoding=encoding, stream=stream ) diff --git a/odps/models/function.py b/odps/models/function.py index da4da08..ef80522 100644 --- a/odps/models/function.py +++ b/odps/models/function.py @@ -81,10 +81,7 @@ def get_resource_name(res): schema_name = res._get_schema_name() if res.project.name == self.project.name and schema_name is None: return res.name - elif schema_name is not None: - return '%s/schemas/%s/resources/%s' % (res.project.name, schema_name, res.name) - else: - return '%s/resources/%s' % (res.project.name, res.name) + return res.full_resource_name else: return res diff --git a/odps/models/resource.py b/odps/models/resource.py index bf3e5af..c834f52 100644 --- a/odps/models/resource.py +++ b/odps/models/resource.py @@ -24,6 +24,9 @@ from .core import LazyLoad from .cache import cache, cache_parent +_RESOURCE_SPLITTER = '/resources/' +_SCHEMA_SPLITTER = '/schemas/' + if sys.version_info[0] < 3: _StringIOType = type(compat.StringIO()) else: @@ -118,6 +121,32 @@ def __init__(self, **kwargs): kwargs['type'] = Resource.Type(typo.upper()) super(Resource, self).__init__(**kwargs) + @classmethod + def build_full_resource_name(cls, name, project_name, schema_name=None): + if project_name is None: + return name + elif schema_name is not None: + return project_name + _SCHEMA_SPLITTER + schema_name + _RESOURCE_SPLITTER + name + else: + return project_name + _RESOURCE_SPLITTER + name + + @classmethod + def split_resource_name(cls, name): + project_name, schema_name = None, None + if _RESOURCE_SPLITTER in name: + project_schema_name, name = name.split(_RESOURCE_SPLITTER, 1) + + if _SCHEMA_SPLITTER not in project_schema_name: + project_name, schema_name = project_schema_name, None + else: + project_name, schema_name = project_schema_name.split(_SCHEMA_SPLITTER, 1) + return project_name, schema_name, name + + @property + def full_resource_name(self): + schema_name = self._get_schema_name() + return self.build_full_resource_name(self.name, self.project.name, schema_name) + def reload(self): params = {} schema_name = self._get_schema_name() @@ -141,6 +170,9 @@ def reload(self): self.last_modified_time = utils.parse_rfc822( resp.headers.get('Last-Modified')) + is_temp_resource_header = resp.headers.get("x-odps-resource-istemp") or "" + self.is_temp_resource = is_temp_resource_header.lower() == "true" + self.source_table_name = resp.headers.get('x-odps-copy-table-source') self.volume_path = resp.headers.get('x-odps-copy-file-source') self.content_md5 = resp.headers.get('Content-MD5') @@ -487,8 +519,8 @@ def __enter__(self): def __exit__(self, *_): self.close() - def update(self, file_obj): - return self._parent.update(self, file_obj=file_obj) + def update(self, file_obj, temp=None): + return self._parent.update(self, file_obj=file_obj, temp=temp) @cache_parent @@ -712,8 +744,8 @@ def open_writer(self, **kwargs): ) def update( - self, - table_project_name=None, table_schema_name=None, table_name=None, *args, **kw): + self, table_project_name=None, table_schema_name=None, table_name=None, *args, **kw + ): """ Update this resource. diff --git a/odps/models/resources.py b/odps/models/resources.py index afc6285..fef67a7 100644 --- a/odps/models/resources.py +++ b/odps/models/resources.py @@ -19,9 +19,6 @@ from .. import serializers, errors from ..compat import six -_RESOURCE_SPLITTER = '/resources/' -_SCHEMA_SPLITTER = '/schemas/' - DEFAULT_RESOURCE_CHUNK_SIZE = 64 << 20 @@ -33,20 +30,17 @@ class Resources(Iterable): def get_typed(self, name, type, **kw): type_cls = Resource._get_cls(type) + parent = self - if _RESOURCE_SPLITTER in name: - project_schema_name, name = name.split(_RESOURCE_SPLITTER, 1) - - if _SCHEMA_SPLITTER not in project_schema_name: - project_name, schema_name = project_schema_name, None - else: - project_name, schema_name = project_schema_name.split(_SCHEMA_SPLITTER, 1) - + project_name, schema_name, name = Resource.split_resource_name(name) + if project_name is not None: parent = self.parent.project.parent[project_name] if schema_name is not None: parent = parent.schemas[schema_name] - return parent.resources[name] - return type_cls(client=self._client, parent=self, name=name, **kw) + parent = parent.resources + if "temp" in kw: + kw["is_temp_resource"] = kw.pop("temp") + return type_cls(client=self._client, parent=parent, name=name, **kw) def _get(self, name): return self.get_typed(name, None) @@ -72,7 +66,7 @@ def __iter__(self): return self.iterate() def get(self, name, type=None): - Resource._get_cls(type) + return self.get_typed(name, type) def iterate(self, name=None, owner=None): params = {'expectmarker': 'true'} @@ -187,9 +181,7 @@ def read_resource( def merge_part_files(self, resource, part_resources, md5_hex, overwrite=False): content = md5_hex + "|" + ",".join(res.name for res in part_resources) total_bytes = sum(res.size for res in part_resources) - resource_args = resource.extract( - - ) + resource_args = resource.extract() resource_args.update( {"parent": self, "client": self._client, "merge_total_bytes": total_bytes} ) diff --git a/odps/models/tasks.py b/odps/models/tasks.py index 725495f..37868ac 100644 --- a/odps/models/tasks.py +++ b/odps/models/tasks.py @@ -23,7 +23,7 @@ from .core import AbstractXMLRemoteModel from .. import serializers, errors, utils -from ..compat import six +from ..compat import enum, six from ..config import options @@ -290,6 +290,50 @@ def update_sql_rt_settings(self, value=None, glob=True): self.update_settings(settings) +class MaxFrameTask(Task): + __slots__ = ("_output_format", "_major_version", "_service_endpoint") + _root = "MaxFrame" + _anonymous_task_name = "AnonymousMaxFrameTask" + + class CommandType(enum.Enum): + CREATE_SESSION = "CREATE_SESSION" + PYTHON_PACK = "PYTHON_PACK" + + command = serializers.XMLNodeField( + "Command", + default=CommandType.CREATE_SESSION, + parse_callback=lambda t: MaxFrameTask.CommandType(t.upper()), + serialize_callback=lambda t: t.value, + ) + + def __init__(self, **kwargs): + kwargs["name"] = kwargs.get("name") or self._anonymous_task_name + self._major_version = kwargs.pop("major_version", None) + self._service_endpoint = kwargs.pop("service_endpoint", None) + super(MaxFrameTask, self).__init__(**kwargs) + + if self.properties is None: + self.properties = OrderedDict() + self.properties["settings"] = "{}" + + def serial(self): + if options.default_task_settings: + settings = options.default_task_settings.copy() + else: + settings = OrderedDict() + + if self._major_version is not None: + settings["odps.task.major.version"] = self._major_version + if self._service_endpoint is not None: + settings["odps.service.endpoint"] = self._service_endpoint + + if "settings" in self.properties: + settings.update(json.loads(self.properties["settings"])) + + self.properties["settings"] = json.dumps(settings) + return super(MaxFrameTask, self).serial() + + try: from ..internal.models.tasks import * # noqa: F401 except ImportError: diff --git a/odps/models/tests/test_resources.py b/odps/models/tests/test_resources.py index 86e1a3d..d526b50 100644 --- a/odps/models/tests/test_resources.py +++ b/odps/models/tests/test_resources.py @@ -195,7 +195,8 @@ def test_table_resource(config, odps): odps.delete_table(test_table_name, project=secondary_project) -def test_temp_file_resource(odps): +def test_temp_file_resource(odps_daily): + odps = odps_daily resource_name = tn('pyodps_t_tmp_file_resource') try: @@ -203,14 +204,17 @@ def test_temp_file_resource(odps): except errors.ODPSError: pass - resource = odps.create_resource(resource_name, 'file', file_obj=FILE_CONTENT, temp=True) + resource = odps.create_resource(resource_name, 'file', fileobj=FILE_CONTENT, temp=True) assert isinstance(resource, FileResource) - assert resource.is_temp_resource is True + assert resource.is_temp_resource + resource.reload() + assert resource.is_temp_resource odps.delete_resource(resource_name) -def test_stream_file_resource(odps): +def test_stream_file_resource(odps_daily): + odps = odps_daily options.resource_chunk_size = 1024 content = OVERWRITE_FILE_CONTENT * 32 resource_name = tn('pyodps_t_tmp_file_resource') @@ -254,17 +258,22 @@ def test_stream_file_resource(odps): sio.write(line) assert sio.getvalue() == content - with odps.open_resource(resource_name, mode="w", stream=True) as res: + odps.delete_resource(resource_name) + + with odps.open_resource(resource_name, mode="w", stream=True, temp=True) as res: lines = content.splitlines(True) for offset in range(0, len(lines), 50): res.writelines(lines[offset:offset + 50]) with odps.open_resource(resource_name, mode="r", stream=True) as res: lines = res.readlines() + res.reload() + assert res.is_temp_resource assert "".join(lines) == content -def test_file_resource(odps): +def test_file_resource(odps_daily): + odps = odps_daily resource_name = tn('pyodps_t_tmp_file_resource') try: @@ -272,15 +281,27 @@ def test_file_resource(odps): except errors.ODPSError: pass - resource = odps.create_resource(resource_name, 'file', file_obj=FILE_CONTENT) + resource = odps.create_resource(resource_name, 'file', fileobj=FILE_CONTENT) assert isinstance(resource, FileResource) resource.drop() # create resource with open_resource and write with odps.open_resource( - resource_name, mode='w', type='file', comment="comment_data" + resource_name, mode='w', type='file', comment="comment_data", temp=True ) as resource: resource.write(FILE_CONTENT) + resource.reload() + assert resource.is_temp_resource + resource.drop() + + # create resource with full resource path + with odps.open_resource( + odps.project + "/resources/" + resource_name, mode='w', type='file', + comment="comment_data", temp=True, + ) as resource: + resource.write(FILE_CONTENT) + resource.reload() + assert resource.is_temp_resource resource.reload() assert resource.comment == "comment_data" diff --git a/odps/models/tests/test_tasks.py b/odps/models/tests/test_tasks.py index 2951542..bb70618 100644 --- a/odps/models/tests/test_tasks.py +++ b/odps/models/tests/test_tasks.py @@ -21,7 +21,7 @@ from ...config import options from ...tests.core import tn, wait_filled from ...utils import get_zone_name, to_text -from .. import SQLTask, MergeTask, CupidTask, SQLCostTask, Task +from .. import CupidTask, MaxFrameTask, MergeTask, SQLCostTask, SQLTask, Task try: import zoneinfo @@ -96,6 +96,19 @@ ''' +maxframe_template = ''' + + AnonymousMaxFrameTask + + + settings + {"odps.service.endpoint": "%(endpoint)s", "odps.maxframe.output_format": "maxframe_v1"} + + + CREATE_SESSION + +''' + def test_task_class_type(): typed = Task(type='SQL', query='select * from dual') @@ -242,3 +255,16 @@ def test_sql_cost_task_to_xml(): task = Task.parse(None, to_xml) assert isinstance(task, SQLCostTask) + + +def test_maxframe_task_to_xml(odps): + task = MaxFrameTask(service_endpoint=odps.endpoint) + task.update_settings({"odps.maxframe.output_format": "maxframe_v1"}) + to_xml = task.serialize() + right_xml = maxframe_template % {'endpoint': odps.endpoint} + + assert to_text(to_xml) == to_text(right_xml) + + task = Task.parse(None, to_xml) + assert isinstance(task, MaxFrameTask) + assert task.command == MaxFrameTask.CommandType.CREATE_SESSION diff --git a/odps/tunnel/tests/test_tabletunnel.py b/odps/tunnel/tests/test_tabletunnel.py index 6f1f3f7..dd6264d 100644 --- a/odps/tunnel/tests/test_tabletunnel.py +++ b/odps/tunnel/tests/test_tabletunnel.py @@ -56,9 +56,9 @@ @pytest.fixture(autouse=True) -def check_malicious_requests(): +def check_malicious_requests(odps): def _new_throw_if_parsable(resp, *args, **kw): - if resp.status_code in (400, 404): + if resp.status_code in (400, 404) and not resp.url.startswith(odps.endpoint): raise AssertionError("Malicious request detected.") throw_if_parsable(resp, *args, **kw) @@ -934,10 +934,11 @@ def test_decimal_with_complex_types(odps): def test_json_timestamp_types(odps_daily): import pandas as pd + odps = odps_daily table_name = tn("test_json_types") - odps_daily.delete_table(table_name, if_exists=True) + odps.delete_table(table_name, if_exists=True) hints = {"odps.sql.type.json.enable": "true"} - table = odps_daily.create_table( + table = odps.create_table( table_name, "col1 json, col2 timestamp_ntz, col3 string", hints=hints )