Skip to content

Commit

Permalink
Add support for Oauth to enable GCS access
Browse files Browse the repository at this point in the history
  • Loading branch information
dcfocus authored and caithagoras0 committed Jul 1, 2020
1 parent 54e7c03 commit b87744e
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ python:
- "2.7"
- "3.5"
- "3.6"
- "pypy-5.4"
- "3.7"
- "pypy3.6-7.0.0"
env:
- PRESTO_VERSION=0.202
services:
Expand Down
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Introduction

This package provides a client interface to query [Presto](https://prestodb.io/)
a distributed SQL engine. It supports Python 2.7, 3.5, 3.6, and pypy.
a distributed SQL engine. It supports Python 2.7, 3.5, 3.6, 3.7, and pypy.

# Installation

Expand Down Expand Up @@ -56,6 +56,36 @@ cur.execute('SELECT * FROM system.runtime.nodes')
rows = cur.fetchall()
```

# Oauth Authentication
To enable GCS access, Oauth authentication support is added by passing in a `shadow.json` file of a service account.
Following example shows a use case where both Kerberos and Oauth authentication are enabled.

```python
import getpass
import prestodb
from prestodb.client import PrestoRequest, PrestoQuery
from requests_kerberos import DISABLED

kerberos_auth = prestodb.auth.KerberosAuthentication(
mutual_authentication=DISABLED,
service_name='kerberos service name',
force_preemptive=True,
hostname_override='example.com'
)

req = PrestoRequest(
host='GCP coordinator url',
port=443,
user=getpass.getuser(),
service_account_file='Service account json file path',
http_scheme='https',
auth=kerberos_auth
)

query = PrestoQuery(req, "SELECT * FROM system.runtime.nodes")
rows = list(query.execute())
```

# Transactions
The client runs by default in *autocommit* mode. To enable transactions, set
*isolation_level* to a value different than `IsolationLevel.AUTOCOMMIT`:
Expand Down
31 changes: 30 additions & 1 deletion prestodb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def __init__(
max_attempts=MAX_ATTEMPTS, # type: int
request_timeout=constants.DEFAULT_REQUEST_TIMEOUT, # type: Union[float, Tuple[float, float]]
handle_retry=exceptions.RetryWithExponentialBackoff(),
service_account_file=None,
):
# type: (...) -> None
self._client_session = ClientSession(
Expand All @@ -230,6 +231,19 @@ def __init__(
else:
# mypy cannot follow module import
self._http_session = self.http.Session() # type: ignore

self.credentials = None
self.auth_req = None
if service_account_file is not None:
import google.auth.transport.requests
from google.oauth2 import service_account

self.auth_req = google.auth.transport.requests.Request()
self.credentials = service_account.Credentials.from_service_account_file(
service_account_file, scopes=[constants.GCS_READ_ONLY]
)
self._http_session.headers.update(self.get_oauth_token())

self._http_session.headers.update(self.http_headers)
self._exceptions = self.HTTP_EXCEPTIONS
self._auth = auth
Expand Down Expand Up @@ -422,6 +436,17 @@ def process(self, http_response):
columns=response.get("columns"),
)

@property
def http_session(self):
return self._http_session

def get_oauth_token(self):
self.credentials.refresh(self.auth_req)
return {
constants.PRESTO_EXTRA_CREDENTIAL: "%s = %s"
% (constants.GCS_CREDENTIALS_OAUTH_TOKEN_KEY, self.credentials.token)
}


class PrestoResult(object):
"""
Expand Down Expand Up @@ -466,12 +491,13 @@ def __init__(
sql, # type: Text
):
# type: (...) -> None
self.auth_req = request.auth_req # type: Optional[Request]
self.credentials = request.credentials # type: Optional[Credentials]
self.query_id = None # type: Optional[Text]

self._stats = {} # type: Dict[Any, Any]
self._warnings = [] # type: List[Dict[Any, Any]]
self._columns = None # type: Optional[List[Text]]

self._finished = False
self._cancelled = False
self._request = request
Expand Down Expand Up @@ -506,6 +532,9 @@ def execute(self):
if self._cancelled:
raise exceptions.PrestoUserError("Query has been cancelled", self.query_id)

if self.credentials is not None and not self.credentials.valid:
self._request.http_session.headers.update(self._request.get_oauth_token())

response = self._request.post(self._sql)
status = self._request.process(response)
self.query_id = status.id
Expand Down
5 changes: 5 additions & 0 deletions prestodb/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@

HEADER_STARTED_TRANSACTION = HEADER_PREFIX + "Started-Transaction-Id"
HEADER_TRANSACTION = HEADER_PREFIX + "Transaction-Id"

PRESTO_EXTRA_CREDENTIAL = "X-Presto-Extra-Credential"
GCS_CREDENTIALS_OAUTH_TOKEN_KEY = "hive.gcs.oauth"

GCS_READ_ONLY = "https://www.googleapis.com/auth/devstorage.read_only"
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

kerberos_require = ["requests_kerberos"]

all_require = [kerberos_require]
google_auth_require = ["google_auth"]

all_require = [kerberos_require, google_auth_require]

tests_require = all_require + ["httpretty", "pytest", "pytest-runner"]

Expand Down Expand Up @@ -70,6 +72,7 @@
extras_require={
"all": all_require,
"kerberos": kerberos_require,
"google_auth": google_auth_require,
"tests": tests_require,
':python_version=="2.7"': py27_require,
},
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py27,py35,py36,pypy2
envlist = py27,py35,py36,py37,pypy2

[testenv]
deps = pytest
Expand Down

0 comments on commit b87744e

Please sign in to comment.