Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce calls to _all_dbs when validating the PSet for a new workflow #11351

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions bin/adhoc-scripts/usePycurl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import time
import types
import sys
from pprint import pformat
from urllib.parse import urlencode
from Utils.Utilities import decodeBytesToUnicode

from WMCore.Services.pycurl_manager import RequestHandler
from Utils.CertTools import getKeyCertFromEnv, getCAPathFromEnv

def main():
#uri = "https://cmsweb.cern.ch/couchdb/_all_dbs"
#uri = "https://cmsweb.cern.ch/couchdb/reqmgr_config_cache/6c075881a8454070ce3c1e8921cdb45e"
uri = "https://cmsweb.cern.ch/couchdb/reqmgr_config_cache/6c075881a8454070ce3c1e8921cdb45e/configFile"

### deal with headers
data = {}
verb = "GET"
incoming_headers = {}
encoder = True
contentType = None
data, headers = encodeParams(data, verb, incoming_headers, encoder, contentType)

headers["Accept-Encoding"] = "gzip,deflate,identity"
ckey, cert = getKeyCertFromEnv()
capath = getCAPathFromEnv()
reqHandler = RequestHandler()
timeStart = time.time()
response, result = reqHandler.request(uri, data, headers, verb=verb, ckey=ckey, cert=cert, capath=capath)
timeEnd = time.time()
print(f"Uri: {uri}")
print(f"Request headers: {headers}")
print(f"Time: {timeEnd - timeStart}")
print(f"Response headers: {pformat(response.header)}")

### deal with the response object
decoder = True
result = decodeResult(result, decoder)
#print(f"Response: {result}")


def encodeParams(data, verb, incomingHeaders, encoder, contentType):
headers = {"Content-type": contentType if contentType else "application/json",
"User-Agent": "WMCore/usePycurl",
"Accept": "application/json"}

incomingHeaders["Accept-Encoding"] = "gzip,identity"
headers.update(incomingHeaders)

encoded_data = ''
if verb != 'GET' and data:
if isinstance(encoder, (types.MethodType, types.FunctionType)):
encoded_data = encoder(data)
elif encoder is False:
encoded_data = data
else:
encoded_data = self.encode(data)
headers["Content-Length"] = len(encoded_data)
elif verb != 'GET':
headers["Content-Length"] = 0
elif verb == 'GET' and data:
# encode the data as a get string
encoded_data = urlencode(data, doseq=True)
return encoded_data, headers

def decodeResult(result, decoder):
if isinstance(decoder, (types.MethodType, types.FunctionType)):
result = decoder(result)
elif decoder is not False:
result = decodeBytesToUnicode(result)
return result

if __name__ == '__main__':
sys.exit(main())
18 changes: 14 additions & 4 deletions src/python/WMCore/Cache/WMConfigCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from builtins import str
from future.utils import with_metaclass
from future.utils import viewkeys, viewvalues
import cherrypy

import hashlib
import logging
Expand Down Expand Up @@ -98,10 +99,10 @@ def __init__(self, dbURL, couchDBName=None, id=None, rev=None, usePYCurl=True,
self.dburl = dbURL
self.detail = detail
try:
cherrypy.log("AMR connecting to CouchServer with usePYCurl: %s" % usePYCurl)
self.couchdb = CouchServer(self.dburl, usePYCurl=usePYCurl, ckey=ckey, cert=cert, capath=capath)
if self.dbname not in self.couchdb.listDatabases():
self.createDatabase()

cherrypy.log("AMR connecting to database %s" % self.dbname)
self.database = self.couchdb.connectDatabase(self.dbname)
except Exception as ex:
msg = "Error connecting to couch: %s\n" % str(ex)
Expand Down Expand Up @@ -145,9 +146,13 @@ def connectUserGroup(self, groupname, username):
_connectUserGroup_

"""
cherrypy.log("AMR setting group name to %s" % groupname)
self.group = Group(name=groupname)
cherrypy.log("AMR setting couchdb for dburl %s and dbname %s" % (self.dburl, self.dbname))
self.group.setCouch(self.dburl, self.dbname)
cherrypy.log("AMR connecting to group backend")
self.group.connect()
cherrypy.log("AMR making user for username %s" % username)
self.owner = makeUser(groupname, username,
couchUrl=self.dburl,
couchDatabase=self.dbname)
Expand Down Expand Up @@ -272,13 +277,17 @@ def loadByID(self, configID):
Load a document from the server given its couchID
"""
try:
cherrypy.log("AMR getting document id %s" % configID)
self.document = self.database.document(id=configID)
if 'owner' in self.document:
cherrypy.log("AMR connectUserGroup")
self.connectUserGroup(groupname=self.document['owner'].get('group', None),
username=self.document['owner'].get('user', None))
if '_attachments' in self.document:
cherrypy.log("AMR loadAttachment")
# Then we need to load the attachments
for key in viewkeys(self.document['_attachments']):
cherrypy.log("AMR loading attachment for key %s" % key)
self.loadAttachment(name=key)
except CouchNotFoundError as ex:
msg = "Document with id %s not found in couch\n" % (configID)
Expand All @@ -301,7 +310,7 @@ def loadAttachment(self, name, overwrite=True):

Load an attachment from the database and put it somewhere useful
"""

logging.info("AMR loading attachment for docID: %s and name: %s", self.document["_id"], name)
attach = self.database.getAttachment(self.document["_id"], name)

if not overwrite:
Expand Down Expand Up @@ -538,11 +547,12 @@ def __str__(self):
return self.document.__str__()

def validate(self, configID):

try:
# TODO: need to change to DataCache
# self.loadDocument(configID = configID)
cherrypy.log("AMR loading ConfigCache for ID: %s" % configID)
self.loadByID(configID=configID)
cherrypy.log("AMR ConfigCache loaded")
except Exception as ex:
raise ConfigCacheException("Failure to load ConfigCache while validating workload: %s" % str(ex))

Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ def deleteDatabase(self, dbname):
raise RuntimeError(msg)
return self.delete("/%s" % dbname)

def connectDatabase(self, dbname='database', create=True, size=1000):
def connectDatabase(self, dbname='database', create=False, size=1000):
"""
Return a Database instance, pointing to a database in the server. If the
database doesn't exist create it if create is True.
Expand Down
21 changes: 0 additions & 21 deletions src/python/WMCore/JobStateMachine/ConfigureState.py

This file was deleted.

7 changes: 5 additions & 2 deletions src/python/WMCore/Services/pycurl_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from past.builtins import basestring
from future.utils import viewitems


import cherrypy
# system modules
import copy
import json
Expand Down Expand Up @@ -257,7 +257,7 @@ def set_opts(self, curl, url, params, headers,
# we need to enable this header here, in case it has not been provided by upstream.
thisHeaders.setdefault("Accept-Encoding", "gzip")
curl.setopt(pycurl.HTTPHEADER, [encodeUnicodeToBytes("%s: %s" % (k, v)) for k, v in viewitems(thisHeaders)])

cherrypy.log("AMR url %s, thisHeaders %s" % (url, thisHeaders))
bbuf = BytesIO()
hbuf = BytesIO()
curl.setopt(pycurl.WRITEFUNCTION, bbuf.write)
Expand Down Expand Up @@ -311,13 +311,16 @@ def request(self, url, params, headers=None, verb='GET',
verbose=0, ckey=None, cert=None, capath=None,
doseq=True, encode=False, decode=False, cainfo=None, cookie=None):
"""Fetch data for given set of parameters"""
cherrypy.log("AMR url: %s, params: %s, headers: %s, verb: %s" % (url, params, headers, verb))
cherrypy.log(" AMR doseq: %s, encode: %s, decode: %s, cookie: %s" % (doseq, encode, decode, cookie))
curl = pycurl.Curl()
bbuf, hbuf = self.set_opts(curl, url, params, headers, ckey, cert, capath,
verbose, verb, doseq, encode, cainfo, cookie)
curl.perform()
if verbose:
print(verb, url, params, headers)
header = self.parse_header(hbuf.getvalue())
cherrypy.log("AMR response headers: %s" % (header.header))
data = bbuf.getvalue()
data = decompress(data, header.header)
if header.status < 300:
Expand Down
4 changes: 4 additions & 0 deletions src/python/WMCore/WMSpec/StdSpecs/DQMHarvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ def validateSchema(self, schema):

Standard DataProcessing schema validation.
"""
import cherrypy
cherrypy.log("AMR calling DQMHarvest.validateSchema")
DataProcessing.validateSchema(self, schema)

cherrypy.log("AMR calling DQMHarvest.validateConfigCacheExists")
self.validateConfigCacheExists(configID=schema["DQMConfigCacheID"],
configCacheUrl=schema['ConfigCacheUrl'],
couchDBName=schema["CouchDBName"],
getOutputModules=False)
cherrypy.log("AMR done with DQMHarvest.validateConfigCacheExists")

@staticmethod
def getWorkloadCreateArgs():
Expand Down
50 changes: 39 additions & 11 deletions src/python/WMCore/WMSpec/StdSpecs/StdBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from __future__ import division
from future.utils import viewitems
from builtins import range, object

import cherrypy
import logging
import json
from collections import deque

from Utils.PythonVersion import PY3
from Utils.Utilities import decodeBytesToUnicodeConditional
Expand Down Expand Up @@ -53,7 +54,8 @@ def __init__(self):

# Internal parameters
self.workloadName = None
self.config_cache = {}
# cache to be used for the workflow config IDs
self.config_cache = deque([], 10)

return

Expand All @@ -65,6 +67,7 @@ def __call__(self, workloadName, arguments):
method and pull out any that are setup by this base class.
"""
self.workloadName = workloadName
cherrypy.log("AMR getWorkloadCreateArgs")
argumentDefinition = self.getWorkloadCreateArgs()
for arg in argumentDefinition:
try:
Expand All @@ -81,6 +84,7 @@ def __call__(self, workloadName, arguments):
raise WMSpecFactoryException("parameter %s: %s" % (arg, str(ex)))

# TODO: this replace can be removed in one year from now, thus March 2022
cherrypy.log("AMR updating dbs")
if hasattr(self, "dbsUrl"):
self.dbsUrl = self.dbsUrl.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch")
self.dbsUrl = self.dbsUrl.rstrip("/")
Expand All @@ -90,6 +94,18 @@ def __call__(self, workloadName, arguments):
# static copy of the skim mapping
skimMap = {}

def getCachedConfigID(self, absoluteConfigID):
"""
Given a ConfigCacheID (including the url and db name), return it if it's
available in the memory cache.
:param absoluteConfigID: string with the configID url
:return: the config cache document, or None if not found
"""
for item in self.config_cache:
if absoluteConfigID == item['name']:
return item['configDoc']
return

@staticmethod
def calcEvtsPerJobLumi(ePerJob, ePerLumi, tPerEvent, requestedEvents=None):
"""
Expand Down Expand Up @@ -193,15 +209,17 @@ def determineOutputModules(self, scenarioFunc=None, scenarioArgs=None,
scenarioArgs = scenarioArgs or {}

outputModules = {}
cherrypy.log("AMR determineOutputModules for %s and %s" % (configCacheUrl, couchDBName))
cacheKey = configCacheUrl + couchDBName + configDoc
if configDoc is not None and configDoc != "":
if (configCacheUrl, couchDBName) in self.config_cache:
configCache = self.config_cache[(configCacheUrl, couchDBName)]
else:
configCache = ConfigCache(configCacheUrl, couchDBName, True)
self.config_cache[(configCacheUrl, couchDBName)] = configCache
configCache = self.getCachedConfigID(cacheKey)
if not configCache:
configCacheDB = ConfigCache(configCacheUrl, couchDBName)
configCacheDB.loadByID(configDoc)
# FIXME: this does not cache the attachment
self.config_cache[cacheKey] = configCache
# TODO: need to change to DataCache
# configCache.loadDocument(configDoc)
configCache.loadByID(configDoc)
outputModules = configCache.getOutputModuleInfo()
else:
if 'outputs' in scenarioArgs and scenarioFunc in ["promptReco", "expressProcessing", "repack"]:
Expand Down Expand Up @@ -235,6 +253,7 @@ def determineOutputModules(self, scenarioFunc=None, scenarioArgs=None,
outputModules[moduleLabel] = {'dataTier': dataTier,
'primaryDataset': scenarioArgs.get('primaryDataset'),
'filterName': alcaSkim}
cherrypy.log("AMR determineOutputModules self.config_cache keys %s" % self.config_cache.keys())

return outputModules

Expand Down Expand Up @@ -902,11 +921,16 @@ def factoryWorkloadConstruction(self, workloadName, arguments):
if arguments.get('RequestType') == 'Resubmission':
self.validateSchema(schema=arguments)
else:
cherrypy.log("AMR running masterValidation")
self.masterValidation(schema=arguments)
cherrypy.log("AMR running validateSchema")
self.validateSchema(schema=arguments)

cherrypy.log("AMR running StdBase.__call__")
workload = self.__call__(workloadName=workloadName, arguments=arguments)
cherrypy.log("AMR running validateWorkload")
self.validateWorkload(workload)
self.config_cache.clear()

return workload

Expand Down Expand Up @@ -952,11 +976,15 @@ def validateConfigCacheExists(self, configID, configCacheUrl, couchDBName,
if configID == '' or configID == ' ':
self.raiseValidationException(msg="ConfigCacheID is invalid and cannot be loaded")

if (configCacheUrl, couchDBName) in self.config_cache:
configCache = self.config_cache[(configCacheUrl, couchDBName)]
cherrypy.log("AMR validateConfigCacheExists for %s and %s" % (configCacheUrl, couchDBName))
cacheKey = configCacheUrl + couchDBName + configID
if cacheKey in self.config_cache:
cherrypy.log("AMR fetching doc from cache: %s" % configID)
configCache = self.config_cache[cacheKey]
else:
configCache = ConfigCache(dbURL=configCacheUrl, couchDBName=couchDBName, detail=getOutputModules)
self.config_cache[(configCacheUrl, couchDBName)] = configCache
self.config_cache[cacheKey] = configCache
cherrypy.log("AMR validateConfigCacheExists self.config_cache keys %s" % self.config_cache.keys())

try:
# if detail option is set return outputModules
Expand Down
5 changes: 3 additions & 2 deletions src/python/WMCore/WMSpec/StdSpecs/TaskChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
},
"""
from __future__ import division

import cherrypy
import json
from builtins import range, object
from future.utils import viewitems
Expand Down Expand Up @@ -219,7 +219,7 @@ def __call__(self, workloadName, arguments):
"""
_call_

Create a ReReco workload with the given parameters.
Create a TaskChain workload with the given parameters.
"""
StdBase.__call__(self, workloadName, arguments)
self.workload = self.createWorkload()
Expand Down Expand Up @@ -719,6 +719,7 @@ def validateSchema(self, schema):

# Validate the existence of the configCache
if task["ConfigCacheID"]:
cherrypy.log("AMR validating configCacheExists in TaskChain for id: %s" % task["ConfigCacheID"])
self.validateConfigCacheExists(configID=task['ConfigCacheID'],
configCacheUrl=schema["ConfigCacheUrl"],
couchDBName=schema["CouchDBName"],
Expand Down