diff --git a/gpMgmt/bin/Makefile b/gpMgmt/bin/Makefile index ced907b6949..e508fa99ab1 100644 --- a/gpMgmt/bin/Makefile +++ b/gpMgmt/bin/Makefile @@ -13,7 +13,7 @@ SUBDIRS += ifaddrs $(recurse) PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \ - gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \ + gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \ gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \ gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \ gpstate gpstop gpsys1 minirepro gpmemwatcher gpmemreport @@ -233,7 +233,7 @@ clean distclean: rm -rf *.pyc rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \ gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \ - gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \ + gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \ gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \ gpstartc gpstatec gpstopc gpsys1c minireproc rm -f gpconfig_modules/gucs_disallowed_in_file.txt diff --git a/gpMgmt/bin/gpexpand b/gpMgmt/bin/gpexpand index 405c357c44d..987eac184f5 100755 --- a/gpMgmt/bin/gpexpand +++ b/gpMgmt/bin/gpexpand @@ -1135,6 +1135,8 @@ class gpexpand: tblspc_info = {} for oid in tblspc_oids: + if oid not in tblspc_oid_names: + continue location = os.path.dirname(os.readlink(os.path.join(master_tblspc_dir, oid))) tblspc_info[oid] = {"location": location, @@ -1254,6 +1256,15 @@ class gpexpand: master_tblspc_dir = self.gparray.master.getSegmentTableSpaceDirectory() if not os.listdir(master_tblspc_dir): return None + + tblspc_oids = os.listdir(master_tblspc_dir) + tblspc_oid_names = self.get_tablespace_oid_names() + flag = False + for oid in tblspc_oids: + if oid in tblspc_oid_names: + flag = True + if not flag: + return None if not self.options.filename: raise ExpansionError('Missing tablespace input file') @@ -1433,6 +1444,25 @@ class gpexpand: self.pool.join() self.pool.check_results() + + for i in range(1,12): + flag = True + for segment in newSegments: + if seg.isSegmentMirror() == True: + continue + + cmd = Command('pg_isready for segment', + "pg_isready -q -h %s -p %d -d %s" % (segment.getSegmentHostName(), segment.getSegmentPort(), segment.getSegmentDataDirectory())) + cmd.run() + rc = cmd.get_return_code() + if rc != 0: + flag &= False + if flag: + break + time.sleep(10) + self.logger.info("Waiting for segment ready last for %s second" % (i*10)) + + """ Build the list of DELETE and TRUNCATE statements based on the subset of MASTER_ONLY_TABLES defined in gpcatalog.py. Mapped tables cannot diff --git a/gpMgmt/bin/gppylib/gparray.py b/gpMgmt/bin/gppylib/gparray.py index 4b50da8a5d8..038ecfc0668 100755 --- a/gpMgmt/bin/gppylib/gparray.py +++ b/gpMgmt/bin/gppylib/gparray.py @@ -140,6 +140,23 @@ def __cmp__(self,other): elif left > right: return 1 else: return 0 + def __equal(self, other, ignoreAttr=[]): + if not isinstance(other, Segment): + return NotImplemented + for key in list(vars(other)): + if key in ignoreAttr: + continue + if vars(other)[key] != vars(self)[key]: + return False + return True + + def __eq__(self, other): + return self.__equal(other, ['mode']) + + + def __hash__(self): + return hash(self.dbid) + # # Moved here from system/configurationImplGpdb.py # @@ -429,6 +446,9 @@ def __str__(self): return "(Primary: %s, Mirror: %s)" % (str(self.primaryDB), str(self.mirrorDB)) + def __eq__(self, other): + return self.primaryDB == other.primaryDB and self.mirrorDB == other.mirrorDB + # -------------------------------------------------------------------- def addPrimary(self,segDB): self.primaryDB=segDB @@ -799,6 +819,7 @@ def __init__(self, segments, segmentsAsLoadedFromDb=None): self.standbyMaster = None self.segmentPairs = [] self.expansionSegmentPairs=[] + self.shrinkSegmentPairs=[] self.numPrimarySegments = 0 self.recoveredSegmentDbids = [] @@ -1043,7 +1064,7 @@ def dumpToFile(self, filename): fp.close() # -------------------------------------------------------------------- - def getDbList(self, includeExpansionSegs=False): + def getDbList(self, includeExpansionSegs=False, removeShrinkSegs=False): """ Return a list of all Segment objects that make up the array """ @@ -1052,8 +1073,8 @@ def getDbList(self, includeExpansionSegs=False): dbs.append(self.master) if self.standbyMaster: dbs.append(self.standbyMaster) - if includeExpansionSegs: - dbs.extend(self.getSegDbList(True)) + if includeExpansionSegs or removeShrinkSegs: + dbs.extend(self.getSegDbList(includeExpansionSegs, removeShrinkSegs)) else: dbs.extend(self.getSegDbList()) return dbs @@ -1103,7 +1124,7 @@ def getDbIdToPeerMap(self): # -------------------------------------------------------------------- - def getSegDbList(self, includeExpansionSegs=False): + def getSegDbList(self, includeExpansionSegs=False, removeShrinkSegs=False): """Return a list of all Segment objects for all segments in the array""" dbs=[] for segPair in self.segmentPairs: @@ -1111,15 +1132,21 @@ def getSegDbList(self, includeExpansionSegs=False): if includeExpansionSegs: for segPair in self.expansionSegmentPairs: dbs.extend(segPair.get_dbs()) + if removeShrinkSegs: + for segPair in self.shrinkSegmentPairs: + dbs = list(filter(lambda x: segPair.primaryDB != x and segPair.mirrorDB != x, dbs)) return dbs # -------------------------------------------------------------------- - def getSegmentList(self, includeExpansionSegs=False): + def getSegmentList(self, includeExpansionSegs=False, removeShrinkSegs=False): """Return a list of SegmentPair objects for all segments in the array""" dbs=[] dbs.extend(self.segmentPairs) if includeExpansionSegs: dbs.extend(self.expansionSegmentPairs) + if removeShrinkSegs: + for segPair in self.shrinkSegmentPairs: + dbs.remove(segPair) return dbs # -------------------------------------------------------------------- @@ -1146,6 +1173,21 @@ def getExpansionSegPairList(self): """Returns a list of all SegmentPair objects that make up the new segments of an expansion""" return self.expansionSegmentPairs + + # -------------------------------------------------------------------- + def getShrinkSegDbList(self): + """Returns a list of all Segment objects that make up the new segments + of an expansion""" + dbs=[] + for segPair in self.shrinkSegmentPairs: + dbs.extend(segPair.get_dbs()) + return dbs + + # -------------------------------------------------------------------- + def getShrinkSegPairList(self): + """Returns a list of all SegmentPair objects that make up the new segments + of an expansion""" + return self.shrinkSegmentPairs # -------------------------------------------------------------------- def getSegmentContainingDb(self, db): @@ -1162,6 +1204,15 @@ def getExpansionSegmentContainingDb(self, db): if db.getSegmentDbId() == segDb.getSegmentDbId(): return segPair return None + + # -------------------------------------------------------------------- + def getShrinkSegmentContainingDb(self, db): + for segPair in self.shrinkSegmentPairs: + for segDb in segPair.get_dbs(): + if db.getSegmentDbId() == segDb.getSegmentDbId(): + return segPair + return None + # -------------------------------------------------------------------- def get_invalid_segdbs(self): dbs=[] @@ -1500,6 +1551,37 @@ def addExpansionSeg(self, content, preferred_role, dbid, role, else: seg.addMirror(segdb) + # -------------------------------------------------------------------- + def addShrinkSeg(self, content, preferred_role, dbid, role, + hostname, address, port, datadir): + """ + Add a segment to the gparray as an shrink segment. + + Note: may work better to construct the new Segment in gpshrink and + simply pass it in. + """ + + segdb = Segment(content = content, + preferred_role = preferred_role, + dbid = dbid, + role = role, + mode = MODE_SYNCHRONIZED, + status = STATUS_UP, + hostname = hostname, + address = address, + port = port, + datadir = datadir) + + if preferred_role == ROLE_PRIMARY: + self.shrinkSegmentPairs.append(SegmentPair()) + seg = self.shrinkSegmentPairs[-1] + if seg.primaryDB: + raise Exception('Duplicate content id for primary segment') + seg.addPrimary(segdb) + else: + seg = self.shrinkSegmentPairs[-1] + seg.addMirror(segdb) + # -------------------------------------------------------------------- def reOrderExpansionSegs(self): """ @@ -1607,6 +1689,52 @@ def validateExpansionSegs(self): else: used_ports[hostname] = [] used_ports[hostname].append(db.port) + + # -------------------------------------------------------------------- + def validateShrinkSegs(self): + """ Checks the segments added for various inconsistencies and errors. + """ + + # make sure we have added at least one segment + if len(self.shrinkSegmentPairs) == 0: + raise Exception('No shrink segments defined') + + totalsize = len(self.segmentPairs) + removesize = len(self.shrinkSegmentPairs) + + if removesize >= totalsize: + self.logger.error('removed segment num %d more than or equal to total segment num %d', removesize, totalsize) + exit(1) + elif removesize < 1: + self.logger.error('removed segment num %d less than 1', removesize) + exit(1) + + for segPair in self.shrinkSegmentPairs: + if self.hasMirrors: + if segPair.mirrorDB is None: + raise Exception('primaryDB and mirrorDB should be removed simultaneously') + + if segPair.primaryDB.content != segPair.mirrorDB.content: + raise Exception('primaryDB content is not equal mirrorDB content') + + # If shrinkSegmentPairs not in the segmentPairs raise exception + flag = False + for segPair_ in self.segmentPairs : + if segPair_ == segPair : + flag = True + + if flag == False: + raise Exception('Shrink segments not in the gp_segment_configuration table') + + # If shrinkSegmentPairs is not the last n segment. + self.shrinkSegmentPairs.sort(key=lambda segPair: segPair.primaryDB.content) + + if self.shrinkSegmentPairs[-1].primaryDB.content != self.get_max_contentid(): + raise Exception('please remove segment from max contentid') + + if self.shrinkSegmentPairs[0].primaryDB.content != self.get_max_contentid()-len(self.shrinkSegmentPairs)+1: + raise Exception('please remove segment in continuous contentid') + # -------------------------------------------------------------------- def addExpansionHosts(self, hosts, mirror_type): diff --git a/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py b/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py index fda2e70b6c7..b9fb34335ea 100755 --- a/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py +++ b/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py @@ -54,7 +54,7 @@ def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary): self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()]) # 'goalsegmap' reflects the desired state of the catalog - self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)]) + self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True, removeShrinkSegs=True)]) # find mirrors and primaries to remove self.mirror_to_remove = [ diff --git a/gpMgmt/bin/gpshrink b/gpMgmt/bin/gpshrink new file mode 100755 index 00000000000..12190ddeb58 --- /dev/null +++ b/gpMgmt/bin/gpshrink @@ -0,0 +1,1604 @@ +#!/usr/bin/env python +# Line too long - pylint: disable=C0301 +# Invalid name - pylint: disable=C0103 +# +# Copyright (c) Greenplum Inc 2008. All Rights Reserved. +# +from gppylib.mainUtils import getProgramName + +import copy +import datetime +import os +import random +import sys +import json +import shutil +import signal +import traceback +from collections import defaultdict +from time import strftime, sleep + +try: + from gppylib.commands.unix import * + from gppylib.commands.gp import * + from gppylib.gparray import GpArray, MODE_NOT_SYNC, STATUS_DOWN + from gppylib.gpparseopts import OptParser, OptChecker + from gppylib.gplog import * + from gppylib.db import catalog + from gppylib.db import dbconn + from gppylib.userinput import * + from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS + from gppylib.system import configurationInterface, configurationImplGpdb + from gppylib.system.environment import GpMasterEnvironment + from pygresql.pgdb import DatabaseError + from pygresql import pg + from gppylib.operations.package import SyncPackages + from gppylib.operations.utils import ParallelOperation + from gppylib.parseutils import line_reader, check_values, canonicalize_address + from gppylib.heapchecksum import HeapChecksum + from gppylib.commands.pg import PgBaseBackup + from gppylib.mainUtils import ExceptionNoStackTraceNeeded + from gppylib.operations.update_pg_hba_on_segments import update_pg_hba_on_segments + +except ImportError as e: + sys.exit('ERROR: Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e)) + +# constants +MAX_PARALLEL_SHRINKS = 96 +MAX_BATCH_SIZE = 128 + + +FIRST_NORMAL_OID = 16384 +RELSTORAGE_EXTERNAL = 'x' + + +SEGMENT_CONFIGURATION_BACKUP_FILE = "gpshrink.gp_segment_configuration" + +DBNAME = 'postgres' + +#global var +_gp_shrink = None + +description = (""" +Adds additional segments to a pre-existing CBDB Array. +""") + +_help = [""" +The input file should be a plain text file with a line for each segment +to add with the format: + + |
||||| + +And add primary before mirror. + +""", + ] + +_TODO = [""" + +Remaining TODO items: +==================== +""", + + """* smarter heuristics on deciding which tables to reorder first. """, + + """* make sure system isn't in "readonly mode" during setup. """, + + """* need a startup validation where we check the status detail + with gp_distribution_policy and make sure that our book + keeping matches reality. we don't have a perfect transactional + model since the tables can be in a different database from + where the gpshrink schema is kept. """, + + """* currently requires that GPHOME and PYTHONPATH be set on all of the remote hosts of + the system. should get rid of this requirement. """ + ] + +_usage = """[-f hosts_file] + +gpshrink -i input_file [-B batch_size] [-t segment_tar_dir] [-S] + +gpshrink [-d duration[hh][:mm[:ss]] | [-e 'YYYY-MM-DD hh:mm:ss']] + [-a] [-n parallel_processes] + +gpshrink -r + +gpshrink -c + +gpshrink -? | -h | --help | --verbose | -v""" + +EXECNAME = os.path.split(__file__)[-1] + + +# ----------------------- Command line option parser ---------------------- + +def parseargs(): + parser = OptParser(option_class=OptChecker, + description=' '.join(description.split()), + version='%prog version $Revision$') + parser.setHelp(_help) + parser.set_usage('%prog ' + _usage) + parser.remove_option('-h') + + parser.add_option('-c', '--clean', action='store_true', + help='remove the shrink schema.') + parser.add_option('-r', '--rollback', action='store_true', + help='rollback failed shrink setup.') + parser.add_option('-a', '--analyze', action='store_true', + help='Analyze the shrinked table after redistribution.') + parser.add_option('-d', '--duration', type='duration', metavar='[h][:m[:s]]', + help='duration from beginning to end.') + parser.add_option('-e', '--end', type='datetime', metavar='datetime', + help="ending date and time in the format 'YYYY-MM-DD hh:mm:ss'.") + parser.add_option('-i', '--input', dest="filename", + help="input shrink configuration file.", metavar="FILE") + parser.add_option('-f', '--hosts-file', metavar='', + help='file containing new host names used to generate input file') + parser.add_option('-B', '--batch-size', type='int', default=16, metavar="", + help='shrink configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE) + parser.add_option('-n', '--parallel', type="int", default=1, metavar="", + help='number of tables to shrink at a time. Valid values are 1-%d.' % MAX_PARALLEL_SHRINKS) + parser.add_option('-v', '--verbose', action='store_true', + help='debug output.') + parser.add_option('-S', '--simple-progress', action='store_true', + help='show simple progress.') + parser.add_option('-t', '--tardir', default='.', metavar="FILE", + help='Tar file directory.') + parser.add_option('-h', '-?', '--help', action='help', + help='show this help message and exit.') + parser.add_option('-s', '--silent', action='store_true', + help='Do not prompt for confirmation to proceed on warnings') + parser.add_option('', '--hba-hostnames', action='store_true', default=False, + help='use hostnames instead of CIDR in pg_hba.conf') + parser.add_option('--usage', action="briefhelp") + + parser.set_defaults(verbose=False, filters=[], slice=(None, None)) + + # Parse the command line arguments + (options, args) = parser.parse_args() + return options, args, parser + +def validate_options(options, args, parser): + if len(args) > 0: + logger.error('Unknown argument %s' % args[0]) + parser.exit() + + # -n sanity check + if options.parallel > MAX_PARALLEL_SHRINKS or options.parallel < 1: + logger.error('Invalid argument. parallel value must be >= 1 and <= %d' % MAX_PARALLEL_SHRINKS) + parser.print_help() + parser.exit() + + proccount = os.environ.get('GP_MGMT_PROCESS_COUNT') + if options.batch_size == 16 and proccount is not None: + options.batch_size = int(proccount) + + if options.batch_size < 1 or options.batch_size > 128: + logger.error('Invalid argument. -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE) + parser.print_help() + parser.exit() + + # OptParse can return date instead of datetime so we might need to convert + if options.end and not isinstance(options.end, datetime.datetime): + options.end = datetime.datetime.combine(options.end, datetime.time(0)) + + if options.end and options.end < datetime.datetime.now(): + logger.error('End time occurs in the past') + parser.print_help() + parser.exit() + + if options.end and options.duration: + logger.warn('Both end and duration options were given.') + # Both a duration and an end time were given. + if options.end > datetime.datetime.now() + options.duration: + logger.warn('The duration argument will be used for the shrink end time.') + options.end = datetime.datetime.now() + options.duration + else: + logger.warn('The end argument will be used for the shrink end time.') + elif options.duration: + options.end = datetime.datetime.now() + options.duration + + # -c and -r options are mutually exclusive + if options.rollback and options.clean: + rollbackOpt = "--rollback" if "--rollback" in sys.argv else "-r" + cleanOpt = "--clean" if "--clean" in sys.argv else "-c" + logger.error("%s and %s options cannot be specified together." % (rollbackOpt, cleanOpt)) + parser.exit() + + try: + options.master_data_directory = get_masterdatadir() + options.gphome = get_gphome() + except GpError as msg: + logger.error(msg) + parser.exit() + + if not os.path.exists(options.master_data_directory): + logger.error('master data directory %s does not exist.' % options.master_data_directory) + parser.exit() + + return options, args + +# ------------------------------------------------------------------------- +# process information functions +def create_pid_file(master_data_directory): + """Creates gpshrink pid file""" + try: + fp = open(master_data_directory + '/gpshrink.pid', 'w') + fp.write(str(os.getpid())) + except IOError: + raise + finally: + if fp: fp.close() + + +def remove_pid_file(master_data_directory): + """Removes gpshrink pid file""" + try: + os.unlink(master_data_directory + '/gpshrink.pid') + except: + pass + + +def is_gpshrink_running(master_data_directory): + """Checks if there is another instance of gpshrink running""" + is_running = False + try: + fp = open(master_data_directory + '/gpshrink.pid', 'r') + pid = int(fp.readline().strip()) + fp.close() + is_running = check_pid(pid) + except IOError: + pass + except Exception: + raise + + return is_running + + +def gpshrink_status_file_exists(master_data_directory): + """Checks if gpshrink.pid exists""" + return os.path.exists(master_data_directory + '/gpshrink.status') + + +# ------------------------------------------------------------------------- +# shrink schema + +undone_status = "NOT STARTED" +start_status = "IN PROGRESS" +done_status = "COMPLETED" +does_not_exist_status = 'NO LONGER EXISTS' + +create_schema_sql = "CREATE SCHEMA gpshrink" +drop_schema_sql = "DROP SCHEMA IF EXISTS gpshrink CASCADE" + +status_table_sql = """CREATE TABLE gpshrink.status + ( status text, + updated timestamp ) """ + +status_detail_table_sql = """CREATE TABLE gpshrink.status_detail + ( table_oid oid, + dbname text, + fq_name text, + root_partition_oid oid, + rank int, + external_writable bool, + status text, + shrink_started timestamp, + shrink_finished timestamp, + source_bytes numeric, rel_storage text) distributed by (table_oid)""" +# gpshrink views +progress_view_simple_sql = """CREATE VIEW gpshrink.shrink_progress AS +SELECT + CASE status + WHEN '%s' THEN 'Tables Shrinked' + WHEN '%s' THEN 'Tables Left' + END AS Name, + count(*)::text AS Value +FROM gpshrink.status_detail GROUP BY status""" % (done_status, undone_status) + +progress_view_sql = """CREATE VIEW gpshrink.shrink_progress AS +SELECT + CASE status + WHEN '%s' THEN 'Tables Shrinked' + WHEN '%s' THEN 'Tables Left' + WHEN '%s' THEN 'Tables In Progress' + END AS Name, + count(*)::text AS Value +FROM gpshrink.status_detail GROUP BY status + +UNION + +SELECT + CASE status + WHEN '%s' THEN 'Bytes Done' + WHEN '%s' THEN 'Bytes Left' + WHEN '%s' THEN 'Bytes In Progress' + END AS Name, + SUM(source_bytes)::text AS Value +FROM gpshrink.status_detail GROUP BY status + +UNION + +SELECT + 'Estimated shrink Rate' AS Name, + (SUM(source_bytes) / (1 + extract(epoch FROM (max(shrink_finished) - min(shrink_started)))) / 1024 / 1024)::text || ' MB/s' AS Value +FROM gpshrink.status_detail +WHERE status = '%s' +AND +shrink_started > (SELECT updated FROM gpshrink.status WHERE status = '%s' ORDER BY updated DESC LIMIT 1) + +UNION + +SELECT +'Estimated Time to Completion' AS Name, +CAST((SUM(source_bytes) / ( +SELECT 1 + SUM(source_bytes) / (1 + (extract(epoch FROM (max(shrink_finished) - min(shrink_started))))) +FROM gpshrink.status_detail +WHERE status = '%s' +AND +shrink_started > (SELECT updated FROM gpshrink.status WHERE status = '%s' ORDER BY +updated DESC LIMIT 1)))::text || ' seconds' as interval)::text AS Value +FROM gpshrink.status_detail +WHERE status = '%s' + OR status = '%s'""" % (done_status, undone_status, start_status, + done_status, undone_status, start_status, + done_status, + 'SHRINK STARTED', + done_status, + 'SHRINK STARTED', + start_status, undone_status) + +# ------------------------------------------------------------------------- +class InvalidStatusError(Exception): pass + + +class ValidationError(Exception): pass + + +class ExpansionError(Exception): pass + + +# ------------------------------------------------------------------------- +class gpshrinkStatus(): + """Class that manages gpshrink status file. + + The status file is placed in the master data directory on both the master and + the standby master. it's used to keep track of where we are in the progression. + """ + + def __init__(self, logger, master_data_directory, master_mirror=None): + self.logger = logger + + self._status_values = {'UNINITIALIZED': 1, + 'SHRINK_PREPARE_STARTED': 2, + 'UPDATE_CATALOG_STARTED': 3, + 'UPDATE_CATALOG_DONE': 4, + 'SETUP_SHRINK_SCHEMA_STARTED': 5, + 'SETUP_SHRINK_SCHEMA_DONE': 6, + 'PREPARE_SHRINK_SCHEMA_STARTED': 7, + 'PREPARE_SHRINK_SCHEMA_DONE': 8, + 'SHRINK_PREPARE_DONE': 9, + 'SHRINK_PERFORM_STARTED':10, + 'SHRINK_TABLE_STARTED': 11, + 'SHRINK_TABLE_DONE': 12, + 'SHRINK_CATALOG_STARTED': 13, + 'SHRINK_CATALOG_DONE': 14, + 'SHRINK_PERFOEM_DONE': 15, + } + self._status = [] + self._status_info = [] + self._master_data_directory = master_data_directory + self._master_mirror = master_mirror + self._status_filename = master_data_directory + '/gpshrink.status' + if master_mirror: + self._status_standby_filename = master_mirror.getSegmentDataDirectory() \ + + '/gpshrink.status' + self._segment_configuration_standby_filename = master_mirror.getSegmentDataDirectory() \ + + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE + self._fp = None + self._temp_dir = None + self._input_filename = None + self._gp_segment_configuration_backup = None + + if os.path.exists(self._status_filename): + self._read_status_file() + + def _read_status_file(self): + """Reads in an existing gpshrink status file""" + self.logger.debug("Trying to read in a pre-existing gpshrink status file") + try: + self._fp = open(self._status_filename, 'a+') + self._fp.seek(0) + + for line in self._fp: + (status, status_info) = line.rstrip().split(':') + if status == 'SHRINK_PREPARE_STARTED': + self._input_filename = status_info + elif status == 'UPDATE_CATALOG_STARTED': + self._gp_segment_configuration_backup = status_info + + self._status.append(status) + self._status_info.append(status_info) + except IOError: + raise + + if self._status[-1] not in self._status_values: + raise InvalidStatusError('Invalid status file. Unknown status %s' % self._status) + + def create_status_file(self): + """Creates a new gpshrink status file""" + try: + self._fp = open(self._status_filename, 'w') + self._fp.write('UNINITIALIZED:None\n') + self._fp.flush() + os.fsync(self._fp) + self._status.append('UNINITIALIZED') + self._status_info.append('None') + except IOError: + raise + + if self._master_mirror: + self._sync_status_file() + + def _sync_status_file(self): + """Syncs the gpshrink status file with the master mirror""" + cpCmd = Scp('gpshrink copying status file to master mirror', + srcFile=self._status_filename, + dstFile=self._status_standby_filename, + dstHost=self._master_mirror.getSegmentHostName()) + cpCmd.run(validateAfter=True) + + def set_status(self, status, status_info=None, force=False): + """Sets the current status. gpshrink status must be set in + proper order. Any out of order status result in an + InvalidStatusError exception. But if force is True, setting + status out of order is allowded""" + if len(self._status) == 0 or not os.path.exists(self._status_filename): + raise InvalidStatusError('not in shrink status or no status file') + + self.logger.debug("Transitioning from %s to %s" % (self._status[-1], status)) + + if not self._fp: + raise InvalidStatusError('The status file is invalid and cannot be written to') + if status not in self._status_values: + raise InvalidStatusError('%s is an invalid gpshrink status' % status) + self._fp.write('%s:%s\n' % (status, status_info)) + self._fp.flush() + os.fsync(self._fp) + self._status.append(status) + self._status_info.append(status_info) + if self._master_mirror: + self._sync_status_file() + + def get_current_status(self): + """Gets the current status that has been written to the gpshrink + status file""" + if (len(self._status) > 0 and len(self._status_info) > 0): + return (self._status[-1], self._status_info[-1]) + else: + return (None, None) + + def get_status_history(self): + """Gets the full status history""" + return list(zip(self._status, self._status_info)) + + def remove_status_file(self): + """Closes and removes the gpexand status file""" + if self._fp: + self._fp.close() + self._fp = None + if os.path.exists(self._status_filename): + os.unlink(self._status_filename) + if self._master_mirror: + RemoveFile.remote('gpshrink master mirror status file cleanup', + self._master_mirror.getSegmentHostName(), + self._status_standby_filename) + + def remove_segment_configuration_backup_file(self): + """ Remove the segment configuration backup file """ + self.logger.debug("Removing segment configuration backup file") + if self._gp_segment_configuration_backup != None and os.path.exists( + self._gp_segment_configuration_backup) == True: + os.unlink(self._gp_segment_configuration_backup) + if self._master_mirror: + RemoveFile.remote('gpshrink master mirror segment configuration backup file cleanup', + self._master_mirror.getSegmentHostName(), + self._segment_configuration_standby_filename) + + def sync_segment_configuration_backup_file(self): + """ Sync the segment configuration backup file to standby """ + if self._master_mirror: + self.logger.debug("Sync segment configuration backup file") + cpCmd = Scp('gpshrink copying segment configuration backup file to master mirror', + srcFile=self._gp_segment_configuration_backup, + dstFile=self._segment_configuration_standby_filename, + dstHost=self._master_mirror.getSegmentHostName()) + cpCmd.run(validateAfter=True) + + def get_input_filename(self): + """Gets input file that was used by shrink setup""" + return self._input_filename + + def get_gp_segment_configuration_backup(self): + """Gets the filename of the gp_segment_configuration backup file + created during shrink setup""" + return self._gp_segment_configuration_backup + + def set_gp_segment_configuration_backup(self, filename): + """Sets the filename of the gp_segment_configuration backup file""" + self._gp_segment_configuration_backup = filename + + + +# ------------------------------------------------------------------------- + +class ShrinkError(Exception): pass + + + +# ------------------------------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------------------------------ +class NewSegmentInput: + def __init__(self, hostname, address, port, datadir, dbid, contentId, role): + self.hostname = hostname + self.address = address + self.port = port + self.datadir = datadir + self.dbid = dbid + self.contentId = contentId + self.role = role + + +# ------------------------------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------------------------------ +class gpshrink: + def __init__(self, logger, gparray, dburl, options, parallel=1, size=0): + self.pastThePointOfNoReturn = False + self.logger = logger + self.dburl = dburl + self.options = options + self.numworkers = parallel + self.gparray = gparray + self.size = size + self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods=True) + self.old_segments = self.gparray.getSegDbList() + + datadir = self.gparray.master.getSegmentDataDirectory() + self.statusLogger = gpshrinkStatus(logger=logger, + master_data_directory=datadir, + master_mirror=self.gparray.standbyMaster) + + # Adjust batch size if it's too high given the number of segments + seg_count = len(self.old_segments) + if self.options.batch_size > seg_count: + self.options.batch_size = seg_count + self.pool = WorkerPool(numWorkers=self.options.batch_size) + + self.queue = None + + + @staticmethod + def prepare_gpdb_state(logger, dburl, options): + """ Gets GPDB in the appropriate state for an shrink. + This state will depend on if this is a new shrink setup, + a continuation of a previous shrink or a rollback """ + # Get the database in the expected state for the shrink/rollback + # If gpshrink status file exists ,the last run of gpshrink didn't finish properly + + gpshrink_db_status = gpshrink.get_status_from_db(dburl, options) + + return gpshrink_db_status + + @staticmethod + def get_status_from_db(dburl, options): + """Gets gpshrink status from the gpshrink schema""" + status_conn = None + gpshrink_db_status = None + if get_local_db_mode(options.master_data_directory) == 'NORMAL': + try: + status_conn = dbconn.connect(dburl, encoding='UTF8') + # Get the last status entry + cursor = dbconn.execSQL(status_conn, 'SELECT status FROM gpshrink.status ORDER BY updated DESC LIMIT 1') + if cursor.rowcount == 1: + gpshrink_db_status = cursor.fetchone()[0] + status_conn.commit() + + except Exception: + # shrink schema doesn't exists or there was a connection failure. + pass + finally: + if status_conn: status_conn.close() + + # make sure gpshrink schema doesn't exist since it wasn't in DB provided + if not gpshrink_db_status: + """ + MPP-14145 - If there's no discernible status, the schema must not exist. + + The checks in get_status_from_db claim to look for existence of the 'gpshrink' schema, but more accurately they're + checking for non-emptiness of the gpshrink.status table. If the table were empty, but the schema did exist, gpshrink would presume + a new shrink was taking place and it would try to CREATE SCHEMA later, which would fail. So, here, if this is the case, we error out. + + Note: -c/--clean will not necessarily work either, as it too has assumptions about the non-emptiness of the gpshrink schema. + """ + conn = dbconn.connect(dburl, encoding='UTF8', utility=True) + try: + cursor = dbconn.execSQL(conn, + "SELECT count(n.nspname) FROM pg_catalog.pg_namespace n WHERE n.nspname = 'gpshrink'") + if cursor.rowcount == 1: + count = cursor.fetchone()[0] + else: + raise ShrinkError("unexpected number of rows in gpshrink") + if count > 0: + raise ShrinkError( + "Existing shrink state could not be determined, but a gpshrink schema already exists. Cannot proceed.") + finally: + conn.commit() + conn.close() + + return gpshrink_db_status + + def validate_max_connections(self): + try: + conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + max_connections = int(catalog.getSessionGUC(conn, 'max_connections')) + except DatabaseError as ex: + if self.options.verbose: + logger.exception(ex) + logger.error('Failed to check max_connections GUC') + raise ex + finally: + conn.close() + + if max_connections < self.options.parallel * 2 + 1: + self.logger.error('max_connections is too small to shrink %d tables at' % self.options.parallel) + self.logger.error('a time. This will lead to connection errors. Either') + self.logger.error('reduce the value for -n passed to gpshrink or raise') + self.logger.error('max_connections in postgresql.conf') + return False + + return True + + def cleanup_file(self): + """simple remove remove status_file segment_configuration_backup_file """ + self.statusLogger.remove_status_file() + self.statusLogger.remove_segment_configuration_backup_file() + + def get_state(self): + """Returns shrink state from status logger""" + return self.statusLogger.get_current_status()[0] + + def generate_inputfile(self): + """Writes a gpshrink input file based on shrink segments + added to gparray by the gpshrink interview""" + outputfile = 'gpshrink_inputfile_' + strftime("%Y%m%d_%H%M%S") + outfile = open(outputfile, 'w') + + logger.info("Generating input file...") + + for db in self.gparray.getShrinkSegDbList(): + tempStr = "%s|%s|%d|%s|%d|%d|%s" % (canonicalize_address(db.getSegmentHostName()) + , canonicalize_address(db.getSegmentAddress()) + , db.getSegmentPort() + , db.getSegmentDataDirectory() + , db.getSegmentDbId() + , db.getSegmentContentId() + , db.getSegmentPreferredRole() + ) + outfile.write(tempStr + "\n") + + outfile.close() + + return outputfile + + + def add_remove_segments(self, inputFileEntryList): + for seg in inputFileEntryList: + self.gparray.addShrinkSeg(content=int(seg.contentId) + , preferred_role=seg.role + , dbid=int(seg.dbid) + , role=seg.role + , hostname=seg.hostname.strip() + , address=seg.address.strip() + , port=int(seg.port) + , datadir=os.path.abspath(seg.datadir.strip()) + ) + try: + self.gparray.validateShrinkSegs() + except Exception as e: + raise ShrinkError('Invalid input file: %s' % e) + + def _getParsedRow(self, lineno, line): + parts = line.split('|') + if len(parts) != 7: + raise ExceptionNoStackTraceNeeded("expected 7 parts, obtained %d" % len(parts)) + hostname, address, port, datadir, dbid, contentId, role = parts + check_values(lineno, address=address, port=port, datadir=datadir, content=contentId, + hostname=hostname, dbid=dbid, role=role) + return NewSegmentInput(hostname=hostname + , port=port + , address=address + , datadir=datadir + , dbid=dbid + , contentId=contentId + , role=role + ) + + def read_input_files(self, inputFilename=None): + """Reads and validates line format of the input file passed + in on the command line via the -i arg""" + + retValue = [] + + if not self.options.filename and not inputFilename: + raise ShrinkError('Missing input file') + + if self.options.filename: + inputFilename = self.options.filename + f = None + + try: + f = open(inputFilename, 'r') + for lineno, line in line_reader(f): + try: + retValue.append(self._getParsedRow(lineno, line)) + except ValueError: + raise ShrinkError('Missing or invalid value on line %d of file %s.' % (lineno, inputFilename)) + except Exception as e: + raise ShrinkError('Invalid input file on line %d of file %s: %s' % (lineno, inputFilename, str(e))) + except IOError: + raise ShrinkError('Input file %s not found' % inputFilename) + finally: + if f is not None: + f.close() + + return retValue + + + def lock_catalog(self): + self.conn_catalog_lock = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + self.logger.info('Locking catalog') + dbconn.execSQL(self.conn_catalog_lock, "BEGIN") + # FIXME: is CHECKPOINT inside BEGIN the one wanted by us? + dbconn.execSQL(self.conn_catalog_lock, "select gp_expand_lock_catalog()") + dbconn.execSQL(self.conn_catalog_lock, "CHECKPOINT") + self.logger.info('Locked catalog') + + def unlock_catalog(self): + self.logger.info('Unlocking catalog') + dbconn.execSQL(self.conn_catalog_lock, "COMMIT") + self.conn_catalog_lock.close() + self.conn_catalog_lock = None + self.logger.info('Unlocked catalog') + + def update_original_segments(self): + """Updates the gp_id catalog table of existing hosts""" + + # Update the gp_id of original segments + self.newPrimaryCount = 0; + for seg in self.gparray.getShrinkSegDbList(): + if seg.isSegmentPrimary(False): + self.newPrimaryCount -= 1 + + self.newPrimaryCount -= self.gparray.get_primary_count() + + # FIXME: update postmaster.opts + + def update_catalog_swap_segment(self): + """ + Backup the gp_segment_configuration. + Fixme: we should swap the removed segment to the end. And + save the new removed segment in the file to remove in the + next pahse. + """ + self.statusLogger.set_gp_segment_configuration_backup( + self.options.master_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE) + self.gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup()) + self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup()) + self.statusLogger.sync_segment_configuration_backup_file() + + + + self.statusLogger.set_status('UPDATE_CATALOG_DONE') + + + + def update_catalog_remove_segments(self): + """ + Starts the database, calls updateSystemConfig() to setup + the catalog tables and get the actual dbid and content id + for the new segments. + """ + + self.statusLogger.set_status('SHRINK_CATALOG_STARTED') + + # Mark expansion segment primaries not in sync + for seg in self.gparray.getShrinkSegDbList(): + if seg.isSegmentMirror() == True: + continue + if not self.gparray.get_mirroring_enabled(): + seg.setSegmentMode(MODE_NOT_SYNC) + + # Update the catalog + configurationInterface.getConfigurationProvider().updateSystemConfig( + self.gparray, + "%s: segment config for resync" % getProgramName(), + dbIdToForceMirrorRemoveAdd={}, + useUtilityMode=True, + allowPrimary=True + ) + + # Issue checkpoint due to forced shutdown below + self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + dbconn.execSQL(self.conn, "CHECKPOINT") + self.conn.close() + + # increase expand version + self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + dbconn.execSQL(self.conn, "select gp_expand_bump_version()") + self.conn.commit() + self.conn.close() + + self.statusLogger.set_status('SHRINK_CATALOG_DONE') + self.statusLogger.set_status('SHRINK_PERFOEM_DONE') + + def stop_remove_segments(self): + """ + Stop the removed segment, and join the pool + """ + newSegments = self.gparray.getShrinkSegDbList() + for seg in newSegments: + segStopCmd = SegmentStop( + name="Stopping new segment dbid %s on host %s." % (str(seg.getSegmentDbId), seg.getSegmentHostName()) + , dataDir=seg.getSegmentDataDirectory() + , mode='fast' + , nowait=False + , ctxt=REMOTE + , remoteHost=seg.getSegmentHostName() + ) + self.pool.addCommand(segStopCmd) + self.pool.join() + self.pool.check_results() + self.pool.haltWork() + self.pool.joinWorkers() + + def start_prepare(self): + """Inserts into gpshrink.status that shrink preparation has started.""" + if self.options.filename: + self.statusLogger.create_status_file() + self.statusLogger.set_status('SHRINK_PREPARE_STARTED', os.path.abspath(self.options.filename)) + + def setup_schema(self): + """Used to setup the gpshrink schema""" + self.statusLogger.set_status('SETUP_SHRINK_SCHEMA_STARTED') + self.logger.info('Creating shrink schema') + self.conn = dbconn.connect(self.dburl, encoding='UTF8') + dbconn.execSQL(self.conn, create_schema_sql) + dbconn.execSQL(self.conn, status_table_sql) + dbconn.execSQL(self.conn, status_detail_table_sql) + + # views + if not self.options.simple_progress: + dbconn.execSQL(self.conn, progress_view_sql) + else: + dbconn.execSQL(self.conn, progress_view_simple_sql) + + self.conn.commit() + self.statusLogger.set_status('SETUP_SHRINK_SCHEMA_DONE') + + def prepare_schema(self): + """Prepares the gpshrink schema""" + self.statusLogger.set_status('PREPARE_SHRINK_SCHEMA_STARTED') + + if not self.conn: + self.conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods=True) + self.gparray = GpArray.initFromCatalog(self.dburl) + + nowStr = datetime.datetime.now() + statusSQL = "INSERT INTO gpshrink.status VALUES ( 'SETUP', '%s' ) " % (nowStr) + + dbconn.execSQL(self.conn, statusSQL) + self.conn.commit() + + db_list = catalog.getDatabaseList(self.conn) + + for db in db_list: + dbname = db[0] + if dbname == 'template0': + continue + self.logger.info('Populating gpshrink.status_detail with data from database %s' % ( + dbname)) + self._populate_regular_tables(dbname) + + nowStr = datetime.datetime.now() + statusSQL = "INSERT INTO gpshrink.status VALUES ( 'SETUP DONE', '%s' ) " % (nowStr) + dbconn.execSQL(self.conn, statusSQL) + self.conn.commit() + + self.conn.close() + + self.statusLogger.set_status('PREPARE_SHRINK_SCHEMA_DONE') + self.statusLogger.set_status('SHRINK_PREPARE_DONE') + + + def _populate_regular_tables(self, dbname): + # FIXME: we process partition table as regular_table, because processing each leaf table + # like exapnd in shrink may result unsafe intermediate state and cannot roll back. + sql = """SELECT + c.oid as table_oid, + current_database() dbname, + quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name, + NULL as root_partition_oid, + 2 as rank, + pe.writable is not null as external_writable, + '%s' as undone_status, + NULL as shrink_started, + NULL as shrink_finished, + 0 as source_bytes, + c.relstorage as rel_storage + FROM + pg_class c + JOIN pg_namespace n ON (c.relnamespace=n.oid) + JOIN pg_catalog.gp_distribution_policy p on (c.oid = p.localoid) + LEFT JOIN pg_partition pp on (c.oid=pp.parrelid) + LEFT JOIN pg_partition_rule pr on (c.oid=pr.parchildrelid) + LEFT JOIN pg_exttable pe on (c.oid=pe.reloid and pe.writable) + WHERE + pr.parchildrelid is NULL + AND n.nspname != 'pg_bitmapindex' + AND c.relpersistence != 't' + AND c.relpersistence != 'u' + """ % undone_status + + self.logger.debug(sql) + table_conn = self.connect_database(dbname) + + try: + data_file = os.path.abspath('./status_detail.dat') + self.logger.debug('status_detail data file: %s' % data_file) + copySQL = """COPY (%s) TO '%s'""" % (sql, data_file) + + self.logger.debug(copySQL) + dbconn.execSQL(table_conn, copySQL) + table_conn.commit() + table_conn.close() + except Exception as e: + raise ShrinkError(e) + + try: + copySQL = """COPY gpshrink.status_detail FROM '%s'""" % (data_file) + + self.logger.debug(copySQL) + dbconn.execSQL(self.conn, copySQL) + self.conn.commit() + except Exception as e: + self.logger.debug(str(e)) + raise ShrinkError(e) + finally: + os.unlink(data_file) + + def generate_sql_to_populate_table_size(self, sql): + # This function generate a SQL that can compute table size + # taking the advantage of MPP. Previous code put pg_relation_size + # in target list of a query involving only catalog, thus each + # tuple will lead to a dispatch to compute pg_relation_size + # thus bad performance. Now we let pg_relation_size compute + # at each segment and then group by the table oid and sum + # together to get each table's size. pg_relation_size is + # volatile function, so the following CTE sql's subquery + # x will never be pulled up which means the pg_relation_size + # will always be evaluated before later motion to gurantee + # correctness no matter single stage or multi stage agg. + # + # Also note, we filter out those relations with relkind is + # RELKIND_FOREIGN_TABLE, because we only expand those + # are external writable, and even for external writable ones + # we simple modify the numsegments and do not move data. + # Refer to C code: ATExecExpandTable and ATExecExpandPartitionTablePrepare. + cte_sql = """with table_size_cte(table_oid, size) as + ( + select table_oid, sum(size) + from ( + select oid as table_oid, + pg_relation_size(oid) as size + from gp_dist_random('pg_class') + where oid >= %d and relstorage <> '%s' + ) x(table_oid, size) + group by table_oid + ) + """ % (FIRST_NORMAL_OID, RELSTORAGE_EXTERNAL) + + final_sql = """{cte_sql} + select + s1.table_oid, + s1.dbname, + s1.fq_name, + s1.root_partition_oid, + s1.rank, + s1.external_writable, + s1.undone_status, + s1.expansion_started, + s1.expansion_finished, + coalesce(table_size_cte.size, 0) as source_bytes, + s1.rel_storage as rel_storage + from ({orig_sql})s1 left join table_size_cte + on s1.table_oid = table_size_cte.table_oid + """ + return final_sql.format(cte_sql=cte_sql, + orig_sql=sql) + + def compute_shrink_size(self): + """Compute we need to shrink the cluster to actual size """ + totalsize = len(self.gparray.segmentPairs) + removesize = len(self.gparray.shrinkSegmentPairs) + + if removesize >= totalsize: + self.logger.error('remove segment num %d more than segment num %d', removesize, totalsize) + exit(1) + elif removesize < 1: + self.logger.error('remove segment num %d less than 1', removesize) + exit(1) + self.size = totalsize - removesize + + def perform_shrink(self): + """Performs the actual table re-organizations""" + self.statusLogger.set_status('SHRINK_PERFORM_STARTED') + self.statusLogger.set_status('SHRINK_TABLE_STARTED') + + shrinkStart = datetime.datetime.now() + + # setup a threadpool + self.queue = WorkerPool(numWorkers=self.numworkers) + + # go through and reset any "IN PROGRESS" tables + self.conn = dbconn.connect(self.dburl, encoding='UTF8') + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STARTED', '%s' ) " % ( + shrinkStart) + dbconn.execSQL(self.conn, sql) + + sql = """UPDATE gpshrink.status_detail set status = '%s' WHERE status = '%s' """ % (undone_status, start_status) + dbconn.execSQL(self.conn, sql) + self.conn.commit() + + # read schema and queue up commands + sql = "SELECT * FROM gpshrink.status_detail WHERE status = 'NOT STARTED' ORDER BY rank" + cursor = dbconn.execSQL(self.conn, sql) + self.conn.commit() + + for row in cursor: + self.logger.debug(row) + name = "name" + tbl = ShrinkTable(options=self.options, row=row, size=self.size) + cmd = ShrinkCommand(name=name, status_url=self.dburl, table=tbl, options=self.options) + self.queue.addCommand(cmd) + + table_shrink_error = False + + stopTime = None + stoppedEarly = False + if self.options.end: + stopTime = self.options.end + + # wait till done. + while not self.queue.isDone(): + logger.debug( + "woke up. queue: %d finished %d " % (self.queue.assigned, self.queue.completed_queue.qsize())) + if stopTime and datetime.datetime.now() >= stopTime: + stoppedEarly = True + break + time.sleep(5) + + shrinkStopped = datetime.datetime.now() + + self.queue.haltWork() + self.queue.joinWorkers() + + + # Doing this after the halt and join workers guarantees that no new completed items can be added + # while we're doing a check + for shrinkCommand in self.queue.getCompletedItems(): + if shrinkCommand.table_shrink_error: + table_shrink_error = True + break + + if stoppedEarly: + logger.info('End time reached. Stopping shrink.') + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STOPPED', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + self.conn.commit() + logger.info('You can resume shrink by running gpshrink again') + elif table_shrink_error: + logger.warn('**************************************************') + logger.warn('One or more tables failed to shrink successfully.') + logger.warn('Please check the log file, correct the problem and') + logger.warn('run gpshrink again to finish the shrink process') + logger.warn('**************************************************') + # We'll try to update the status, but if the errors were caused by + # going into read only mode, this will fail. That's ok though as + # gpshrink will resume next run + try: + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STOPPED', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + self.conn.commit() + except: + pass + else: + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK COMPLETE', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + self.conn.commit() + logger.info("SHRINK COMPLETED SUCCESSFULLY") + + self.conn.close() + self.statusLogger.set_status('SHRINK_TABLE_DONE') + + def shutdown(self): + """used if the script is closed abrubtly""" + logger.info('Shutting down gpshrink...') + if self.pool: + self.pool.haltWork() + self.pool.joinWorkers() + + if self.queue: + self.queue.haltWork() + self.queue.joinWorkers() + + try: + shrinkStopped = datetime.datetime.now() + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STOPPED', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + self.conn.commit() + self.conn.close() + except pg.OperationalError: + pass + except Exception: + # schema doesn't exist. Cancel or error during setup + pass + + def halt_work(self): + if self.pool: + self.pool.haltWork() + self.pool.joinWorkers() + + if self.queue: + self.queue.haltWork() + self.queue.joinWorkers() + + def cleanup_schema(self): + """Removes the gpshrink schema""" + # drop schema + + # See if user wants to dump the status_detail table to file + c = dbconn.connect(self.dburl, encoding='UTF8') + + self.logger.info("Removing gpshrink schema") + dbconn.execSQL(c, drop_schema_sql) + c.commit() + c.close() + + def connect_database(self, dbname): + test_url = copy.deepcopy(self.dburl) + test_url.pgdb = dbname + c = dbconn.connect(test_url, encoding='UTF8', allowSystemTableMods=True) + return c + + def validate_heap_checksums(self): + num_workers = min(len(self.gparray.get_hostlist()), MAX_PARALLEL_SHRINKS) + heap_checksum_util = HeapChecksum(gparray=self.gparray, num_workers=num_workers, logger=self.logger) + successes, failures = heap_checksum_util.get_segments_checksum_settings() + if len(successes) == 0: + logger.fatal("No segments responded to ssh query for heap checksum. Not shrinking the cluster.") + return 1 + + consistent, inconsistent, master_heap_checksum = heap_checksum_util.check_segment_consistency(successes) + + inconsistent_segment_msgs = [] + for segment in inconsistent: + inconsistent_segment_msgs.append("dbid: %s " + "checksum set to %s differs from master checksum set to %s" % + (segment.getSegmentDbId(), segment.heap_checksum, + master_heap_checksum)) + + if not heap_checksum_util.are_segments_consistent(consistent, inconsistent): + self.logger.fatal("Cluster heap checksum setting differences reported") + self.logger.fatal("Heap checksum settings on %d of %d segment instances do not match master <<<<<<<<" + % (len(inconsistent_segment_msgs), len(self.gparray.segmentPairs))) + self.logger.fatal("Review %s for details" % get_logfile()) + log_to_file_only("Failed checksum consistency validation:", logging.WARN) + self.logger.fatal("gpshrink error: Cluster will not be modified as checksum settings are not consistent " + "across the cluster.") + + for msg in inconsistent_segment_msgs: + log_to_file_only(msg, logging.WARN) + raise Exception("Segments have heap_checksum set inconsistently to master") + else: + self.logger.info("Heap checksum setting consistent across cluster") + + +# ----------------------------------------------- +class ShrinkTable(): + def __init__(self, options, row=None, size = 0): + self.options = options + if row is not None: + (self.table_oid, self.dbname, self.fq_name, + self.root_partition_oid, self.rank, self.external_writable, + self.status, self.shrink_started, self.shrink_finished, + self.source_bytes, self.rel_storage) = row + self.size = size + + def add_table(self, conn): + insertSQL = """INSERT INTO gpshrink.status_detail + VALUES ('%s','%s',%s, + '%d',%d,'%s','%s','%s','%s',%d, '%s') + """ % (self.dbname.replace("'", "''"), self.fq_name.replace("'", "''"), self.table_oid, + self.root_partition_oid, + self.rank, self.external_writable, self.status, + self.shrink_started, self.shrink_finished, + self.source_bytes, self.rel_storage) + logger.info('Added table %s.%s' % (self.dbname.decode('utf-8'), self.fq_name.decode('utf-8'))) + logger.debug(insertSQL.decode('utf-8')) + dbconn.execSQL(conn, insertSQL) + conn.commit() + + def mark_started(self, status_conn, table_conn, start_time, cancel_flag): + if cancel_flag: + return + sql = "SELECT pg_relation_size(%s)" % (self.table_oid) + row = dbconn.execSQL(table_conn, sql) + src_bytes = int(row.fetchone()[0]) + table_conn.commit() + logger.debug(" Table: %s has %d bytes" % (self.fq_name, src_bytes)) + + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_started='%s', + source_bytes = %d + WHERE dbname = '%s' + AND table_oid = %s """ % (start_status, start_time, + src_bytes, self.dbname.replace("'", "''"), + self.table_oid) + + logger.debug("Mark Started: " + sql) + dbconn.execSQL(status_conn, sql) + status_conn.commit() + + def reset_started(self, status_conn): + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_started=NULL, shrink_finished=NULL + WHERE dbname = '%s' + AND table_oid = %s """ % (undone_status, + self.dbname.replace("'", "''"), self.table_oid) + + logger.debug('Resetting detailed_status: %s' % sql) + dbconn.execSQL(status_conn, sql) + status_conn.commit() + + def shrink(self, table_conn, cancel_flag): + # shrink leaf partitions separately in parallel + # FIXME: alter table on external table does not throw + # a warning, but it will throw error in 6X + # do we still need using alter external table? + if self.root_partition_oid is not None: + return True + else: + # FIXME: Can "ONLY" be allowed in "EXPAND TABLE"? + sql = 'ALTER TABLE %s SHRINK TABLE to %d' % (self.fq_name, self.size) + + logger.info('Shrinking %s.%s' % (self.dbname, self.fq_name)) + logger.debug("Shrink SQL: %s" % sql) + + # check is atomic in python + if not cancel_flag: + dbconn.execSQL(table_conn, sql) + # the ALTER TABLE command requires a commit to execute + table_conn.commit() + if self.options.analyze: + sql = 'ANALYZE %s' % (self.fq_name) + logger.info('Analyzing %s' % (self.fq_name)) + dbconn.execSQL(table_conn, sql) + table_conn.commit() + + return True + + # I can only get here if the cancel flag is True + return False + + def mark_finished(self, status_conn, start_time, finish_time): + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_started='%s', shrink_finished='%s' + WHERE dbname = '%s' + AND table_oid = %s """ % (done_status, start_time, finish_time, + self.dbname.replace("'", "''"), self.table_oid) + logger.debug(sql) + dbconn.execSQL(status_conn, sql) + status_conn.commit() + + def mark_does_not_exist(self, status_conn, finish_time): + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_finished='%s' + WHERE dbname = '%s' + AND table_oid = %s """ % (does_not_exist_status, finish_time, + self.dbname.replace("'", "''"), self.table_oid) + logger.debug(sql) + dbconn.execSQL(status_conn, sql) + status_conn.commit() + +def sig_handler(sig, arg): + if _gp_shrink is not None: + _gp_shrink.shutdown() + + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) + + # raise sig + os.kill(os.getpid(), sig) + + +# ----------------------------------------------- +class ShrinkCommand(SQLCommand): + def __init__(self, name, status_url, table, options): + self.status_url = status_url + self.table = table + self.options = options + self.cmdStr = "Shrink %s.%s" % (table.dbname, table.fq_name) + self.table_url = copy.deepcopy(status_url) + self.table_url.pgdb = table.dbname + self.table_shrink_error = False + + SQLCommand.__init__(self, name) + + def run(self, validateAfter=False): + # connect. + status_conn = None + table_conn = None + table_exp_success = False + + try: + status_conn = dbconn.connect(self.status_url, encoding='UTF8') + table_conn = dbconn.connect(self.table_url, encoding='UTF8') + except DatabaseError as ex: + if self.options.verbose: + logger.exception(ex) + logger.error(ex.__str__().strip()) + if status_conn: status_conn.close() + if table_conn: table_conn.close() + self.table_shrink_error = True + return + + # validate table hasn't been dropped + start_time = None + try: + sql = """select * from pg_class c where c.oid = %d """ % (self.table.table_oid) + + cursor = dbconn.execSQL(table_conn, sql) + table_conn.commit() + + if cursor.rowcount == 0: + logger.info('%s no longer exists in database %s' % (self.table.fq_name, + self.table.dbname)) + + self.table.mark_does_not_exist(status_conn, datetime.datetime.now()) + status_conn.close() + table_conn.close() + return + else: + # Set conn for cancel + self.cancel_conn = table_conn + start_time = datetime.datetime.now() + if not self.options.simple_progress: + self.table.mark_started(status_conn, table_conn, start_time, self.cancel_flag) + + table_exp_success = self.table.shrink(table_conn, self.cancel_flag) + + except Exception as ex: + if ex.__str__().find('canceling statement due to user request') == -1 and not self.cancel_flag: + self.table_shrink_error = True + if self.options.verbose: + logger.exception(ex) + logger.error('Table %s.%s failed to shrink: %s' % (self.table.dbname, + self.table.fq_name, + ex.__str__().strip())) + else: + logger.info('ALTER TABLE of %s.%s canceled' % ( + self.table.dbname, self.table.fq_name)) + + if table_exp_success: + end_time = datetime.datetime.now() + # update metadata + logger.info( + "Finished shrinking %s.%s" % (self.table.dbname, self.table.fq_name)) + self.table.mark_finished(status_conn, start_time, end_time) + elif not self.options.simple_progress: + logger.info("Resetting status_detail for %s.%s" % ( + self.table.dbname, self.table.fq_name)) + self.table.reset_started(status_conn) + + # disconnect + status_conn.close() + table_conn.close() + + def set_results(self, results): + raise ExecutionError("TODO: must implement", None) + + def get_results(self): + raise ExecutionError("TODO: must implement", None) + + def was_successful(self): + raise ExecutionError("TODO: must implement", None) + + def validate(self, expected_rc=0): + raise ExecutionError("TODO: must implement", None) + + +# ------------------------------- UI Help -------------------------------- +def read_hosts_file(hosts_file): + new_hosts = [] + try: + f = open(hosts_file, 'r') + try: + for l in f: + if l.strip().startswith('#') or l.strip() == '': + continue + + new_hosts.append(l.strip()) + + finally: + f.close() + except IOError: + raise ShrinkError('Hosts file %s not found' % hosts_file) + + return new_hosts + + +# -------------------------------------------------------------------------- +# Main +# -------------------------------------------------------------------------- +def main(options, args, parser): + global _gp_shrink + + remove_pid = True + gpshrink_db_status = None + + try: + # setup signal handlers so we can clean up correctly + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGHUP, sig_handler) + + logger = get_default_logger() + setup_tool_logging(EXECNAME, getLocalHostname(), getUserName()) + + options, args = validate_options(options, args, parser) + + if options.verbose: + enable_verbose_logging() + + if is_gpshrink_running(options.master_data_directory): + logger.error('gpshrink is already running. Only one instance') + logger.error('of gpshrink is allowed at a time.') + remove_pid = False + sys.exit(1) + else: + create_pid_file(options.master_data_directory) + + # prepare provider for updateSystemConfig + gpEnv = GpMasterEnvironment(options.master_data_directory, True) + configurationInterface.registerConfigurationProvider( + configurationImplGpdb.GpConfigurationProviderUsingGpdbCatalog()) + configurationInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort()) + + dburl = dbconn.DbURL(dbname=DBNAME, port=gpEnv.getMasterPort()) + + gpshrink_db_status = gpshrink.prepare_gpdb_state(logger, dburl, options) + + # Get array configuration + try: + gparray = GpArray.initFromCatalog(dburl, utility=True) + except DatabaseError as ex: + logger.error('Failed to connect to database. Make sure the') + logger.error('CloudberryDB instance you wish to shrink is running') + logger.error('and that your environment is correct, then rerun') + logger.error('gshrink ' + ' '.join(sys.argv[1:])) + sys.exit(1) + + _gp_shrink = gpshrink(logger, gparray, dburl, options, parallel=options.parallel) + + if options.clean: + _gp_shrink.cleanup_schema() + _gp_shrink.cleanup_file() + logger.info('Cleanup Finished. exiting...') + sys.exit(0) + + if options.rollback: + try: + logger.info('Rollback is not support in shrink.') + sys.exit(0) + except ShrinkError as e: + logger.error(e) + sys.exit(1) + + if options.filename is None: + logger.error('gpshrink must with input file') + + if gpshrink_db_status is None and options.filename: + _gp_shrink.validate_heap_checksums() + removeSegList = _gp_shrink.read_input_files() + _gp_shrink.add_remove_segments(removeSegList) + _gp_shrink.start_prepare() + _gp_shrink.lock_catalog() + _gp_shrink.update_original_segments() + _gp_shrink.update_catalog_swap_segment() + _gp_shrink.unlock_catalog() + _gp_shrink.setup_schema() + _gp_shrink.prepare_schema() + logger.info('************************************************') + logger.info('Initialization of the system shrink complete.') + logger.info('To begin table shrink onto the new segments') + logger.info('rerun gpshrink') + logger.info('************************************************') + elif gpshrink_db_status == 'SETUP DONE' or gpshrink_db_status == 'SHRINK STARTED' or gpshrink_db_status == 'SHRINK STOPPED': + if not _gp_shrink.validate_max_connections(): + raise ValidationError() + removeSegList = _gp_shrink.read_input_files() + _gp_shrink.add_remove_segments(removeSegList) + _gp_shrink.compute_shrink_size() + _gp_shrink.perform_shrink() + _gp_shrink.lock_catalog() + _gp_shrink.update_original_segments() + _gp_shrink.update_catalog_remove_segments() + _gp_shrink.unlock_catalog() + _gp_shrink.stop_remove_segments() + elif gpshrink_db_status == 'SHRINK COMPLETE': + logger.info('shrink has already completed.') + logger.info('If you want to shrink again, run gpshrink -c to remove') + logger.info('the gpshrink schema and begin a new shrink') + else: + logger.error('gpshrink_db_status is %s', gpshrink_db_status) + logger.error('The last gpshrink setup did not complete successfully.') + logger.error('Please run gpshrink -c to clean to the original state.') + + logger.info("Exiting...") + sys.exit(0) + + except ValidationError as e: + logger.info('Input validation failed: %s', e) + if _gp_shrink is not None: + _gp_shrink.shutdown() + sys.exit() + except Exception as e: + logger.error('Exception happens: %s', e) + if _gp_shrink is not None: + _gp_shrink.shutdown() + if not (gpshrink_db_status is None): + logger.error('May left the database in uncompleted state') + logger.error('Any remaining issues must be addressed outside of gpshrink.') + logger.error('You can shrink all the table in the database by yourself in gpshrink.status_detail.') + logger.error('And \'gpshrink -c\' to clean the file and schema.') + sys.exit(3) + except KeyboardInterrupt: + # Disable SIGINT while we shutdown. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + if _gp_shrink is not None: + _gp_shrink.shutdown() + + # Re-enabled SIGINT + signal.signal(signal.SIGINT, signal.default_int_handler) + + sys.exit('\nUser Interrupted') + + + finally: + try: + if remove_pid and options: + remove_pid_file(options.master_data_directory) + except NameError: + pass + + if _gp_shrink is not None: + _gp_shrink.halt_work() + + +if __name__ == '__main__': + options, args, parser = parseargs() + main(options, args, parser) \ No newline at end of file diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 423b2a60478..f99bdc2c9e4 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -293,7 +293,7 @@ struct DropRelationCallbackState #define ATT_COMPOSITE_TYPE 0x0010 #define ATT_FOREIGN_TABLE 0x0020 -static void ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd); +static void ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd, int numsegments); static void truncate_check_rel(Relation rel); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel, @@ -459,8 +459,8 @@ static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, static void RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid, void *arg); static void RemoveInheritance(Relation child_rel, Relation parent_rel, bool is_partition); -static void ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd); -static void ATExecExpandPartitionTablePrepare(Relation rel); +static void ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd, int numsegments); +static void ATExecExpandPartitionTablePrepare(Relation rel, int numsegments); static void ATExecSetDistributedBy(Relation rel, Node *node, AlterTableCmd *cmd); @@ -4118,6 +4118,7 @@ AlterTableGetLockLevel(List *cmds) /* GPDB additions */ case AT_ExpandTable: case AT_ExpandPartitionTablePrepare: + case AT_ShrinkTable: case AT_SetDistributedBy: case AT_PartAdd: case AT_PartAddForSplit: @@ -4843,6 +4844,54 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_MISC; break; + case AT_ShrinkTable: + ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE | ATT_MATVIEW); + + /* ATTACH and DETACH will process in ATExecAttachPartition function */ + if (!recursing) + { + Assert(IsA(cmd->def, ExpandStmtSpec) || IsA(cmd->def, Integer)); + Oid relid = RelationGetRelid(rel); + PartStatus ps = rel_part_status(relid); + int numseg; + + if (IsA(cmd->def, ExpandStmtSpec)) { + numseg = ((ExpandStmtSpec*)(cmd->def))->numseg; + } else { + numseg = intVal(cmd->def); + } + + if (Gp_role == GP_ROLE_DISPATCH && + rel->rd_cdbpolicy->numsegments <= numseg) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot shrink table \"%s\"", + RelationGetRelationName(rel)), + errdetail("table numsegments \"%d\", shrink size \"%d\" " ,rel->rd_cdbpolicy->numsegments, numseg))); + + switch (ps) + { + case PART_STATUS_NONE: + case PART_STATUS_ROOT: + break; + + case PART_STATUS_INTERIOR: + case PART_STATUS_LEAF: + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot shrink leaf or interior partition \"%s\"", + RelationGetRelationName(rel)), + errdetail("root/leaf/interior partitions need to have same numsegments"), + errhint("use \"ALTER TABLE %s SHRINK TABLE\" instead", + get_rel_name(rel_partition_get_master(relid))))); + break; + } + } + + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode); + pass = AT_PASS_MISC; + break; + case AT_AddInherit: /* INHERIT */ ATSimplePermissions(rel, ATT_TABLE); ATPartitionCheck(cmd->subtype, rel, true, recursing); @@ -5526,6 +5575,9 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode) /* ATExecExpandTable() may close the relation temporarily */ else if (atc->subtype == AT_ExpandTable) rel = relation_open(tab->relid, NoLock); + /* ATExecExpandTable() may close the relation temporarily */ + else if (atc->subtype == AT_ShrinkTable) + rel = relation_open(tab->relid, NoLock); } /* @@ -5803,11 +5855,21 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation *rel_p, checkATSetDistributedByStandalone(tab, rel); ATExecSetDistributedBy(rel, (Node *) cmd->def, cmd); break; - case AT_ExpandTable: /* SET DISTRIBUTED BY */ - ATExecExpandTable(wqueue, rel, cmd); + case AT_ExpandTable: /* EXPAND TABLE */ + ATExecExpandTable(wqueue, rel, cmd, getgpsegmentCount()); break; case AT_ExpandPartitionTablePrepare: /* EXPAND PARTITION PREPARE */ - ATExecExpandPartitionTablePrepare(rel); + ATExecExpandPartitionTablePrepare(rel, getgpsegmentCount()); + break; + case AT_ShrinkTable: /* SHRINK TABLE */ + switch (cmd->def->type) { + case T_ExpandStmtSpec: + ATExecExpandTable(wqueue, rel, cmd, ((ExpandStmtSpec*)(cmd->def))->numseg); + break; + default: + ATExecExpandTable(wqueue, rel, cmd, intVal(cmd->def)); + break; + } break; /* CDB: Partitioned Table commands */ case AT_PartAdd: /* Add */ @@ -15277,9 +15339,13 @@ static void checkUniqueIndexCompatible(Relation rel, GpPolicy *pol) * the data to the new reltion file, and swap it in place of the old one. * This is called the "CTAS method", because it uses a CREATE TABLE AS * command internally to create the new physical relation. + * + * To support shrink, We add parameter numsegments to set table policy to + * arbitrary size. For expand, the numsegments is getgpsegmentCount. For shrink + * the numsegments is input of user. */ static void -ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) +ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd, int numsegments) { AlteredTableInfo *tab; AlterTableCmd *rootCmd; @@ -15314,8 +15380,16 @@ ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) if (Gp_role == GP_ROLE_DISPATCH) { /* only rootCmd is dispatched to QE, we can store */ - if (rootCmd == cmd) - rootCmd->def = (Node*)makeNode(ExpandStmtSpec); + if (rootCmd == cmd) { + ExpandStmtSpec *spec; + + spec = makeNode(ExpandStmtSpec); + + if (rootCmd->def != NULL && IsA(rootCmd->def, Integer)) { + spec->numseg = intVal((A_Const*) rootCmd->def); + } + rootCmd->def = (Node*) spec; + } } if (RelationIsExternal(rel)) @@ -15333,11 +15407,11 @@ ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) } else { - ATExecExpandTableCTAS(rootCmd, rel, cmd); + ATExecExpandTableCTAS(rootCmd, rel, cmd, numsegments); } /* Update numsegments to cluster size */ - newPolicy->numsegments = getgpsegmentCount(); + newPolicy->numsegments = numsegments; GpPolicyReplace(relid, newPolicy); } @@ -15362,12 +15436,12 @@ ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) * and new policy type of leaf partitions are randomly on 3 segments * * @param rel the parent or child of partition table + * + * @param numsegments, the number of segments to expand to, see ATExecExpandTable for details. */ static void -ATExecExpandPartitionTablePrepare(Relation rel) +ATExecExpandPartitionTablePrepare(Relation rel, int numsegments) { - /* get current distribution policy and partition rule (root/interior/leaf) */ - int new_numsegments = getgpsegmentCount(); Oid relid = RelationGetRelid(rel); if (GpPolicyIsRandomPartitioned(rel->rd_cdbpolicy) || has_subclass(rel->rd_id)) @@ -15381,7 +15455,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) */ oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(rel)); new_policy = GpPolicyCopy(rel->rd_cdbpolicy); - new_policy->numsegments = new_numsegments; + new_policy->numsegments = numsegments; MemoryContextSwitchTo(oldcontext); GpPolicyReplace(relid, new_policy); @@ -15407,7 +15481,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) /* Just modify the numsegments for external writable leaves */ oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(rel)); new_policy = GpPolicyCopy(rel->rd_cdbpolicy); - new_policy->numsegments = new_numsegments; + new_policy->numsegments = numsegments; MemoryContextSwitchTo(oldcontext); GpPolicyReplace(relid, new_policy); @@ -15422,7 +15496,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(rel)); - new_policy = createRandomPartitionedPolicy(new_numsegments); + new_policy = createRandomPartitionedPolicy(numsegments); MemoryContextSwitchTo(oldcontext); GpPolicyReplace(relid, new_policy); @@ -15433,7 +15507,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) } static void -ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd) +ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd, int numsegments) { RangeVar *tmprv; Oid tmprelid; @@ -15476,7 +15550,7 @@ ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd) /* Step (b) - build CTAS */ distby = make_distributedby_for_rel(rel); - distby->numsegments = getgpsegmentCount(); + distby->numsegments = numsegments; queryDesc = build_ctas_with_dist(rel, distby, untransformRelOptions(get_rel_opts(rel)), @@ -20535,6 +20609,10 @@ char *alterTableCmdString(AlterTableType subtype) cmdstring = pstrdup("expand table data on"); break; + case AT_ShrinkTable: + cmdstring = pstrdup("shrink table data on"); + break; + case AT_PartAdd: /* Add */ case AT_PartAddForSplit: /* Add */ case AT_PartAlter: /* Alter */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index db3f53a5d7b..c38fccfad41 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3622,6 +3622,7 @@ _copyExpandStmtSpec(const ExpandStmtSpec *from) { ExpandStmtSpec *newnode = makeNode(ExpandStmtSpec); + COPY_SCALAR_FIELD(numseg); COPY_SCALAR_FIELD(backendId); return newnode; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 0dfe3360ab6..7c0ee2eed62 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -3133,6 +3133,7 @@ static void _outExpandStmtSpec(StringInfo str, const ExpandStmtSpec *node) { WRITE_NODE_TYPE("EXPANDSTMTSPEC"); + WRITE_INT_FIELD(numseg); WRITE_OID_FIELD(backendId); } diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c index 5ab45651c05..8eba0a9ac4c 100644 --- a/src/backend/nodes/readfast.c +++ b/src/backend/nodes/readfast.c @@ -1231,6 +1231,7 @@ _readExpandStmtSpec(void) { READ_LOCALS(ExpandStmtSpec); + READ_INT_FIELD(numseg); READ_OID_FIELD(backendId); READ_DONE(); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 1fcd69d670b..8bb7ab9c0ee 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -743,7 +743,7 @@ static Node *makeIsNotDistinctFromNode(Node *expr, int position); RANDOMLY READABLE READS REJECT_P REPLICATED RESOURCE ROLLUP ROOTPARTITION - SCATTER SEGMENT SEGMENTS SETS SPLIT SQL SUBPARTITION + SCATTER SEGMENT SEGMENTS SETS SHRINK SPLIT SQL SUBPARTITION THRESHOLD TIES @@ -3043,6 +3043,14 @@ alter_table_cmd: n->subtype = AT_ExpandPartitionTablePrepare; $$ = (Node *)n; } + /* ALTER TABLE SHRINK TABLE TO */ + | SHRINK TABLE TO SignedIconst + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_ShrinkTable; + n->def = (Node *) makeInteger($4); + $$ = (Node *)n; + } /* ALTER TABLE OF */ | OF any_name { @@ -15980,6 +15988,7 @@ unreserved_keyword: | SET | SHARE | SHOW + | SHRINK | SIMPLE | SNAPSHOT | SPLIT diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3ed085c4efd..57f1faf42ab 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1514,7 +1514,11 @@ typedef enum AlterTableType AT_PartSplit, /* Split */ AT_PartTruncate, /* Truncate */ AT_PartAddInternal, /* CREATE TABLE time partition addition */ - AT_PartAttachIndex /* ALTER INDEX ATTACH PARTITION (not exposed to user) */ + AT_PartAttachIndex, /* ALTER INDEX ATTACH PARTITION (not exposed to user) */ + + + /* YGP addons */ + AT_ShrinkTable /* SHRINK DISTRIBUTED */ } AlterTableType; typedef struct ReplicaIdentityStmt @@ -2124,6 +2128,7 @@ typedef struct ExpandStmtSpec { NodeTag type; /* for ctas method */ + int numseg; Oid backendId; } ExpandStmtSpec; diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index b6705abe3df..cd3e2cedd74 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -403,6 +403,7 @@ PG_KEYWORD("setof", SETOF, COL_NAME_KEYWORD) PG_KEYWORD("sets", SETS, COL_NAME_KEYWORD) PG_KEYWORD("share", SHARE, UNRESERVED_KEYWORD) PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD) +PG_KEYWORD("shrink", SHRINK, UNRESERVED_KEYWORD) /* GPDB */ PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD) PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD) diff --git a/src/test/isolation2/Makefile b/src/test/isolation2/Makefile index b192a86db3b..2141bd733ab 100644 --- a/src/test/isolation2/Makefile +++ b/src/test/isolation2/Makefile @@ -75,6 +75,9 @@ installcheck: install installcheck-parallel-retrieve-cursor installcheck-ic-tcp installcheck-resgroup: install ./pg_isolation2_regress $(EXTRA_REGRESS_OPTS) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_resgroup --psqldir='$(PSQLDIR)' --inputdir=$(srcdir) --dbname=isolation2resgrouptest --schedule=$(srcdir)/isolation2_resgroup_schedule +installcheck-expandshrink: install + $(pg_isolation2_regress_installcheck) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_isolation2 --dbname=isolation2expandshrinktest --schedule=$(srcdir)/isolation2_expandshrink_schedule + installcheck-parallel-retrieve-cursor: install ./pg_isolation2_regress $(EXTRA_REGRESS_OPTS) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_parallel_retrieve_cursor --psqldir='$(PSQLDIR)' --inputdir=$(srcdir) --dbname=isolation2parallelretrcursor --load-extension=gp_inject_fault --schedule=$(srcdir)/parallel_retrieve_cursor_schedule diff --git a/src/test/isolation2/expected/gpexpand_gpshrink.out b/src/test/isolation2/expected/gpexpand_gpshrink.out new file mode 100644 index 00000000000..c571c299bbd --- /dev/null +++ b/src/test/isolation2/expected/gpexpand_gpshrink.out @@ -0,0 +1,397 @@ +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 3) + +!\retcode gpshrink -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 1) + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +create table test(a int); +CREATE + +insert into test select i from generate_series(1,100) i; +INSERT 100 + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 30 + 3 | 26 + 0 | 28 + 2 | 16 +(4 rows) + +select count(*) from gp_segment_configuration; + count +------- + 10 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 4 | 1 +(1 row) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -c; +-- start_ignore + + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 0) + +drop table test; +DROP + + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +create table test_partitioned (a int, b int) partition by range (b) (start(1) end(101) every(20),default partition def); +CREATE + +insert into test_partitioned select i,i from generate_series(1,100) i; +INSERT 100 + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 37 + 2 | 25 + 0 | 38 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 30 + 2 | 16 + 0 | 28 + 3 | 26 +(4 rows) + +select count(*) from gp_segment_configuration; + count +------- + 10 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 4 | 1 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 4 | 1 +(1 row) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 2 | 25 + 1 | 37 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +drop table test_partitioned; +DROP + +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -c; +-- start_ignore + + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 0) + + +-- expand two segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m localhost|localhost|7010|/tmp/datadirs/dbfast5/demoDataDir4|11|4|p localhost|localhost|7011|/tmp/datadirs/dbfast_mirror5/demoDataDir4|12|4|m" > /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +create table test(a int); +CREATE + +insert into test select i from generate_series(1,100) i; +INSERT 100 + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 37 + 2 | 25 + 0 | 38 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 3 | 22 + 2 | 13 + 1 | 20 + 0 | 24 + 4 | 21 +(5 rows) + +select count(*) from gp_segment_configuration; + count +------- + 12 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 5 | 1 +(1 row) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 37 + 2 | 25 + 0 | 38 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +drop table test; +DROP + +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 0) diff --git a/src/test/isolation2/isolation2_expandshrink_schedule b/src/test/isolation2/isolation2_expandshrink_schedule new file mode 100644 index 00000000000..3fd75a5eff0 --- /dev/null +++ b/src/test/isolation2/isolation2_expandshrink_schedule @@ -0,0 +1,3 @@ +# Tests for gpexpand and gpshrink +# Keep for single schedule due to redistribute all the table in all database +test: gpexpand_gpshrink \ No newline at end of file diff --git a/src/test/isolation2/sql/gpexpand_gpshrink.sql b/src/test/isolation2/sql/gpexpand_gpshrink.sql new file mode 100644 index 00000000000..6a73fd5a254 --- /dev/null +++ b/src/test/isolation2/sql/gpexpand_gpshrink.sql @@ -0,0 +1,141 @@ +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p +localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; + +create table test(a int); + +insert into test select i from generate_series(1,100) i; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpexpand -i /tmp/testexpand; + +!\retcode gpexpand -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpshrink -i /tmp/testexpand; + +!\retcode gpshrink -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; + +drop table test; + + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p +localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; + +create table test_partitioned (a int, b int) partition by range (b) (start(1) end(101) every(20),default partition def); + +insert into test_partitioned select i,i from generate_series(1,100) i; + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + +!\retcode gpexpand -i /tmp/testexpand; + +!\retcode gpexpand -i /tmp/testexpand; + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + +!\retcode gpshrink -i /tmp/testexpand; + +!\retcode gpshrink -i /tmp/testexpand; + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + +drop table test_partitioned; + +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; + + +-- expand two segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p +localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m +localhost|localhost|7010|/tmp/datadirs/dbfast5/demoDataDir4|11|4|p +localhost|localhost|7011|/tmp/datadirs/dbfast_mirror5/demoDataDir4|12|4|m" > /tmp/testexpand; + +create table test(a int); + +insert into test select i from generate_series(1,100) i; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpexpand -i /tmp/testexpand; + +!\retcode gpexpand -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpshrink -i /tmp/testexpand; + +!\retcode gpshrink -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +drop table test; + +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; \ No newline at end of file diff --git a/src/test/regress/expected/shrink_table.out b/src/test/regress/expected/shrink_table.out new file mode 100644 index 00000000000..74ed3b6b80b --- /dev/null +++ b/src/test/regress/expected/shrink_table.out @@ -0,0 +1,856 @@ +drop schema if exists test_shrink_table cascade; +NOTICE: schema "test_shrink_table" does not exist, skipping +create schema test_shrink_table; +set search_path=test_shrink_table,public; +set default_table_access_method='heap'; +set allow_system_table_mods=true; +-- Hash distributed tables +Create table t1(a int, b int, c int) distributed by (a); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 2 | 25 + 0 | 38 + 1 | 37 +(3 rows) + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 2 | 25 + 1 | 37 +(3 rows) + +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table t1; +Create table t1(a int, b int, c int) distributed by (a,b); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 2 | 37 + 0 | 33 + 1 | 30 +(3 rows) + +begin; +Alter table t1 shrink table to 1; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 100 +(1 row) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 33 + 1 | 30 + 2 | 37 +(3 rows) + +Alter table t1 shrink table to 1; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 100 +(1 row) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 1 +(1 row) + +drop table t1; +-- Test NULLs. +Create table t1(a int, b int, c int) distributed by (a,b,c); +insert into t1 values + (1, 1, 1 ), + (null, 2, 2 ), + (3, null, 3 ), + (4, 4, null), + (null, null, 5 ), + (null, 6, null), + (7, null, null), + (null, null, null); +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 3 + 1 | 1 + 2 | 4 +(3 rows) + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 4 + 1 | 4 +(2 rows) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 3 + 1 | 1 + 2 | 4 +(3 rows) + +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 4 + 1 | 4 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table t1; +Create table t1(a int, b int, c int) distributed by (a) partition by list(b) (partition t1_1 values(1), partition t1_2 values(2), default partition other); +insert into t1 select i,i,0 from generate_series(1,100) I; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +NOTICE: One or more columns in the following table(s) do not have statistics: t1 +HINT: For non-partitioned tables, run analyze (). For partitioned tables, run analyze rootpartition (). See log for columns missing statistics. + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +NOTICE: One or more columns in the following table(s) do not have statistics: t1 +HINT: For non-partitioned tables, run analyze (). For partitioned tables, run analyze rootpartition (). See log for columns missing statistics. + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table t1; +-- Random distributed tables +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +Create table r1(a int, b int, c int) distributed randomly partition by list(b) (partition r1_1 values(1), partition r1_2 values(2), default partition other); +insert into r1 select i,i,0 from generate_series(1,100) I; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +-- Replicated tables +Create table r1(a int, b int, c int) distributed replicated; +insert into r1 select i,i,0 from generate_series(1,100) I; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +abort; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +-- +Create table r1(a int, b int, c int) distributed replicated; +insert into r1 select i,i,0 from generate_series(1,100) I; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +abort; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +-- table with update triggers on distributed key column +CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +CREATE TABLE table_with_update_trigger(a int, b int, c int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +insert into table_with_update_trigger select i,i,0 from generate_series(1,100) I; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +CREATE TRIGGER foo_br_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_ar_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_bs_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); +ERROR: Triggers for statements are not yet supported +CREATE TRIGGER foo_as_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); +ERROR: Triggers for statements are not yet supported +-- update should fail +update table_with_update_trigger set a = a + 1; +ERROR: UPDATE on distributed key column not allowed on relation with update triggers +-- data expansion should success and not hiting any triggers. +Alter table table_with_update_trigger shrink table to 2; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +drop table table_with_update_trigger; +-- +-- Test shrinking inheritance parent table, parent table has different +-- numsegments with child tables. +-- +create table mix_base_tbl (a int4, b int4) DISTRIBUTED RANDOMLY; +insert into mix_base_tbl select g, g from generate_series(1, 3) g; +create table mix_child_a (a int4, b int4) inherits (mix_base_tbl) distributed by (a); +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +insert into mix_child_a select g, g from generate_series(11, 13) g; +create table mix_child_b (a int4, b int4) inherits (mix_base_tbl) distributed by (b); +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +insert into mix_child_b select g, g from generate_series(21, 23) g; +-- shrink the child table, not effect parent table +Alter table mix_child_a shrink table to 2; +select numsegments from gp_distribution_policy where localoid='mix_base_tbl'::regclass; + numsegments +------------- + 3 +(1 row) + +-- shrink the parent table, both parent and child table will be rebalanced to all +-- segments +select count(*) from mix_child_a where gp_segment_id = 2; + count +------- + 0 +(1 row) + +select count(*) from mix_child_b where gp_segment_id = 2; + count +------- + 1 +(1 row) + +Alter table mix_base_tbl shrink table to 2; +select count(*) from mix_child_a where gp_segment_id = 2; + count +------- + 0 +(1 row) + +select count(*) from mix_child_b where gp_segment_id = 2; + count +------- + 0 +(1 row) + +drop table mix_base_tbl cascade; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table mix_child_a +drop cascades to table mix_child_b +-- multi-level partition tables +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +begin; +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +abort; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 2 | 25 + 0 | 38 + 1 | 37 +(3 rows) + +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table part_t1; +-- +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED RANDOMLY +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table part_t1 shrink table to 2; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table part_t1 shrink table to 2; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table part_t1; +-- only shrink leaf partitions, not allowed now +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + gp_segment_id | a | b | c | d | e +---------------+----+---+---+---+--- + 0 | 29 | 2 | 1 | 4 | 0 + 0 | 45 | 0 | 1 | 0 | 0 + 0 | 65 | 2 | 1 | 0 | 0 + 2 | 5 | 2 | 1 | 0 | 2 + 2 | 9 | 0 | 1 | 4 | 2 + 1 | 69 | 0 | 1 | 4 | 1 + 1 | 89 | 2 | 1 | 4 | 1 +(7 rows) + +alter table part_t1_1_prt_other_b_2_prt_2_3_prt_others_d shrink table to 2; +ERROR: cannot shrink leaf or interior partition "part_t1_1_prt_other_b_2_prt_2_3_prt_others_d" +DETAIL: Root/leaf/interior partitions need to have same numsegments +HINT: Call ALTER TABLE SHRINK TABLE on the root table instead +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + gp_segment_id | a | b | c | d | e +---------------+----+---+---+---+--- + 0 | 29 | 2 | 1 | 4 | 0 + 0 | 45 | 0 | 1 | 0 | 0 + 0 | 65 | 2 | 1 | 0 | 0 + 2 | 5 | 2 | 1 | 0 | 2 + 2 | 9 | 0 | 1 | 4 | 2 + 1 | 69 | 0 | 1 | 4 | 1 + 1 | 89 | 2 | 1 | 4 | 1 +(7 rows) + +-- try to shrink root partition, should success +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +drop table part_t1; +-- inherits tables +CREATE TABLE inherit_t1_p1(a int, b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE inherit_t1_p2(a int, b int) INHERITS (inherit_t1_p1); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +CREATE TABLE inherit_t1_p3(a int, b int) INHERITS (inherit_t1_p1); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +CREATE TABLE inherit_t1_p4(a int, b int) INHERITS (inherit_t1_p2); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +CREATE TABLE inherit_t1_p5(a int, b int) INHERITS (inherit_t1_p3); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +insert into inherit_t1_p1 select i,i from generate_series(1,10) i; +insert into inherit_t1_p2 select i,i from generate_series(1,10) i; +insert into inherit_t1_p3 select i,i from generate_series(1,10) i; +insert into inherit_t1_p4 select i,i from generate_series(1,10) i; +insert into inherit_t1_p5 select i,i from generate_series(1,10) i; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + t +(1 row) + +begin; +alter table inherit_t1_p1 shrink table to 2; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + f +(1 row) + +abort; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + t +(1 row) + +alter table inherit_t1_p1 shrink table to 2; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + f +(1 row) + +DROP TABLE inherit_t1_p1 CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table inherit_t1_p2 +drop cascades to table inherit_t1_p4 +drop cascades to table inherit_t1_p3 +drop cascades to table inherit_t1_p5 +-- +-- Cannot shrink a native view and transformed view +-- +CREATE TABLE shrink_table1(a int) distributed by (a); +CREATE TABLE shrink_table2(a int) distributed by (a); +CREATE VIEW shrink_view AS select * from shrink_table1; +CREATE rule "_RETURN" AS ON SELECT TO shrink_table2 + DO INSTEAD SELECT * FROM shrink_table1; +ALTER TABLE shrink_table2 shrink TABLE to 2; +ERROR: "shrink_table2" is not a table, materialized view, or foreign table +ALTER TABLE shrink_view shrink TABLE to 2; +ERROR: "shrink_view" is not a table, materialized view, or foreign table +ALTER TABLE shrink_table1 shrink TABLE to 2; +drop table shrink_table1 cascade; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to view shrink_view +drop cascades to view shrink_table2 +-- +-- Test shrinking a table with a domain type as distribution key. +-- +create domain myintdomain as int4; +create table shrink_domain_tab(d myintdomain, oldseg int4) distributed by(d); +insert into shrink_domain_tab select generate_series(1,10); +update shrink_domain_tab set oldseg = gp_segment_id; +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 1 + 2 | 4 + 0 | 5 +(3 rows) + +alter table shrink_domain_tab shrink table to 2; +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 8 + 1 | 2 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='shrink_domain_tab'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table shrink_domain_tab; +-- start_ignore +-- We need to do a cluster expansion which will check if there are partial +-- tables, we need to drop the partial tables to keep the cluster expansion +-- run correctly. +reset search_path; +drop schema test_reshuffle cascade; +ERROR: schema "test_reshuffle" does not exist +-- end_ignore diff --git a/src/test/regress/greenplum_schedule b/src/test/regress/greenplum_schedule index df00dd7b5b5..870b42f4ab6 100755 --- a/src/test/regress/greenplum_schedule +++ b/src/test/regress/greenplum_schedule @@ -159,7 +159,7 @@ test: resource_group_gucs test: wrkloadadmin # expand_table tests may affect the result of 'gp_explain', keep them below that -test: gp_toolkit_ao_funcs trig auth_constraint role portals_updatable plpgsql_cache timeseries pg_stat_last_operation pg_stat_last_shoperation gp_numeric_agg partindex_test partition_pruning runtime_stats expand_table expand_table_ao expand_table_aoco expand_table_regression +test: gp_toolkit_ao_funcs trig auth_constraint role portals_updatable plpgsql_cache timeseries pg_stat_last_operation pg_stat_last_shoperation gp_numeric_agg partindex_test partition_pruning runtime_stats expand_table expand_table_ao expand_table_aoco expand_table_regression shrink_table test: rle rle_delta dsp not_out_of_shmem_exit_slots # direct dispatch tests diff --git a/src/test/regress/sql/shrink_table.sql b/src/test/regress/sql/shrink_table.sql new file mode 100644 index 00000000000..63fc5169d8c --- /dev/null +++ b/src/test/regress/sql/shrink_table.sql @@ -0,0 +1,440 @@ +drop schema if exists test_shrink_table cascade; +create schema test_shrink_table; +set search_path=test_shrink_table,public; +set default_table_access_method='heap'; +set allow_system_table_mods=true; + +-- Hash distributed tables +Create table t1(a int, b int, c int) distributed by (a); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 2; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + + +Create table t1(a int, b int, c int) distributed by (a,b); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 1; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 1; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + +-- Test NULLs. +Create table t1(a int, b int, c int) distributed by (a,b,c); +insert into t1 values + (1, 1, 1 ), + (null, 2, 2 ), + (3, null, 3 ), + (4, 4, null), + (null, null, 5 ), + (null, 6, null), + (7, null, null), + (null, null, null); +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 2; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + +Create table t1(a int, b int, c int) distributed by (a) partition by list(b) (partition t1_1 values(1), partition t1_2 values(2), default partition other); +insert into t1 select i,i,0 from generate_series(1,100) I; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 2; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + +-- Random distributed tables +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; +abort; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +Alter table r1 shrink table to 2; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; +abort; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +Alter table r1 shrink table to 2; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +Create table r1(a int, b int, c int) distributed randomly partition by list(b) (partition r1_1 values(1), partition r1_2 values(2), default partition other); +insert into r1 select i,i,0 from generate_series(1,100) I; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; +abort; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +Alter table r1 shrink table to 2; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +-- Replicated tables +Create table r1(a int, b int, c int) distributed replicated; + +insert into r1 select i,i,0 from generate_series(1,100) I; + +Select count(*) from gp_dist_random('r1'); + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); +abort; + +Select count(*) from gp_dist_random('r1'); + +Alter table r1 shrink table to 2; + +Select count(*) from gp_dist_random('r1'); + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +-- +Create table r1(a int, b int, c int) distributed replicated; + +insert into r1 select i,i,0 from generate_series(1,100) I; + +Select count(*) from gp_dist_random('r1'); + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); +abort; + +Select count(*) from gp_dist_random('r1'); + +Alter table r1 shrink table to 2; + +Select count(*) from gp_dist_random('r1'); + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +-- table with update triggers on distributed key column +CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TABLE table_with_update_trigger(a int, b int, c int); +insert into table_with_update_trigger select i,i,0 from generate_series(1,100) I; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; + +CREATE TRIGGER foo_br_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_ar_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); + +CREATE TRIGGER foo_bs_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_as_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); + +-- update should fail +update table_with_update_trigger set a = a + 1; +-- data expansion should success and not hiting any triggers. +Alter table table_with_update_trigger shrink table to 2; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; +drop table table_with_update_trigger; + +-- +-- Test shrinking inheritance parent table, parent table has different +-- numsegments with child tables. +-- +create table mix_base_tbl (a int4, b int4) DISTRIBUTED RANDOMLY; +insert into mix_base_tbl select g, g from generate_series(1, 3) g; +create table mix_child_a (a int4, b int4) inherits (mix_base_tbl) distributed by (a); +insert into mix_child_a select g, g from generate_series(11, 13) g; +create table mix_child_b (a int4, b int4) inherits (mix_base_tbl) distributed by (b); +insert into mix_child_b select g, g from generate_series(21, 23) g; +-- shrink the child table, not effect parent table +Alter table mix_child_a shrink table to 2; +select numsegments from gp_distribution_policy where localoid='mix_base_tbl'::regclass; +-- shrink the parent table, both parent and child table will be rebalanced to all +-- segments +select count(*) from mix_child_a where gp_segment_id = 2; +select count(*) from mix_child_b where gp_segment_id = 2; +Alter table mix_base_tbl shrink table to 2; +select count(*) from mix_child_a where gp_segment_id = 2; +select count(*) from mix_child_b where gp_segment_id = 2; +drop table mix_base_tbl cascade; + +-- multi-level partition tables +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) + +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); + +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; + +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +begin; +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +Alter table part_t1 shrink table to 2; + +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; +drop table part_t1; + +-- +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED RANDOMLY +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) + +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); + +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; + +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; + +begin; +Alter table part_t1 shrink table to 2; +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; +abort; + +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; + +Alter table part_t1 shrink table to 2; + +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; +drop table part_t1; + +-- only shrink leaf partitions, not allowed now +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) + +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); + +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; + +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + +alter table part_t1_1_prt_other_b_2_prt_2_3_prt_others_d shrink table to 2; +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + +-- try to shrink root partition, should success +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +drop table part_t1; + + +-- inherits tables +CREATE TABLE inherit_t1_p1(a int, b int); +CREATE TABLE inherit_t1_p2(a int, b int) INHERITS (inherit_t1_p1); +CREATE TABLE inherit_t1_p3(a int, b int) INHERITS (inherit_t1_p1); +CREATE TABLE inherit_t1_p4(a int, b int) INHERITS (inherit_t1_p2); +CREATE TABLE inherit_t1_p5(a int, b int) INHERITS (inherit_t1_p3); + +insert into inherit_t1_p1 select i,i from generate_series(1,10) i; +insert into inherit_t1_p2 select i,i from generate_series(1,10) i; +insert into inherit_t1_p3 select i,i from generate_series(1,10) i; +insert into inherit_t1_p4 select i,i from generate_series(1,10) i; +insert into inherit_t1_p5 select i,i from generate_series(1,10) i; + +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + +begin; +alter table inherit_t1_p1 shrink table to 2; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; +abort; + +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + +alter table inherit_t1_p1 shrink table to 2; + +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + +DROP TABLE inherit_t1_p1 CASCADE; + + +-- +-- Cannot shrink a native view and transformed view +-- +CREATE TABLE shrink_table1(a int) distributed by (a); +CREATE TABLE shrink_table2(a int) distributed by (a); +CREATE VIEW shrink_view AS select * from shrink_table1; +CREATE rule "_RETURN" AS ON SELECT TO shrink_table2 + DO INSTEAD SELECT * FROM shrink_table1; +ALTER TABLE shrink_table2 shrink TABLE to 2; +ALTER TABLE shrink_view shrink TABLE to 2; +ALTER TABLE shrink_table1 shrink TABLE to 2; +drop table shrink_table1 cascade; + +-- +-- Test shrinking a table with a domain type as distribution key. +-- +create domain myintdomain as int4; + +create table shrink_domain_tab(d myintdomain, oldseg int4) distributed by(d); +insert into shrink_domain_tab select generate_series(1,10); + +update shrink_domain_tab set oldseg = gp_segment_id; + +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; + +alter table shrink_domain_tab shrink table to 2; +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; +select numsegments from gp_distribution_policy where localoid='shrink_domain_tab'::regclass; +drop table shrink_domain_tab; + +-- start_ignore +-- We need to do a cluster expansion which will check if there are partial +-- tables, we need to drop the partial tables to keep the cluster expansion +-- run correctly. +reset search_path; +drop schema test_reshuffle cascade; +-- end_ignore