diff --git a/.gitignore b/.gitignore index 3f8a4ee0..67ea9b8c 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,4 @@ .coverage *.swp *.bin +*.mafile \ No newline at end of file diff --git a/steam/client/__init__.py b/steam/client/__init__.py index e9aeb4cd..46f04f4f 100644 --- a/steam/client/__init__.py +++ b/steam/client/__init__.py @@ -34,6 +34,8 @@ from steam.utils import ip4_from_int, ip4_to_int from steam.utils.proto import proto_fill_from_dict + +# TODO: remove py2 support. if six.PY2: _cli_input = raw_input else: @@ -42,14 +44,13 @@ class SteamClient(CMClient, BuiltinBase): EVENT_LOGGED_ON = 'logged_on' - """After successful login - """ + """After successful login""" + EVENT_AUTH_CODE_REQUIRED = 'auth_code_required' - """When either email or 2FA code is needed for login - """ + """When either email or 2FA code is needed for login""" + EVENT_NEW_LOGIN_KEY = 'new_login_key' - """After a new login key is accepted - """ + """After a new login key is accepted""" _LOG = logging.getLogger("SteamClient") _reconnect_backoff_c = 0 diff --git a/steam/client/builtins/web.py b/steam/client/builtins/web.py index 861975ce..b30cb440 100644 --- a/steam/client/builtins/web.py +++ b/steam/client/builtins/web.py @@ -19,6 +19,8 @@ def __init__(self, *args, **kwargs): def __handle_disconnect(self): self._web_session = None + # TODO: DEPRECATED. This function not work anymore. + #This function must be rewritten to use WebAuth def get_web_session_cookies(self): """Get web authentication cookies via WebAPI's ``AuthenticateUser`` diff --git a/steam/webauth.py b/steam/webauth.py index aeabbab5..58f65d63 100644 --- a/steam/webauth.py +++ b/steam/webauth.py @@ -61,10 +61,15 @@ import six import requests +from steam.enums.proto import EAuthSessionGuardType from steam.steamid import SteamID -from steam.utils.web import make_requests_session, generate_session_id +from steam.utils.web import generate_session_id from steam.core.crypto import rsa_publickey, pkcs1v15_encrypt + +# TODO: Remove python2 support. +# TODO: Encrease min python version to 3.5 + if six.PY2: intBase = long _cli_input = raw_input @@ -72,228 +77,328 @@ intBase = int _cli_input = input +API_HEADERS = { + 'origin': 'https://steamcommunity.com', + 'referer': 'https://steamcommunity.com/', + 'accept': 'application/json, text/plain, */*' +} + +API_URL = 'https://api.steampowered.com/{}Service/{}/v{}' + class WebAuth(object): - key = None - logged_on = False #: whether authentication has been completed successfully - session = None #: :class:`requests.Session` (with auth cookies after auth is completed) - session_id = None #: :class:`str`, session id string - captcha_gid = -1 - captcha_code = '' - steam_id = None #: :class:`.SteamID` (after auth is completed) - - def __init__(self, username, password=''): - self.__dict__.update(locals()) - self.session = make_requests_session() - self._session_setup() - - def _session_setup(self): - pass - - @property - def captcha_url(self): - """If a captch is required this property will return url to the image, or ``None``""" - if self.captcha_gid == -1: - return None - else: - return "https://steamcommunity.com/login/rendercaptcha/?gid=%s" % self.captcha_gid + """New WEB Auth class. - def get_rsa_key(self, username): - """Get rsa key for a given username + This class works with Steam API: + https://steamapi.xpaw.me/#IAuthenticationService - :param username: username - :type username: :class:`str` - :return: json response - :rtype: :class:`dict` - :raises HTTPError: any problem with http request, timeouts, 5xx, 4xx etc - """ - try: - resp = self.session.post('https://steamcommunity.com/login/getrsakey/', - timeout=15, - data={ - 'username': username, - 'donotcache': int(time() * 1000), - }, - ).json() - except requests.exceptions.RequestException as e: - raise HTTPError(str(e)) + Currently, supports bsaic login/password auth with no 2FA, 2FA via + steam guard code and 2FA via EMAIL confirmation. - return resp + TODO: Add QR code support. - def _load_key(self): - if not self.key: - resp = self.get_rsa_key(self.username) + TODO: Fully rework api handling. PUT api into separate class, + in order to make this class responsible only for actual auth. - self.key = rsa_publickey(intBase(resp['publickey_mod'], 16), - intBase(resp['publickey_exp'], 16), - ) - self.timestamp = resp['timestamp'] + IMPORTANT: + Actually, at real login page + steam handles function little bit different. + e.g. https://api.steampowered.com/IAuthenticationService/BeginAuthSessionViaCredentials/v1 + can handle multipart/form-data; with something like. - def _send_login(self, password='', captcha='', email_code='', twofactor_code=''): - data = { - 'username': self.username, - "password": b64encode(pkcs1v15_encrypt(self.key, password.encode('ascii'))), - "emailauth": email_code, - "emailsteamid": str(self.steam_id) if email_code else '', - "twofactorcode": twofactor_code, - "captchagid": self.captcha_gid, - "captcha_text": captcha, - "loginfriendlyname": "python-steam webauth", - "rsatimestamp": self.timestamp, - "remember_login": 'true', - "donotcache": int(time() * 100000), + { + input_protobuf_encoded: EgxhbmFjb25kYXJ0dXIa2AJodHgzZzlJaWY4dGE4RGxqR2VWbncwZWpxdi9uNDByRkZxaFduVk12VFFhRm1ZU1F4MHlrYUlJNmFURlVJWEQ5Z2VXMTlObWJDc3pydmNQZ1RTVllyOWl0SW5EWjgzMVh0YWtOaHJaUk9JN1lvMzhpb2xHRmdHdVBZT3NsekErTHZNZlJoQ3YzL1JFaEpNQlhjaXhzNklRRTZjbnM4d1JWbGI0TVA1Nzd0MHpGajRpcWF4U21KbnVjRDh5YzVIVkYvMERlMnFKd3dGTG0vR3B4SEdreFFlQURzZi9OTXJMTUszcWxnR3NLZm4ycGxNOGhkMzF3YnErSUlCZkNJb3dFZWExaUpJcmVjYkdLT0EvRlJ5VFpSQlVoVitLQmt6TGk3THY1UjVNYVRJSzNPTCtCMUZnZ2xWSG94c0ErTm5BMHVqSVZWZ0ZRdGpDL2tMTjd0SmhYamc9PSCA+aCT0ggoATgBQgVTdG9yZUqBAQp9TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2IE9QUi8xMDIuMC4wLjAQAlgI } + it's protobuf encoded value. You can decode it here: + https://protobuf-decoder.netlify.app + + some fields I can understand: + 2) string - steamlogin + 3) string - encrypted password + 4) timestamp to map to a key - STime + 5) some INT (probably it's always 1) it's DEPRECATED + 7) whether we are requesting a persistent or an ephemeral session + 8) (EMachineAuthWebDomain) identifier of client requesting auth. + e.g. "Store" + 9) Protobuf of device type (see CAuthentication_DeviceDetails): + 9.1) string - User-Agent + 9.2) Int - platform identifier e.g. 2 (means Web Browser) + (See EAuthTokenPlatformType protobuf). + 9.3) os_type (MOSTLY NOT PRESENTED IN REAL REQUESTS) + 9.4) gaming_device_type (MOSTLY NOT PRESENTED IN REAL REQUESTS) + 11) UNDOCUMENTED AT ALL: Some number (like 8) + + FIELD NUMBERS I SKIPPED MEANS THEY ARE NOT PRESENTED IN REAL REQUEST + We currently uses basic multipart/form-data and "key-value" + data presentation. + But I Think, it's important to know, that real steam works differently, + and maybe we can once upon a time simulate it's REAL behavior. + + """ + + # Pretend to be chrome on windows, made this act as most like a + # browser as possible to (hopefully) avoid breakage in the future from valve + def __init__(self, username='', password='', + userAgent='Mozilla/5.0 (Windows NT 10.0; Win64; x64)' + ' AppleWebKit/537.36 (KHTML, like Gecko)' + ' Chrome/118.0.0.0 Safari/537.36'): + + # ALL FUNCTIONS RENAMED TO PEP8 NOTATION. + self.session = requests.session() + self.user_agent = userAgent + self.username = username + self.password = password + self.session.headers['User-Agent'] = self.user_agent + self.steam_id = None + self.client_id = None + self.request_id = None + self.refresh_token = None + self.access_token = None + self.session_id = None + self.email_auth_waits = False # Not used yet. + self.logged_on = False + + @staticmethod + def send_api_request(data, steam_api_interface, steam_api_method, + steam_api_version): + """Send request to Steam API via requests""" + steam_url = API_URL.format(steam_api_interface, steam_api_method, + steam_api_version) + + if steam_api_method == "GetPasswordRSAPublicKey": # It's GET method + res = requests.get(steam_url, timeout=10, headers=API_HEADERS, + params=data) + else: # Every other API endpoints are POST. + res = requests.post(steam_url, timeout=10, headers=API_HEADERS, + data=data) + + res.raise_for_status() + return res.json() + + def _get_rsa_key(self): + """Get rsa key to crypt password.""" + return self.send_api_request({'account_name': self.username}, + "IAuthentication", 'GetPasswordRSAPublicKey', 1) + + def _encrypt_password(self): + """Encrypt password via RSA key + + Steam handles every password only in encoded way. + """ + r = self._get_rsa_key() - try: - return self.session.post('https://steamcommunity.com/login/dologin/', data=data, timeout=15).json() - except requests.exceptions.RequestException as e: - raise HTTPError(str(e)) + mod = intBase(r['response']['publickey_mod'], 16) + exp = intBase(r['response']['publickey_exp'], 16) - def _finalize_login(self, login_response): - self.steam_id = SteamID(login_response['transfer_parameters']['steamid']) - - def login(self, password='', captcha='', email_code='', twofactor_code='', language='english'): - """Attempts web login and returns on a session with cookies set - - :param password: password, if it wasn't provided on instance init - :type password: :class:`str` - :param captcha: text reponse for captcha challenge - :type captcha: :class:`str` - :param email_code: email code for steam guard - :type email_code: :class:`str` - :param twofactor_code: 2FA code for steam guard - :type twofactor_code: :class:`str` - :param language: select language for steam web pages (sets language cookie) - :type language: :class:`str` - :return: a session on success and :class:`None` otherwise - :rtype: :class:`requests.Session`, :class:`None` - :raises HTTPError: any problem with http request, timeouts, 5xx, 4xx etc - :raises LoginIncorrect: wrong username or password - :raises CaptchaRequired: when captcha is needed - :raises CaptchaRequiredLoginIncorrect: when captcha is needed and login is incorrect - :raises EmailCodeRequired: when email is needed - :raises TwoFactorCodeRequired: when 2FA is needed - """ - if self.logged_on: - return self.session + pub_key = rsa_publickey(mod, exp) + encrypted = pkcs1v15_encrypt(pub_key, self.password.encode('ascii')) + b64 = b64encode(encrypted) + + return tuple((b64.decode('ascii'), r['response']['timestamp'])) + + def _startSessionWithCredentials(self, account_encrypted_password: str, + time_stamp: int): + """Start login session via BeginAuthSessionViaCredentials - if password: - self.password = password - elif self.password: - password = self.password - else: - raise LoginIncorrect("password is not specified") - if not captcha and self.captcha_code: - captcha = self.captcha_code + """ + resp = self.send_api_request( + {'device_friendly_name': self.user_agent, + 'account_name': self.username, + 'encrypted_password': account_encrypted_password, + 'encryption_timestamp': time_stamp, + 'remember_login': '1', + 'platform_type': '2', + 'persistence': '1', + 'website_id': 'Community', + }, + 'IAuthentication', + 'BeginAuthSessionViaCredentials', + 1 + ) + self.client_id = resp['response']['client_id'] + self.request_id = resp['response']['request_id'] + self.steam_id = SteamID(resp['response']['steamid']) + + def _startLoginSession(self): + """Starts login session via credentials.""" + encrypted_password = self._encrypt_password() + self._startSessionWithCredentials(encrypted_password[0], + encrypted_password[1]) + + def _pollLoginStatus(self): + """Get status of current Login Session + + This function asks server about login session status. + If we logged in, this returns access_token that we needed. + + TODO: add check of interval, returned from _startSessionWithCredentials + actually it has no need now, but + """ + resp = self.send_api_request({ + 'client_id': str(self.client_id), + 'request_id': str(self.request_id) + }, 'IAuthentication', 'PollAuthSessionStatus', 1) + try: + self.refresh_token = resp['response']['refresh_token'] + self.access_token = resp['response']['access_token'] + except KeyError: + raise WebAuthException('Authentication requires 2fa token, which is not provided or invalid') + + def _finalizeLogin(self): + self.sessionID = generate_session_id() + self.logged_on = True + for domain in ['store.steampowered.com', 'help.steampowered.com', + 'steamcommunity.com']: + self.session.cookies.set('sessionid', self.sessionID, domain=domain) + self.session.cookies.set('steamLoginSecure', + str(self.steam_id.as_64) + "||" + str( + self.access_token), domain=domain) + + def _update_login_token( + self, + code: str, + code_type: EAuthSessionGuardType = EAuthSessionGuardType.DeviceCode + ): + """Send 2FA token to steam + + Please note, that very rare, login can be unsuccessful, + if you use code, that guard provided you BEFORE log in session + was started. To fix it, just rerun login. - self._load_key() - resp = self._send_login(password=password, captcha=captcha, email_code=email_code, twofactor_code=twofactor_code) + """ + data = { + 'client_id': self.client_id, + 'steamid': self.steam_id, + 'code': code, + 'code_type': code_type + } + res = self.send_api_request(data, 'IAuthentication', + 'UpdateAuthSessionWithSteamGuardCode', 1) + return res - if resp['success'] and resp['login_complete']: - self.logged_on = True - self.password = self.captcha_code = '' - self.captcha_gid = -1 - for cookie in list(self.session.cookies): - for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']: - self.session.cookies.set(cookie.name, cookie.value, domain=domain, secure=cookie.secure) + def login(self, username: str = '', password: str = '', code: str = None, + email_required=False): + """Log in user by new Steam API - self.session_id = generate_session_id() + If user has no need 2FA, this function will just log in user. + If 2FA SteamGuard code needed, when user can provide it just + with guard.SteamAuthenticator.get_code like it always was. - for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']: - self.session.cookies.set('Steam_Language', language, domain=domain) - self.session.cookies.set('birthtime', '-3333', domain=domain) - self.session.cookies.set('sessionid', self.session_id, domain=domain) + If Email code is required, when user can provide email_required. + If email_required was provided, when this function only setup auth + and return new function. - self._finalize_login(resp) + this function will receive email code. Once email code will be provided + authentication process will be complete. + If wrong code provided in this new function, when error will be raised. + And new code will be waited. + """ + if self.logged_on: return self.session - else: - if resp.get('captcha_needed', False): - self.captcha_gid = resp['captcha_gid'] - self.captcha_code = '' - - if resp.get('clear_password_field', False): - self.password = '' - raise CaptchaRequiredLoginIncorrect(resp['message']) - else: - raise CaptchaRequired(resp['message']) - elif resp.get('emailauth_needed', False): - self.steam_id = SteamID(resp['emailsteamid']) - raise EmailCodeRequired(resp['message']) - elif resp.get('requires_twofactor', False): - raise TwoFactorCodeRequired(resp['message']) - elif 'too many login failures' in resp.get('message', ''): - raise TooManyLoginFailures(resp['message']) - else: - self.password = '' - raise LoginIncorrect(resp['message']) - def cli_login(self, password='', captcha='', email_code='', twofactor_code='', language='english'): - """Generates CLI prompts to perform the entire login process + if username == '' or password == '': + if self.username == '' and self.password == '': + raise ValueError("Username or password is provided empty!") + else: + self.username = username + self.password = password - :param password: password, if it wasn't provided on instance init - :type password: :class:`str` - :param captcha: text reponse for captcha challenge - :type captcha: :class:`str` - :param email_code: email code for steam guard - :type email_code: :class:`str` - :param twofactor_code: 2FA code for steam guard - :type twofactor_code: :class:`str` - :param language: select language for steam web pages (sets language cookie) - :type language: :class:`str` - :return: a session on success and :class:`None` otherwise - :rtype: :class:`requests.Session`, :class:`None` + self._startLoginSession() + if code: + self._update_login_token(code) + if email_required: + # We do another request, which force steam to send email code + # (otherwise code just not sent). + + url = (f'https://login.steampowered.com/jwt/checkdevice/' + f'{self.steam_id}') + res = self.session.post(url, data={ + 'clientid': self.client_id, + 'steamid': self.steam_id + }).json() + if res.get('result') == 8: + # This usually mean code sent now. + def end_login(email_code: str): + self._update_login_token(email_code, + EAuthSessionGuardType.EmailCode) + self._pollLoginStatus() + self._finalizeLogin() + return self.session + return end_login + if res.get('result') == 29: + # This code 100% means some data not valid + # Actually this must will never be called, because + # Errors can be only like wrong cookies. (Theoretically) + raise WebAuthException("Something invalid went. Try again later.") + self._pollLoginStatus() + self._finalizeLogin() - .. code:: python + return self.session - In [3]: user.cli_login() - Enter password for 'steamuser': - Solve CAPTCHA at https://steamcommunity.com/login/rendercaptcha/?gid=1111111111111111111 - CAPTCHA code: 123456 - Invalid password for 'steamuser'. Enter password: - Solve CAPTCHA at https://steamcommunity.com/login/rendercaptcha/?gid=2222222222222222222 - CAPTCHA code: abcdef - Enter 2FA code: AB123 - Out[3]: + def logout_everywhere(self): + """Log out on every device. + This function works just like button at + https://store.steampowered.com/twofactor/manage + and allows user to logout on every device. + Can be VERY useful e.g. for users, who practice account rent. """ + session_id = self.session.cookies.get('sessionid', + domain='store.steampowered.com') - # loop until successful login - while True: - try: - return self.login(password, captcha, email_code, twofactor_code, language) - except (LoginIncorrect, CaptchaRequired) as exp: - email_code = twofactor_code = '' - - if isinstance(exp, LoginIncorrect): - prompt = ("Enter password for %s: " if not password else - "Invalid password for %s. Enter password: ") - password = getpass(prompt % repr(self.username)) - if isinstance(exp, CaptchaRequired): - prompt = "Solve CAPTCHA at %s\nCAPTCHA code: " % self.captcha_url - captcha = _cli_input(prompt) - else: - captcha = '' - except EmailCodeRequired: - prompt = ("Enter email code: " if not email_code else - "Incorrect code. Enter email code: ") - email_code, twofactor_code = _cli_input(prompt), '' - except TwoFactorCodeRequired: - prompt = ("Enter 2FA code: " if not twofactor_code else - "Incorrect code. Enter 2FA code: ") - email_code, twofactor_code = '', _cli_input(prompt) + # By the times I saw session can be both of keys, so select valid. + session_id = session_id or self.session.cookies.get( + 'sessionId', domain='store.steampowered.com') + data = { + "action": "deauthorize", + "sessionid": session_id + } + resp = self.session.post( + 'https://store.steampowered.com/twofactor/manage_action', + data=data + ) + + return resp.status_code == 200 + + def cli_login(self, username: str = '', password: str = '', code: str = '', + email_required: bool = False): + """Generates CLI prompts to perform the entire login process + If you use email confirm, provide email_required = True, + else just provide code. + """ + res = self.login( + username, + password, + code, + email_required + ) + if hasattr(res, '__call__'): + while True: + try: + twofactor_code = input('Enter your 2fa/email code: ') + resp = res(twofactor_code) + return resp + except WebAuthException: + pass + else: + return self.session + +#TODO: DEPRECATED, must be rewritten, like WebAuth class MobileWebAuth(WebAuth): """Identical to :class:`WebAuth`, except it authenticates as a mobile device.""" oauth_token = None #: holds oauth_token after successful login - def _send_login(self, password='', captcha='', email_code='', twofactor_code=''): + def _send_login(self, password='', captcha='', email_code='', + twofactor_code=''): data = { 'username': self.username, - "password": b64encode(pkcs1v15_encrypt(self.key, password.encode('ascii'))), + "password": b64encode( + pkcs1v15_encrypt(self.key, password.encode('ascii'))), "emailauth": email_code, "emailsteamid": str(self.steam_id) if email_code else '', "twofactorcode": twofactor_code, @@ -311,7 +416,9 @@ def _send_login(self, password='', captcha='', email_code='', twofactor_code='') self.session.cookies.set('mobileClient', 'android') try: - return self.session.post('https://steamcommunity.com/login/dologin/', data=data, timeout=15).json() + return self.session.post( + 'https://steamcommunity.com/login/dologin/', data=data, + timeout=15).json() except requests.exceptions.RequestException as e: raise HTTPError(str(e)) finally: @@ -357,7 +464,9 @@ def oauth_login(self, oauth_token='', steam_id='', language='english'): } try: - resp = self.session.post('https://api.steampowered.com/IMobileAuthService/GetWGToken/v0001', data=data) + resp = self.session.post( + 'https://api.steampowered.com/IMobileAuthService/GetWGToken/v0001', + data=data) except requests.exceptions.RequestException as e: raise HTTPError(str(e)) @@ -371,13 +480,20 @@ def oauth_login(self, oauth_token='', steam_id='', language='english'): self.session_id = generate_session_id() - for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']: + for domain in ['store.steampowered.com', 'help.steampowered.com', + 'steamcommunity.com']: self.session.cookies.set('birthtime', '-3333', domain=domain) - self.session.cookies.set('sessionid', self.session_id, domain=domain) - self.session.cookies.set('mobileClientVersion', '0 (2.1.3)', domain=domain) + self.session.cookies.set('sessionid', self.session_id, + domain=domain) + self.session.cookies.set('mobileClientVersion', '0 (2.1.3)', + domain=domain) self.session.cookies.set('mobileClient', 'android', domain=domain) - self.session.cookies.set('steamLogin', str(steam_id) + "%7C%7C" + resp_data['token'], domain=domain) - self.session.cookies.set('steamLoginSecure', str(steam_id) + "%7C%7C" + resp_data['token_secure'], + self.session.cookies.set('steamLogin', + str(steam_id) + "%7C%7C" + resp_data[ + 'token'], domain=domain) + self.session.cookies.set('steamLoginSecure', + str(steam_id) + "%7C%7C" + resp_data[ + 'token_secure'], domain=domain, secure=True) self.session.cookies.set('Steam_Language', language, domain=domain) @@ -389,23 +505,34 @@ def oauth_login(self, oauth_token='', steam_id='', language='english'): class WebAuthException(Exception): pass + +class TwoFactorAuthNotProvided(WebAuthException): + pass + + class HTTPError(WebAuthException): pass + class LoginIncorrect(WebAuthException): pass + class CaptchaRequired(WebAuthException): pass + class CaptchaRequiredLoginIncorrect(CaptchaRequired, LoginIncorrect): pass + class EmailCodeRequired(WebAuthException): pass + class TwoFactorCodeRequired(WebAuthException): pass + class TooManyLoginFailures(WebAuthException): pass