Skip to content

Commit

Permalink
upgrade to 0.8.2
Browse files Browse the repository at this point in the history
  • Loading branch information
继盛 committed Jul 1, 2019
1 parent 4b0de18 commit 39dd4db
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 22 deletions.
2 changes: 1 addition & 1 deletion odps/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version_info = (0, 8, 1)
version_info = (0, 8, 2)
_num_index = max(idx if isinstance(v, int) else 0
for idx, v in enumerate(version_info))
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \
Expand Down
16 changes: 16 additions & 0 deletions odps/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def _do_POST(self):
self.end_headers()
return

self._sign(postvars)

def _sign(self, postvars):
if self.server._token is not None:
auth = self.headers.get('Authorization')
if not auth:
Expand Down Expand Up @@ -267,3 +270,16 @@ def sign_request(self, req, endpoint):
LOG.debug('headers after signing: ' + repr(req.headers))
else:
raise SignServerError('Sign server returned error code: %d' % resp.status_code, resp.status_code)


class BearerTokenAccount(BaseAccount):
def __init__(self, token):
self._token = token

@property
def token(self):
return self._token

def sign_request(self, req, endpoint):
req.headers['x-odps-bearer-token'] = self._token
LOG.debug('headers after signing: ' + repr(req.headers))
4 changes: 2 additions & 2 deletions odps/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

DEFAULT_CHUNK_SIZE = 1496
DEFAULT_CONNECT_RETRY_TIMES = 4
DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_CONNECT_TIMEOUT = 10
DEFAULT_READ_TIMEOUT = 120
DEFAULT_POOL_CONNECTIONS = 10
DEFAULT_POOL_MAXSIZE = 10
Expand Down Expand Up @@ -329,7 +329,7 @@ def validate(x):
options.register_option('allow_antique_date', False)
options.register_option('user_agent_pattern', '$pyodps_version $python_version $os_version')
options.register_option('log_view_host', None)
options.register_option('log_view_hours', 24, validator=is_integer)
options.register_option('log_view_hours', 24 * 30, validator=is_integer)
options.register_option('api_proxy', None)
options.register_option('data_proxy', None)
options.redirect_option('tunnel_proxy', 'data_proxy')
Expand Down
7 changes: 3 additions & 4 deletions odps/df/backends/odpssql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,12 +1398,11 @@ def visit_join(self, expr):
self._ctx.add_expr_compiled(expr, from_clause)

def visit_union(self, expr):
if expr._distinct:
raise CompileError("Distinct union is not supported here.")

union_type = 'UNION ALL' if not expr._distinct else 'UNION'
left_compiled, right_compiled = tuple(self._sub_compiles[expr])

from_clause = '{0} \nUNION ALL\n{1}'.format(left_compiled, utils.indent(right_compiled, self._indent_size))
from_clause = '{0} \n{1}\n{2}'.format(left_compiled, union_type,
utils.indent(right_compiled, self._indent_size))

compiled = from_clause
if not self._union_no_alias.get(expr, False):
Expand Down
14 changes: 14 additions & 0 deletions odps/df/backends/odpssql/tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2546,6 +2546,20 @@ def testUnion(self):

self.assertEqual(to_str(expected), to_str(ODPSEngine(self.odps).compile(expr, prettify=False)))

expr = e.union(e3['const', 'id', 'name'], distinct=True)

expected = "SELECT * \n" \
"FROM (\n" \
" SELECT t1.`name`, t1.`id`, 'cst' AS `const` \n" \
" FROM mocked_project.`pyodps_test_expr_table` t1 \n" \
" UNION\n" \
" SELECT t2.`name`, SUM(t2.`id`) AS `id`, 'cst' AS `const` \n" \
" FROM mocked_project.`pyodps_test_expr_table1` t2 \n" \
" GROUP BY t2.`name`\n" \
") t3"

self.assertEqual(to_str(expected), to_str(ODPSEngine(self.odps).compile(expr, prettify=False)))

def testAliases(self):
df = self.expr
df = df[(df.id == 1) | (df.id == 2)].exclude(['fid'])
Expand Down
28 changes: 23 additions & 5 deletions odps/df/backends/odpssql/types.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2017 Alibaba Group Holding Ltd.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,6 +18,7 @@
from .... import types as odps_types
from ... import types as df_types
from ....compat import six
from ....config import options


_odps_to_df_types = {
Expand Down Expand Up @@ -46,6 +47,19 @@
df_types.binary: odps_types.string,
}

_df_to_odps_types2 = {
df_types.int8: odps_types.tinyint,
df_types.int16: odps_types.smallint,
df_types.int32: odps_types.int_,
df_types.int64: odps_types.bigint,
df_types.float32: odps_types.float_,
df_types.float64: odps_types.double,
df_types.boolean: odps_types.boolean,
df_types.string: odps_types.string,
df_types.datetime: odps_types.datetime,
df_types.binary: odps_types.binary,
}


def odps_type_to_df_type(odps_type):
if isinstance(odps_type, six.string_types):
Expand Down Expand Up @@ -74,11 +88,15 @@ def odps_schema_to_df_schema(odps_schema):


def df_type_to_odps_type(df_type):
if options.sql.use_odps2_extension:
df_to_odps_types = _df_to_odps_types2
else:
df_to_odps_types = _df_to_odps_types
if isinstance(df_type, six.string_types):
df_type = df_types.validate_data_type(df_type)

if df_type in _df_to_odps_types:
return _df_to_odps_types[df_type]
if df_type in df_to_odps_types:
return df_to_odps_types[df_type]
elif df_type == df_types.decimal:
return odps_types.Decimal()
elif isinstance(df_type, df_types.List):
Expand Down
29 changes: 27 additions & 2 deletions odps/df/backends/tests/test_mixed_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import time
import uuid

from odps.tests.core import tn, pandas_case
from odps import types as odps_types
from odps.tests.core import tn, pandas_case, odps2_typed_case
from odps.df.backends.tests.core import TestBase
from odps.config import options
from odps.compat import unittest
from odps.models import Schema, Instance
from odps.errors import ODPSError
from odps.df.backends.engine import MixedEngine
from odps.df.backends.odpssql.engine import ODPSSQLEngine
from odps.df.backends.pd.engine import PandasEngine
Expand Down Expand Up @@ -194,6 +194,31 @@ def testPandasPersist(self):
finally:
self.odps.delete_table(tmp_table_name)

@odps2_typed_case
def testPandasPersistODPS2(self):
import pandas as pd
import numpy as np

data_int8 = np.random.randint(0, 10, (1,), dtype=np.int8)
data_int16 = np.random.randint(0, 10, (1,), dtype=np.int16)
data_int32 = np.random.randint(0, 10, (1,), dtype=np.int32)
data_int64 = np.random.randint(0, 10, (1,), dtype=np.int64)
data_float32 = np.random.random((1,)).astype(np.float32)
data_float64 = np.random.random((1,)).astype(np.float64)

df = DataFrame(pd.DataFrame(dict(data_int8=data_int8, data_int16=data_int16,
data_int32=data_int32, data_int64=data_int64,
data_float32=data_float32, data_float64=data_float64)))
tmp_table_name = tn('pyodps_test_mixed_persist_odps2_types')

self.odps.delete_table(tmp_table_name, if_exists=True)
df.persist(tmp_table_name, lifecycle=1, drop_table=True, odps=self.odps)

t = self.odps.get_table(tmp_table_name)
expected_types = [odps_types.tinyint, odps_types.smallint, odps_types.int_,
odps_types.bigint, odps_types.float_, odps_types.double]
self.assertEqual(expected_types, t.schema.types)

def testExecuteCacheTable(self):
df = self.odps_df[self.odps_df.name == 'name1']
result = df.execute().values
Expand Down
2 changes: 2 additions & 0 deletions odps/models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,8 @@ def open_reader(self, *args, **kwargs):
if false, use conventional routine.
if absent, `options.tunnel.use_instance_tunnel` will be used and automatic fallback
is enabled.
:param limit: if True, enable the limitation
:type limit: bool
:param reopen: the reader will reuse last one, reopen is true means open a new reader.
:type reopen: bool
:param endpoint: the tunnel service URL
Expand Down
4 changes: 3 additions & 1 deletion odps/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def _get_submit_instance_content(cls, job):
return Instance.AnonymousSubmitInstance(job=job).serialize()

def create(self, xml=None, job=None, task=None, priority=None, running_cluster=None,
headers=None, create_callback=None):
headers=None, create_callback=None, encoding=None):
if xml is None:
job = self._create_job(job=job, task=task, priority=priority,
running_cluster=running_cluster)
Expand All @@ -155,6 +155,8 @@ def create(self, xml=None, job=None, task=None, priority=None, running_cluster=N
if create_callback is not None:
create_callback(instance_id)

if encoding is not None:
resp.encoding = encoding
body = resp.text
if body:
instance_result = Instance.InstanceResult.parse(self._client, resp)
Expand Down
12 changes: 8 additions & 4 deletions odps/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ def gen_create_table_sql(table_name, table_schema, comment=None, if_not_exists=F
project = utils.to_text(project)
comment = utils.to_text(comment)

store_as = kw.get('store_as')
stored_as = kw.get('stored_as')
external_stored_as = kw.get('external_stored_as')
storage_handler = kw.get('storage_handler')

buf.write(u'CREATE%s TABLE ' % (' EXTERNAL' if storage_handler or store_as else ''))
buf.write(u'CREATE%s TABLE ' % (' EXTERNAL' if storage_handler or external_stored_as else ''))
if if_not_exists:
buf.write(u'IF NOT EXISTS ')
if project is not None:
Expand Down Expand Up @@ -319,11 +320,11 @@ def write_columns(col_array):
serde_properties = kw.get('serde_properties')
location = kw.get('location')
resources = kw.get('resources')
if storage_handler or store_as:
if storage_handler or external_stored_as:
if storage_handler:
buf.write("STORED BY '%s'\n" % escape_odps_string(storage_handler))
else:
buf.write("STORED AS %s\n" % escape_odps_string(store_as))
buf.write("STORED AS %s\n" % escape_odps_string(external_stored_as))
if serde_properties:
buf.write('WITH SERDEPROPERTIES (\n')
for idx, k in enumerate(serde_properties):
Expand All @@ -344,6 +345,9 @@ def write_columns(col_array):
if hub_lifecycle is not None:
buf.write(u' HUBLIFECYCLE %s\n' % hub_lifecycle)

if stored_as:
buf.write("STORED AS %s\n" % escape_odps_string(stored_as))

return buf.getvalue().strip()

def get_ddl(self, with_comments=True, if_not_exists=False):
Expand Down
4 changes: 4 additions & 0 deletions odps/models/xflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ def get_xflow_sub_instances(self, instance):
six.itervalues(self.get_xflow_results(instance))):
if x_result.node_type == 'Instance':
inst_dict[x_result.name] = self.odps.get_instance(x_result.instance_id)
elif x_result.node_type == 'SubWorkflow':
sub_instance = self.odps.get_instance(x_result.instance_id)
sub_inst_dict = self.odps.get_xflow_sub_instances(sub_instance)
inst_dict.update(**sub_inst_dict)
return inst_dict

def iter_xflow_sub_instances(self, instance, interval=1):
Expand Down
6 changes: 5 additions & 1 deletion odps/tempobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,11 @@ def clean_thread():


def _gen_repository_key(odps):
return hashlib.md5('####'.join([odps.account.access_id, odps.endpoint, odps.project]).encode('utf-8')).hexdigest()
if hasattr(odps.account, 'access_id'):
keys = [odps.account.access_id, odps.endpoint, odps.project]
elif hasattr(odps.account, 'token'):
keys = [odps.account.token, odps.endpoint, odps.project]
return hashlib.md5('####'.join(keys).encode('utf-8')).hexdigest()


def _put_objects(odps, objs):
Expand Down
36 changes: 35 additions & 1 deletion odps/tests/test_accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import uuid

from odps import ODPS
from odps import errors
from odps.tests.core import TestBase, tn
from odps.accounts import SignServer, SignServerAccount, SignServerError
from odps.accounts import SignServer, SignServerAccount, SignServerError, BearerTokenAccount


class Test(TestBase):
Expand Down Expand Up @@ -54,3 +55,36 @@ def testTokenizedSignServerAccount(self):
t.drop(async_=True)
finally:
server.stop()

def testBearerTokenAccount(self):
self.odps.delete_table(tn('test_bearer_token_account_table'), if_exists=True)
t = self.odps.create_table(tn('test_bearer_token_account_table'), 'col string', lifecycle=1)
with t.open_writer() as writer:
records = [['val1'], ['val2'], ['val3']]
writer.write(records)

inst = self.odps.execute_sql('select count(*) from {0}'.format(tn('test_bearer_token_account_table')), async_=True)
inst.wait_for_success()
task_name = inst.get_task_names()[0]

logview_address = inst.get_logview_address()
token = logview_address[logview_address.find('token=') + len('token='):]
bearer_token_account = BearerTokenAccount(token=token)
bearer_token_odps = ODPS(None, None, self.odps.project, self.odps.endpoint, account=bearer_token_account)
bearer_token_instance = bearer_token_odps.get_instance(inst.id)

self.assertEqual(inst.get_task_result(task_name),
bearer_token_instance.get_task_result(task_name))
self.assertEqual(inst.get_task_summary(task_name),
bearer_token_instance.get_task_summary(task_name))

with self.assertRaises(errors.NoPermission):
bearer_token_odps.create_table(tn('test_bearer_token_account_table_test1'),
'col string', lifecycle=1)

fake_token_account = BearerTokenAccount(token='fake-token')
bearer_token_odps = ODPS(None, None, self.odps.project, self.odps.endpoint, account=fake_token_account)

with self.assertRaises(errors.ODPSError):
bearer_token_odps.create_table(tn('test_bearer_token_account_table_test2'),
'col string', lifecycle=1)
2 changes: 1 addition & 1 deletion odps/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _repr(self):
return buf.getvalue()

def build_snapshot(self):
if self._snapshot is None and not options.force_py:
if not options.force_py:
if not self._columns:
return None

Expand Down
20 changes: 20 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Parts of this file were taken from the pandas project
# (https://github.com/pandas-dev/pandas), which is permitted for use under
# the BSD 3-Clause License

from setuptools import setup, find_packages, Extension
from setuptools.command.install import install
from distutils.cmd import Command
from distutils.sysconfig import get_config_var
from distutils.version import LooseVersion

import sys
import os
import platform
import shutil


# From https://github.com/pandas-dev/pandas/pull/24274:
# For mac, ensure extensions are built for macos 10.9 when compiling on a
# 10.9 system or above, overriding distuitls behaviour which is to target
# the version that python was built for. This may be overridden by setting
# MACOSX_DEPLOYMENT_TARGET before calling setup.py
if sys.platform == 'darwin':
if 'MACOSX_DEPLOYMENT_TARGET' not in os.environ:
current_system = LooseVersion(platform.mac_ver()[0])
python_target = LooseVersion(
get_config_var('MACOSX_DEPLOYMENT_TARGET'))
if python_target < '10.9' and current_system >= '10.9':
os.environ['MACOSX_DEPLOYMENT_TARGET'] = '10.9'

repo_root = os.path.dirname(os.path.abspath(__file__))

try:
Expand Down

0 comments on commit 39dd4db

Please sign in to comment.