Skip to content

Commit

Permalink
Releases v0.11.6.5 (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Aug 26, 2024
1 parent 5eb1025 commit 5e1c7f4
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 177 deletions.
2 changes: 1 addition & 1 deletion odps/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version_info = (0, 11, 6, 4)
version_info = (0, 11, 6, 5)
_num_index = max(idx if isinstance(v, int) else 0
for idx, v in enumerate(version_info))
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \
Expand Down
212 changes: 142 additions & 70 deletions odps/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A couple of authentication types in ODPS.
"""
"""A couple of authentication types in ODPS."""

import base64
import hmac
import calendar
import hashlib
import hmac
import json
import logging
import os
import threading
Expand All @@ -27,13 +28,12 @@

import requests

from .compat import six, cgi, urlparse, unquote, parse_qsl
from . import options, utils

from .compat import cgi, parse_qsl, six, unquote, urlparse

logger = logging.getLogger(__name__)

DEFAULT_BEARER_TOKEN_HOURS = 5
DEFAULT_TEMP_ACCOUNT_HOURS = 5


class BaseAccount(object):
Expand Down Expand Up @@ -138,20 +138,6 @@ def sign_request(self, req, endpoint, region_name=None):
logger.debug('headers after signing: %r', req.headers)


class StsAccount(AliyunAccount):
"""
Account of sts
"""
def __init__(self, access_id, secret_access_key, sts_token):
super(StsAccount, self).__init__(access_id, secret_access_key)
self.sts_token = sts_token

def sign_request(self, req, endpoint, region_name=None):
super(StsAccount, self).sign_request(req, endpoint, region_name=region_name)
if self.sts_token:
req.headers['authorization-sts-token'] = self.sts_token


class AppAccount(BaseAccount):
"""
Account for applications.
Expand Down Expand Up @@ -352,77 +338,152 @@ def sign_request(self, req, endpoint, region_name=None):
)


class BearerTokenAccount(BaseAccount):
class TempAccountMixin(object):
def __init__(self, expired_hours=DEFAULT_TEMP_ACCOUNT_HOURS):
self._last_modified_time = datetime.now()
if expired_hours is not None:
self._expired_time = timedelta(hours=expired_hours)
else:
self._expired_time = None
self.reload()

def _is_account_valid(self):
raise NotImplementedError

def _reload_account(self):
raise NotImplementedError

def reload(self, force=False):
t = datetime.now()
if (
force
or not self._is_account_valid()
or (
self._last_modified_time is not None
and self._expired_time is not None
and (t - self._last_modified_time) > self._expired_time
)
):
self._last_modified_time = self._reload_account() or datetime.now()


class StsAccount(TempAccountMixin, AliyunAccount):
"""
Account of sts
"""

def __init__(
self, token=None, expired_hours=DEFAULT_BEARER_TOKEN_HOURS, get_bearer_token_fun=None
self,
access_id,
secret_access_key,
sts_token,
expired_hours=DEFAULT_TEMP_ACCOUNT_HOURS,
):
self._get_bearer_token = get_bearer_token_fun or self.get_default_bearer_token
self._token = token or self._get_bearer_token()
self._reload_bearer_token_time()
self.sts_token = sts_token
AliyunAccount.__init__(self, access_id, secret_access_key)
TempAccountMixin.__init__(self, expired_hours=expired_hours)

self._expired_time = timedelta(hours=expired_hours)
@classmethod
def from_environments(cls):
expired_hours = int(
os.getenv("ODPS_STS_TOKEN_HOURS", str(DEFAULT_TEMP_ACCOUNT_HOURS))
)
if "ODPS_STS_ACCOUNT_FILE" in os.environ or "ODPS_STS_TOKEN" in os.environ:
if "ODPS_STS_ACCOUNT_FILE" not in os.environ:
expired_hours = None
return cls(None, None, None, expired_hours=expired_hours)
return None

def sign_request(self, req, endpoint, region_name=None):
self.reload()
super(StsAccount, self).sign_request(req, endpoint, region_name=region_name)
if self.sts_token:
req.headers["authorization-sts-token"] = self.sts_token

def _is_account_valid(self):
return self.sts_token is not None

def _resolve_expiration(self, exp_data):
if exp_data is None or self._expired_time is None:
return None
try:
ts = calendar.timegm(time.strptime(exp_data, "%Y-%m-%dT%H:%M:%SZ"))
return ts - self._expired_time.total_seconds()
except:
return None

def _reload_account(self):
ts = None
if "ODPS_STS_ACCOUNT_FILE" in os.environ:
token_file_name = os.getenv("ODPS_STS_ACCOUNT_FILE")
if token_file_name and os.path.exists(token_file_name):
with open(token_file_name, "r") as token_file:
token_json = json.load(token_file)
self.access_id = token_json["accessKeyId"]
self.secret_access_key = token_json["accessKeySecret"]
self.sts_token = token_json["securityToken"]
ts = self._resolve_expiration(token_json.get("expiration"))
elif "ODPS_STS_ACCESS_KEY_ID" in os.environ:
self.access_id = os.getenv("ODPS_STS_ACCESS_KEY_ID")
self.secret_access_key = os.getenv("ODPS_STS_ACCESS_KEY_SECRET")
self.sts_token = os.getenv("ODPS_STS_TOKEN")

return datetime.fromtimestamp(ts) if ts is not None else None


class BearerTokenAccount(TempAccountMixin, BaseAccount):
def __init__(
self, token=None, expired_hours=DEFAULT_TEMP_ACCOUNT_HOURS, get_bearer_token_fun=None
):
self.token = token
self._custom_bearer_token_func = get_bearer_token_fun
TempAccountMixin.__init__(self, expired_hours=expired_hours)

@classmethod
def from_environments(cls):
expired_hours = int(os.getenv('ODPS_BEARER_TOKEN_HOURS', str(DEFAULT_BEARER_TOKEN_HOURS)))
expired_hours = int(os.getenv('ODPS_BEARER_TOKEN_HOURS', str(DEFAULT_TEMP_ACCOUNT_HOURS)))
kwargs = {"expired_hours": expired_hours}
if 'ODPS_BEARER_TOKEN' in os.environ:
return cls(os.environ['ODPS_BEARER_TOKEN'], **kwargs)
elif 'ODPS_BEARER_TOKEN_FILE' in os.environ:
if "ODPS_BEARER_TOKEN_FILE" in os.environ:
return cls(**kwargs)
elif "ODPS_BEARER_TOKEN" in os.environ:
kwargs["expired_hours"] = None
return cls(os.environ["ODPS_BEARER_TOKEN"], **kwargs)
return None

@staticmethod
def get_default_bearer_token():
def _get_bearer_token(self):
if self._custom_bearer_token_func is not None:
return self._custom_bearer_token_func()

token_file_name = os.getenv("ODPS_BEARER_TOKEN_FILE")
if token_file_name and os.path.exists(token_file_name):
with open(token_file_name, "r") as token_file:
return token_file.read().strip()
else: # pragma: no cover
from cupid.runtime import context, RuntimeContext

from cupid.runtime import context, RuntimeContext

if not RuntimeContext.is_context_ready():
return
cupid_context = context()
return cupid_context.get_bearer_token()

def get_bearer_token_and_timestamp(self):
self._check_bearer_token()
return self._token, self._last_modified_time.timestamp()

def _reload_bearer_token_time(self):
if "ODPS_BEARER_TOKEN_TIMESTAMP_FILE" in os.environ:
with open(os.getenv("ODPS_BEARER_TOKEN_TIMESTAMP_FILE"), "r") as ts_file:
self._last_modified_time = datetime.fromtimestamp(float(ts_file.read()))
else:
self._last_modified_time = datetime.now()

def _check_bearer_token(self):
t = datetime.now()
if self._last_modified_time is None:
token = self._get_bearer_token()
if token is None:
return
if token != self._token:
self._token = token
self._reload_bearer_token_time()
elif (t - self._last_modified_time) > self._expired_time:
token = self._get_bearer_token()
if token is None:
if not RuntimeContext.is_context_ready():
return
self._token = token
self._reload_bearer_token_time()
cupid_context = context()
return cupid_context.get_bearer_token()

@property
def token(self):
return self._token
def _is_account_valid(self):
return self.token is not None

def _reload_account(self):
token = self._get_bearer_token()
self.token = token
try:
resolved_token_parts = base64.b64decode(token).decode().split(",")
return datetime.fromtimestamp(int(resolved_token_parts[2]))
except:
return None

def sign_request(self, req, endpoint, region_name=None):
self._check_bearer_token()
self.reload()
url = req.url[len(endpoint):]
url_components = urlparse(unquote(url), allow_fragments=False)
self._build_canonical_str(url_components, req)
req.headers['x-odps-bearer-token'] = self._token
req.headers['x-odps-bearer-token'] = self.token
logger.debug('headers after signing: %r', req.headers)


Expand All @@ -432,11 +493,22 @@ def __init__(self, credential_provider):
super(CredentialProviderAccount, self).__init__(None, None, None)

def sign_request(self, req, endpoint, region_name=None):
credential = self.provider.get_credentials()
get_cred_method = getattr(self.provider, "get_credential", None) or getattr(
self.provider, "get_credentials"
)
credential = get_cred_method()

self.access_id = credential.get_access_key_id()
self.secret_access_key = credential.get_access_key_secret()
self.sts_token = credential.get_security_token()
return super(CredentialProviderAccount, self).sign_request(
req, endpoint, region_name=region_name
)


def from_environments():
for account_cls in (StsAccount, BearerTokenAccount):
account = account_cls.from_environments()
if account is not None:
break
return account
8 changes: 0 additions & 8 deletions odps/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ def odps_with_schema():
pytest.skip("ODPS project with schema not defined")


@pytest.fixture(scope="session")
def odps_with_schema_tenant():
try:
return get_config().odps_with_schema_tenant
except AttributeError:
pytest.skip("ODPS project with schema configured on tenants not defined")


@pytest.fixture(scope="session")
def odps_with_tunnel_quota():
try:
Expand Down
27 changes: 16 additions & 11 deletions odps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,14 @@ def _init(
if account is None:
if access_id is not None:
self.account = self._build_account(access_id, secret_access_key)
elif accounts.BearerTokenAccount.from_environments():
self.account = accounts.BearerTokenAccount.from_environments()
elif options.account is not None:
self.account = options.account
else:
raise TypeError('`access_id` and `secret_access_key` should be provided.')
self.account = accounts.from_environments()
if self.account is None:
raise TypeError(
"`access_id` and `secret_access_key` should be provided."
)
else:
self.account = account
self.endpoint = (
Expand Down Expand Up @@ -2544,14 +2546,17 @@ def from_global(cls):
@classmethod
def from_environments(cls):
try:
account = accounts.BearerTokenAccount.from_environments()
if not account:
raise KeyError('ODPS_BEARER_TOKEN')
project = os.getenv('ODPS_PROJECT_NAME')
endpoint = os.environ['ODPS_ENDPOINT']
tunnel_endpoint = os.getenv('ODPS_TUNNEL_ENDPOINT')
return cls(None, None, account=account, project=project,
endpoint=endpoint, tunnel_endpoint=tunnel_endpoint)
project = os.getenv("ODPS_PROJECT_NAME")
endpoint = os.environ["ODPS_ENDPOINT"]
tunnel_endpoint = os.getenv("ODPS_TUNNEL_ENDPOINT")
return cls(
None,
None,
account=accounts.from_environments(),
project=project,
endpoint=endpoint,
tunnel_endpoint=tunnel_endpoint,
)
except KeyError:
return None

Expand Down
2 changes: 1 addition & 1 deletion odps/df/tests/test_delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_async_execute(setup):
def make_filter(df, cnt):
def waiter(val, c):
import time
time.sleep(5 * c)
time.sleep(30 * c)
return val

f_df = df[df.value == cnt]
Expand Down
12 changes: 8 additions & 4 deletions odps/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ class NoSuchObject(ServerDefinedException):
pass


class NoSuchProject(NoSuchObject):
pass


class NoSuchPartition(NoSuchObject):
pass

Expand All @@ -241,6 +245,10 @@ class InvalidArgument(ServerDefinedException):
pass


class AuthenticationRequestExpired(ServerDefinedException):
pass


class AuthorizationRequired(ServerDefinedException):
pass

Expand Down Expand Up @@ -363,10 +371,6 @@ class SecurityQueryError(ODPSError):
pass


class NoSuchProject(ODPSError):
pass


class OSSSignUrlError(ODPSError):
def __init__(self, err):
if isinstance(err, six.string_types):
Expand Down
Loading

0 comments on commit 5e1c7f4

Please sign in to comment.