-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathvirtualbox_snapshotter.py
385 lines (309 loc) · 15.9 KB
/
virtualbox_snapshotter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
"""
MIT License.
Copyright (c) 2020 Mattia Baldari
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from datetime import datetime
import argparse
import logging
import sys
import virtualbox
def delete_oldest_snapshots(virtual_machine: virtualbox.lib.IMachine,
session: virtualbox.lib.ISession,
snapshot_details: list,
number_to_retain: int) -> None:
"""
Attempts to delete oldest snapshots from specified machine.
Exits prematurely when:
1. Virtual Machine does not have any snapshots
2. Amount of snapshots to retain resulted in 0 or below
:param virtualbox.lib.IMachine virtual_machine: virtual machine class
:param virtualbox.lib.ISession session: virtual machine session
:param list snapshot_details: list of available snapshots and their details
:param int number_to_retain: number of snapshots to retain
:return: None
"""
if not snapshot_details:
logger.info("Snapshot deletion skipped. Reason: No snapshots found")
return
logger.info("Overall list of snapshot:")
for snapshot in snapshot_details:
logger.info("Snapshot ID: %s Name: %s", snapshot[0], snapshot[1])
if args.ignore is not None:
# An ignore file is specified
# Parsing snapshot ignore file
uuids_to_retain = parse_snapshot_ignore_file(args.ignore)
# Removing snapshot records if there are any matching records between
# read from ignore file and available snapshots
# TODO: This may potentially be slow when being run with thousands of records
snapshot_details = [snapshot for snapshot in snapshot_details if snapshot[0] not in uuids_to_retain]
if number_to_retain >= len(snapshot_details):
logger.info("Snapshot deletion skipped. Reason: "
"Number of snapshots to be retained is equal or bigger then number of available snapshots")
return
# Removing number of snapshots from the list of snapshots to be deleted
snapshot_details = snapshot_details[:len(snapshot_details) - number_to_retain]
logger.info("List of snapshots to be deleted:")
for snapshot in snapshot_details:
logger.info("Snapshot ID: %s, Name: %s", snapshot[0], snapshot[1])
try:
# Locking VM
virtual_machine.lock_machine(session, virtualbox.library.LockType(1))
for snapshot in snapshot_details:
# Deleting snapshot by using Snapshot ID
process = session.machine.delete_snapshot(snapshot[0])
logger.info("Deleting snapshot: '%s'...", snapshot[1])
process.wait_for_completion(timeout=-1)
logger.info("Deleted snapshot: '%s'", snapshot[1])
except virtualbox.lib.VBoxError as ex:
# Exiting application on exception as it may be due to user error (i.e. typo in machine name)
# Snapshot deletion must not happen in such a case.
logger.error("Application is going to terminate. Reason: "
"Snapshot deletion aborted prematurely due to VBoxError: 0x%x (%s)",
ex.value, ex.msg)
sys.exit(ex.value)
def create_snapshot(virtual_machine: virtualbox.lib.IMachine,
session: virtualbox.lib.ISession) -> bool:
"""
Attempts to create a snapshot for a specified machine.
:param virtualbox.lib.IMachine virtual_machine: virtual machine class
:param virtualbox.lib.ISession session: virtual machine session
:return: status of a machine if it was in any state but "Powered Off" initially
:rtype: bool
"""
# Assuming that machine is initially in any state but not in "Powered Off"
vm_running_initially = True
if virtual_machine.state == virtualbox.library.MachineState(1):
# Check if machine is powered off (MachineState(1) = PowerOff)
vm_running_initially = False
if session.state == virtualbox.library.SessionState(2):
# Check if session is locked (SessionState(2) = Locked)
session.unlock_machine()
# Locks virtual machine from writes
proc = virtual_machine.launch_vm_process(session, "headless", [])
proc.wait_for_completion(timeout=-1)
# Creating snapshot name and description
snap_name = f"{args.name} {datetime.now().strftime('%d-%m-%Y')}"
description = f"{args.description} {datetime.now().strftime('%d-%m-%Y')} via virtualbox-snapshotter"
if vm_running_initially:
# Check if initial state of a machine was anything but "Powered Off"
if virtual_machine.session_state == virtualbox.library.SessionState(2):
# Check if VM session is locked (SessionState(2) = Locked)
if session.state == virtualbox.library.SessionState(2):
# Check if current session is locked (SessionState(2) = Locked)
session.unlock_machine()
# Locking machine to allow making changes
shared_lock_type = virtualbox.library.LockType(1)
virtual_machine.lock_machine(session, shared_lock_type)
logger.info("Creating snapshot: '%s'...", snap_name)
# Taking snapshot
process, _ = session.machine.take_snapshot(snap_name, description, False)
process.wait_for_completion(timeout=-1)
logger.info("Created snapshot: '%s'", snap_name)
if vm_running_initially:
# Check if initial state of a machine was anything but "Powered Off"
if session.state == virtualbox.library.SessionState(2):
# Check if session is locked
session.unlock_machine()
return vm_running_initially
def parse_snapshot_ignore_file(filename: str) -> list:
"""
Reads a list of VirtualBox snapshot UUIDs from a file.
On OSError, returns an empty list.
:param str filename: filename to read a list of VirtualBox snapshot UUIDs from
:return: read list of VirtualBox snapshot UUIDs
:rtype: list
"""
uuids = []
try:
with open(file=filename, mode="rt", encoding="utf8") as file_stream:
for dirty_line in file_stream:
# String processing required as line may contain:
# 1. Comments (those, starting with #)
# 2. Whitespaces
# Removing comments (starting with #) from read line
comment_start_position = dirty_line.find("#")
no_comment_line = dirty_line[:comment_start_position]
# Removing whitespaces from processed line
clean_line = no_comment_line.strip()
uuids.append(clean_line)
except OSError as ex:
logger.error("Application is going to terminate. Reason: "
"Reading snapshot UUID ignore file failed due to OSError: %x (%s)",
ex.errno, ex.strerror)
# Exiting application on exception as it may be due to user error (i.e. typo in ignore filename)
# Snapshot deletion must not happen in such a case.
sys.exit(ex.strerror)
return uuids
def load_virtual_machine(machine_name: str) -> virtualbox.lib.IMachine:
"""
Getting a virtual machine as a class. On error, exits application.
:param str machine_name: name of a machine to acquire
:rtype: virtualbox.lib.IMachine
:return: virtual machine class
"""
try:
# Attach machine
vbox = virtualbox.VirtualBox()
virtual_machine = vbox.find_machine(machine_name)
except virtualbox.lib.VBoxError as ex:
# Cannot find a registered machine named `machine_name`
logger.error("Application is going to terminate. Reason: "
"Cannot find virtual machine '%s' due to VBoxError: 0x%x (%s)",
machine_name, ex.value, ex.msg)
sys.exit(ex.value)
return virtual_machine
def load_snapshot_details(virtual_machine: virtualbox.lib.IMachine) -> list:
"""
Loads snapshot details from a virtual machine.
:param virtualbox.lib.IMachine virtual_machine: virtual machine no find snapshots for
:rtype: list
:return: If snapshot(s) exist - found snapshot list, empty list otherwise
"""
# Snapshot ids[0], names[1], descriptions[2] are sorted from oldest (index 0) to newest
snapshot_details = []
try:
# Getting root snapshot and adding it to a list
snapshot = virtual_machine.find_snapshot("")
except virtualbox.lib.VBoxError as ex:
# Machine does not have any snapshots or no snapshots found
logger.warning("No snapshots found. Reason: %s", ex.msg)
return []
snapshot_details.append([snapshot.id_p, snapshot.name, snapshot.description])
# Traversing through children snapshots (until one has no children) and adding them to a list
while snapshot.children_count != 0:
# TODO: Implement multi children scan
# This check skips snapshot marked as "Current State"
snapshot = snapshot.children[0]
snapshot_details.append([snapshot.id_p, snapshot.name, snapshot.description])
return snapshot_details
def list_snapshots(machine_name: str, snapshot_details: list) -> None:
"""
Prints all available (if any) snapshot details.
:return: None
"""
if not snapshot_details:
print(f"No snapshots found for '{machine_name}'")
else:
# Avoiding to use logger here to not clutter output which may be of some use for user
print(f"Available snapshots for '{machine_name}': ")
for snapshot in snapshot_details:
print(f"Name: '{snapshot[1]}' UUID: {snapshot[0]}")
print(f"\tDescription: {snapshot[2]}")
def main():
"""
Main code of virtualbox_snapshotter.
1. Tries to delete old snapshots
2. Tries to create a new snapshot
"""
logger.info("Starting autosnapshotter script ...")
virtual_machine = load_virtual_machine(args.machine_name)
snapshot_details = load_snapshot_details(virtual_machine)
# All manipulations with virtual machine will happen under same session
session = virtualbox.Session()
if args.list:
list_snapshots(args.machine_name, snapshot_details)
# Always exit after running with `-l`/`--list` argument
sys.exit(0)
delete_oldest_snapshots(virtual_machine, session, snapshot_details, args.retain)
vm_status = create_snapshot(virtual_machine, session)
try:
if not vm_status:
session.console.power_down()
except virtualbox.lib.VBoxError as ex:
# Virtual machine must be Running, Paused or Stuck to be powered down.
logger.error("Cannot power down virtual machine '%s' due to VBoxError: 0x%x (%s). "
"Application execution will terminate", args.machine_name, ex.value, ex.msg)
sys.exit(ex.value)
sys.exit(0)
if __name__ == "__main__":
# Setting up a global logger
logger = logging.getLogger(__name__)
# Setting up default logging string format
logging.basicConfig(format="%(filename)s:%(levelname)s:%(asctime)s:%(funcName)s: %(message)s",
datefmt="%d/%m/%Y %H:%M:%S")
# Default log level is WARNING
logger.setLevel(logging.WARNING)
# Adding argparse to application
parser = argparse.ArgumentParser(prog="VirtualBox Snapshotter",
description="Takes new snapshots and deletes old ones "
"for specified Virtual Machine.",
epilog="Currently, multi children are not supported. "
"Nested children are supported.")
# Adding arguments to argparse
parser.add_argument("machine_name",
action="store",
help="(Required) Virtual Machine (VM) name enclosed in double quotes (\"). "
"Not using double quotes may lead to abnormal behaviour if name contains whitespaces.",
metavar="\"VIRTUAL_MACHINE_NAME\"",
type=str)
parser.add_argument("-r", "--retain",
action="store",
choices=range(0, 1000),
default=3,
help="Number of latest snapshots to retain. "
"When used with '-i'/'--ignore', counts only those snapshots, that are NOT ignored. "
"I.e. 2 snapshot are ignored via '-i'/'--ignore', 3 snapshots specified for "
"'-r'/'--retain', resulting number of preserved snapshots will be 5. "
"If 0 is provided - deletes all snapshots leaving just the latest one. "
"If argument is not provided, defaults to 3.",
metavar="(0-1000)",
type=int,
required=False)
parser.add_argument("-v", "--verbose",
action="store_true",
help="Adds verbosity",
required=False)
parser.add_argument("-n", "--name",
action="store",
help="Custom name for a snapshot. "
"If argument is not provided, defaults to 'Regular Snapshot CURRENT_DATE'",
metavar="\"CUSTOM_SNAPSHOT_NAME\"",
type=str,
default="Regular Snapshot",
required=False)
parser.add_argument("-d", "--description",
action="store",
help="Custom description for a snapshot. "
"If argument is not provided, defaults to "
"'Regular Snapshot taken on CURRENT_DATE via virtualbox-snapshotter'",
metavar="\"CUSTOM_SNAPSHOT_DESCRIPTION\"",
type=str,
default="Regular Snapshot taken on",
required=False)
parser.add_argument("-i", "--ignore",
action="store",
help="Path to a file, containing snapshot IDs to be ignored from deletion. "
"Snapshot UUIDs specified within a file will never be deleted.",
metavar="\"SNAPSHOT_IGNORE_FILENAME\"",
type=str,
required=False
)
parser.add_argument("-l", "--list",
action="store_true",
help="Lists all snapshots and their details (name, UUID, date) for selected virtual machine. "
"When '-l'/'--list' is specified, no actions (i.e. snapshot delete, machine lock etc) "
"are performed on a virtual machine apart from reading virtual machine's details. "
"When '-l'/'--list' argument is used, any other optional argument has no effect",
required=False
)
# Parsing arguments
args = parser.parse_args()
if args.verbose:
# When verbosity flag is set, log level is changed to INFO
logger.setLevel(logging.INFO)
main()