Skip to content

Commit

Permalink
Merge pull request #38 from Tql-ws1/tql-ws1/support-batch-delete-snap…
Browse files Browse the repository at this point in the history
…shots

feat: support batch delete snapshots
  • Loading branch information
hirak99 authored Dec 28, 2024
2 parents 16776cd + edad4be commit 0b54fd4
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 15 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,26 @@ E.g.
Or,
`yabsnap delete 20221006143047 # Deletes all snapshots with this timestamp.`

### `yabsnap batch-delete --start START_TIMESTAMP --end END_TIMESTAMP [--indicator S|I|U]`
Batch delete snapshots.

Assume there is a snapshot as shown in the above `yabsnap list` command:

\# Delete all snapshots including and after 2022-10-06_14:30.
\
\# Only root.conf's 20221006122312 will not be deleted.
\
E.g. `yabsnap batch-delete --start 20221006143047`
\
Or,
`yabsnap batch-delete --start 2022-10-06_14:30`

\# Delete snapshots before 2022-10-06_16:46 (excluding that snapshot) where the indicator is U.
\
\# Only root.conf's 20221006122312 will be deleted.
\
E.g. `yabsnap batch-delete --end 2022-10-06_16:46 --indicator U`

### `yabsnap rollback-gen PATH|TIMESTAMP`
Generates a script for rolling back.

Expand Down
10 changes: 8 additions & 2 deletions src/code/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ def is_compatible_volume(self) -> bool:

def iterate_configs(source: Optional[str]) -> Iterator[Config]:
config_iterator: Iterable[str]
# Try to add the user-specified configuration file to the `config_iterator`
# or try to add the configuration file from the `_CONFIG_PATH` to the `config_iterator`.
if USER_CONFIG_FILE is not None:
if not os.path.isfile(USER_CONFIG_FILE):
logging.warn(f"Could not find specified config file: {USER_CONFIG_FILE}")
logging.warning(f"Could not find specified config file: {USER_CONFIG_FILE}")
return
config_iterator = [USER_CONFIG_FILE]
logging.info(f"Using user-supplied config {USER_CONFIG_FILE}")
Expand All @@ -144,11 +146,14 @@ def iterate_configs(source: Optional[str]) -> Iterator[Config]:
)
return
config_iterator = (str(path) for path in _CONFIG_PATH.iterdir())

# Check whether the necessary fields in the configuration file are filled in
# and append the configurations with filled necessary fields to `config_iterator`
configs_found = False
for fname in config_iterator:
logging.info(f"Reading config {fname}")
config = Config.from_configfile(fname)
if not config.source or not config.dest_prefix:
if not (config.source and config.dest_prefix):
os_utils.eprint(
f"WARNING: Skipping invalid configuration {fname}"
" (please specify source and dest_prefix)"
Expand All @@ -157,6 +162,7 @@ def iterate_configs(source: Optional[str]) -> Iterator[Config]:
if not source or config.source == source:
configs_found = True
yield config

if source is not None and not configs_found:
logging.warning(f"No config file found with source={source}")

Expand Down
95 changes: 82 additions & 13 deletions src/code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import collections
import datetime
import logging
import pathlib
from typing import Iterable

from . import colored_logs
Expand Down Expand Up @@ -51,17 +52,43 @@ def _parse_args() -> argparse.Namespace:
subparsers.add_parser(
"list-json", help="Machine readable list of all managed snaps."
)

# Creates an user snapshot.
create = subparsers.add_parser("create", help="Create new snapshots.")
create.add_argument("--comment", help="Comment attached to this snapshot.")

# Creates a new config by NAME.
create_config = subparsers.add_parser(
"create-config", help="Bootstrap a config for new filesystem to snapshot."
)
create_config.add_argument(
"config_name", help='Name to be given to config file, e.g. "home".'
)

# Delete a snapshot.
delete = subparsers.add_parser(
"delete", help="Delete a snapshot created by yabsnap."
)

# Batch delete snapshots.
batch_delete = subparsers.add_parser(
"batch-delete", help="Batch delete snapshots created by yabsnap."
)
batch_delete.add_argument(
"--indicator",
type=str,
choices=("S", "I", "U"),
default="",
help="Filter out snapshots that have a specific indicator identifier.",
)
batch_delete.add_argument(
"--start", type=str, default="", help="Where to start deleting snapshots."
)
batch_delete.add_argument(
"--end", type=str, default="", help="Where to stop deleting snapshots."
)

# Generates a script for rolling back.
rollback = subparsers.add_parser(
"rollback-gen", help="Generate script to rollback one or more snaps."
)
Expand Down Expand Up @@ -107,7 +134,43 @@ def _delete_snap(configs_iter: Iterable[configs.Config], path_suffix: str, sync:
os_utils.eprint(f"Target {path_suffix} not found in any config.")


def _config_operation(command: str, source: str, comment: str, sync: bool):
def _batch_delete_snaps(
configs_iter: Iterable[configs.Config],
scope: tuple[str, str],
indicator: str,
sync: bool,
):
configs_list = list(configs_iter)
targets = snap_operator.find_multi_targets(configs_list, scope, indicator)
if not targets:
os_utils.eprint("No snapshots matching the criteria were found.")
return

# TODO(thR CIrcU5): Sort the snapshot list by old date to new date?

to_sync: list[configs.Config] = []

confirm_deletion = snap_operator.confirm_deletion_snapshots(targets)
if confirm_deletion is True:
# Delete snapshots one by one based on the configuration file,
# and try to find the configuration file to add it to the `to_sync` list.
for config_name, datetime_and_snapshots in targets.items():
for snap in datetime_and_snapshots.values():
snap.delete()
# Try to find the configuration file and add it to the `to_sync` list.
for config in configs_list:
config_exist = pathlib.Path(config.config_file).stem == config_name
snap_type_is_btrfs = config.snap_type == snap_mechanisms.SnapType.BTRFS
if config_exist and snap_type_is_btrfs:
to_sync.append(config)

if sync:
_sync(to_sync)


def _config_operation(
command: str, source: str | None, comment: str | None, sync: bool
):
# Single timestamp for all operations.
now = datetime.datetime.now()

Expand Down Expand Up @@ -153,19 +216,18 @@ def main():

colored_logs.setup_logging(level=logging.INFO if args.verbose else logging.WARNING)

if configs.is_schedule_enabled():
if not os_utils.timer_enabled():
os_utils.eprint(
"\n".join(
[
"",
"*** NOTE - Backup schedule exists but yabsnap.timer is not active ***",
"To enable scheduled backups, please run -",
" sudo systemctl enable --now yabsnap.timer",
"",
]
)
if configs.is_schedule_enabled() and not os_utils.timer_enabled():
os_utils.eprint(
"\n".join(
[
"",
"*** NOTE - Backup schedule exists but yabsnap.timer is not active ***",
"To enable scheduled backups, please run -",
" sudo systemctl enable --now yabsnap.timer",
"",
]
)
)

if command == "create-config":
configs.create_config(args.config_name, args.source)
Expand All @@ -175,6 +237,13 @@ def main():
path_suffix=args.target_suffix,
sync=args.sync,
)
elif command == "batch-delete":
_batch_delete_snaps(
configs.iterate_configs(source=args.source),
scope=(args.start, args.end),
indicator=args.indicator,
sync=args.sync,
)
elif command == "rollback-gen":
rollbacker.rollback(
configs.iterate_configs(source=args.source), args.target_suffix
Expand Down
166 changes: 166 additions & 0 deletions src/code/snap_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import logging
import os
import re

from . import configs
from . import deletion_logic
Expand All @@ -25,6 +26,7 @@
from . import snap_holder

from typing import Any, Iterable, Iterator, Optional, TypeVar
from pathlib import Path


def _get_old_backups(config: configs.Config) -> Iterator[snap_holder.Snapshot]:
Expand Down Expand Up @@ -54,6 +56,170 @@ def find_target(config: configs.Config, suffix: str) -> Optional[snap_holder.Sna
return None


def find_multi_targets(
configs_iter: Iterable[configs.Config], scope: tuple[str, str], indicator: str = ""
) -> Optional[dict[str, dict[datetime.datetime, snap_holder.Snapshot]]]:
"""Filter out snapshots that meet the criteria."""
time_scope = tuple(map(_convert_to_timestamp, scope))
targets = _snapshots_of_configs(configs_iter)

if indicator:
targets = _indicator_filter(targets, indicator)

# The pattern matching has a lot of duplicate code, with only the filtering conditions being different!
match time_scope:
# Delete snapshots within a certain time range (inclusive of the left endpoint and exclusive of the right endpoint).
case (start, end) if start and end:
start_datetime = datetime.datetime.strptime(start, global_flags.TIME_FORMAT)
end_datetime = datetime.datetime.strptime(end, global_flags.TIME_FORMAT)
return {
config_name: {
datetime_: snapshot
for datetime_, snapshot in datetime_and_snapshots.items()
if start_datetime <= datetime_ < end_datetime
}
for config_name, datetime_and_snapshots in targets.items()
}
# Delete all snapshots after a certain point in time (including the specified point in time).
case (start, ""):
start_datetime = datetime.datetime.strptime(start, global_flags.TIME_FORMAT)
return {
config_name: {
datetime_: snapshot
for datetime_, snapshot in datetime_and_snapshots.items()
if datetime_ >= start_datetime
}
for config_name, datetime_and_snapshots in targets.items()
}
# Delete all snapshots before a certain point in time.
case ("", end):
end_datetime = datetime.datetime.strptime(end, global_flags.TIME_FORMAT)
return {
config_name: {
datetime_: snapshot
for datetime_, snapshot in datetime_and_snapshots.items()
if datetime_ < end_datetime
}
for config_name, datetime_and_snapshots in targets.items()
}
# No deletion scope provided.
case _:
os_utils.eprint("Use --start and --end to specify the deletion scope.")
return None


def confirm_deletion_snapshots(
snapshots_of_configs: dict[str, dict[datetime.datetime, snap_holder.Snapshot]],
) -> bool:
"""List the snapshots to be deleted and ask if they really want to delete them."""
now = datetime.datetime.now()

for config_name, datetime_and_snapshots in snapshots_of_configs.items():
print(f"Config: {config_name}.conf")
print("Snaps:")

for datetime_, snapshot in datetime_and_snapshots.items():
columns = []
snap_timestamp = datetime_.strftime(global_flags.TIME_FORMAT)
columns.append(f" {snap_timestamp}")

trigger_str = "".join(
c if snapshot.metadata.trigger == c else " " for c in "SIU"
)
columns.append(trigger_str)

elapsed = (now - snapshot.snaptime).total_seconds()
elapsed_str = f"({human_interval.humanize(elapsed)} ago)"
columns.append(f"{elapsed_str:<20}")
columns.append(snapshot.metadata.comment)

print(" ".join(columns))

print()

confirm = input("Are you sure you want to delete the above snapshots? [y/N]")
match confirm:
case "y" | "Y" | "yes" | "Yes" | "YES":
return True
case _:
return False


def _indicator_filter(
snapshots_of_configs: dict[str, dict[datetime.datetime, snap_holder.Snapshot]],
indicator: str,
) -> dict[str, dict[datetime.datetime, snap_holder.Snapshot]]:
return {
config_name: {
datetime_: snapshot
for datetime_, snapshot in datetime_and_snapshots.items()
if snapshot.metadata.trigger == indicator.upper()
}
for config_name, datetime_and_snapshots in snapshots_of_configs.items()
}


def _snapshots_of_configs(
configs_iter: Iterable[configs.Config],
) -> dict[str, dict[datetime.datetime, snap_holder.Snapshot]]:
"""Find the snapshots held by each configuration file."""
mapping: dict[str, dict[datetime.datetime, snap_holder.Snapshot]] = {}

# Create a mapping between configuration files and snapshots.
for config in configs_iter:
config_name = Path(config.config_file).stem
mapping[config_name] = {}
# Create a mapping between snapshot timestamps and snapshots
# to facilitate later filtering and sorting of the snapshots.
for snapshot in _get_old_backups(config):
mapping[config_name][snapshot.snaptime] = snapshot

return mapping


def _convert_to_timestamp(suffix: str) -> str:
"""Convert human-readable time format into a timestamp."""
if not _human_friendly_timestamp(suffix):
return suffix

try:
# "2024-11-19_15:30".split("_") -> ["2024-11-19", "15:30"]
date, time = suffix.split("_")
except ValueError: # unpack error
date = suffix
# If the user does not provide the hour and minute, it defaults to 23:59
hour_minute = "2359"
else:
# "15:30".split(":") -> ["15", "30"] -> "1530"
# "1530".split(":") -> ["1530"] -> "1530"
hour_minute = "".join(time.split(":"))
suffix_not_minutes = len(hour_minute) == 2
if suffix_not_minutes:
hour_minute += "00"

days_not_padded_zeros = r"^\d{4}-\d{2}-\d$"
pad_days_with_zeros = r"^\d{4}-\d{2}-0\d$"
year_month_day = re.sub(days_not_padded_zeros, pad_days_with_zeros, date)
return year_month_day + hour_minute


def _human_friendly_timestamp(suffix: str) -> bool:
"""Determine whether the suffix provided by the user is a human-readable timestamp."""

# Match the following format:
# - 2024-11-01_14:20
# - 2024-11-1_14:20
# - 2024-11-01
# - 2024-11-1
format_match = re.match(
r"^\d{4}-(0[1-9]|1[0-2]|[1-9])-(0[1-9]|[12]\d|3[01]|[1-9])(_\d{2}:\d{2})?$",
suffix,
)
if format_match:
return True
return False


_GenericT = TypeVar("_GenericT")


Expand Down

0 comments on commit 0b54fd4

Please sign in to comment.