From fe6c612675ae54dd3a8508dc0d49c841461b51c0 Mon Sep 17 00:00:00 2001 From: Walter Doekes Date: Wed, 21 Sep 2016 17:17:22 +0200 Subject: [PATCH] Refactor. Split up clone into copy and friends. --- README.rst | 4 +- proxmove | 359 +++++++++++++++++++++++++++-------------------------- 2 files changed, 186 insertions(+), 177 deletions(-) diff --git a/README.rst b/README.rst index f5194f8..d60b4f1 100644 --- a/README.rst +++ b/README.rst @@ -52,7 +52,7 @@ When configured, you can do something like this: .. code-block:: console $ proxmove banana-cluster the-new-cluster node2 node2-ssd the-vm-to-move - 12:12:27: Requested move banana-cluster => the-new-cluster<6669ad2c> (node 'node2'): the-vm-to-move + 12:12:27: Attempt moving banana-cluster => the-new-cluster<6669ad2c> (node 'node2'): the-vm-to-move 12:12:27: - source VM the-vm-to-move@node1 12:12:27: - storage 'ide2': None,media=cdrom (blobsize=None) 12:12:27: - storage 'virtio0': sharedsan:565/vm-565-disk-1.qcow2,format=qcow2,iops_rd=4000,iops_wr=500,size=50G (blobsize=53705113600) @@ -74,7 +74,7 @@ When configured, you can do something like this: 12:24:25: Removing temp '/node2-ssd/temp/temp-proxmove/vm-126-virtio0' (on local-ssd) 12:24:26: Starting VM the-vm-to-move@node2 12:24:27: - started VM the-vm-to-move@node2 - 12:24:27: Completed move banana-cluster => the-new-cluster<6669ad2c> (node 'node2'): the-vm-to-move + 12:24:27: Completed moving banana-cluster => the-new-cluster<6669ad2c> (node 'node2'): the-vm-to-move Before, ``the-vm-to-move`` was running on ``banana-cluster`` on ``node1``. diff --git a/proxmove b/proxmove index 6aec4d9..5439a9a 100755 --- a/proxmove +++ b/proxmove @@ -456,6 +456,31 @@ class ProxmoxStorage(object): 'temp dir {!r} of storage {!r} does not exist; ' 'please create it!'.format(self.temp, self.name)) + def copy(self, data_size, disk_size, src_location, src_format, + dst_storage, dst_id, dst_name): + dst_temp = self.copy_to_temp(src_location, dst_storage, dst_name) + + if dst_temp: + log.info('Temp data {!r} on {}'.format(dst_temp, dst_storage)) + new_path, new_format = dst_storage.copy_from_temp( + disk_size, dst_temp, src_format, dst_id, dst_name) + else: + assert not src_format, src_format + new_path, new_format = self.copy_direct( + data_size, src_location, dst_storage, dst_name) + + return new_path, new_format + + def copy_to_temp(self, src_location, dst_storage, dst_name): + raise NotImplementedError('subclasses need to implemented this') + + def copy_from_temp(self, disk_size, src_temp, src_format, dst_id, + dst_name): + raise NotImplementedError('subclasses need to implemented this') + + def copy_direct(self, data_size, src_location, dst_storage, dst_name): + raise NotImplementedError('subclasses need to implemented this') + def run_command(self, command, hide_stderr=False, tty=False): kwargs = {} if tty: @@ -499,11 +524,8 @@ class ProxmoxStorage(object): ['ssh', '-A', # '-o', 'StrictHostKeyChecking=no', self.ssh] + extra + command, hide_stderr=hide_stderr, tty=tty) - def get_bandwidth_limit(self): - return self.bwlimit - def set_bandwidth_limit(self, bwlimit): - self.bwlimit = bwlimit + self.bwlimit_mbps = bwlimit def get_volume(self, location, properties): return ProxmoxVolume(location, properties, storage=self) @@ -578,12 +600,55 @@ class ProxmoxStoragePlain(ProxmoxStorage): 'storage {!r} lacks ssh(1) or scp(1)'.format( self.name), file=sys.stderr) + def copy_to_temp(self, src_location, dst_storage, dst_name): + src_path = os.path.join(self.path, src_location) + dst_temp = os.path.join(dst_storage.temp, 'temp-proxmove', dst_name) + scp_dest = '{}:{}'.format(dst_storage.ssh, dst_temp) + log.info('scp(1) copy from {!r} (on {}) to {!r}'.format( + src_path, self, scp_dest)) + + # mkdir + dst_storage.ssh_command(['mkdir', '-p', os.path.dirname(dst_temp)]) + # scp, using ssh+scp instead of local-scp, so we can add our + # beloved options. + scp_command = ['scp', '-o', 'StrictHostKeyChecking=no'] + # Add bandwidth limits in kbit/s. + if self.bwlimit_mbps: + scp_command.extend(['-l', str(self.bwlimit_mbps * 1024)]) + # Source/destination. + scp_command.extend([src_path, scp_dest]) + # Exec. + self.ssh_command(scp_command, tty=True) + + return dst_temp + + def copy_from_temp(self, disk_size, src_temp, src_format, dst_id, + dst_name): + if src_format != 'qcow2': + raise NotImplementedError( + 'format conversion from {!r} not implemented'.format( + src_format)) + dst_format = 'qcow2' + + rel_path = os.path.join( + str(dst_id), '{}.{}'.format(dst_name, dst_format)) + dst_path = os.path.join(self.path, rel_path) + + log.info('Moving data from {!r} to {!r}'.format(src_temp, dst_path)) + self.ssh_command(['mkdir', '-p', os.path.dirname(dst_path)]) + self.ssh_command(['mv', src_temp, dst_path]) + + # In case old_format != new_format, we would need to update + # properties. So the following must be true for now. + assert src_format == dst_format, (src_format, dst_format) + return rel_path, dst_format + def set_path(self, value): if not value.startswith('/'): raise ValueError( 'path should start with / in {!r} storage section: {}'.format( self.name, value)) - return super().set_path(value[4:]) + return super().set_path(value) class ProxmoxStorageZfs(ProxmoxStorage): @@ -617,6 +682,91 @@ class ProxmoxStorageZfs(ProxmoxStorage): # see the transfer go over 100%. return int(human_size_scan(human_size) * 1.02) + def copy_to_temp(self, src_location, dst_storage, dst_name): + if isinstance(dst_storage, ProxmoxStorageZfs): + # ZFS->ZFS copying requires no temp files. + return None + else: + # TODO: Simply cat /dev/zvol/.../... to the destfile? + raise NotImplementedError( + 'ZFS->other copying is not implemented yet') + + def copy_from_temp(self, disk_size, src_temp, src_format, dst_id, + dst_name): + dst_zfs = '{}/{}'.format(self.path, dst_name) + self.ssh_command(['zfs', 'create', '-V', disk_size, dst_zfs]) + + dst_path = os.path.join('/dev/zvol', self.path, dst_name) + log.info('Writing data from temp {!r} to {!r} (on {})'.format( + src_temp, dst_path, self)) + + if src_format is None: + src_format = 'raw' + if src_format not in ('qcow2', 'raw'): + raise NotImplementedError( + 'format conversion from {!r} not implemented'.format( + src_format)) + + self.ssh_command( + # -n = no create volume + # -p = progress + ['qemu-img', 'convert', '-n', '-p', '-f', src_format, + '-O', 'raw', src_temp, dst_path], tty=True) + log.info('Removing temp {!r} (on {})'.format(dst_path, self)) + self.ssh_command(['rm', src_temp]) + + # Return name and format (dst_name is the filesystem name on the + # ZFS pool known to belong to dst_storage). + return dst_name, None + + def copy_direct(self, data_size, src_location, dst_storage, dst_name): + src_zfs = '{}/{}@proxmove-{}'.format( + self.path, src_location, datetime.now().strftime('%y%m%d-%H%M%S')) + dst_zfs = '{}/{}'.format(dst_storage.path, dst_name) + log.info('zfs(1) send/recv {} data from {!r} to {!r} (on {})'.format( + human_size_fmt(data_size), src_zfs, dst_zfs, dst_storage)) + + self.ssh_command(['zfs', 'snapshot', src_zfs]) + + # mbuffer takes k-, M- or G-bytes + mbuffer_write_limit = '' + if self.bwlimit_mbps: + mbuffer_write_limit = '-R {}k'.format(self.bwlimit_mbps * 128) + + # pv shows a nice progress bar. It's optional. + optional_pv_pipe = '' + if dst_storage.has_commands(['pv']) and data_size: + log.debug( + "Using pv(1) for progress bar because it's available") + optional_pv_pipe = ( + # --fineta does not exist on all versions.. + # --force is required to make it display anything.. + 'pv --force --eta --progress -s {} | '.format(data_size)) + + # Older mbuffer(1) [v20140310-3] on the receiving end + # may say: "mbuffer: warning: No controlling terminal + # and no autoloader command specified." This is fixed + # in newer versions [v20150412-3]. + self.ssh_command( + ["zfs send -R {src_zfs} | " + "mbuffer -q -s 128k -m 1G {src_bwlim} | " + "ssh -o StrictHostKeyChecking=no {dst_ssh} " + "'mbuffer {optional_quiet_mbuffer} -s 128k -m 1G | " + " {dst_pv}" + " zfs recv {dst_zfs}'".format( + src_zfs=src_zfs, + src_bwlim=mbuffer_write_limit, + dst_ssh=dst_storage.ssh, + optional_quiet_mbuffer=( + '-q' if optional_pv_pipe else ''), + dst_pv=optional_pv_pipe, + dst_zfs=dst_zfs)], + tty=True) + + # Return name and format (dst_name is the filesystem name on the + # ZFS pool known to belong to dst_storage). + return dst_name, None + def set_path(self, value): assert value.startswith('zfs:'), value pool_name = value[4:] @@ -806,7 +956,7 @@ class ProxmoxVolume(object): file extension as fallback. """ ret = self.get_property('format') - if not ret: + if not ret and isinstance(self.storage, ProxmoxStoragePlain): # hacks ret = self.location.rsplit('.', 1)[-1] return ret @@ -818,13 +968,23 @@ class ProxmoxVolume(object): return part[len(search):] return None - def get_properties_without(self, *without_list): - search_list = tuple('{}='.format(i) for i in without_list) + def get_properties(self, **updates): + """ + Get properties list (abc=def,ghi=jkl) but update properties from + the updates argument or leave them out if the value is None. + """ + search_list = tuple('{}='.format(i) for i in updates.keys()) ret = [] + parts = self.properties.split(',') for part in parts: if not part.startswith(search_list): ret.append(part) + + for key, value in updates.items(): + if value: + parts.append('{}={}'.format(key, value)) + return ','.join(ret) def get_size(self): @@ -845,172 +1005,21 @@ class ProxmoxVolume(object): """ new_key could be "virtio0" or "ide2" or ... """ + # Create new name and let the storage backend do the copying. new_name = 'vm-{}-{}'.format(new_vmid, new_key) - old_format = self.get_format() - bw_limit_mbps = self.storage.get_bandwidth_limit() - - # Copy data from old storage. - if isinstance(self.storage, ProxmoxStoragePlain): - source_path = os.path.join(self.storage.path, self.location) - temp_dest_path = os.path.join( - new_storage.temp, 'temp-proxmove', new_name) - scp_dest = '{}:{}'.format(new_storage.ssh, temp_dest_path) - log.info('SCP copy from {!r} (on {}) to {!r}'.format( - source_path, self.storage, scp_dest)) - - # mkdir - new_storage.ssh_command( - ['mkdir', '-p', os.path.dirname(temp_dest_path)]) - # scp, using ssh+scp instead of local-scp, so we can add our - # beloved options. - scp_command = ['scp', '-o', 'StrictHostKeyChecking=no'] - # Add bandwidth limits in kbit/s. - if bw_limit_mbps: - scp_command.extend(['-l', str(bw_limit_mbps * 1024)]) - # Source/destination. - scp_command.extend([source_path, scp_dest]) - # Exec. - self.storage.ssh_command(scp_command, tty=True) - elif isinstance(self.storage, ProxmoxStorageZfs): - # *from* ZFS - if isinstance(new_storage, ProxmoxStorageZfs): - # We'll use ZFS send/recv. - temp_dest_path = None - else: - # TODO: Simply cat /dev/zvol/.../... to the destfile? - raise NotImplementedError( - 'volume conversion from zfs to {} ' - 'not implemented'.format(new_storage.__class__)) - else: - raise NotImplementedError( - 'volume conversion from {} not implemented'.format( - self.storage.__class__)) - - # We now have temp data at: temp_dest_path - if temp_dest_path: - log.info('Temp data {!r} on {}'.format( - temp_dest_path, new_storage)) - - # Create room for new storage. - if isinstance(new_storage, ProxmoxStoragePlain): - if old_format != 'qcow2': - raise NotImplementedError( - 'format conversion from {!r} not implemented'.format( - old_format)) - new_format = 'qcow2' - - rel_path = os.path.join( - str(new_vmid), '{}.{}'.format(new_name, new_format)) - dest_path = os.path.join(new_storage.path, rel_path) - - log.info('Moving data from {!r} to {!r}'.format( - temp_dest_path, dest_path)) - - # mkdir - new_storage.ssh_command( - ['mkdir', '-p', os.path.dirname(dest_path)]) - # mv - new_storage.ssh_command( - ['mv', temp_dest_path, dest_path]) - - # TODO: In case old_format != new_format, we need to update - # properties! - assert old_format == new_format - cloned_volume = ProxmoxVolume( - rel_path, self.properties, storage=new_storage) - elif isinstance(new_storage, ProxmoxStorageZfs): - # *to* ZFS - zfs_name = '{}/{}'.format(new_storage.path, new_name) - - if isinstance(self.storage, ProxmoxStorageZfs): - # *from* ZFS *to* ZFS - src_zfs_name = '{}/{}@proxmove-{}'.format( - self.storage.path, self.location, - datetime.now().strftime('%y%m%d-%H%M%S')) - log.info('Copying {} data from {!r} to {!r} (on {})'.format( - self.get_human_size(), src_zfs_name, zfs_name, - new_storage)) - - self.storage.ssh_command( - ['zfs', 'snapshot', src_zfs_name]) - - # mbuffer takes k-, M- or G-bytes - if bw_limit_mbps: - mbuffer_write_limit = '-R {}k'.format(bw_limit_mbps * 128) - else: - mbuffer_write_limit = '' - - if new_storage.has_commands(['pv']) and self.get_size(): - log.debug( - "Using pv(1) for progress bar because it's available") - optional_pv_pipe = ( - # --fineta does not exist on all versions.. - # --force is required to make it display anything.. - 'pv --force --eta --progress -s {} | '.format( - self.get_size())) - else: - optional_pv_pipe = '' - - # Older mbuffer(1) [v20140310-3] on the receiving end - # may say: "mbuffer: warning: No controlling terminal - # and no autoloader command specified." This is fixed - # in newer versions [v20150412-3]. - self.storage.ssh_command( - ["zfs send -R {src_zfs} | " - "mbuffer -q -s 128k -m 1G {src_bwlim} | " - "ssh -o StrictHostKeyChecking=no {dst_ssh} " - "'mbuffer {optional_quiet_mbuffer} -s 128k -m 1G | " - " {dst_pv}" - " zfs recv {dst_zfs}'".format( - src_zfs=src_zfs_name, - src_bwlim=mbuffer_write_limit, - dst_ssh=new_storage.ssh, - optional_quiet_mbuffer=( - '-q' if optional_pv_pipe else ''), - dst_pv=optional_pv_pipe, - dst_zfs=zfs_name)], - tty=True) - elif temp_dest_path: - # *from* plain (raw? qcow2?) *to* ZFS - new_storage.ssh_command( - ['zfs', 'create', '-V', self.get_property('size'), - zfs_name]) - dest_path = os.path.join( - '/dev/zvol', new_storage.path, new_name) - log.info('Writing data from temp {!r} to {!r} (on {})'.format( - temp_dest_path, dest_path, new_storage)) - - if old_format is None: - old_format = 'raw' - if old_format in ('qcow2', 'raw'): - new_storage.ssh_command( - # -n = no create volume - # -p = progress - ['qemu-img', 'convert', '-n', '-p', '-f', old_format, - '-O', 'raw', temp_dest_path, dest_path], tty=True) - log.info('Removing temp {!r} (on {})'.format( - temp_dest_path, new_storage)) - new_storage.ssh_command(['rm', temp_dest_path]) - else: - raise NotImplementedError( - 'format conversion from {!r} not implemented'.format( - old_format)) - else: - raise NotImplementedError( - 'volume conversion from {} not implemented'.format( - self.storage.__class__)) - - # Log message so we know that the percentage shown by PV may - # not be accurate. - log.info('All data transferred sucessfully') - - cloned_volume = ProxmoxVolume( - new_name, self.get_properties_without('format'), - storage=new_storage) - else: - raise NotImplementedError(new_storage.__class__) - - return cloned_volume + new_path, new_format = self.storage.copy( + self.get_size(), self.get_property('size'), + self.location, self.get_format(), + new_storage, new_vmid, new_name) + + # Log message so we know that any percentages shown arealy are + # irrelevant. (pv(1) gets a guesstimate which may be off by a + # few percent. We don't want the user to think the transfer + # stopped at 98%.) + log.info('Volume transferring/conversion 100% complete!') + return ProxmoxVolume( + new_path, self.get_properties(format=new_format), + storage=new_storage) def as_properties(self): """ @@ -1168,7 +1177,7 @@ class VmMover(object): # - stop old host, rename to "CLONING" # - move all disks, one by one # - rename dest to real name, rename source to "MIGRATED", add comment - log.info('Request move {} => {} (node {!r}): {}'.format( + log.info('Attempt moving {} => {} (node {!r}): {}'.format( self.src_pve, self.dst_pve, self.dst_node, src_vm.name)) log.info('- source VM {}'.format(src_vm)) @@ -1211,7 +1220,7 @@ class VmMover(object): else: dst_vm.ensure_started() - log.info('Completed move {} => {} (node {!r}): {}'.format( + log.info('Completed moving {} => {} (node {!r}): {}'.format( self.src_pve, self.dst_pve, self.dst_node, src_vm.name))