Skip to content

Commit

Permalink
Cherry-pick Enhancement: Add gpshrink to support elastic scaling. Wit…
Browse files Browse the repository at this point in the history
…h some editions by me. (#32)

In order to support gpshrink, similar to gpexpand, we first
support "alter table <tablename> shrink table to <segnum>"
to redistribute data on a specific number of segments.

For gpshrink implementation, it is mainly divided into two
stages similar to gpexpand:
1. Collect the tables that need to be shrink and write them
into gpshrink.status_detail.
2. Perform data redistribution on the tables that need to be
shrink, and delete specific segments in gp_segment_configuration.
  • Loading branch information
reshke authored and diPhantxm committed Nov 18, 2024
1 parent 4b05cbf commit bb6c9d0
Show file tree
Hide file tree
Showing 19 changed files with 3,728 additions and 30 deletions.
4 changes: 2 additions & 2 deletions gpMgmt/bin/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ SUBDIRS += ifaddrs
$(recurse)

PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \
gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \
gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \
gpstate gpstop gpsys1 minirepro gpmemwatcher gpmemreport
Expand Down Expand Up @@ -233,7 +233,7 @@ clean distclean:
rm -rf *.pyc
rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \
gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \
gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \
gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \
gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \
gpstartc gpstatec gpstopc gpsys1c minireproc
rm -f gpconfig_modules/gucs_disallowed_in_file.txt
30 changes: 30 additions & 0 deletions gpMgmt/bin/gpexpand
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,8 @@ class gpexpand:
tblspc_info = {}

for oid in tblspc_oids:
if oid not in tblspc_oid_names:
continue
location = os.path.dirname(os.readlink(os.path.join(master_tblspc_dir,
oid)))
tblspc_info[oid] = {"location": location,
Expand Down Expand Up @@ -1254,6 +1256,15 @@ class gpexpand:
master_tblspc_dir = self.gparray.master.getSegmentTableSpaceDirectory()
if not os.listdir(master_tblspc_dir):
return None

tblspc_oids = os.listdir(master_tblspc_dir)
tblspc_oid_names = self.get_tablespace_oid_names()
flag = False
for oid in tblspc_oids:
if oid in tblspc_oid_names:
flag = True
if not flag:
return None

if not self.options.filename:
raise ExpansionError('Missing tablespace input file')
Expand Down Expand Up @@ -1433,6 +1444,25 @@ class gpexpand:
self.pool.join()
self.pool.check_results()


for i in range(1,12):
flag = True
for segment in newSegments:
if seg.isSegmentMirror() == True:
continue

cmd = Command('pg_isready for segment',
"pg_isready -q -h %s -p %d -d %s" % (segment.getSegmentHostName(), segment.getSegmentPort(), segment.getSegmentDataDirectory()))
cmd.run()
rc = cmd.get_return_code()
if rc != 0:
flag &= False
if flag:
break
time.sleep(10)
self.logger.info("Waiting for segment ready last for %s second" % (i*10))


"""
Build the list of DELETE and TRUNCATE statements based on the subset of
MASTER_ONLY_TABLES defined in gpcatalog.py. Mapped tables cannot
Expand Down
138 changes: 133 additions & 5 deletions gpMgmt/bin/gppylib/gparray.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ def __cmp__(self,other):
elif left > right: return 1
else: return 0

def __equal(self, other, ignoreAttr=[]):
if not isinstance(other, Segment):
return NotImplemented
for key in list(vars(other)):
if key in ignoreAttr:
continue
if vars(other)[key] != vars(self)[key]:
return False
return True

def __eq__(self, other):
return self.__equal(other, ['mode'])


def __hash__(self):
return hash(self.dbid)

#
# Moved here from system/configurationImplGpdb.py
#
Expand Down Expand Up @@ -429,6 +446,9 @@ def __str__(self):
return "(Primary: %s, Mirror: %s)" % (str(self.primaryDB),
str(self.mirrorDB))

def __eq__(self, other):
return self.primaryDB == other.primaryDB and self.mirrorDB == other.mirrorDB

# --------------------------------------------------------------------
def addPrimary(self,segDB):
self.primaryDB=segDB
Expand Down Expand Up @@ -799,6 +819,7 @@ def __init__(self, segments, segmentsAsLoadedFromDb=None):
self.standbyMaster = None
self.segmentPairs = []
self.expansionSegmentPairs=[]
self.shrinkSegmentPairs=[]
self.numPrimarySegments = 0

self.recoveredSegmentDbids = []
Expand Down Expand Up @@ -1043,7 +1064,7 @@ def dumpToFile(self, filename):
fp.close()

# --------------------------------------------------------------------
def getDbList(self, includeExpansionSegs=False):
def getDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""
Return a list of all Segment objects that make up the array
"""
Expand All @@ -1052,8 +1073,8 @@ def getDbList(self, includeExpansionSegs=False):
dbs.append(self.master)
if self.standbyMaster:
dbs.append(self.standbyMaster)
if includeExpansionSegs:
dbs.extend(self.getSegDbList(True))
if includeExpansionSegs or removeShrinkSegs:
dbs.extend(self.getSegDbList(includeExpansionSegs, removeShrinkSegs))
else:
dbs.extend(self.getSegDbList())
return dbs
Expand Down Expand Up @@ -1103,23 +1124,29 @@ def getDbIdToPeerMap(self):


# --------------------------------------------------------------------
def getSegDbList(self, includeExpansionSegs=False):
def getSegDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""Return a list of all Segment objects for all segments in the array"""
dbs=[]
for segPair in self.segmentPairs:
dbs.extend(segPair.get_dbs())
if includeExpansionSegs:
for segPair in self.expansionSegmentPairs:
dbs.extend(segPair.get_dbs())
if removeShrinkSegs:
for segPair in self.shrinkSegmentPairs:
dbs = list(filter(lambda x: segPair.primaryDB != x and segPair.mirrorDB != x, dbs))
return dbs

# --------------------------------------------------------------------
def getSegmentList(self, includeExpansionSegs=False):
def getSegmentList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""Return a list of SegmentPair objects for all segments in the array"""
dbs=[]
dbs.extend(self.segmentPairs)
if includeExpansionSegs:
dbs.extend(self.expansionSegmentPairs)
if removeShrinkSegs:
for segPair in self.shrinkSegmentPairs:
dbs.remove(segPair)
return dbs

# --------------------------------------------------------------------
Expand All @@ -1146,6 +1173,21 @@ def getExpansionSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.expansionSegmentPairs

# --------------------------------------------------------------------
def getShrinkSegDbList(self):
"""Returns a list of all Segment objects that make up the new segments
of an expansion"""
dbs=[]
for segPair in self.shrinkSegmentPairs:
dbs.extend(segPair.get_dbs())
return dbs

# --------------------------------------------------------------------
def getShrinkSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.shrinkSegmentPairs

# --------------------------------------------------------------------
def getSegmentContainingDb(self, db):
Expand All @@ -1162,6 +1204,15 @@ def getExpansionSegmentContainingDb(self, db):
if db.getSegmentDbId() == segDb.getSegmentDbId():
return segPair
return None

# --------------------------------------------------------------------
def getShrinkSegmentContainingDb(self, db):
for segPair in self.shrinkSegmentPairs:
for segDb in segPair.get_dbs():
if db.getSegmentDbId() == segDb.getSegmentDbId():
return segPair
return None

# --------------------------------------------------------------------
def get_invalid_segdbs(self):
dbs=[]
Expand Down Expand Up @@ -1500,6 +1551,37 @@ def addExpansionSeg(self, content, preferred_role, dbid, role,
else:
seg.addMirror(segdb)

# --------------------------------------------------------------------
def addShrinkSeg(self, content, preferred_role, dbid, role,
hostname, address, port, datadir):
"""
Add a segment to the gparray as an shrink segment.
Note: may work better to construct the new Segment in gpshrink and
simply pass it in.
"""

segdb = Segment(content = content,
preferred_role = preferred_role,
dbid = dbid,
role = role,
mode = MODE_SYNCHRONIZED,
status = STATUS_UP,
hostname = hostname,
address = address,
port = port,
datadir = datadir)

if preferred_role == ROLE_PRIMARY:
self.shrinkSegmentPairs.append(SegmentPair())
seg = self.shrinkSegmentPairs[-1]
if seg.primaryDB:
raise Exception('Duplicate content id for primary segment')
seg.addPrimary(segdb)
else:
seg = self.shrinkSegmentPairs[-1]
seg.addMirror(segdb)

# --------------------------------------------------------------------
def reOrderExpansionSegs(self):
"""
Expand Down Expand Up @@ -1607,6 +1689,52 @@ def validateExpansionSegs(self):
else:
used_ports[hostname] = []
used_ports[hostname].append(db.port)

# --------------------------------------------------------------------
def validateShrinkSegs(self):
""" Checks the segments added for various inconsistencies and errors.
"""

# make sure we have added at least one segment
if len(self.shrinkSegmentPairs) == 0:
raise Exception('No shrink segments defined')

totalsize = len(self.segmentPairs)
removesize = len(self.shrinkSegmentPairs)

if removesize >= totalsize:
self.logger.error('removed segment num %d more than or equal to total segment num %d', removesize, totalsize)
exit(1)
elif removesize < 1:
self.logger.error('removed segment num %d less than 1', removesize)
exit(1)

for segPair in self.shrinkSegmentPairs:
if self.hasMirrors:
if segPair.mirrorDB is None:
raise Exception('primaryDB and mirrorDB should be removed simultaneously')

if segPair.primaryDB.content != segPair.mirrorDB.content:
raise Exception('primaryDB content is not equal mirrorDB content')

# If shrinkSegmentPairs not in the segmentPairs raise exception
flag = False
for segPair_ in self.segmentPairs :
if segPair_ == segPair :
flag = True

if flag == False:
raise Exception('Shrink segments not in the gp_segment_configuration table')

# If shrinkSegmentPairs is not the last n segment.
self.shrinkSegmentPairs.sort(key=lambda segPair: segPair.primaryDB.content)

if self.shrinkSegmentPairs[-1].primaryDB.content != self.get_max_contentid():
raise Exception('please remove segment from max contentid')

if self.shrinkSegmentPairs[0].primaryDB.content != self.get_max_contentid()-len(self.shrinkSegmentPairs)+1:
raise Exception('please remove segment in continuous contentid')


# --------------------------------------------------------------------
def addExpansionHosts(self, hosts, mirror_type):
Expand Down
2 changes: 1 addition & 1 deletion gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary):
self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()])

# 'goalsegmap' reflects the desired state of the catalog
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)])
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True, removeShrinkSegs=True)])

# find mirrors and primaries to remove
self.mirror_to_remove = [
Expand Down
Loading

0 comments on commit bb6c9d0

Please sign in to comment.