Skip to content

Commit

Permalink
Releases v0.11.6.1 (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored May 13, 2024
1 parent 01a8f38 commit af6e1e4
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 47 deletions.
2 changes: 1 addition & 1 deletion odps/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version_info = (0, 11, 6)
version_info = (0, 11, 6, 1)
_num_index = max(idx if isinstance(v, int) else 0
for idx, v in enumerate(version_info))
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \
Expand Down
11 changes: 6 additions & 5 deletions odps/apis/storage_api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@


@pytest.fixture
def storage_api_client(odps_daily):
def storage_api_client(odps):
global _test_table_id

options.always_enable_schema = True

test_table_name = tn("test_halo_common_table_" + str(_test_table_id))
_test_table_id += 1
odps_daily.delete_table(test_table_name, if_exists=True)
table = odps_daily.create_table(
odps.delete_table(test_table_name, if_exists=True)
table = odps.create_table(
test_table_name,
("a BIGINT, b BIGINT, c BIGINT, d BIGINT", "pt string"),
if_not_exists=True,
)
try:
yield StorageApiArrowClient(odps_daily, table)
yield StorageApiArrowClient(odps, table)
finally:
table.drop(async_=True)
options.always_enable_schema = False
options.always_enable_schema = False

4 changes: 2 additions & 2 deletions odps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ def exist_resource(self, name, project=None, schema=None):

def open_resource(
self, name, project=None, mode='r+', encoding='utf-8', schema=None,
type="file", stream=False, comment=None
type="file", stream=False, comment=None, temp=False
):
"""
Open a file resource as file-like object.
Expand Down Expand Up @@ -831,7 +831,7 @@ def open_resource(
return name.open(mode=mode)

parent = self._get_project_or_schema(project, schema)
return parent.resources.get_typed(name, type=type, comment=comment).open(
return parent.resources.get_typed(name, type=type, comment=comment, temp=temp).open(
mode=mode, encoding=encoding, stream=stream
)

Expand Down
5 changes: 1 addition & 4 deletions odps/models/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ def get_resource_name(res):
schema_name = res._get_schema_name()
if res.project.name == self.project.name and schema_name is None:
return res.name
elif schema_name is not None:
return '%s/schemas/%s/resources/%s' % (res.project.name, schema_name, res.name)
else:
return '%s/resources/%s' % (res.project.name, res.name)
return res.full_resource_name
else:
return res

Expand Down
40 changes: 36 additions & 4 deletions odps/models/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from .core import LazyLoad
from .cache import cache, cache_parent

_RESOURCE_SPLITTER = '/resources/'
_SCHEMA_SPLITTER = '/schemas/'

if sys.version_info[0] < 3:
_StringIOType = type(compat.StringIO())
else:
Expand Down Expand Up @@ -118,6 +121,32 @@ def __init__(self, **kwargs):
kwargs['type'] = Resource.Type(typo.upper())
super(Resource, self).__init__(**kwargs)

@classmethod
def build_full_resource_name(cls, name, project_name, schema_name=None):
if project_name is None:
return name
elif schema_name is not None:
return project_name + _SCHEMA_SPLITTER + schema_name + _RESOURCE_SPLITTER + name
else:
return project_name + _RESOURCE_SPLITTER + name

@classmethod
def split_resource_name(cls, name):
project_name, schema_name = None, None
if _RESOURCE_SPLITTER in name:
project_schema_name, name = name.split(_RESOURCE_SPLITTER, 1)

if _SCHEMA_SPLITTER not in project_schema_name:
project_name, schema_name = project_schema_name, None
else:
project_name, schema_name = project_schema_name.split(_SCHEMA_SPLITTER, 1)
return project_name, schema_name, name

@property
def full_resource_name(self):
schema_name = self._get_schema_name()
return self.build_full_resource_name(self.name, self.project.name, schema_name)

def reload(self):
params = {}
schema_name = self._get_schema_name()
Expand All @@ -141,6 +170,9 @@ def reload(self):
self.last_modified_time = utils.parse_rfc822(
resp.headers.get('Last-Modified'))

is_temp_resource_header = resp.headers.get("x-odps-resource-istemp") or ""
self.is_temp_resource = is_temp_resource_header.lower() == "true"

self.source_table_name = resp.headers.get('x-odps-copy-table-source')
self.volume_path = resp.headers.get('x-odps-copy-file-source')
self.content_md5 = resp.headers.get('Content-MD5')
Expand Down Expand Up @@ -487,8 +519,8 @@ def __enter__(self):
def __exit__(self, *_):
self.close()

def update(self, file_obj):
return self._parent.update(self, file_obj=file_obj)
def update(self, file_obj, temp=None):
return self._parent.update(self, file_obj=file_obj, temp=temp)


@cache_parent
Expand Down Expand Up @@ -712,8 +744,8 @@ def open_writer(self, **kwargs):
)

def update(
self,
table_project_name=None, table_schema_name=None, table_name=None, *args, **kw):
self, table_project_name=None, table_schema_name=None, table_name=None, *args, **kw
):
"""
Update this resource.
Expand Down
26 changes: 9 additions & 17 deletions odps/models/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
from .. import serializers, errors
from ..compat import six

_RESOURCE_SPLITTER = '/resources/'
_SCHEMA_SPLITTER = '/schemas/'

DEFAULT_RESOURCE_CHUNK_SIZE = 64 << 20


Expand All @@ -33,20 +30,17 @@ class Resources(Iterable):

def get_typed(self, name, type, **kw):
type_cls = Resource._get_cls(type)
parent = self

if _RESOURCE_SPLITTER in name:
project_schema_name, name = name.split(_RESOURCE_SPLITTER, 1)

if _SCHEMA_SPLITTER not in project_schema_name:
project_name, schema_name = project_schema_name, None
else:
project_name, schema_name = project_schema_name.split(_SCHEMA_SPLITTER, 1)

project_name, schema_name, name = Resource.split_resource_name(name)
if project_name is not None:
parent = self.parent.project.parent[project_name]
if schema_name is not None:
parent = parent.schemas[schema_name]
return parent.resources[name]
return type_cls(client=self._client, parent=self, name=name, **kw)
parent = parent.resources
if "temp" in kw:
kw["is_temp_resource"] = kw.pop("temp")
return type_cls(client=self._client, parent=parent, name=name, **kw)

def _get(self, name):
return self.get_typed(name, None)
Expand All @@ -72,7 +66,7 @@ def __iter__(self):
return self.iterate()

def get(self, name, type=None):
Resource._get_cls(type)
return self.get_typed(name, type)

def iterate(self, name=None, owner=None):
params = {'expectmarker': 'true'}
Expand Down Expand Up @@ -187,9 +181,7 @@ def read_resource(
def merge_part_files(self, resource, part_resources, md5_hex, overwrite=False):
content = md5_hex + "|" + ",".join(res.name for res in part_resources)
total_bytes = sum(res.size for res in part_resources)
resource_args = resource.extract(

)
resource_args = resource.extract()
resource_args.update(
{"parent": self, "client": self._client, "merge_total_bytes": total_bytes}
)
Expand Down
46 changes: 45 additions & 1 deletion odps/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from .core import AbstractXMLRemoteModel
from .. import serializers, errors, utils
from ..compat import six
from ..compat import enum, six
from ..config import options


Expand Down Expand Up @@ -290,6 +290,50 @@ def update_sql_rt_settings(self, value=None, glob=True):
self.update_settings(settings)


class MaxFrameTask(Task):
__slots__ = ("_output_format", "_major_version", "_service_endpoint")
_root = "MaxFrame"
_anonymous_task_name = "AnonymousMaxFrameTask"

class CommandType(enum.Enum):
CREATE_SESSION = "CREATE_SESSION"
PYTHON_PACK = "PYTHON_PACK"

command = serializers.XMLNodeField(
"Command",
default=CommandType.CREATE_SESSION,
parse_callback=lambda t: MaxFrameTask.CommandType(t.upper()),
serialize_callback=lambda t: t.value,
)

def __init__(self, **kwargs):
kwargs["name"] = kwargs.get("name") or self._anonymous_task_name
self._major_version = kwargs.pop("major_version", None)
self._service_endpoint = kwargs.pop("service_endpoint", None)
super(MaxFrameTask, self).__init__(**kwargs)

if self.properties is None:
self.properties = OrderedDict()
self.properties["settings"] = "{}"

def serial(self):
if options.default_task_settings:
settings = options.default_task_settings.copy()
else:
settings = OrderedDict()

if self._major_version is not None:
settings["odps.task.major.version"] = self._major_version
if self._service_endpoint is not None:
settings["odps.service.endpoint"] = self._service_endpoint

if "settings" in self.properties:
settings.update(json.loads(self.properties["settings"]))

self.properties["settings"] = json.dumps(settings)
return super(MaxFrameTask, self).serial()


try:
from ..internal.models.tasks import * # noqa: F401
except ImportError:
Expand Down
37 changes: 29 additions & 8 deletions odps/models/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,26 @@ def test_table_resource(config, odps):
odps.delete_table(test_table_name, project=secondary_project)


def test_temp_file_resource(odps):
def test_temp_file_resource(odps_daily):
odps = odps_daily
resource_name = tn('pyodps_t_tmp_file_resource')

try:
odps.delete_resource(resource_name)
except errors.ODPSError:
pass

resource = odps.create_resource(resource_name, 'file', file_obj=FILE_CONTENT, temp=True)
resource = odps.create_resource(resource_name, 'file', fileobj=FILE_CONTENT, temp=True)
assert isinstance(resource, FileResource)
assert resource.is_temp_resource is True
assert resource.is_temp_resource
resource.reload()
assert resource.is_temp_resource

odps.delete_resource(resource_name)


def test_stream_file_resource(odps):
def test_stream_file_resource(odps_daily):
odps = odps_daily
options.resource_chunk_size = 1024
content = OVERWRITE_FILE_CONTENT * 32
resource_name = tn('pyodps_t_tmp_file_resource')
Expand Down Expand Up @@ -254,33 +258,50 @@ def test_stream_file_resource(odps):
sio.write(line)
assert sio.getvalue() == content

with odps.open_resource(resource_name, mode="w", stream=True) as res:
odps.delete_resource(resource_name)

with odps.open_resource(resource_name, mode="w", stream=True, temp=True) as res:
lines = content.splitlines(True)
for offset in range(0, len(lines), 50):
res.writelines(lines[offset:offset + 50])

with odps.open_resource(resource_name, mode="r", stream=True) as res:
lines = res.readlines()
res.reload()
assert res.is_temp_resource
assert "".join(lines) == content


def test_file_resource(odps):
def test_file_resource(odps_daily):
odps = odps_daily
resource_name = tn('pyodps_t_tmp_file_resource')

try:
odps.delete_resource(resource_name)
except errors.ODPSError:
pass

resource = odps.create_resource(resource_name, 'file', file_obj=FILE_CONTENT)
resource = odps.create_resource(resource_name, 'file', fileobj=FILE_CONTENT)
assert isinstance(resource, FileResource)
resource.drop()

# create resource with open_resource and write
with odps.open_resource(
resource_name, mode='w', type='file', comment="comment_data"
resource_name, mode='w', type='file', comment="comment_data", temp=True
) as resource:
resource.write(FILE_CONTENT)
resource.reload()
assert resource.is_temp_resource
resource.drop()

# create resource with full resource path
with odps.open_resource(
odps.project + "/resources/" + resource_name, mode='w', type='file',
comment="comment_data", temp=True,
) as resource:
resource.write(FILE_CONTENT)
resource.reload()
assert resource.is_temp_resource

resource.reload()
assert resource.comment == "comment_data"
Expand Down
28 changes: 27 additions & 1 deletion odps/models/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ...config import options
from ...tests.core import tn, wait_filled
from ...utils import get_zone_name, to_text
from .. import SQLTask, MergeTask, CupidTask, SQLCostTask, Task
from .. import CupidTask, MaxFrameTask, MergeTask, SQLCostTask, SQLTask, Task

try:
import zoneinfo
Expand Down Expand Up @@ -96,6 +96,19 @@
</SQLCost>
'''

maxframe_template = '''<?xml version="1.0" encoding="utf-8"?>
<MaxFrame>
<Name>AnonymousMaxFrameTask</Name>
<Config>
<Property>
<Name>settings</Name>
<Value>{"odps.service.endpoint": "%(endpoint)s", "odps.maxframe.output_format": "maxframe_v1"}</Value>
</Property>
</Config>
<Command>CREATE_SESSION</Command>
</MaxFrame>
'''


def test_task_class_type():
typed = Task(type='SQL', query='select * from dual')
Expand Down Expand Up @@ -242,3 +255,16 @@ def test_sql_cost_task_to_xml():

task = Task.parse(None, to_xml)
assert isinstance(task, SQLCostTask)


def test_maxframe_task_to_xml(odps):
task = MaxFrameTask(service_endpoint=odps.endpoint)
task.update_settings({"odps.maxframe.output_format": "maxframe_v1"})
to_xml = task.serialize()
right_xml = maxframe_template % {'endpoint': odps.endpoint}

assert to_text(to_xml) == to_text(right_xml)

task = Task.parse(None, to_xml)
assert isinstance(task, MaxFrameTask)
assert task.command == MaxFrameTask.CommandType.CREATE_SESSION
Loading

0 comments on commit af6e1e4

Please sign in to comment.