Skip to content

Commit

Permalink
Releases v0.11.5.beta1 (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Nov 10, 2023
1 parent c5b897f commit 6a13d98
Show file tree
Hide file tree
Showing 123 changed files with 6,250 additions and 1,703 deletions.
43 changes: 23 additions & 20 deletions cupid/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from odps import options
from odps.config import default_options, options

DEFAULT_PROXY_ENDPOINT = 'open.maxcompute.aliyun.com'
options = options

options.register_option('cupid.major_task_version', 'cupid_v2')
options.register_option('cupid.wait_am_start_time', 600)
options.register_option('cupid.use_bearer_token', None)
options.register_option('cupid.settings', None)
options.register_option('cupid.mp_buffer_size', 1024 * 64)
if not hasattr(default_options, "cupid"):
default_options.register_option('cupid.major_task_version', 'cupid_v2')
default_options.register_option('cupid.wait_am_start_time', 600)
default_options.register_option('cupid.use_bearer_token', None)
default_options.register_option('cupid.settings', None)
default_options.register_option('cupid.mp_buffer_size', 1024 * 64)

options.register_option('cupid.proxy_endpoint', 'open.maxcompute.aliyun.com')
options.register_option('cupid.worker.virtual_resource', None)
options.register_option('cupid.master.virtual_resource', None)
options.register_option('cupid.master_type', 'kubernetes')
options.register_option('cupid.application_type', 'mars')
options.register_option('cupid.engine_running_type', 'default')
options.register_option('cupid.container_node_label', None)
options.register_option('cupid.job_duration_hours', 25920)
options.register_option('cupid.channel_init_timeout_seconds', 120)
options.register_option('cupid.kube.master_mode', 'cupid')
options.register_option('cupid.runtime.endpoint', None)
default_options.register_option('cupid.proxy_endpoint', DEFAULT_PROXY_ENDPOINT)
default_options.register_option('cupid.worker.virtual_resource', None)
default_options.register_option('cupid.master.virtual_resource', None)
default_options.register_option('cupid.master_type', 'kubernetes')
default_options.register_option('cupid.application_type', 'mars')
default_options.register_option('cupid.engine_running_type', 'default')
default_options.register_option('cupid.container_node_label', None)
default_options.register_option('cupid.job_duration_hours', 25920)
default_options.register_option('cupid.channel_init_timeout_seconds', 120)
default_options.register_option('cupid.kube.master_mode', 'cupid')
default_options.register_option('cupid.runtime.endpoint', None)

# mars app config
options.register_option('cupid.image_prefix', None)
options.register_option('cupid.image_version', 'v0.11.1')
# mars app config
default_options.register_option('cupid.image_prefix', None)
default_options.register_option('cupid.image_version', 'v0.11.1')
2 changes: 1 addition & 1 deletion cupid/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_proxied_url(self, instance, app_name, expired_in_hours=None):
def start_kubernetes(self, async_=False, priority=None, running_cluster=None,
proxy_endpoint=None, major_task_version=None,
app_command=None, app_image=None, resources=None, **kw):
priority = priority or options.priority
priority = priority if priority is not None else options.priority
if priority is None and options.get_priority is not None:
priority = options.get_priority(self.odps)
menginetype = options.cupid.engine_running_type
Expand Down
19 changes: 14 additions & 5 deletions docs/source/base-instances.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,26 @@ Task如SQLTask是ODPS的基本计算单元,当一个Task在执行时会被实
基本操作
--------

可以调用 ``list_instances`` 来获取项目空间下的所有instance,``exist_instance`` 能判断是否存在某instance,
可以调用 ``list_instances`` 来获取项目空间下的所有instance, ``exist_instance`` 能判断是否存在某instance,
``get_instance`` 能获取实例。

.. code-block:: python
>>> for instance in o.list_instances():
>>> print(instance.id)
>>> o.exist_instance('my_instance_id')
>>> if o.exist_instance('<my_instance_id>'):
>>> print("Instance <my_instance_id> exists!")
停止一个instance可以在odps入口使用 ``stop_instance``,或者对instance对象调用 ``stop`` 方法。
停止一个instance可以在odps入口使用 ``stop_instance``,或者对 instance 对象调用 ``stop`` 方法:

.. code-block:: python
>>> # 方法1:使用 stop_instance
>>> o.exist_instance('<my_instance_id>')
>>> # 方法2:使用 instance 的 stop 方法
>>> instance = o.get_instance('<my_instance_id>')
>>> instance.stop()
.. _logview:

Expand All @@ -37,7 +46,7 @@ Task如SQLTask是ODPS的基本计算单元,当一个Task在执行时会被实
>>> instance = o.get_instance('2016042605520945g9k5pvyi2')
>>> print(instance.get_logview_address())
对于 XFlow 任务,需要枚举其子任务,再获取子任务的 LogView
对于 XFlow 任务,需要枚举其子任务,再获取子任务的 LogView。更多细节可以参考 :ref:`XFlow 和模型 <models>` 。

.. code-block:: python
Expand Down Expand Up @@ -65,7 +74,7 @@ Task如SQLTask是ODPS的基本计算单元,当一个Task在执行时会被实
'Terminated'
调用 ``wait_for_completion`` 方法会阻塞直到instance执行完成``wait_for_success`` 方法同样会阻塞,不同的是,
调用 ``wait_for_completion`` 方法会阻塞直到instance执行完成``wait_for_success`` 方法同样会阻塞,不同的是,
如果最终任务执行失败,则会抛出相关异常。

子任务操作
Expand Down
33 changes: 28 additions & 5 deletions docs/source/base-schemas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ Schema

Schema 基本操作
----------------
你可以使用 ``create_schema`` 创建一个 Schema 对象:
你可以使用 ``exist_schema`` 判断 Schema 对象是否存在:

.. code-block:: python
print(o.exist_schema("test_schema"))
使用 ``create_schema`` 创建一个 Schema 对象:

.. code-block:: python
Expand All @@ -26,12 +32,19 @@ Schema 基本操作
schema = o.delete_schema("test_schema")
使用 ``list_schema`` 列举所有 Schema 对象:
使用 ``get_schema`` 获得一个 Schema 对象并打印 Schema Owner:

.. code-block:: python
schema = o.get_schema("test_schema")
print(schema.owner)
使用 ``list_schema`` 列举所有 Schema 对象并打印名称:

.. code-block:: python
for schema in o.list_schema():
print(schema)
print(schema.name)
操作 Schema 中的对象
-------------------
Expand All @@ -40,8 +53,18 @@ Schema 基本操作

.. code-block:: python
o = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
endpoint='**your-end-point**', schema='**your-schema-name**')
import os
from odps import ODPS
# 保证 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='**your-project**',
endpoint='**your-endpoint**',
schema='**your-schema-name**',
)
也可以为不同对象的操作方法指定 ``schema`` 参数。例如,下面的方法列举了 ``test_schema``
下所有的表:
Expand Down
61 changes: 50 additions & 11 deletions docs/source/base-sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@ PyODPS支持ODPS SQL的查询,并可以读取执行的结果。 ``execute_sql`
执行 SQL
--------

你可以使用 ``execute_sql`` 方法以同步方式执行 SQL。调用时,该方法会阻塞直至 SQL 执行完成。
你可以使用 ``execute_sql`` 方法以同步方式执行 SQL。调用时,该方法会阻塞直至 SQL 执行完成,并返回一个
Instance 实例。如果 SQL 执行报错,该方法会抛出以 ``odps.errors.ODPSError`` 为基类的错误。

.. code-block:: python
>>> o.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
你也可以使用非阻塞方式异步执行 SQL。调用时,该方法在将 SQL 提交到 MaxCompute 后即返回 Instance
对象。你需要使用 ``wait_for_success`` 方法等待该 SQL 执行完成。
实例。你需要使用 ``wait_for_success`` 方法等待该 SQL 执行完成。同样地,如果 instance 出现错误,
``wait_for_success`` 会抛出以 ``odps.errors.ODPSError`` 为基类的错误。

.. code-block:: python
>>> instance = o.run_sql('select * from dual') # 异步的方式执行
>>> print(instance.get_logview_address()) # 获取logview地址
>>> instance.wait_for_success() # 阻塞直到完成
关于如何操作 run_sql / execute_sql 返回的 Instance 实例,可以参考 :ref:`运行实例 <instances>` 。

使用 MCQA 执行 SQL
-------------------
Expand Down Expand Up @@ -73,6 +76,42 @@ MCQA Instance,你需要自行等待 Instance 完成。需要注意的是,该
.. _sql_hints:

设置时区
---------
有时我们希望对于查询出来的时间数据显示为特定时区下的时间,可以通过 ``options.local_timezone`` 设置客户端的时区。

``options.local_timezone`` 可设置为以下三种类型:

* ``False``:使用 UTC 时间(默认设置)。
* ``True``:使用本地时区。
* 时区字符串:使用指定的时区,例如 ``Asia/Shanghai``。

例如,使用 UTC 时间:

.. code-block:: python
>>> from odps import options
>>> options.local_timezone = False
使用本地时区:

.. code-block:: python
>>> from odps import options
>>> options.local_timezone = True
使用 ``Asia/Shanghai``:

.. code-block:: python
>>> from odps import options
>>> options.local_timezone = "Asia/Shanghai"
.. note::

设置 ``options.local_timezone`` 后,PyODPS 会根据它的值自动设置 ``odps.sql.timezone``。
两者的值不同可能导致服务端和客户端时间不一致,因此不应再手动设置 ``odps.sql.timezone``。

设置运行参数
------------

Expand Down Expand Up @@ -127,10 +166,15 @@ MCQA Instance,你需要自行等待 Instance 完成。需要注意的是,该
>>> for record in reader:
>>> # 处理每一个record
PyODPS 默认不限制能够从 Instance 读取的数据规模。对于受保护的 Project,通过 Tunnel 下载数据受限。此时,
如果 `options.tunnel.limit_instance_tunnel` 未设置,会自动打开数据量限制。此时,可下载的数据条数受到 Project 配置限制,
通常该限制为 10000 条。如果你想要手动限制下载数据的规模,可以为 open_reader 方法增加 `limit` 选项,
或者设置 `options.tunnel.limit_instance_tunnel = True` 。
PyODPS 默认不限制能够从 Instance 读取的数据规模,但 Project Owner 可能在 MaxCompute Project 上增加保护设置以限制对
Instance 结果的读取,此时只能使用受限读取模式读取数据,在此模式下可读取的行数受到 Project 配置限制,通常为 10000 行。如果
PyODPS 检测到读取 Instance 数据被限制,且 `options.tunnel.limit_instance_tunnel` 未设置,会自动启用受限读取模式。
如果你的 Project 被保护,想要手动启用受限读取模式,可以为 `open_reader` 方法增加 `limit=True` 选项,或者设置
`options.tunnel.limit_instance_tunnel = True` 。

在部分环境中,例如 DataWorks,`options.tunnel.limit_instance_tunnel` 可能默认被置为 True。此时,如果需要读取所有数据,需要为
`open_reader` 增加参数 `tunnel=True, limit=False` 。需要注意的是,如果 Project 本身被保护,这两个参数 **不能**
解除保护,此时应联系 Project Owner 开放相应的读权限。

如果你所使用的 MaxCompute 只能支持旧 Result 接口,同时你需要读取所有数据,可将 SQL 结果写入另一张表后用读表接口读取
(可能受到 Project 安全设置的限制)。
Expand All @@ -148,11 +192,6 @@ PyODPS 默认不限制能够从 Instance 读取的数据规模。对于受保护

如果需要使用多核加速读取速度,可以通过 `n_process` 指定使用进程数:

.. note::

目前多进程加速在 Windows 下无法使用。


.. code-block:: python
>>> import multiprocessing
Expand Down
53 changes: 44 additions & 9 deletions docs/source/base-sqlalchemy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,43 @@ PyODPS 支持集成 SQLAlchemy,可以使用 SQLAlchemy 查询 MaxCompute 数

.. code-block:: python
from sqlalchemy import create_engine
engine = create_engine('odps://<access_id>:<access_key>@<project>')
import os
from sqlalchemy import create_engine
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
conn_string = 'odps://%s:%s@<project>' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
要在连接串中指定 ``endpoint``,可以按如下方式:

.. code-block:: python
from sqlalchemy import create_engine
engine = create_engine('odps://<access_id>:<access_key>@<project>/?endpoint=<endpoint>')
import os
from sqlalchemy import create_engine
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
这里把 ``<access_id>`` 等替换成相应的账号。

对于已有的 ODPS 对象 ``o`` ,调用 ``o.to_global()`` 设为全局账号后,在连接串中就不需要指定了。

.. code-block:: python
from sqlalchemy import create_engine
o.to_global() # set ODPS object as global one
engine = create_engine('odps://')
from sqlalchemy import create_engine
o.to_global() # set ODPS object as global one
engine = create_engine('odps://')
接着创建连接。

Expand All @@ -44,18 +62,35 @@ PyODPS 支持集成 SQLAlchemy,可以使用 SQLAlchemy 查询 MaxCompute 数

.. code-block:: python
import os
from odps import options
from sqlalchemy import create_engine
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
options.sql.settings = {'odps.sql.hive.compatible': 'true'}
engine = create_engine('odps://<access_id>:<access_key>@<project>/?endpoint=<endpoint>')
engine = create_engine(conn_string)
也可以直接配置在连接字符串中:

.. code-block:: python
import os
from sqlalchemy import create_engine
engine = create_engine('odps://<access_id>:<access_key>@<project>/?endpoint=<endpoint>&odps.sql.hive.compatible=true')
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>&odps.sql.hive.compatible=true' % (
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
)
engine = create_engine(conn_string)
使用上述方式时,每个 engine 对象都会拥有不同的选项。

Expand Down
Loading

0 comments on commit 6a13d98

Please sign in to comment.