-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: Add respx dependency and update unit tests to use mocks
- Loading branch information
Showing
19 changed files
with
551 additions
and
289 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,10 @@ | ||
import os | ||
|
||
from cozepy import COZE_CN_BASE_URL, JWTAuth, JWTOAuthApp, TokenAuth | ||
|
||
COZE_JWT_AUTH_CLIENT_ID = os.getenv("COZE_JWT_AUTH_CLIENT_ID").strip() | ||
COZE_JWT_AUTH_PRIVATE_KEY = os.getenv("COZE_JWT_AUTH_PRIVATE_KEY").strip() | ||
COZE_JWT_AUTH_KEY_ID = os.getenv("COZE_JWT_AUTH_KEY_ID").strip() | ||
if COZE_JWT_AUTH_PRIVATE_KEY == "" and os.getenv("COZE_JWT_AUTH_PRIVATE_KEY_FILE").strip() != "": | ||
try: | ||
with open(os.getenv("COZE_JWT_AUTH_PRIVATE_KEY_FILE").strip(), "r") as f: | ||
COZE_JWT_AUTH_PRIVATE_KEY = f.read() | ||
except: # noqa: E722 | ||
pass | ||
def read_file(path: str): | ||
current_dir = os.path.dirname(os.path.abspath(__file__)) | ||
file_path = os.path.join(current_dir, path) | ||
|
||
COZE_TOKEN = os.getenv("COZE_TOKEN").strip() | ||
|
||
jwt_oauth_app = JWTOAuthApp( | ||
COZE_JWT_AUTH_CLIENT_ID, | ||
COZE_JWT_AUTH_PRIVATE_KEY, | ||
COZE_JWT_AUTH_KEY_ID, | ||
base_url=COZE_CN_BASE_URL, | ||
) | ||
|
||
fixed_token_auth = TokenAuth(COZE_TOKEN) | ||
|
||
jwt_auth = JWTAuth( | ||
COZE_JWT_AUTH_CLIENT_ID, | ||
COZE_JWT_AUTH_PRIVATE_KEY, | ||
COZE_JWT_AUTH_KEY_ID, | ||
30, | ||
base_url=COZE_CN_BASE_URL, | ||
) | ||
with open(file_path, "r") as file: | ||
content = file.read() | ||
return content |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,154 @@ | ||
import time | ||
|
||
from tests.config import ( | ||
jwt_auth, | ||
jwt_oauth_app, | ||
) | ||
import httpx | ||
import pytest | ||
|
||
from cozepy import COZE_COM_BASE_URL, DeviceAuthCode, DeviceOAuthApp, JWTOAuthApp, OAuthToken, PKCEOAuthApp, WebOAuthApp | ||
from cozepy.util import random_hex | ||
from tests.config import read_file | ||
|
||
def test_jwt_oauth_app(): | ||
token = jwt_oauth_app.get_access_token(30) | ||
assert token.access_token != "" | ||
assert token.token_type == "Bearer" | ||
assert token.expires_in - int(time.time()) <= 31 | ||
assert token.refresh_token == "" | ||
|
||
@pytest.mark.respx(base_url="https://api.coze.com") | ||
class TestWebOAuthApp: | ||
def test_get_oauth_url(self, respx_mock): | ||
app = WebOAuthApp("client id", "client secret") | ||
|
||
def test_jwt_auth(): | ||
assert jwt_auth.token != "" | ||
url = app.get_oauth_url("https://example.com", "state") | ||
assert ( | ||
"https://www.coze.com/api/permission/oauth2/authorize" | ||
"?response_type=code&" | ||
"client_id=client+id&" | ||
"redirect_uri=https%3A%2F%2Fexample.com&" | ||
"state=state" | ||
) == url | ||
|
||
def test_get_access_token(self, respx_mock): | ||
app = WebOAuthApp("client id", "client secret") | ||
mock_token = random_hex(20) | ||
|
||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.get_access_token("https://example.com", "code") | ||
|
||
assert token.access_token == mock_token | ||
|
||
def test_refresh_token(self, respx_mock): | ||
app = WebOAuthApp("client id", "client secret") | ||
mock_token = random_hex(20) | ||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.refresh_access_token("refresh token") | ||
assert token.access_token == mock_token | ||
|
||
|
||
@pytest.mark.respx(base_url="https://api.coze.com") | ||
class TestJWTOAuthApp: | ||
def test_get_access_token(self, respx_mock): | ||
private_key = read_file("testdata/private_key.pem") | ||
app = JWTOAuthApp("client id", private_key, "public key id") | ||
mock_token = random_hex(20) | ||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.get_access_token(100) | ||
assert token.access_token == mock_token | ||
|
||
|
||
@pytest.mark.respx(base_url="https://api.coze.com") | ||
class TestPKCEOAuthApp: | ||
def test_get_oauth_url(self, respx_mock): | ||
app = PKCEOAuthApp("client id") | ||
|
||
url = app.get_oauth_url("https://example.com", "state", "code_verifier", "S256") | ||
assert ( | ||
"https://www.coze.com/api/permission/oauth2/authorize?" | ||
"response_type=code&client_id=client+id&" | ||
"redirect_uri=https%3A%2F%2Fexample.com&state=state&" | ||
"code_challenge=73oehA2tBul5grZPhXUGQwNAjxh69zNES8bu2bVD0EM&code_challenge_method=S256" | ||
) == url | ||
|
||
def test_get_access_token(self, respx_mock): | ||
app = PKCEOAuthApp("client id") | ||
mock_token = random_hex(20) | ||
|
||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.get_access_token("https://example.com", "code", "code_verifier") | ||
|
||
assert token.access_token == mock_token | ||
|
||
def test_refresh_token(self, respx_mock): | ||
app = PKCEOAuthApp("client id") | ||
mock_token = random_hex(20) | ||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.refresh_access_token("refresh token") | ||
assert token.access_token == mock_token | ||
|
||
|
||
@pytest.mark.respx(base_url="https://api.coze.com") | ||
class TestDeviceOAuthApp: | ||
def test_get_device_code(self, respx_mock): | ||
app = DeviceOAuthApp("client id") | ||
mock_code = random_hex(20) | ||
|
||
respx_mock.post("/api/permission/oauth2/device/code").mock( | ||
httpx.Response( | ||
200, | ||
content=DeviceAuthCode( | ||
device_code=mock_code, | ||
user_code="user_code", | ||
verification_uri=COZE_COM_BASE_URL, | ||
interval=5, | ||
expires_in=int(time.time()), | ||
).model_dump_json(), | ||
) | ||
) | ||
|
||
device_code = app.get_device_code() | ||
assert device_code.device_code == mock_code | ||
|
||
def test_get_access_token(self, respx_mock): | ||
app = DeviceOAuthApp("client id") | ||
mock_token = random_hex(20) | ||
|
||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.get_access_token("https://example.com", False) | ||
|
||
assert token.access_token == mock_token | ||
|
||
def test_refresh_token(self, respx_mock): | ||
app = DeviceOAuthApp("client id") | ||
mock_token = random_hex(20) | ||
respx_mock.post("/api/permission/oauth2/token").mock( | ||
httpx.Response( | ||
200, content=OAuthToken(access_token=mock_token, expires_in=int(time.time())).model_dump_json() | ||
) | ||
) | ||
|
||
token = app.refresh_access_token("refresh token") | ||
assert token.access_token == mock_token |
Oops, something went wrong.