Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RING-44425 - Comments for SPARK scripts #48

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
18 changes: 18 additions & 0 deletions scripts/S3_FSCK/s3_fsck_p0.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
COS = cfg["cos_protection"]
PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"])

# The arcindex is a map between the ARC Schema and the hex value found in the ringkey in the 24 bits preceding the last 8 bits of the key
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"}

os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'
Expand Down Expand Up @@ -118,6 +119,7 @@ def sparse(f):


def check_split(key):
"""Check if the key is split or not. Return True if split, False if not split, None if error (404, 50X, etc.)"""
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40)))
r = requests.head(url)
if r.status_code == 200:
Expand All @@ -126,9 +128,13 @@ def check_split(key):


def blob(row):
"""Return a list of dict with the key, subkey and digkey"""
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
# set key from row._c2 (column 3) which contains an sproxyd input key
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
key = row._c2
# use the sproxyd input key to find out if the key is split or not
split = check_split(key)
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
if not split['result']:
# If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP
return [{"key":key, "subkey":"NOK_HTTP", "digkey":"NOK_HTTP"}]
if split['is_split']:
try:
Expand All @@ -146,25 +152,37 @@ def blob(row):
chunks = chunk + chunk
chunkshex = chunks.encode("hex")
rtlst = []
# the k value is the subkey, a subkey is the sproxys input key for each stripe of the split
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
for k in list(set(sparse(chunkshex))):
# "key": key == primary sproxyd input key of a split object
# "subkey": k == subkey sproxyd input key of an individual stripe of a split object
# "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey, which is the unique part of a
# main chunk before service id, arc schema, and class are appended
rtlst.append(
{"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]}
)
# If the key is split and request is OK, return a list of dict with the key, subkey and digkey set to the md5 of the subkey
return rtlst
# If the key is split and request is not OK, return a dict with the key, with both subkey and digkey set to NOK
return [{"key": key, "subkey": "NOK", "digkey": "NOK"}]
except requests.exceptions.ConnectionError as e:
# If the key is split and request is not OK, return a dict with the key, with both subkey and digkey set to NOK
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
return [{"key": key, "subkey": "NOK_HTTP", "digkey": "NOK_HTTP"}]
if not split['is_split']:
# If the key is not split, return a dict with the key, subkey set to SINGLE and digkey set to the md5 of the key
return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}]

new_path = os.path.join(PATH, RING, "s3-bucketd")
files = "%s://%s" % (PROTOCOL, new_path)

# reading without a header, the _c0, _c1, _c2 are the default column names of column 1, 2, 3 for the csv
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files)

# repartition the dataframe to have the same number of partitions as the number of executors * cores
df = df.repartition(PARTITIONS)
rdd = df.rdd.map(lambda x : blob(x))
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfnew = rdd.flatMap(lambda x: x).toDF()
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved

single = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING)
# write the dataframe to a csv file with a header
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfnew.write.format("csv").mode("overwrite").options(header="true").save(single)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want correct headers, this write operation w/ header="true" is where we get our first instance of generic _c0, _c1 column names. We could perform:

Suggested change
dfnew.write.format("csv").mode("overwrite").options(header="true").save(single)
dfnew = dnew.withColumnRenamed("_c0", "digkey).withColumnRenamed("_c1", "input_key").withColumnRenamed("_c2", "subkey")
dfnew.write.format("csv").mode("overwrite").options(header="true").save(single)

Requires updating the p2 script to read the new column names instead of the generic ones.

25 changes: 23 additions & 2 deletions scripts/S3_FSCK/s3_fsck_p1.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,58 @@


files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING)
# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files)
Copy link
Member Author

@TrevorBenson TrevorBenson Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another spot we can inject valid headers prior to later commands, making them a bit simpler to comprehend:

Suggested change
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files)
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files)\
df = df.withColumnRenamed("_c0", "ringkey").withColumnRenamed("_c1", "mainchunk").withColumnRenamed("_c2", "disk").withColumnRenamed("_c3", "flag")

in this example I name the _c0 (ring chunk keys) as ringkey, instead of the naming _c1, the main chunk, as ringkey. I think this could potentially reduce confusion if we decide to be very specific and use explicit terms for each data type

  • ringkey (or ring_key) # The 30-33 chunk keys and 70-7B chunk keys
  • mainchunk (or main_chunk) # The 30 or 70 main chunk (aka zero keys)
  • disk
  • flag
  • inputkey (or input_key) # The sproxyd input key
  • digkey (or dig_key) # The md5sum digged from the main chunk

I like the underscore versions for better readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whatever's easier to read is fine by me


#list the ARC SPLIT main chunks
# list the ARC SPLIT main chunks with service ID 50 from column 2
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1")

# Match keys which end in 70 from column 2
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$"))
# Filter out when less than 3 stripe chunks (RING orphans)
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3")

# dfARCsingle _c1 (column 2) is now the ringkey ???
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"])

# filter _c1 (column 2) for specific COS protection
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$"))

# count the number of chunks in _c1 (column 2) found for each key
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOSsingle = dfCOSsingle.groupBy("_c1").count()
# dfCOSsingle _c1 (column 2) is now the ringkey ???
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOSsingle = dfCOSsingle.withColumn("ringkey",dfCOSsingle["_c1"])
# ???
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOSsingle = dfCOSsingle.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation likely explains why the withColumn() is used to duplicate _c1 into ringkey instead of a withColumnRenamd() for dfCOSsingle.


# union the ARC and COS single chunks
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCsingle = dfARCsingle.union(dfCOSsingle)

#list the ARC KEYS
# list the ARC KEYS with service ID 51
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
df_sync = df.filter(df["_c1"].rlike(r".*000000..51........$")).select("_c1")

# Match keys which end in 70 from column 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Match keys which end in 70 from column 2
# Match keys which end in 70 from single column named "_c1"

Copy link
Member Author

@TrevorBenson TrevorBenson Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this reads to me makes me wonder about a column named "single", which we do not have. This comment is before df_sync and dfARCSYNC operations.

Does "from single column" intend to suggest the match is not on more than one column? If so for clarity I suggest:

Suggested change
# Match keys which end in 70 from column 2
# Match keys which end in 70 from a single column named "_c1"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for rephrasing 😄
not sure if saying "match keys ending in "70", in vector named _c1" would confuse the future reader of the code.

dfARCSYNC = df_sync.filter(df["_c1"].rlike(r".*70$"))
# Filter out when less than 3 stripe chunks (RING orphans)
dfARCSYNC = dfARCSYNC.groupBy("_c1").count().filter("count > 3")
# dfARCSYNC _c1 (column 2) is now the ringkey ???
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCSYNC = dfARCSYNC.withColumn("ringkey",dfARCSYNC["_c1"])
# filter _c1 (column 2) for specific COS protection
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCSYNC = dfARCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)"))

# filter _c1 (column 2) for specific COS protection
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOCSYNC = df_sync.filter(df["_c1"].rlike(r".*" + str(COS) + "0$"))
# count the number of chunks in _c1 (column 2) found for each key
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOCSYNC = dfCOCSYNC.groupBy("_c1").count()
# dfCOCSYNC _c1 (column 2) is now the ringkey ???
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOCSYNC = dfCOCSYNC.withColumn("ringkey",dfCOCSYNC["_c1"])
# ???
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfCOCSYNC = dfCOCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)"))

# union the ARC and COS SYNC chunks
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dfARCSYNC = dfARCSYNC.union(dfCOCSYNC)

# union the ARC and COS SYNC and single chunks to get the total list of keys
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
dftotal = dfARCSYNC.union(dfARCsingle)
total = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING)
dftotal.write.format("csv").mode("overwrite").options(header="true").save(total)
12 changes: 12 additions & 0 deletions scripts/S3_FSCK/s3_fsck_p2.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,28 @@
.getOrCreate()


# s3keys are read from the verifySproxydKeys.js scripts output
s3keys = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING)
# ringkeys are read from the listkeys.py (or ringsh dump) scripts output
ringkeys = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING)

# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to
dfs3keys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(s3keys)
# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to
dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys)

# rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1
dfringkeys = dfringkeys.withColumnRenamed("_c1","digkey")

# inner join the s3keys and ringkeys on the digkey column
# the result will be a dataframe with the columns ringkey, digkey
# the leftanti option will remove the rows that are present in both dataframes.
inner_join_false = dfringkeys.join(dfs3keys,["digkey"], "leftanti").withColumn("is_present", F.lit(int(0))).select("ringkey", "is_present", "digkey")

# Create the final dataframe with only the ringkey column
df_final = inner_join_false.select("ringkey")

# write the final dataframe to a csv file
allmissing = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING)
df_final.write.format("csv").mode("overwrite").options(header="false").save(allmissing)

24 changes: 22 additions & 2 deletions scripts/S3_FSCK/s3_fsck_p3.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,54 @@
.config("spark.local.dir", PATH) \
.getOrCreate()


# Use of the arcindex limits the inspection to a specific ARC protection scheme.
# If there were more than one cluster with different ARC protection schemes then this would limit the check to a specific scheme.
# FOOD FOR THOUGHT: limits finding keys which may have been written after a schema change or any bug did not honor the schema.
# The arcindex is a dictionary that contains the ARC protection scheme and the hex value found in the ringkey
arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"}

# The arcdatakeypattern is a regular expression that matches the ARC data keys
arcdatakeypattern = re.compile(r'[0-9a-fA-F]{38}70')


def statkey(row):
""" statkey takes a row from the dataframe and returns a tuple with the key, status_code, size"""
key = row._c0
try:
url = "%s/%s" % (SREBUILDD_URL, str(key.zfill(40)))
r = requests.head(url)
if r.status_code == 200:
if re.search(arcdatakeypattern, key):
if re.search(arcdatakeypattern, key): # Should consider changing this to match any entry in the arcindex
# The size of the ARC data key is 12 times the size of the ARC index key.
# At this point there is no longer access to the qty of keys found, so
# it simply computes based on the presumed schema of 12 chunks per key.
size = int(r.headers.get("X-Scal-Size", False))*12
else:
# The size of the ARC index key is the size of the ARC index key plus the size of the ARC data key times the COS protection.
# At this point there is no longer access to the qty of keys found, so
# it simply computes based on the presumed schema of int(COS) chunks per key.
# If there are orphans which are not matching the arcdatakeypattern they will
# be computed as if they were COS.
size = int(r.headers.get("X-Scal-Size",False)) + int(r.headers.get("X-Scal-Size",False))*int(COS)
return ( key, r.status_code, size)
else:
# If the key is not found (HTTP code != 200) then return the key, the status code, and 0 for the size
return ( key, r.status_code, 0)
except requests.exceptions.ConnectionError as e:
# If there is a connection error then return the key, the status code, and 0 for the size
return ( key, "HTTP_ERROR", 0)


files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING)
# Create a dataframe from the csv file not using the header, the columns will be _c0, _c1, _c2
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files)
# Create a resilient distributed dataset (RDD) from the dataframe (logical partitions of data)
# The rdd is a collection of tuples returned from statkey (key, status_code, size)
rdd = df.rdd.map(statkey)

#rdd1 = rdd.toDF()

# The size_computed is the sum of the size column in the rdd
size_computed= rdd.map(lambda x: (2,int(x[2]))).reduceByKey(lambda x,y: x + y).collect()[0][1]
string = "The total computed size of the not indexed keys is: %d bytes" % size_computed
banner = '\n' + '-' * len(string) + '\n'
Expand Down
9 changes: 9 additions & 0 deletions scripts/S3_FSCK/s3_fsck_p4.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,18 @@ def deletekey(row):


files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING)

# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4
df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files)
# rename the column _c0 (column 1) to ringkey
df = df.withColumnRenamed("_c0","ringkey")

# repartition the dataframe to the number of partitions (executors * cores)
df = df.repartition(PARTITIONS)

# map the deletekey function to the dataframe
TrevorBenson marked this conversation as resolved.
Show resolved Hide resolved
rdd = df.rdd.map(deletekey).toDF()

deletedorphans = "%s://%s/%s/s3fsck/deleted-s3-orphans.csv" % (PROTOCOL, PATH, RING)
# write the dataframe to a csv file with the results of the deletekey function
rdd.write.format("csv").mode("overwrite").options(header="false").save(deletedorphans)