Skip to content

Commit

Permalink
Testing: Extend RSESettingsDict to support protocols that access prot…
Browse files Browse the repository at this point in the history
…ocol-specific data from rse_settings
  • Loading branch information
rdimaio committed Jan 16, 2025
1 parent c96ea61 commit 139fabc
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 30 deletions.
15 changes: 8 additions & 7 deletions lib/rucio/client/uploadclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def _pick_random_rse(rse_expression: str) -> dict[str, Any]:
# if register_after_upload, file should be overwritten if it is not registered
# otherwise if file already exists on RSE we're done
if register_after_upload:
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): # type: ignore (pfn is str)
try:
self.client.get_did(file['did_scope'], file['did_name'])
logger(logging.INFO, 'File already registered. Skipping upload.')
Expand All @@ -245,7 +245,7 @@ def _pick_random_rse(rse_expression: str) -> dict[str, Any]:
logger(logging.INFO, 'File already exists on RSE. Previous left overs will be overwritten.')
delete_existing = True
elif not is_deterministic and not no_register:
if rsemgr.exists(rse_settings, pfn, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
if rsemgr.exists(rse_settings, pfn, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): # type: ignore (pfn is str)
logger(logging.INFO, 'File already exists on RSE with given pfn. Skipping upload. Existing replica has to be removed first.')
trace['stateReason'] = 'File already exists'
continue
Expand All @@ -254,7 +254,7 @@ def _pick_random_rse(rse_expression: str) -> dict[str, Any]:
trace['stateReason'] = 'File already exists'
continue
else:
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger):
if rsemgr.exists(rse_settings, pfn if pfn else file_did, domain=domain, scheme=force_scheme, impl=impl, auth_token=self.auth_token, vo=self.client.vo, logger=logger): # type: ignore (pfn is str)
logger(logging.INFO, 'File already exists on RSE. Skipping upload')
trace['stateReason'] = 'File already exists'
continue
Expand All @@ -269,10 +269,11 @@ def _pick_random_rse(rse_expression: str) -> dict[str, Any]:
protocol = protocols.pop()
cur_scheme = protocol['scheme']
logger(logging.INFO, 'Trying upload with %s to %s' % (cur_scheme, rse))
lfn = {}
lfn: "LFNDict" = {
'name': file['did_name'],
'scope': file['did_scope']
}
lfn['filename'] = basename
lfn['scope'] = file['did_scope']
lfn['name'] = file['did_name']

for checksum_name in GLOBALLY_SUPPORTED_CHECKSUMS:
if checksum_name in file:
Expand Down Expand Up @@ -753,7 +754,7 @@ def _upload_item(
try:
if protocol_write.renaming:
logger(logging.DEBUG, 'Renaming file %s to %s' % (pfn_tmp, pfn))
protocol_write.rename(pfn_tmp, pfn)
protocol_write.rename(pfn_tmp, pfn) # type: ignore (pfn might be None)
except Exception:
raise RucioException('Unable to rename the tmp file %s.' % pfn_tmp)

Expand Down
7 changes: 7 additions & 0 deletions lib/rucio/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ class RSESettingsDict(TypedDict):
domain: list[str]
protocols: list[RSEProtocolDict]

# Compatibility with protocols that access protocol-specific data from rse_settings (e.g. RFIO)
protocol: RSEProtocolDict
prefix: str
scheme: str
hostname: str


class RSEAccountCounterDict(TypedDict):
account: InternalAccount
Expand Down Expand Up @@ -262,6 +268,7 @@ class DIDStringDict(TypedDict):
class LFNDict(TypedDict):
name: str
scope: str
path: NotRequired[str]
filesize: NotRequired[int]
adler32: NotRequired[str]
md5: NotRequired[str]
Expand Down
13 changes: 8 additions & 5 deletions lib/rucio/core/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from rucio.common.cache import MemcacheRegion
from rucio.common.config import config_get, config_get_bool
from rucio.common.constants import RseAttr, SuspiciousAvailability
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.types import InternalAccount, InternalScope, LFNDict
from rucio.common.utils import add_url_query, chunks, clean_pfns, str_to_date
from rucio.core.credential import get_signed_url
from rucio.core.message import add_messages
Expand Down Expand Up @@ -953,9 +953,12 @@ def _build_list_replicas_pfn(
If needed, sign the PFN url
If relevant, add the server-side root proxy to the pfn url
"""
pfn: str = list(protocol.lfns2pfns(lfns={'scope': scope.external,
'name': name,
'path': path}).values())[0]
lfn: LFNDict = {
'scope': scope.external, # type: ignore (scope.external might be None)
'name': name,
'path': path
}
pfn: str = list(protocol.lfns2pfns(lfns=lfn).values())[0]

# do we need to sign the URLs?
if sign_urls and protocol.attributes['scheme'] == 'https':
Expand Down Expand Up @@ -1109,7 +1112,7 @@ def _list_replicas(
try:
path = pfns_cache['%s:%s:%s' % (protocol.attributes['determinism_type'], t_scope.internal, t_name)]
except KeyError: # No cache entry scope:name found for this protocol
path = protocol._get_path(t_scope, t_name)
path = protocol._get_path(t_scope, t_name) # type: ignore (t_scope is InternalScope instead of str)
pfns_cache['%s:%s:%s' % (protocol.attributes['determinism_type'], t_scope.internal, t_name)] = path

try:
Expand Down
22 changes: 18 additions & 4 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

from sqlalchemy.orm import Session

from rucio.common.types import InternalAccount
from rucio.common.types import InternalAccount, LFNDict
from rucio.core.rse import RseData
from rucio.core.topology import Topology
from rucio.rse.protocols.protocol import RSEProtocol
Expand Down Expand Up @@ -210,7 +210,12 @@ def _generate_source_url(cls, src: RequestSource, dst: TransferDestination, rws:
# Compute the source URL
source_sign_url = src.rse.attributes.get(RseAttr.SIGN_URL, None)
dest_sign_url = dst.rse.attributes.get(RseAttr.SIGN_URL, None)
source_url = list(protocol.lfns2pfns(lfns={'scope': rws.scope.external, 'name': rws.name, 'path': src.file_path}).values())[0]
lfn: "LFNDict" = {
'scope': rws.scope.external, # type: ignore (scope.external might be None)
'name': rws.name,
'path': src.file_path
}
source_url = list(protocol.lfns2pfns(lfns=lfn).values())[0]
source_url = cls.__rewrite_source_url(source_url, source_sign_url=source_sign_url, dest_sign_url=dest_sign_url, source_scheme=src.scheme)
return source_url

Expand All @@ -223,7 +228,11 @@ def _generate_dest_url(cls, dst: TransferDestination, rws: RequestWithSources, p
protocol = protocol_factory.protocol(dst.rse, dst.scheme, operation)

if dst.rse.info['deterministic']:
dest_url = list(protocol.lfns2pfns(lfns={'scope': rws.scope.external, 'name': rws.name}).values())[0]
lfn: "LFNDict" = {
'scope': rws.scope.external, # type: ignore (scope.external might be None)
'name': rws.name
}
dest_url = list(protocol.lfns2pfns(lfns=lfn).values())[0]
else:
# compute dest url in case of non deterministic
# naming convention, etc.
Expand All @@ -236,7 +245,12 @@ def _generate_dest_url(cls, dst: TransferDestination, rws: RequestWithSources, p
if rws.retry_count or rws.activity == 'Recovery':
dest_path = '%s_%i' % (dest_path, int(time.time()))

dest_url = list(protocol.lfns2pfns(lfns={'scope': rws.scope.external, 'name': rws.name, 'path': dest_path}).values())[0]
lfn: "LFNDict" = {
'scope': rws.scope.external, # type: ignore (scope.external might be None)
'name': rws.name,
'path': dest_path
}
dest_url = list(protocol.lfns2pfns(lfns=lfn).values())[0]

dest_sign_url = dst.rse.attributes.get(RseAttr.SIGN_URL, None)
dest_url = cls.__rewrite_dest_url(dest_url, dest_sign_url=dest_sign_url)
Expand Down
10 changes: 7 additions & 3 deletions lib/rucio/daemons/reaper/dark_reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from types import FrameType
from typing import Optional

from rucio.common.types import LFNDict
from rucio.daemons.common import HeartbeatHandler

logging.getLogger("requests").setLevel(logging.CRITICAL)
Expand Down Expand Up @@ -120,10 +121,13 @@ def run_once(
if replica['scope']:
scope = replica['scope'].external
try:
lfn: "LFNDict" = {
'scope': scope,
'name': replica['name'],
'path': replica['path']
}
pfn = str(list(rsemgr.lfns2pfns(rse_settings=rse_info,
lfns=[{'scope': scope,
'name': replica['name'],
'path': replica['path']}],
lfns=[lfn],
operation='delete',
scheme=scheme).values())[0])
logger(logging.INFO, 'Deletion ATTEMPT of %s:%s as %s on %s', scope, replica['name'], pfn, rse)
Expand Down
9 changes: 7 additions & 2 deletions lib/rucio/daemons/reaper/reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from collections.abc import Iterable, Sequence
from types import FrameType

from rucio.common.types import LoggerFunction
from rucio.common.types import LFNDict, LoggerFunction
from rucio.daemons.common import HeartbeatHandler

GRACEFUL_STOP = threading.Event()
Expand Down Expand Up @@ -632,8 +632,13 @@ def _run_once(
del_start_time = time.time()
for replica in file_replicas:
try:
lfn: "LFNDict" = {
'scope': replica['scope'].external,
'name': replica['name'],
'path': replica['path']
}
replica['pfn'] = str(list(rsemgr.lfns2pfns(rse_settings=rse.info,
lfns=[{'scope': replica['scope'].external, 'name': replica['name'], 'path': replica['path']}],
lfns=[lfn],
operation='delete', scheme=scheme).values())[0])
except (ReplicaUnAvailable, ReplicaNotFound) as error:
logger(logging.WARNING, 'Failed get pfn UNAVAILABLE replica %s:%s on %s with error %s', replica['scope'], replica['name'], rse.name, str(error))
Expand Down
10 changes: 6 additions & 4 deletions lib/rucio/daemons/storage/consistency/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from rucio.common import exception
from rucio.common.logging import formatted_logger, setup_logging
from rucio.common.types import InternalAccount, InternalScope
from rucio.common.types import InternalAccount, InternalScope, LFNDict
from rucio.common.utils import daemon_sleep
from rucio.core.heartbeat import die, live, sanity_check
from rucio.core.monitor import MetricManager
Expand Down Expand Up @@ -444,10 +444,12 @@ def process_dark_files(
% (rse, scope, name))
rse_id = get_rse_id(rse=rse)
Intscope = InternalScope(scope=scope, vo=issuer.vo)
lfns = [{'scope': scope, 'name': name}]

lfn: "LFNDict" = {
'scope': scope,
'name': name,
}
attributes = get_rse_info(rse=rse)
pfns = lfns2pfns(rse_settings=attributes, lfns=lfns, operation='delete')
pfns = lfns2pfns(rse_settings=attributes, lfns=[lfn], operation='delete')
pfn_key = scope + ':' + name
url = pfns[pfn_key]
urls = [url]
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/rse/protocols/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def parse_pfns(self, pfns):

return ret

def exists(self, path):
def exists(self, path: str) -> bool:
"""
Checks if the requested file is known by the referred RSE.
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/rse/protocols/http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, protocol_attr, rse_settings, logger=None):
:param props: Properties derived from the RSE Repository
"""
super(Default, self).__init__(protocol_attr, rse_settings, logger=logger)
super(Default, self).__init__(protocol_attr, rse_settings, logger=logger) # type: ignore (logger might be None)
self.attributes.pop('determinism_type', None)
self.files = []

Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/rse/rsemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from rucio.rse.protocols.protocol import RSEProtocol



def get_scope_protocol(vo: str = 'def') -> 'Callable':
"""
Returns the callable protocol to translate the pfn to a name/scope pair
Expand Down Expand Up @@ -349,6 +348,7 @@ def exists(
exists = protocol.exists(f)
ret[f] = exists
elif 'scope' in f: # a LFN is provided
f = cast(types.LFNDict, f)
pfn = list(protocol.lfns2pfns(f).values())[0]
if isinstance(pfn, exception.RucioException):
raise pfn
Expand Down
8 changes: 6 additions & 2 deletions lib/rucio/web/rest/flaskapi/v1/rses.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from json import dumps
from typing import TYPE_CHECKING

from flask import Flask, Response, jsonify, request

Expand Down Expand Up @@ -66,6 +67,9 @@
from rucio.web.rest.flaskapi.authenticated_bp import AuthenticatedBlueprint
from rucio.web.rest.flaskapi.v1.common import ErrorHandlingMethodView, check_accept_header_wrapper_flask, generate_http_error_flask, json_parameters, param_get, response_headers, try_stream

if TYPE_CHECKING:
from rucio.common.types import LFNDict


class RSEs(ErrorHandlingMethodView):
""" List all RSEs in the database. """
Expand Down Expand Up @@ -899,7 +903,7 @@ def get(self, rse):
if any(filter(lambda info: len(info) != 2, lfns)):
invalid_lfns = ', '.join(filter(lambda info: len(info) != 2, lfns))
return generate_http_error_flask(400, InvalidPath.__name__, 'LFN(s) in invalid format: ' + invalid_lfns)
lfns = list(map(lambda info: {'scope': info[0], 'name': info[1]}, lfns))
lfns_list: list["LFNDict"] = list(map(lambda info: {'scope': info[0], 'name': info[1]}, lfns))
scheme = request.args.get('scheme', default=None)
domain = request.args.get('domain', default='wan')
operation = request.args.get('operation', default='write')
Expand All @@ -909,7 +913,7 @@ def get(self, rse):
except (RSENotFound, RSEProtocolNotSupported, RSEProtocolDomainNotSupported) as error:
return generate_http_error_flask(404, error)

pfns = rsemanager.lfns2pfns(rse_settings, lfns, operation=operation, scheme=scheme, domain=domain)
pfns = rsemanager.lfns2pfns(rse_settings, lfns_list, operation=operation, scheme=scheme, domain=domain)
if not pfns:
return generate_http_error_flask(404, ReplicaNotFound.__name__, 'No replicas found')

Expand Down

0 comments on commit 139fabc

Please sign in to comment.