Skip to content

Commit

Permalink
Fix/userhub testconnexion (#82)
Browse files Browse the repository at this point in the history
* (fix required for PnX-SI/UsersHub#191) + drop deprecated import of escape + add header for test connexion to UsersHub
  • Loading branch information
jacquesfize authored Nov 28, 2023
1 parent 29dcb8b commit d4979fa
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 80 deletions.
24 changes: 17 additions & 7 deletions src/pypnusershub/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
import logging

import datetime

from markupsafe import escape
from flask import (
Blueprint,
escape,
request,
Response,
current_app,
Expand Down Expand Up @@ -69,7 +68,9 @@ def register(self, app, *args, **kwargs):
# set cookie autorenew
app.config["PASS_METHOD"] = app.config.get("PASS_METHOD", "hash")

app.config["REMEMBER_COOKIE_NAME"] = app.config.get("REMEMBER_COOKIE_NAME", "token")
app.config["REMEMBER_COOKIE_NAME"] = app.config.get(
"REMEMBER_COOKIE_NAME", "token"
)

parent = super(ConfigurableBlueprint, self)
parent.register(app, *args, **kwargs)
Expand Down Expand Up @@ -100,7 +101,11 @@ def login():
app = models.Application.query.get(id_app)
if not app:
raise BadRequest(f"No app for id {id_app}")
user = models.User.query.filter(models.User.identifiant == login).filter_by_app().one()
user = (
models.User.query.filter(models.User.identifiant == login)
.filter_by_app()
.one()
)
user_dict = UserSchema(exclude=["remarques"]).dump(user)
except exc.NoResultFound as e:
msg = json.dumps(
Expand All @@ -126,7 +131,9 @@ def login():
token = encode_token(user_dict)
token_exp = datetime.datetime.now(datetime.timezone.utc)
token_exp += datetime.timedelta(seconds=current_app.config["COOKIE_EXPIRATION"])
return jsonify({"user": user_dict, "expires": token_exp.isoformat(), "token": token.decode()})
return jsonify(
{"user": user_dict, "expires": token_exp.isoformat(), "token": token.decode()}
)


@routes.route("/public_login", methods=["POST"])
Expand All @@ -136,7 +143,8 @@ def public_login():

user = (
models.AppUser.query.filter(
models.AppUser.identifiant == current_app.config.get("PUBLIC_ACCESS_USERNAME")
models.AppUser.identifiant
== current_app.config.get("PUBLIC_ACCESS_USERNAME")
)
.filter(models.AppUser.id_application == get_current_app_id())
.one()
Expand All @@ -148,7 +156,9 @@ def public_login():
token_exp = datetime.datetime.now(datetime.timezone.utc)
token_exp += datetime.timedelta(seconds=current_app.config["COOKIE_EXPIRATION"])

return jsonify({"user": user_dict, "expires": token_exp.isoformat(), "token": token.decode()})
return jsonify(
{"user": user_dict, "expires": token_exp.isoformat(), "token": token.decode()}
)


@routes.route("/logout", methods=["GET", "POST"])
Expand Down
168 changes: 95 additions & 73 deletions src/pypnusershub/routes_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@

s = requests.Session()

bp = Blueprint('register', __name__)
bp = Blueprint("register", __name__)


def get_json_request(r):
'''
r : retour de la requete requests
"""
r : retour de la requete requests
fonction pour recuperer la reponse json sans lever d'erreur
'''
fonction pour recuperer la reponse json sans lever d'erreur
"""
try:
r_json = r.json()
except Exception:
Expand All @@ -57,45 +57,45 @@ def get_json_request(r):


def req_json_or_text(r, msg_pypn=""):
'''
r : retour de la requete requests
msg_pypn : message supplementaire rajouté a la reponse
"""
r : retour de la requete requests
msg_pypn : message supplementaire rajouté a la reponse
revoie un tuple avec la réponse de la requete en json r.json si possible
{'msg': r.text} sinon
et status_code
revoie un tuple avec la réponse de la requete en json r.json si possible
{'msg': r.text} sinon
et status_code
'''
"""
r_json = get_json_request(r)

if not r_json and r.text:

r_json = {"msg": r.text}

if not r_json and not r.text:

r_json = {"msg": "empty message"}

if msg_pypn:

r_json['msg_pypn'] = msg_pypn
r_json["msg_pypn"] = msg_pypn

return json.dumps(r_json), r.status_code


def connect_admin():
'''
decorateur pour la connexion de l'admin a une appli
ici url config['URL_USERSHUB'] sans / à la fin
'''
"""
decorateur pour la connexion de l'admin a une appli
ici url config['URL_USERSHUB'] sans / à la fin
"""

def _connect_admin(f):
@wraps(f)
def __connect_admin(*args, **kwargs):
# connexion à usershub
id_app_usershub = db.session.query(
Application.id_application
).filter(Application.code_application == 'UH').first()
id_app_usershub = (
db.session.query(Application.id_application)
.filter(Application.code_application == "UH")
.first()
)

if not id_app_usershub:
return json.dumps({"msg": "Pas d'id app USERSHUB"}), 500
Expand All @@ -104,22 +104,31 @@ def __connect_admin(*args, **kwargs):

# test si on est déjà connecté
try:
r = s.post(current_app.config['URL_USERSHUB'] +
"/api_register/test_connexion")
b_connexion = (r.status_code == 200)
r = s.post(
current_app.config["URL_USERSHUB"] + "/api_register/test_connexion",
headers={"Content-Type": "application/json"},
)
b_connexion = r.status_code == 200
except requests.ConnectionError:
return json.dumps({"msg": "Erreur de connexion a l'application USERSHUB (causes possibles : url erronee, application USERSHUB ne fonctionne pas, ..;)"}), 500
return (
json.dumps(
{
"msg": "Erreur de connexion a l'application USERSHUB (causes possibles : url erronee, application USERSHUB ne fonctionne pas, ..;)"
}
),
500,
)

# si on est pas connecté on se connecte
if not b_connexion:
# connexion à usershub
r = s.post(
current_app.config['URL_USERSHUB'] + "/" + "pypn/auth/login",
current_app.config["URL_USERSHUB"] + "/" + "pypn/auth/login",
json={
'login': current_app.config['ADMIN_APPLICATION_LOGIN'],
'password': current_app.config['ADMIN_APPLICATION_PASSWORD'],
'id_application': id_app_usershub
}
"login": current_app.config["ADMIN_APPLICATION_LOGIN"],
"password": current_app.config["ADMIN_APPLICATION_PASSWORD"],
"id_application": id_app_usershub,
},
)

# si echec de connexion
Expand All @@ -133,84 +142,97 @@ def __connect_admin(*args, **kwargs):
return _connect_admin


@bp.route('test_uh', methods=['GET'])
@bp.route("test_uh", methods=["GET"])
@connect_admin()
def test():
'''
route pour tester le décorateur connect_admin
ainsi que les paramètres de connexion à USERSHUB:
- config['ADMIN_APPLICATION_LOGIN']
- config['ADMIN_APPLICATION_PASSWORD']
'''
"""
route pour tester le décorateur connect_admin
ainsi que les paramètres de connexion à USERSHUB:
- config['ADMIN_APPLICATION_LOGIN']
- config['ADMIN_APPLICATION_PASSWORD']
"""

r = s.post(current_app.config['URL_USERSHUB'] + "/api_register/test_connexion")
r = s.post(current_app.config["URL_USERSHUB"] + "/api_register/test_connexion")

return req_json_or_text(r, "Test pypn")


@bp.route("post_usershub/<string:type_action>", methods=['POST'])
@bp.route("post_usershub/<string:type_action>", methods=["POST"])
@connect_admin()
def post_usershub(type_action):
'''
route generique pour appeler les routes UsersHub en tant qu'administrateur de l'appli en cours
ex : post_usershub/test_connexion appelle la route URL_USERSHUB/api_register/test_connexion
'''
"""
route generique pour appeler les routes UsersHub en tant qu'administrateur de l'appli en cours
ex : post_usershub/test_connexion appelle la route URL_USERSHUB/api_register/test_connexion
"""
# attribution des droits pour les actions
dict_type_action_droit = {
'test_connexion': 0,
'valid_temp_user': 0,
'create_temp_user': 0,
'change_password': 0,
'create_cor_role_token': 0,
'add_application_right_to_role': 0,
'login_recovery': 0,
'password_recovery': 0,
'update_user': 1,
'change_application_right': 4,
"test_connexion": 0,
"valid_temp_user": 0,
"create_temp_user": 0,
"change_password": 0,
"create_cor_role_token": 0,
"add_application_right_to_role": 0,
"login_recovery": 0,
"password_recovery": 0,
"update_user": 1,
"change_application_right": 4,
}
params = request.args
id_droit = 0

if session.get('current_user', None):

id_role = session['current_user']['id_role']
if session.get("current_user", None):
id_role = session["current_user"]["id_role"]

q = (db.session.query(AppUser.id_droit_max)
.filter(AppUser.id_role == id_role)
.filter(AppUser.id_application == get_current_app_id()))
q = (
db.session.query(AppUser.id_droit_max)
.filter(AppUser.id_role == id_role)
.filter(AppUser.id_application == get_current_app_id())
)
id_droit = q.one()[0]

# si pas de droit definis pour cet action, alors les droits requis sont à 7 => action impossible
if id_droit < dict_type_action_droit.get(type_action, 7):

return json.dumps({"msg": "Droits insuffisant pour la requête UsersHub : " + type_action}), 403
return (
json.dumps(
{"msg": "Droits insuffisant pour la requête UsersHub : " + type_action}
),
403,
)

# les test de paramètres seront faits dans UsersHub
data = request.get_json()
url = current_app.config['URL_USERSHUB'] + "/" + "api_register/" + type_action
url = current_app.config["URL_USERSHUB"] + "/" + "api_register/" + type_action
r_usershub = s.post(url, json=data)

# after request definir route dans app
# par ex. pour l'envoi de mails
# lancer uniquement si enable_post_action n'est pas = False dans le body de la requête
if r_usershub.status_code == 200 and data.get('enable_post_action', True):
if r_usershub.status_code == 200 and data.get("enable_post_action", True):
out_after = after_request(type_action, get_json_request(r_usershub))

# 0 = pas d'action definie dans current_app.config['after_USERSHUB_request'][type_action]
if out_after != 0:

if out_after['msg'] != "ok":

return json.dumps({'msg': 'Problème after request pour post_usershub ' + type_action + ':' + out_after['msg']}), 500
if out_after["msg"] != "ok":
return (
json.dumps(
{
"msg": "Problème after request pour post_usershub "
+ type_action
+ ":"
+ out_after["msg"]
}
),
500,
)

return req_json_or_text(r_usershub)


def after_request(type_action, data, *args, **kwargs):
'''
lorsqu'une fonction est definie dans REGISTER_POST_ACTION_FCT[type_action]
elle est executée avec les données fournies en retour de la requete USERSHUB
'''
"""
lorsqu'une fonction est definie dans REGISTER_POST_ACTION_FCT[type_action]
elle est executée avec les données fournies en retour de la requete USERSHUB
"""
if not REGISTER_POST_ACTION_FCT:
return 0
f = REGISTER_POST_ACTION_FCT.get(type_action, None)
Expand Down

0 comments on commit d4979fa

Please sign in to comment.