Skip to content

Commit

Permalink
yq http client restyling (ydb-platform#1982)
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik authored Feb 17, 2024
1 parent 71f7291 commit 63dfe8f
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 148 deletions.
174 changes: 92 additions & 82 deletions ydb/core/fq/libs/http_api_client/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@
ERROR_CODES = (500, 502, 504)


def requests_retry_session(session,
retries=MAX_RETRY_FOR_SESSION,
back_off_factor=BACK_OFF_FACTOR,
status_force_list=ERROR_CODES):
retry = Retry(total=retries, read=retries, connect=retries,
backoff_factor=back_off_factor,
status_forcelist=status_force_list,
allowed_methods=frozenset(['GET', 'POST']))
def requests_retry_session(
session, retries=MAX_RETRY_FOR_SESSION, back_off_factor=BACK_OFF_FACTOR, status_force_list=ERROR_CODES
):
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=back_off_factor,
status_forcelist=status_force_list,
allowed_methods=frozenset(["GET", "POST"]),
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session


class YQHttpClientConfig(object):
def __init__(self,
token: str | None = None,
project: str | None = None,
user_agent: str | None = "Python YQ HTTP SDK") -> None:

class YQHttpClientConfig:
def __init__(
self,
token: str | None = None,
project: str | None = None,
user_agent: str | None = "Python YQ HTTP SDK",
) -> None:
assert len(token) > 0, "empty token"
self.token = token
self.project = project
Expand All @@ -49,7 +53,7 @@ def __init__(self,


class YQHttpClientException(Exception):
def __init__(self, message: str, status: str, msg: str, details: Any) -> None:
def __init__(self, message: str, status: str = None, msg: str = None, details: Any = None) -> None:
super().__init__(message)
self.status = status
self.msg = msg
Expand All @@ -67,10 +71,11 @@ def __enter__(self):
def __exit__(self, *args):
self.session.close()

def close(self):
self.session.close()

def _build_headers(self, idempotency_key=None, request_id=None) -> dict[str, str]:
headers = {
"Authorization": f"{self.config.token_prefix}{self.config.token}"
}
headers = {"Authorization": f"{self.config.token_prefix}{self.config.token}"}
if idempotency_key is not None:
headers["Idempotency-Key"] = idempotency_key

Expand All @@ -96,47 +101,51 @@ def _compose_web_url(self, path: str) -> str:
return self.config.web_base_url + path

def _validate_http_error(self, response, expected_code=200) -> None:
logging.info(f"Response: {response.status_code}, {response.text}")
logging.debug("Response: %s, %s", response.status_code, response.text)
if response.status_code != expected_code:
if response.headers.get("Content-Type", "").startswith("application/json"):
body = response.json()
status = body.get("status")
msg = body.get("message")
details = body.get("details")
raise YQHttpClientException(f"Error occurred. http code={response.status_code}, status={status}, msg={msg}, details={details}",
status=status,
msg=msg,
details=details
)
raise YQHttpClientException(
f"Error occurred. http code={response.status_code}, status={status}, msg={msg}, details={details}",
status=status,
msg=msg,
details=details,
)

raise YQHttpClientException(f"Error occurred: {response.status_code}, {response.text}")

def create_query(self,
query_text=None,
type=None,
name=None,
description=None,
idempotency_key=None,
request_id=None,
expected_code=200):
body = dict()
def create_query(
self,
query_text=None,
query_type=None,
name=None,
description=None,
idempotency_key=None,
request_id=None,
expected_code=200,
):
body = {}
if query_text is not None:
body["text"] = query_text

if type is not None:
body["type"] = type
if query_type is not None:
body["type"] = query_type

if name is not None:
body["name"] = name

if description is not None:
body["description"] = description

response = self.session.post(self._compose_api_url("/api/fq/v1/queries"),
headers=self._build_headers(idempotency_key=idempotency_key,
request_id=request_id),
params=self._build_params(),
json=body)
response = self.session.post(
self._compose_api_url("/api/fq/v1/queries"),
headers=self._build_headers(idempotency_key=idempotency_key, request_id=request_id),
params=self._build_params(),
json=body,
)

self._validate_http_error(response, expected_code=expected_code)
return response.json()["id"]
Expand All @@ -145,7 +154,7 @@ def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
response = self.session.get(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/status"),
headers=self._build_headers(request_id=request_id),
params=self._build_params()
params=self._build_params(),
)

self._validate_http_error(response, expected_code=expected_code)
Expand All @@ -155,25 +164,25 @@ def get_query(self, query_id, request_id=None, expected_code=200) -> Any:
response = self.session.get(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}"),
headers=self._build_headers(request_id=request_id),
params=self._build_params()
params=self._build_params(),
)

self._validate_http_error(response, expected_code=expected_code)
return response.json()

def stop_query(self,
query_id: str,
idempotency_key: str | None = None,
request_id: str | None = None,
expected_code: int = 204) -> Any:

headers = self._build_headers(
idempotency_key=idempotency_key,
request_id=request_id
def stop_query(
self,
query_id: str,
idempotency_key: str | None = None,
request_id: str | None = None,
expected_code: int = 204,
) -> Any:
headers = self._build_headers(idempotency_key=idempotency_key, request_id=request_id)
response = self.session.post(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/stop"),
headers=headers,
params=self._build_params(),
)
response = self.session.post(self._compose_api_url(f"/api/fq/v1/queries/{query_id}/stop"),
headers=headers,
params=self._build_params())
self._validate_http_error(response, expected_code=expected_code)
return response

Expand Down Expand Up @@ -201,26 +210,26 @@ def wait_query_to_complete(self, query_id, execution_timeout=None, stop_on_timeo

def wait_query_to_succeed(self, query_id, execution_timeout=None, stop_on_timeout=False) -> int:
status = self.wait_query_to_complete(
query_id=query_id,
execution_timeout=execution_timeout,
stop_on_timeout=stop_on_timeout
query_id=query_id, execution_timeout=execution_timeout, stop_on_timeout=stop_on_timeout
)

query = self.get_query(query_id)
if status != "COMPLETED":
issues = query["issues"]
raise RuntimeError(f"Query {query_id} failed", issues=issues)
raise RuntimeError(f"Query {query_id} failed with issues={issues}")

return len(query["result_sets"])

def get_query_result_set_page(self,
query_id,
result_set_index,
offset=None,
limit=None,
raw_format=False,
request_id=None,
expected_code=200) -> Any:
def get_query_result_set_page(
self,
query_id,
result_set_index,
offset=None,
limit=None,
raw_format=False,
request_id=None,
expected_code=200,
) -> Any:
params = self._build_params()
if offset is not None:
params["offset"] = offset
Expand All @@ -231,7 +240,7 @@ def get_query_result_set_page(self,
response = self.session.get(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/results/{result_set_index}"),
headers=self._build_headers(request_id=request_id),
params=params
params=params,
)

self._validate_http_error(response, expected_code=expected_code)
Expand All @@ -244,11 +253,7 @@ def get_query_result_set(self, query_id: str, result_set_index: int, raw_format:
rows = []
while True:
part = self.get_query_result_set_page(
query_id,
result_set_index=result_set_index,
offset=offset,
limit=limit,
raw_format=raw_format
query_id, result_set_index=result_set_index, offset=offset, limit=limit, raw_format=raw_format
)

if columns is None:
Expand All @@ -264,17 +269,15 @@ def get_query_result_set(self, query_id: str, result_set_index: int, raw_format:
result = {"rows": rows, "columns": columns}
if raw_format:
return result

return YQResults(result).results

def get_query_all_result_sets(self, query_id: str, result_set_count: int, raw_format: bool = False) -> Any:
result = list()
def get_query_all_result_sets(
self, query_id: str, result_set_count: int, raw_format: bool = False
) -> Any:
result = []
for i in range(0, result_set_count):
r = self.get_query_result_set(
query_id,
result_set_index=i,
raw_format=raw_format
)
r = self.get_query_result_set(query_id, result_set_index=i, raw_format=raw_format)

if result_set_count == 1:
return r
Expand All @@ -289,3 +292,10 @@ def get_openapi_spec(self) -> str:

def compose_query_web_link(self, query_id) -> str:
return self._compose_web_url(f"/folders/{self.config.project}/ide/queries/{query_id}")

@staticmethod
def result_set_to_dataframe(data):
import pandas as pd

column_names = [column["name"] for column in data["columns"]]
return pd.DataFrame(data["rows"], columns=column_names)
Loading

0 comments on commit 63dfe8f

Please sign in to comment.