Skip to content

Commit

Permalink
Merge branch 'red-hat-storage:main' into earmark_feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Harishacharya-redhat committed Jan 9, 2025
2 parents 1356f24 + fc6d48e commit 2e9f216
Show file tree
Hide file tree
Showing 104 changed files with 5,225 additions and 1,273 deletions.
29 changes: 17 additions & 12 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
# These owners will be the default owners for everything in this repo.
# all changes.
* @red-hat-storage/cephci-top-reviewers
* @red-hat-storage/ceph-qe-ci @red-hat-storage/ceph-qe-admin

# for RGW
rgw/ @red-hat-storage/teams/cephci-rgw
rgw/ @mkasturi18 @ckulal @TejasC88

# for RBD team
rbd/ @red-hat-storage/teams/cephci-block
rbd_mirror/ @red-hat-storage/teams/cephci-block
rbd/ @Manohar-Murthy
rbd_mirror/ @Manohar-Murthy

# for NVMe-oF
nvmeof/ @Manohar-Murthy @HaruChebrolu

# for RADOS
rados/ @ed-hat-storage/teams/cephci-rados
rados/ @neha-gangadhar @pdhiran

# for FS
cephfs/ @red-hat-storage/teams/cephci-fs
cephfs/ @neha-gangadhar @AmarnatReddy @Manimaran-MM

# smb
smb/ @vdas-redhat @pranavprakash20

# for DMFG
ansible/ @red-hat-storage/teams/cephci-dm
upgrades/ @red-hat-storage/teams/cephci-dm
dashboard/ @red-hat-storage/teams/cephci-dm
cephadm/ @red-hat-storage/teams/cephci-dm
ceph_ansisble/ @red-hat-storage/teams/cephci-dm
ceph_installer/ @red-hat-storage/teams/cephci-dm
ceph_volume/ @vdas-redhat @pranavprakash20
cephadm/ @vdas-redhat @pranavprakash20
mgr/ @vdas-redhat @pranavprakash20
nfs/ @vdas-redhat @pranavprakash20
upgrades/ @vdas-redhat @pranavprakash20
8 changes: 4 additions & 4 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ pull_request_rules:
conditions:
- label!=DNM
- label!=work-in-progress
- base=master
- "approved-reviews-by=@red-hat-storage/cephci-top-level-reviewers"
- base=main
- branch-protection-review-decision=approved
- "check-success=tox (3.9.18)"
- check-success=WIP
actions:
merge:
method: merge
queue:
merge_method: merge
- name: ask to resolve conflict
conditions:
- conflict
Expand Down
221 changes: 111 additions & 110 deletions ceph/parallel.py
Original file line number Diff line number Diff line change
@@ -1,121 +1,122 @@
import sys

import gevent.pool
import gevent.queue

from utility.log import Log

log = Log(__name__)


class ExceptionHolder(object):
def __init__(self, exc_info):
self.exc_info = exc_info


def capture_traceback(func, *args, **kwargs):
"""
Utility function to capture tracebacks of any exception func
raises.
"""
try:
return func(*args, **kwargs)
except Exception:
return ExceptionHolder(sys.exc_info())


def resurrect_traceback(exc):
if isinstance(exc, ExceptionHolder):
exc_info = exc.exc_info
elif isinstance(exc, BaseException):
exc_info = (type(exc), exc, None)
else:
return

raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])


class parallel(object):
"""
This class is a context manager for running functions in parallel.
You add functions to be run with the spawn method::
with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
You can iterate over the results (which are in arbitrary order)::
with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
for result in p:
print result
If one of the spawned functions throws an exception, it will be thrown
when iterating over the results, or when the with block ends.
At the end of the with block, the main thread waits until all
spawned functions have completed, or, if one exited with an exception,
kills the rest and raises the exception.
"""

def __init__(self):
self.group = gevent.pool.Group()
self.results = gevent.queue.Queue()
self.count = 0
self.any_spawned = False
self.iteration_stopped = False

def spawn(self, func, *args, **kwargs):
self.count += 1
self.any_spawned = True
greenlet = self.group.spawn(capture_traceback, func, *args, **kwargs)
greenlet.link(self._finish)
# -*- code: utf-8 -*-
"""
This module provides a context manager for running methods concurrently.
For backward compatability, spawn method is leveraged however one can also
choose to move submit. Likewise, thread pool executor is the default executor.
Timeout is an inherited feature provided by concurrent futures. Additionally,
one wait for all the threads/process to complete even when on thread or process
encounters an exception. This is useful when multiple test modules are
executing different test scenarios.
When a test module controls the threads then it can forcefully terminate all
threads when an exception is encountered.
Changelog:
Version 1.0 used gevent module for parallel method execution.
Version 2.0 uses concurrent.futures module instead of gevent.
You add functions to be run with the spawn method::
with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
You can iterate over the results (which are in arbitrary order)::
with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
for result in p:
print result
In version 2, you can choose whether to use threads or processes by
with parallel(thread_pool=False, timeout=10) as p:
_r = [p.spawn(quux, x) for name in names]
If one of the spawned functions throws an exception, it will be thrown
when iterating over the results, or when the with block ends.
At the end of the with block, the main thread waits until all
spawned functions have completed, or, if one exited with an exception,
kills the rest and raises the exception.
"""
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed

logger = logging.getLogger(__name__)


class parallel:
"""This class is a context manager for concurrent method execution."""

def __init__(
self,
thread_pool=True,
timeout=None,
shutdown_wait=True,
shutdown_cancel_pending=False,
):
"""Object initialization method.
Args:
thread_pool (bool) Whether to use threads or processes.
timeout (int | float) Maximum allowed time.
shutdown_wait (bool) If disabled, it would not wait for executing
threads/process to complete.
shutdown_cancel_pending (bool) If enabled, it would cancel pending tasks.
"""
self._executor = ThreadPoolExecutor() if thread_pool else ProcessPoolExecutor()
self._timeout = timeout
self._shutdown_wait = shutdown_wait
self._cancel_pending = shutdown_cancel_pending
self._futures = list()
self._results = list()

def spawn(self, fun, *args, **kwargs):
"""Triggers the first class method.
Args:
func: Function to be executed.
args: A list of variables to be passed to the function.
kwargs A dictionary of named variables.
Returns:
None
"""
_future = self._executor.submit(fun, *args, **kwargs)
self._futures.append(_future)

def __enter__(self):
return self

def __exit__(self, type_, value, traceback):
self.group.join()
def __exit__(self, exc_type, exc_value, trackback):
_exceptions = []
exception_count = 0

for _f in as_completed(self._futures, timeout=self._timeout):
try:
self._results.append(_f.result())
except Exception as e:
logger.exception(e)
_exceptions.append(e)
exception_count += 1

if exception_count > 0 and not self._shutdown_wait:
# At this point we are ignoring results
self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending)
raise _exceptions[0]

if value is not None:
return False
if len(_exceptions) > 0:
raise _exceptions[0]

try:
# raises if any greenlets exited with an exception
for result in self:
log.debug("result is %s", repr(result))
pass
except Exception:
# Emit message here because traceback gets stomped when we re-raise
log.exception("Exception in parallel execution")
raise
return True
return False if exception_count == 0 else True

def __iter__(self):
return self

def __next__(self):
if not self.any_spawned or self.iteration_stopped:
raise StopIteration()
result = self.results.get()

try:
resurrect_traceback(result)
except StopIteration:
self.iteration_stopped = True
raise

return result

def _finish(self, greenlet):
if greenlet.successful():
self.results.put(greenlet.value)
else:
self.results.put(greenlet.exception)

self.count -= 1
if self.count <= 0:
self.results.put(StopIteration())
for r in self._results:
yield r
13 changes: 13 additions & 0 deletions ceph/rados/monitor_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,19 @@ def get_mon_quorum_hosts(self) -> list:
quorum = self.rados_obj.run_ceph_command(cmd)
return [entry["name"] for entry in quorum["quorum"]]

def get_mon_quorum(self) -> dict:
"""
Fetches mon details and returns the names of monitors present in the quorum
Returns: dictionary with mon names as keys and Rank as value
eg: {'magna045': 0, 'magna046': 1, 'magna047': 2}
"""
cmd = "ceph mon stat"
quorum = self.rados_obj.run_ceph_command(cmd, client_exec=True)
mon_members = {}
for entry in quorum["quorum"]:
mon_members.setdefault(entry["name"], entry["rank"])
return mon_members

def set_tiebreaker_mon(self, host) -> bool:
"""
Sets the passed host mon as the new tiebreaker mon daemon in stretch mode
Expand Down
4 changes: 3 additions & 1 deletion ceph/rbd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def get_md5sum_rbd_image(**kw):
log.error(f"Export failed for image {kw.get('image_spec')}")
return None
return exec_cmd(
output=True, cmd="md5sum {}".format(kw.get("file_path")), node=kw.get("client")
output=True,
cmd=f"md5sum {kw['file_path']} && rm -f {kw['file_path']}",
node=kw.get("client"),
).split()[0]


Expand Down
2 changes: 2 additions & 0 deletions ceph/rbd/workflows/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def device_cleanup(rbd, client, **kw):
"image-snap-or-device-spec": kw["device_name"],
"device-type": kw.get("device_type", "nbd"),
}
if kw.get("options"):
map_config.update({"options": kw.get("options")})
_, err = rbd.device.unmap(**map_config)

else:
Expand Down
2 changes: 2 additions & 0 deletions ceph/rbd/workflows/krbd_io_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ def krbd_io_handler(**kw):
cleanup_config.update(
{"device-type": config.get("device_type", "nbd")}
)
if config.get("encryption_config"):
cleanup_config.update({"options": options})
return_flag = device_cleanup(**cleanup_config)
kw["config"]["device_names"] = device_names
return return_flag, ""
3 changes: 1 addition & 2 deletions ceph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import time
import traceback
from json import loads
from time import mktime
from time import mktime, sleep

import requests
import yaml
from gevent import sleep
from htmllistparse import fetch_listing
from libcloud.common.exceptions import BaseHTTPError
from libcloud.compute.providers import get_driver
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rhcs_delete.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ node("rhel-9-medium || ceph-qe-ci") {
checkout(
scm: [
$class: 'GitSCM',
branches: [[name: 'origin/master']],
branches: [[name: 'origin/main']],
extensions: [[
$class: 'CleanBeforeCheckout',
deleteUntrackedNestedRepositories: true
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rhcs_deploy.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ node ("rhel-9-medium || ceph-qe-ci") {
checkout(
scm: [
$class: 'GitSCM',
branches: [[name: 'origin/master']],
branches: [[name: 'origin/main']],
extensions: [[
$class: 'CleanBeforeCheckout',
deleteUntrackedNestedRepositories: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ node("rhel-8-medium") {
checkout(
scm: [
$class: 'GitSCM',
branches: [[name: "origin/master"]],
branches: [[name: "origin/main"]],
extensions: [[
$class: 'CleanBeforeCheckout',
deleteUntrackedNestedRepositories: true
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rhos_scripts/Jenkinsfile-cleanup-env.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ node("rhel-9-medium") {
checkout(
scm: [
$class: 'GitSCM',
branches: [[name: "origin/master"]],
branches: [[name: "origin/main"]],
extensions: [[
$class: 'CleanBeforeCheckout',
deleteUntrackedNestedRepositories: true
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rhos_scripts/Jenkinsfile-quota.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ node("rhel-9-medium") {
checkout(
scm: [
$class: 'GitSCM',
branches: [[name: "origin/master"]],
branches: [[name: "origin/main"]],
extensions: [[
$class: 'CleanBeforeCheckout',
deleteUntrackedNestedRepositories: true
Expand Down
Loading

0 comments on commit 2e9f216

Please sign in to comment.