Skip to content

Commit

Permalink
refacto: move code to improve logic mutualization (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guts authored Jan 8, 2024
2 parents 0853874 + 02d5ecc commit 6591c7f
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 323 deletions.
204 changes: 204 additions & 0 deletions geotribu_cli/cli_results_rich_formatters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#! python3 # noqa: E265

# ############################################################################
# ########## IMPORTS #############
# ################################

# standard library
import logging
from typing import Optional, Union

# 3rd party
from rich.table import Table

# package
from geotribu_cli.__about__ import __title__, __version__
from geotribu_cli.comments.mdl_comment import Comment
from geotribu_cli.constants import GeotribuDefaults
from geotribu_cli.utils.formatters import url_add_utm

# ############################################################################
# ########## GLOBALS #############
# ################################

logger = logging.getLogger(__name__)
defaults_settings = GeotribuDefaults()


# ############################################################################
# ########## FUNCTIONS ###########
# ################################


def format_output_result_comments(
results: list[Comment], format_type: Optional[str] = None, count: int = 5
) -> Union[Table, list[Comment]]:
"""Format list of comments according to output option.
Args:
results: result to format
format_type: format output option. Defaults to None.
count: default number of results to display. Defaults to 5.
Returns:
formatted result ready to print, table or inital list
"""
if format_type == "table":
table = Table(
title=f"Derniers commentaires publiés - {count}/{len(results)} résultats "
f"\n(ctrl+clic sur le numéro pour ouvrir dans le navigateur)",
show_lines=True,
highlight=True,
caption=f"{__title__} {__version__}",
)

# columns
table.add_column(header="#", justify="center")
table.add_column(header="Date", justify="center")
table.add_column(header="Auteur/e", justify="left", style="default")
table.add_column(header="Contenu", justify="center", style="magenta")
table.add_column(header="Réponse à", justify="center")
table.add_column(header="Commentaire", justify="center")

# iterate over results

for r in results[:count]:
# add row
table.add_row(
f"[link={r.url_to_comment}]{r.id}[/link]",
f"{r.created_as_datetime:%d %B %Y \nà %H:%m}",
r.author,
f"[link={r.url_to_comment}]{r.uri}[/link]",
str(r.parent),
r.markdownified_text,
)

return table
else:
return results


def format_output_result_search_content(
result: list[dict],
search_term: Optional[str] = None,
format_type: Optional[str] = None,
count: int = 5,
search_filter_dates: Optional[tuple] = None,
search_filter_type: Optional[str] = None,
) -> Union[list[dict], Table]:
"""Format result according to output option.
Args:
result (list[dict]): result to format
search_term (str, optional): term used for search. Defaults to None.
format_type (str, optional): format output option. Defaults to None.
count (int, optional): default number of results to display. Defaults to 5.
search_filter_dates: dates used to filter search. Defaults to None.
search_filter_type: type used to filter search. Defaults to None.
Returns:
str: formatted result ready to print
"""
if format_type == "table":
# formatte le titre - plus lisible qu'une grosse f-string des familles
titre = f"Recherche de contenus - {count}/{len(result)} résultats "
if search_term:
titre += f"avec le terme : {search_term}"
if any([search_filter_dates[0], search_filter_dates[1], search_filter_type]):
titre += "\nFiltres : "
if search_filter_type:
titre += f"de type {search_filter_type}, "
if search_filter_dates[0]:
titre += f"plus récents que {search_filter_dates[0]:%d %B %Y}, "
if search_filter_dates[1]:
titre += f"plus anciens que {search_filter_dates[1]:%d %B %Y}"

table = Table(
title=titre,
show_lines=True,
highlight=True,
caption=f"{__title__} {__version__}",
)

# columns
table.add_column(header="#", justify="center")
table.add_column(header="Titre", justify="left", style="default")
table.add_column(header="Type", justify="center", style="bright_black")
table.add_column(
header="Date de publication", justify="center", style="bright_black"
)
table.add_column(header="Score", style="magenta")
table.add_column(header="Mots-clés", justify="right", style="blue")

# iterate over results
for r in result[:count]:
table.add_row(
f"{result.index(r)}",
f"[link={url_add_utm(r.get('url'))}]{r.get('titre')}[/link]",
r.get("type"),
f"{r.get('date'):%d %B %Y}",
r.get("score"),
",".join(r.get("tags")),
)

return table
else:
return result[:count]


def format_output_result_search_image(
result: list[dict],
search_term: Optional[str] = None,
format_type: Optional[str] = None,
count: int = 5,
) -> Union[list[dict], Table]:
"""Format result according to output option.
Args:
result (list[dict]): result to format
search_term (str, optional): term used for search. Defaults to None.
format_type (str, optional): format output option. Defaults to None.
count (int, optional): default number of results to display. Defaults to 5.
Returns:
str: formatted result ready to print
"""
if format_type == "table":
table = Table(
title=f"Recherche d'images - {len(result)} résultats "
f"avec le terme : {search_term}\n(ctrl+clic sur le nom pour ouvrir l'image)",
show_lines=True,
highlight=True,
caption=f"{__title__} {__version__}",
)

# columns
table.add_column(header="#", justify="center")
table.add_column(header="Nom", justify="left", style="default")
table.add_column(header="Dimensions", justify="center", style="bright_black")
table.add_column(header="Score", justify="center", style="magenta")
table.add_column(header="Chemin CDN", justify="left")
# table.add_column(header="Syntaxe intégration", justify="right", style="blue")

# iterate over results

for r in result[:count]:
# # syntaxe depending on image type
# if "logos-icones" in r.get("url"):
# syntax = rf"!\[logo {Path(r.get('nom')).stem}]({r.get('url')}){{: .img-rdp-news-thumb }}"
# else:
# syntax = rf"!\[{Path(r.get('nom')).stem}]({r.get('url')})"

# add row
table.add_row(
f"{result.index(r)}",
f"[link={url_add_utm(r.get('url'))}]{r.get('nom')}[/link]",
r.get("dimensions"),
r.get("score"),
f"[link={defaults_settings.cdn_base_url}/tinyfilemanager.php?p={r.get('cdn_path')}]{r.get('cdn_path')}[/link]",
# syntax,
)

return table
else:
return result
6 changes: 1 addition & 5 deletions geotribu_cli/comments/comments_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from rich import print

# package
from geotribu_cli.comments.comments_latest import get_latest_comments
from geotribu_cli.comments.comments_toolbelt import get_latest_comments
from geotribu_cli.constants import GeotribuDefaults
from geotribu_cli.social.mastodon_client import broadcast_to_mastodon
from geotribu_cli.utils.start_uri import open_uri
Expand All @@ -36,10 +36,6 @@
\n#Geotribot #commentaire comment-{id}"""

# ############################################################################
# ########## FUNCTIONS ###########
# ################################


# ############################################################################
# ########## CLI #################
Expand Down
142 changes: 3 additions & 139 deletions geotribu_cli/comments/comments_latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@

# standard library
import argparse
import json
import logging
import sys
from os import getenv
from typing import Literal

# 3rd party
from rich import print
from rich.table import Table

# package
from geotribu_cli.__about__ import __title__, __version__
from geotribu_cli.comments.mdl_comment import Comment
from geotribu_cli.cli_results_rich_formatters import format_output_result_comments
from geotribu_cli.comments.comments_toolbelt import get_latest_comments
from geotribu_cli.constants import GeotribuDefaults
from geotribu_cli.utils.file_downloader import download_remote_file_to_local

# ############################################################################
# ########## GLOBALS #############
Expand All @@ -29,138 +25,6 @@
logger = logging.getLogger(__name__)
defaults_settings = GeotribuDefaults()

# ############################################################################
# ########## FUNCTIONS ###########
# ################################


def format_output_result(
results: list[Comment], format_type: str = None, count: int = 5
) -> str:
"""Format result according to output option.
Args:
results: result to format
format_type: format output option. Defaults to None.
count: default number of results to display. Defaults to 5.
Returns:
formatted result ready to print
"""

if format_type == "table":
table = Table(
title=f"Derniers commentaires publiés - {count}/{len(results)} résultats "
f"\n(ctrl+clic sur le numéro pour ouvrir dans le navigateur)",
show_lines=True,
highlight=True,
caption=f"{__title__} {__version__}",
)

# columns
table.add_column(header="#", justify="center")
table.add_column(header="Date", justify="center")
table.add_column(header="Auteur/e", justify="left", style="default")
table.add_column(header="Contenu", justify="center", style="magenta")
table.add_column(header="Réponse à", justify="center")
table.add_column(header="Commentaire", justify="center")

# iterate over results

for r in results[:count]:
# add row
table.add_row(
f"[link={r.url_to_comment}]{r.id}[/link]",
f"{r.created_as_datetime:%d %B %Y \nà %H:%m}",
r.author,
f"[link={r.url_to_comment}]{r.uri}[/link]",
str(r.parent),
r.markdownified_text,
)

return table
else:
return results


def get_latest_comments(
number: int = 5,
sort_by: Literal[
"author_asc", "author_desc", "created_asc", "created_desc"
] = "created_asc",
expiration_rotating_hours: int = 1,
attempt: int = 1,
) -> list[Comment]:
"""Download and parse latest comments published.
Args:
number: count of comments to download. Must be > 1. Defaults to 5.
sort_by: comments sorting criteria. Defaults to "created_asc".
expiration_rotating_hours (int, optional): number in hours to consider the \
local file outdated. Defaults to 1.
Returns:
list of comments objects
"""
# check if count is acceptable
if number < 1:
number = 1

# download remote latest comments
comments_file = download_remote_file_to_local(
remote_url_to_download=f"{defaults_settings.comments_base_url}latest?limit={number}",
local_file_path=defaults_settings.geotribu_working_folder.joinpath(
"comments/latest.json"
),
expiration_rotating_hours=expiration_rotating_hours,
content_type="application/json",
)

try:
with comments_file.open(mode="r", encoding="UTF-8") as f:
comments = json.loads(f.read())
except json.decoder.JSONDecodeError as err:
logger.error(f"Impossible de lire le fichier des commentaires. Trace {err}")
if attempt < 2:
logger.info("Deuxième essai en forçant le téléchargement du fichier.")
return get_latest_comments(
number=number, sort_by=sort_by, expiration_rotating_hours=0, attempt=2
)
raise err

li_comments = [Comment(**c) for c in comments]

# only one comment or less? no need to sort
if len(li_comments) < 2:
return li_comments

# sort
if sort_by == "author_asc":
return sorted(
li_comments,
key=lambda x: x.author,
)
elif sort_by == "author_desc":
return sorted(
li_comments,
key=lambda x: x.author,
reverse=True,
)
elif sort_by == "created_asc":
return sorted(
li_comments,
key=lambda x: x.created,
)
elif sort_by == "created_desc":
return sorted(
li_comments,
key=lambda x: x.created,
reverse=True,
)
else:
return li_comments


# ############################################################################
# ########## CLI #################
# ################################
Expand Down Expand Up @@ -245,7 +109,7 @@ def run(args: argparse.Namespace):
# formatage de la sortie
if len(latest_comments):
print(
format_output_result(
format_output_result_comments(
results=latest_comments,
format_type=args.format_output,
count=args.results_number,
Expand Down
Loading

0 comments on commit 6591c7f

Please sign in to comment.