Skip to content

Commit

Permalink
Merge pull request #15934 from ggovi/o2o-fixes-0-81X
Browse files Browse the repository at this point in the history
Various improvements for O2O jobs management
  • Loading branch information
cmsbuild authored Sep 22, 2016
2 parents ec72fb0 + 53405d2 commit f0a1f43
Show file tree
Hide file tree
Showing 35 changed files with 662 additions and 647 deletions.
2 changes: 1 addition & 1 deletion CondCore/CondDB/src/IOVProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ namespace cond {
std::string dummy;
if(!m_session->iovSchema().tagTable().select( tag, m_data->timeType, m_data->payloadType, m_data->synchronizationType,
m_data->endOfValidity, dummy, m_data->lastValidatedTime ) ){
throwException( "Tag \""+tag+"\" has not been found in the database.","IOVProxy::load");
throwException( "Tag \""+tag+"\" has not been found in the database "+m_session->connectionString,"IOVProxy::load");
}
m_data->tag = tag;

Expand Down
14 changes: 14 additions & 0 deletions CondCore/Utilities/python/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import netrc
import os

def get_credentials_from_file( service, authFile=None ):
creds = netrc.netrc( authFile ).authenticators(service)
return creds

def get_credentials( authPathEnvVar, service, authFile=None ):
if authFile is None:
if authPathEnvVar in os.environ:
authPath = os.environ[authPathEnvVar]
authFile = os.path.join(authPath,'.netrc')
return get_credentials_from_file( service, authFile )

42 changes: 27 additions & 15 deletions CondCore/Utilities/python/o2o.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
from datetime import datetime
import os
import sys
import netrc
import logging
import string

import CondCore.Utilities.credentials as auth

prod_db_service = ['cms_orcon_prod','o2o_prod']
dev_db_service = ['cms_orcoff_prep','o2o_dev']
schema_name = 'CMS_CONDITIONS'
oracle_tpl = 'oracle://%s:%s@%s'
sqlalchemy_tpl = 'oracle://%s:%s@%s'
coral_tpl = 'oracle://%s/%s'
private_db = 'sqlite:///o2o_jobs.db'
startStatus = -1
authPathEnvVar = 'O2O_AUTH_PATH'
authPathEnvVar = 'COND_AUTH_PATH'
messageLevelEnvVar = 'O2O_LOG_LEVEL'
logFolderEnvVar = 'O2O_LOG_FOLDER'

Expand Down Expand Up @@ -47,13 +50,7 @@ class O2ORun(_Base):
log = sqlalchemy.Column(sqlalchemy.CLOB, nullable=True)

def get_db_credentials( db_service, authFile ):
pwd = None
if authFile is None:
if authPathEnvVar in os.environ:
authPath = os.environ[authPathEnvVar]
authFile = os.path.join(authPath,'.netrc')
logging.debug('Retrieving credentials from file %s' %authFile )
(username, account, pwd) = netrc.netrc( authFile ).authenticators(db_service[1])
(username, account, pwd) = auth.get_credentials( authPathEnvVar, db_service[1], authFile )
return username,pwd


Expand Down Expand Up @@ -82,9 +79,9 @@ def getSession( self, db_service, auth ):
username = None
pwd = None
if username is None:
logging.error('Credentials for service %s are not available',db_service[0])
logging.error('Credentials for service %s (machine=%s) are not available' %(db_service[0],db_service[1]))
return None
url = oracle_tpl %(username,pwd,db_service[0])
url = sqlalchemy_tpl %(username,pwd,db_service[0])
session = None
try:
self.eng = sqlalchemy.create_engine( url )
Expand Down Expand Up @@ -137,6 +134,8 @@ def __init__( self ):
self.job_name = None
self.start = None
self.end = None
self.tag_name = None
self.db_connection = None

def log( self, level, message ):
consoleLog = getattr(O2OMgr.logger( self ),level)
Expand All @@ -151,17 +150,19 @@ def connect( self, service, args ):
if self.session is None:
return False
else:
self.db_connection = coral_tpl %(service[0],schema_name)
return True

def startJob( self, job_name ):
O2OMgr.logger( self ).info('Checking job %s', job_name)
exists = None
enabled = None
try:
res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
res = self.session.query(O2OJob.enabled,O2OJob.tag_name).filter_by(name=job_name)
for r in res:
exists = True
enabled = int(r[0])
self.tag_name = str(r[1])
if exists is None:
exists = False
enabled = False
Expand All @@ -187,7 +188,9 @@ def endJob( self, status, log ):
except sqlalchemy.exc.SQLAlchemyError as dberror:
O2OMgr.logger( self ).error( str(dberror) )

def executeJob( self, job_name, command ):
def executeJob( self, args ):
job_name = args.name
command = args.executable
logFolder = os.getcwd()
if logFolderEnvVar in os.environ:
logFolder = os.environ[logFolderEnvVar]
Expand All @@ -197,13 +200,22 @@ def executeJob( self, job_name, command ):
exists, enabled = self.startJob( job_name )
if exists is None:
return 3
if enabled is None:
if not exists:
O2OMgr.logger( self).error( 'The job %s is unknown.', job_name )
return 2
else:
if enabled == 0:
O2OMgr.logger( self).info( 'The job %s has been disabled.', job_name )
return 5
if args.inputFromDb:
try:
O2OMgr.logger( self ).info('Setting db input parameters...')
input_params = {'db':self.db_connection,'tag':self.tag_name }
commandTpl = string.Template( command )
command = commandTpl.substitute( input_params )
except KeyError as exc:
O2OMgr.logger( self).error( str(exc)+': Unknown template key in the command.' )
O2OMgr.logger( self ).info('O2O Command: "%s"', command )
try:
O2OMgr.logger( self ).info('Executing job %s', job_name )
pipe = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
Expand Down
171 changes: 71 additions & 100 deletions CondCore/Utilities/python/popcon2dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
dateformatForFolder = "%y-%m-%d-%H-%M-%S"
dateformatForLabel = "%y-%m-%d %H:%M:%S"

"""
import upload_popcon
class CondMetaData(object):
Expand All @@ -29,13 +30,13 @@ def __init__( self, fileName ):
def authPath( self ):
apath = ''
if 'authenticationPath' in self.md:
if self.md.has_key('authenticationPath'):
apath = self.md.get('authenticationPath')
return apath
def authSys( self ):
asys = 1
if 'authenticationSys' in self.md:
if self.md.has_key('authenticationSys'):
asys = self.md.get('authenticationSystem')
return asys
Expand All @@ -62,66 +63,26 @@ def dumpMetadataForUpload( self, inputtag, desttag, comment ):
with open( '%s.txt' %fileNameForDropBox, 'wb') as jf:
jf.write( json.dumps( uploadMd, sort_keys=True, indent = 2 ) )
jf.write('\n')
def dumpMetadataForUpload( destDb, destTag, comment ):
uploadMd = {}
uploadMd['destinationDatabase'] = destDb
tags = {}
tagInfo = {}
tags[ destTag ] = tagInfo
uploadMd['destinationTags'] = tags
uploadMd['inputTag'] = destTag
uploadMd['since'] = None
datef = datetime.now()
#datelabel = datef.strftime(dateformatForLabel)
uploadMd['userText'] = '%s : %s' %(datelabel,comment)
with open( '%s.txt' %fileNameForDropBox, 'wb') as jf:
jf.write( json.dumps( uploadMd, sort_keys=True, indent = 2 ) )
jf.write('\n')
"""

def runO2O( cmsswdir, releasepath, release, arch, jobfilename, logfilename, *p ):
# first remove any existing metadata file...
if os.path.exists( '%s.db' %fileNameForDropBox ):
print "Removing files with name %s" %fileNameForDropBox
os.remove( '%s.db' %fileNameForDropBox )
if os.path.exists( '%s.txt' %fileNameForDropBox ):
os.remove( '%s.txt' %fileNameForDropBox )
command = 'export SCRAM_ARCH=%s;' %arch
command += 'CMSSWDIR=%s;' %cmsswdir
command += 'source ${CMSSWDIR}/cmsset_default.sh;'
command += 'cd %s/%s/src;' %(releasepath,release)
command += 'eval `scramv1 runtime -sh`;'
command += 'cd -;'
command += 'pwd;'
command += 'cmsRun %s ' %jobfilename
command += ' '.join(p)
command += ' 2>&1'
pipe = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
stdout_val = pipe.communicate()[0]
return stdout_val

def upload_to_dropbox( backend ):

md = CondMetaData(confFileName)
# check if the expected input file is there...
if not os.path.exists( dbFileForDropBox ):
print 'The input sqlite file has not been produced.'
return False
# first remove any existing metadata file...
if os.path.exists( '%s.txt' %fileNameForDropBox ):
os.remove( '%s.txt' %fileNameForDropBox )
try:
dropBox = upload_popcon.ConditionsUploader(upload_popcon.defaultHostname, upload_popcon.defaultUrlTemplate)
# Try to find the netrc entry
try:
(username, account, password) = netrc.netrc().authenticators(upload_popcon.defaultNetrcHost)
except Exception:
print 'Netrc entry "%s" not found.' %upload_popcon.defaultNetrcHost
return False
dropBox.signIn(username, password)
ret = True
for k,v in md.records().items():
destTag = v.get("destinationTag")
inputTag = v.get("sqliteTag")
if inputTag == None:
inputTag = destTag
comment = v.get("comment")
metadata = md.dumpMetadataForUpload( inputTag, destTag, comment )
ret &= dropBox.uploadFile(dbFileForDropBox, backend, upload_popcon.defaultTemporaryFile)
dropBox.signOut()
if ret:
print 'File %s successfully uploaded.' %dbFileForDropBox
return ret
except upload_popcon.HTTPError as e:
print e
return False

def upload( cfileName, authPath ):
md = CondMetaData(cfileName)
def upload( destDb, destTag, comment, authPath ):
#md = CondMetaData(cfileName)
datef = datetime.now()

# check if the expected input file is there...
if not os.path.exists( dbFileForDropBox ):
Expand All @@ -148,52 +109,62 @@ def upload( cfileName, authPath ):
if os.path.exists( '%s.txt' %fileNameForDropBox ):
os.remove( '%s.txt' %fileNameForDropBox )

# loop over the records to process...
ret = 0
for k,v in md.records().items():
destTag = v.get("destinationTag")
inputTag = v.get("sqliteTag")
if inputTag == None:
inputTag = destTag
comment = v.get("comment")
metadata = md.dumpMetadataForUpload( inputTag, destTag, comment )
uploadCommand = 'uploadConditions.py %s' %fileNameForDropBox
if not authPath is None:
uploadCommand += ' -a %s' %authPath
try:
pipe = subprocess.Popen( uploadCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
stdout = pipe.communicate()[0]
print stdout
retCode = pipe.returncode
if retCode != 0:
# save a copy of the files in case of upload failure...
leafFolderName = md.datef.strftime(dateformatForFolder)
fileFolder = os.path.join( errorInUploadFileFolder, leafFolderName)
if not os.path.exists(fileFolder):
os.makedirs(fileFolder)
df= '%s.db' %fileNameForDropBox
mf= '%s.txt' %fileNameForDropBox
dataDestFile = os.path.join( fileFolder, df)
if not os.path.exists(dataDestFile):
shutil.copy2(df, dataDestFile)
shutil.copy2(mf,os.path.join(fileFolder,mf))
print "Upload failed. Data file and metadata saved in folder '%s'" %os.path.abspath(fileFolder)
ret |= retCode
except Exception as e:
ret |= 1
print e
# dump Metadata for the Upload
uploadMd = {}
uploadMd['destinationDatabase'] = destDb
tags = {}
tagInfo = {}
tags[ destTag ] = tagInfo
uploadMd['destinationTags'] = tags
uploadMd['inputTag'] = destTag
uploadMd['since'] = None
datelabel = datef.strftime(dateformatForLabel)
commentStr = ''
if not comment is None:
commentStr = comment
uploadMd['userText'] = '%s : %s' %(datelabel,commentStr)
with open( '%s.txt' %fileNameForDropBox, 'wb') as jf:
jf.write( json.dumps( uploadMd, sort_keys=True, indent = 2 ) )
jf.write('\n')

# run the upload
uploadCommand = 'uploadConditions.py %s' %fileNameForDropBox
if not authPath is None:
uploadCommand += ' -a %s' %authPath
try:
pipe = subprocess.Popen( uploadCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
stdout = pipe.communicate()[0]
print stdout
retCode = pipe.returncode
if retCode != 0:
# save a copy of the files in case of upload failure...
leafFolderName = datef.strftime(dateformatForFolder)
fileFolder = os.path.join( errorInUploadFileFolder, leafFolderName)
if not os.path.exists(fileFolder):
os.makedirs(fileFolder)
df= '%s.db' %fileNameForDropBox
mf= '%s.txt' %fileNameForDropBox
dataDestFile = os.path.join( fileFolder, df)
if not os.path.exists(dataDestFile):
shutil.copy2(df, dataDestFile)
shutil.copy2(mf,os.path.join(fileFolder,mf))
print "Upload failed. Data file and metadata saved in folder '%s'" %os.path.abspath(fileFolder)
ret |= retCode
except Exception as e:
ret |= 1
print e
return ret

def run( jobfilename, authPath ):
fns = os.path.splitext( jobfilename )
confFile = '%s.json' %fns[0]
def run( args ):
if os.path.exists( '%s.db' %fileNameForDropBox ):
print "Removing files with name %s" %fileNameForDropBox
os.remove( '%s.db' %fileNameForDropBox )
if os.path.exists( '%s.txt' %fileNameForDropBox ):
os.remove( '%s.txt' %fileNameForDropBox )
command = 'cmsRun %s ' %jobfilename
command += ' popconConfigFileName=%s' %confFile
command = 'cmsRun %s ' %args.job_file
command += ' destinationDatabase=%s' %args.destDb
command += ' destinationTag=%s' %args.destTag
command += ' 2>&1'
pipe = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
stdout = pipe.communicate()[0]
Expand All @@ -203,4 +174,4 @@ def run( jobfilename, authPath ):
if retCode!=0:
print 'O2O job failed. Skipping upload.'
return retCode
return upload( confFile, authPath )
return upload( args.destDb, args.destTag, args.comment, args.auth )
Loading

0 comments on commit f0a1f43

Please sign in to comment.