-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding demo CLI for fetching tokens using the device auth flow
- Loading branch information
1 parent
f959091
commit aaf8386
Showing
2 changed files
with
110 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import os | ||
import requests | ||
import jwt | ||
import time | ||
import json | ||
import webbrowser | ||
import http.server | ||
from urllib.parse import urlencode, parse_qs | ||
|
||
def get_discovery_document(url): | ||
response = requests.get(url) | ||
return response.json() | ||
|
||
|
||
def request_device_code(discovery_doc, client_credentials): | ||
data={} | ||
data.update(client_credentials) | ||
data.update({'scope': 'offline_access' }) | ||
response = requests.post( | ||
discovery_doc["device_authorization_endpoint"], data=data | ||
) | ||
if response.status_code == 200: | ||
return response.json() | ||
else: | ||
print(f"Failed to obtain device code. Status code: {response.status_code}") | ||
print(f"Response content: {response.content}") | ||
return None | ||
|
||
|
||
|
||
def poll_for_access_token( | ||
discovery_doc, client_credentials, device_code, interval, timeout | ||
): | ||
payload = { | ||
"grant_type": "urn:ietf:params:oauth:grant-type:device_code", | ||
"device_code": device_code, | ||
"client_id": client_credentials["client_id"] | ||
} | ||
start_time = time.time() | ||
print("\nWaiting for device to be authorized ...") | ||
while time.time() - start_time < timeout: | ||
response = requests.post(discovery_doc["token_endpoint"], data=payload) | ||
if response.status_code == 200: | ||
return response.json() | ||
elif response.status_code == 400: | ||
error = response.json().get("error", "") | ||
if error == "authorization_pending": | ||
print("Waiting for device to be authorized ...") | ||
time.sleep(interval) | ||
else: | ||
return None | ||
else: | ||
return None | ||
|
||
|
||
def device_login(discovery_doc, client_credentials): | ||
device_code_response = request_device_code(discovery_doc, client_credentials) | ||
if device_code_response: | ||
verification_uri_complete = device_code_response.get("verification_uri_complete", None) | ||
webbrowser.open(verification_uri_complete) | ||
if verification_uri_complete: | ||
print( | ||
f"The following URL has been opened in your browser: {verification_uri_complete}" | ||
) | ||
else: | ||
print(f"Something went wrong") | ||
|
||
|
||
interval = device_code_response["interval"] | ||
timeout = device_code_response["expires_in"] | ||
device_login = poll_for_access_token( | ||
discovery_doc, | ||
client_credentials, | ||
device_code_response["device_code"], | ||
interval, | ||
timeout, | ||
) | ||
if device_login: | ||
print("Device login successfull") | ||
return device_login | ||
else: | ||
print("Failed to obtain an device login") | ||
return None | ||
else: | ||
print("Failed to obtain device code") | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from pprint import pprint | ||
|
||
from helpers import ( | ||
get_discovery_document, | ||
device_login, | ||
) | ||
|
||
discovery_doc = get_discovery_document("https://auth.apex.esa.int/realms/apex/.well-known/openid-configuration") | ||
|
||
client_credentials ={ | ||
"client_id": "project-a-catalogue-dev-browser" | ||
} | ||
|
||
response = device_login(discovery_doc, client_credentials) | ||
|
||
print("\n\nScopes: %s" % response["scope"]) | ||
|
||
print("\naccess token ( Expires after %s seconds ):\n-------------------------------------------------------------------------------------------------------------------" % response["expires_in"]) | ||
print(response["access_token"]) | ||
print("-------------------------------------------------------------------------------------------------------------------") | ||
|
||
print("\nRefresh token ( Expires after %s seconds ):\n-------------------------------------------------------------------------------------------------------------------" % response["refresh_expires_in"]) | ||
print(response["refresh_token"]) | ||
print("-------------------------------------------------------------------------------------------------------------------\n") |