Skip to content

Commit

Permalink
fixup! ✨(project) add qualicharge CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaupetit committed May 30, 2024
1 parent ecc6d8d commit 8fb7c5a
Show file tree
Hide file tree
Showing 2 changed files with 306 additions and 31 deletions.
115 changes: 86 additions & 29 deletions src/api/qualicharge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def list_groups(ctx: typer.Context):
for group in groups:
table.add_row(
group.name,
",".join(user.username for user in group.users),
",".join(sorted(user.username for user in group.users)),
",".join(ou.code for ou in group.operational_units),
)
console.print(table)
Expand All @@ -53,10 +53,13 @@ def create_group(
session: SMSession = ctx.obj

selected_operational_units = []
if operational_units is None and not force:
selected_operational_units = questionary.checkbox(
"Select group operational unit(s)", choices=prefixes
).ask()
if operational_units is None:
if not force:
selected_operational_units = questionary.checkbox(
"Select group operational unit(s)", choices=prefixes
).ask()
else:
selected_operational_units = operational_units

# Get database operational_units
db_operational_units: Sequence[OperationalUnit] = []
Expand All @@ -72,12 +75,7 @@ def create_group(
if not force:
print(group)
print(db_operational_units)
create = questionary.confirm(
"Create above group with selected operational units?"
).ask()
if not create:
print("[bold red]Abort![/bold red]")
raise typer.Exit()
typer.confirm("Create above group with selected operational units?", abort=True)

session.add(group)
session.commit()
Expand All @@ -86,16 +84,67 @@ def create_group(


@app.command()
def update_group(ctx: typer.Context, name: str, force: bool = False):
def update_group(
ctx: typer.Context,
group_name: str,
name: Optional[str] = None,
operational_units: Optional[list[str]] = None,
force: bool = False,
):
"""Update an API group."""
session: SMSession = ctx.obj

# Check group exists in database
db_group = session.exec(select(Group).where(Group.name == group_name)).one_or_none()
if db_group is None:
print(f"[bold red]Group {name} does not exist![/bold red]")
raise typer.Exit()

# Copy original group to update
# (for diff generation while asking for changes confirmation)
old_group = db_group.model_copy(deep=True)
old_operational_units = [ou.code for ou in db_group.operational_units]

# Get updates
if name:
db_group.name = name

if operational_units:
db_group.operational_units = session.exec(
select(OperationalUnit).filter(
cast(SAColumn, OperationalUnit.code).in_(operational_units)
)
).all()

if not force:
table = Table("Status", "ID", "Name", "Operational units")
table.add_row(
"old",
str(old_group.id),
old_group.name,
",".join(old_operational_units),
style="red",
)
table.add_row(
"new",
str(db_group.id),
db_group.name,
",".join(ou.code for ou in db_group.operational_units),
style="green",
)
console.print(table)
typer.confirm("Update above group with selected operational units?", abort=True)

session.add(db_group)
session.commit()

print(f"[bold green]Group {db_group.name} updated.[/bold green]")


@app.command()
def delete_group(ctx: typer.Context, name: str, force: bool = False):
"""Delete an API group."""
session: SMSession = ctx.obj
print(f"{session.__dict__=}")

# Check group exists in database
db_group = session.exec(select(Group).where(Group.name == name)).one_or_none()
Expand Down Expand Up @@ -160,7 +209,8 @@ def create_user( # noqa: PLR0913
first_name: Optional[str] = None,
last_name: Optional[str] = None,
password: Optional[str] = None,
scopes: Optional[str] = None,
scopes: Optional[list[str]] = None,
groups: Optional[list[str]] = None,
is_active: bool = True,
is_superuser: bool = False,
is_staff: bool = False,
Expand Down Expand Up @@ -188,16 +238,19 @@ def create_user( # noqa: PLR0913
"Select user scope(s)", choices=list(ScopesEnum)
).ask()
else:
user_scopes = scopes.split(",")
groups: Sequence[Group] = []
if not force:
group_names = session.exec(select(Group.name)).all()
selected_group_names = questionary.checkbox(
"Select user group(s)", choices=group_names
).ask()
groups = session.exec(
select(Group).filter(cast(SAColumn, Group.name).in_(selected_group_names))
).all()
user_scopes = scopes
selected_group_names = []
if not groups:
if not force:
group_names = session.exec(select(Group.name)).all()
selected_group_names = questionary.checkbox(
"Select user group(s)", choices=group_names
).ask()
else:
selected_group_names = groups
user_groups = session.exec(
select(Group).filter(cast(SAColumn, Group.name).in_(selected_group_names))
).all()

user = UserCreate(
username=username,
Expand All @@ -209,16 +262,14 @@ def create_user( # noqa: PLR0913
is_superuser=is_superuser,
is_active=is_active,
scopes=user_scopes,
groups=[group.name for group in user_groups],
)

if not force:
print(user)
create = questionary.confirm("Create above user?").ask()
if not create:
print("[bold red]Abort![/bold red]")
raise typer.Exit()
typer.confirm("Create above user?", abort=True)

session.add(User(**user.model_dump(exclude={"groups"}), groups=groups))
session.add(User(**user.model_dump(exclude={"groups"}), groups=user_groups))
session.commit()

print(f"[bold green]User {username} created.[/bold green]")
Expand All @@ -229,6 +280,12 @@ def update_user(ctx: typer.Context, username: str, force: bool = False):
"""Update an API user."""
session: SMSession = ctx.obj

# Check user exists in database
db_user = session.exec(select(User).where(User.username == username)).one_or_none()
if db_user is None:
print(f"[bold red]User {username} does not exist![/bold red]")
raise typer.Exit()


@app.command()
def delete_user(ctx: typer.Context, username: str, force: bool = False):
Expand Down
Loading

0 comments on commit 8fb7c5a

Please sign in to comment.