Newer
Older
Andre Sailer
committed
"""
DD4hep simulation with some argument parsing
Based on M. Frank and F. Gaede runSim.py
@author A.Sailer
@version 0.1
"""
import argparse
import io
import logging
import sys
import traceback
from urllib.parse import urlparse
from DDSim.Helper.Meta import Meta
from DDSim.Helper.LCIO import LCIO
Andre Sailer
committed
from DDSim.Helper.HepMC3 import HepMC3
from DDSim.Helper.GuineaPig import GuineaPig
from DDSim.Helper.Physics import Physics
from DDSim.Helper.Filter import Filter
from DDSim.Helper.Geometry import Geometry
from DDSim.Helper.Random import Random
from DDSim.Helper.Action import Action
from DDSim.Helper.Output import Output, outputLevel, outputLevelType
Andre Sailer
committed
from DDSim.Helper.OutputConfig import OutputConfig, defaultOutputFile
from DDSim.Helper.InputConfig import InputConfig
Andre Sailer
committed
from DDSim.Helper.ConfigHelper import ConfigHelper
from DDSim.Helper.MagneticField import MagneticField
from DDSim.Helper.ParticleHandler import ParticleHandler
from DDSim.Helper.Gun import Gun
Andre Sailer
committed
from DDSim.Helper.UI import UI
Marko Petric
committed
Marko Petric
committed
HEPMC3_SUPPORTED_EXTENSIONS = [
".hepmc.gz", ".hepmc.xz", ".hepmc.bz2",
".hepmc3", ".hepmc3.gz", ".hepmc3.xz", ".hepmc3.bz2",
".hepmc3.tree.root",
EDM4HEP_INPUT_EXTENSIONS = [
".root",
".sio",
]
POSSIBLEINPUTFILES = [
".stdhep", ".slcio", ".HEPEvt", ".hepevt",
]
POSSIBLEINPUTFILES += HEPMC3_SUPPORTED_EXTENSIONS
POSSIBLEINPUTFILES += EDM4HEP_INPUT_EXTENSIONS
Andre Sailer
committed
class DD4hepSimulation(object):
"""Class to hold all the parameters and functions to run simulation"""
def __init__(self):
Andre Sailer
committed
self.inputFiles = []
self.outputFile = defaultOutputFile()
Andre Sailer
committed
self.runType = "batch"
Andre Sailer
committed
self.numberOfEvents = 0
self.skipNEvents = 0
Andre Sailer
committed
self.crossingAngleBoost = 0.0
self.macroFile = ''
self.enableG4GPS = False
self.enableG4Gun = False
self._g4gun = None
self._g4gps = None
Andre Sailer
committed
self.vertexSigma = [0.0, 0.0, 0.0, 0.0]
self.vertexOffset = [0.0, 0.0, 0.0, 0.0]
self.enableDetailedShowerMode = False
Andre Sailer
committed
self.disableSignalHandler = False
Andre Sailer
committed
self._errorMessages = []
self._dumpParameter = False
self._dumpSteeringFile = False
Andre Sailer
committed
self.gun = Gun()
self.part = ParticleHandler()
self.field = MagneticField()
self.action = Action()
self.outputConfig = OutputConfig()
self.inputConfig = InputConfig()
self.guineapig = GuineaPig()
Andre Sailer
committed
self.hepmc3 = HepMC3()
self.meta = Meta()
self.geometry = Geometry()
Andre Sailer
committed
self.filter = Filter()
self.physics = Physics()
Andre Sailer
committed
self.ui = UI()
Andre Sailer
committed
Remi Ete
committed
self._argv = None
Andre Sailer
committed
"""Reads a steering file and sets the parameters to that of the
DD4hepSimulation object present in the steering file.
"""
globs = {}
Andre Sailer
committed
return
exec(compile(io.open(self.steeringFile).read(), self.steeringFile, 'exec'), globs, locs)
Andre Sailer
committed
for _name, obj in locs.items():
if isinstance(obj, DD4hepSimulation):
self.__dict__ = obj.__dict__
self.steeringFile = os.path.abspath(sFileTemp)
Andre Sailer
committed
Remi Ete
committed
def parseOptions(self, argv=None):
Andre Sailer
committed
"""parse the command line options"""
Remi Ete
committed
if argv is None:
self._argv = list(sys.argv)
Andre Sailer
committed
parser = argparse.ArgumentParser("Running DD4hep Simulations:",
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("--steeringFile", "-S", action="store", default=self.steeringFile,
Andre Sailer
committed
help="Steering file to change default behaviour")
# first we parse just the steering file, but only if we don't want to see the help message
if not any(opt in self._argv for opt in ('-h', '--help')):
Andre Sailer
committed
parsed, _unknown = parser.parse_known_args()
self.steeringFile = parsed.steeringFile
self.readSteeringFile()
Andre Sailer
committed
# readSteeringFile will set self._argv to None if there is a steering file
if self._argv is None:
self._argv = list(argv) if argv else list(sys.argv)
Andre Sailer
committed
parser.add_argument("--compactFile", nargs='+', action="store",
default=ConfigHelper.makeList(self.compactFile), type=str,
help="The compact XML file, or multiple compact files, if the last one is the closer.")
Andre Sailer
committed
parser.add_argument("--runType", action="store", choices=("batch", "vis", "run", "shell", "qt"),
default=self.runType,
help="The type of action to do in this invocation" # Note: implicit string concatenation
Andre Sailer
committed
"\nbatch: just simulate some events, needs numberOfEvents, and input file or gun"
"\nvis: enable visualisation, run the macroFile if it is set"
"\nqt: enable visualisation in Qt shell, run the macroFile if it is set"
Andre Sailer
committed
"\nshell: enable interactive session")
parser.add_argument("--inputFiles", "-I", nargs='+', action="store", default=self.inputFiles,
help="InputFiles for simulation %s files are supported" % ", ".join(POSSIBLEINPUTFILES))
Andre Sailer
committed
parser.add_argument("--outputFile", "-O", action="store", default=self.outputFile,
help="Outputfile from the simulation: .slcio, edm4hep.root and .root"
" output files are supported")
Andre Sailer
committed
parser.add_argument("-v", "--printLevel", action="store", default=self.printLevel, dest="printLevel",
choices=(1, 2, 3, 4, 5, 6, 7, 'VERBOSE', 'DEBUG',
'INFO', 'WARNING', 'ERROR', 'FATAL', 'ALWAYS'),
type=outputLevelType,
Andre Sailer
committed
help="Verbosity use integers from 1(most) to 7(least) verbose"
"\nor strings: VERBOSE, DEBUG, INFO, WARNING, ERROR, FATAL, ALWAYS")
parser.add_argument("--numberOfEvents", "-N", action="store", dest="numberOfEvents", default=self.numberOfEvents,
type=int, help="number of events to simulate, used in batch mode")
parser.add_argument("--skipNEvents", action="store", dest="skipNEvents", default=self.skipNEvents, type=int,
help="Skip first N events when reading a file")
parser.add_argument("--physicsList", action="store", dest="physicsList", default=self.physicsList,
help="Physics list to use in simulation")
parser.add_argument("--crossingAngleBoost", action="store", dest="crossingAngleBoost",
default=self.crossingAngleBoost,
Andre Sailer
committed
type=float, help="Lorentz boost for the crossing angle, in radian!")
parser.add_argument("--vertexSigma", nargs=4, action="store", dest="vertexSigma",
default=self.vertexSigma, metavar=('X', 'Y', 'Z', 'T'),
Andre Sailer
committed
type=float, help="FourVector of the Sigma for the Smearing of the Vertex position: x y z t")
parser.add_argument("--vertexOffset", nargs=4, action="store", dest="vertexOffset",
default=self.vertexOffset, metavar=('X', 'Y', 'Z', 'T'),
Andre Sailer
committed
type=float, help="FourVector of translation for the Smearing of the Vertex position: x y z t")
parser.add_argument("--macroFile", "-M", action="store", dest="macroFile", default=self.macroFile,
help="Macro file to execute for runType 'run' or 'vis'")
parser.add_argument("--enableGun", "-G", action="store_true", dest="enableGun", default=self.enableGun,
Andre Sailer
committed
help="enable the DDG4 particle gun")
parser.add_argument("--enableG4GPS", action="store_true", dest="enableG4GPS", default=self.enableG4GPS,
help="enable the Geant4 GeneralParticleSource. Needs a macroFile (runType run)"
"or use it with the shell (runType shell)")
parser.add_argument("--enableG4Gun", action="store_true", dest="enableG4Gun", default=self.enableG4Gun,
help="enable the Geant4 particle gun. Needs a macroFile (runType run)"
" or use it with the shell (runType shell)")
parser.add_argument("--dumpParameter", "--dump", action="store_true", dest="dumpParameter",
default=self._dumpParameter, help="Print all configuration Parameters and exit")
Andre Sailer
committed
parser.add_argument("--enableDetailedShowerMode", action="store_true", dest="enableDetailedShowerMode",
default=self.enableDetailedShowerMode,
Andre Sailer
committed
help="use detailed shower mode")
Andre Sailer
committed
parser.add_argument("--disableSignalHandler", action="store_true", dest="disableSignalHandler",
default=self.disableSignalHandler,
help="disable the Signal Handler of DD4hep")
parser.add_argument("--dumpSteeringFile", action="store_true", dest="dumpSteeringFile",
default=self._dumpSteeringFile, help="print an example steering file to stdout")
# output, or do something smarter with fullHelp only for example
Andre Sailer
committed
ConfigHelper.addAllHelper(self, parser)
# now parse everything. The default values are now taken from the
# steeringFile if they were set so that the steering file parameters can be
# overwritten from the command line
if ARGCOMPLETEENABLED:
argcomplete.autocomplete(parser)
Andre Sailer
committed
parsed = parser.parse_args()
self._dumpParameter = parsed.dumpParameter
self._dumpSteeringFile = parsed.dumpSteeringFile
Andre Sailer
committed
self.compactFile = ConfigHelper.makeList(parsed.compactFile)
self.__checkFilesExist(self.compactFile, fileType='compact')
Andre Sailer
committed
self.inputFiles = parsed.inputFiles
self.inputFiles = self.__checkFileFormat(self.inputFiles, POSSIBLEINPUTFILES)
self.__checkFilesExist(self.inputFiles, fileType='input')
Andre Sailer
committed
self.outputFile = parsed.outputFile
self.__checkFileFormat(self.outputFile, ('.root', '.slcio'))
Andre Sailer
committed
self.runType = parsed.runType
self.printLevel = self.__checkOutputLevel(parsed.printLevel)
Andre Sailer
committed
self.numberOfEvents = parsed.numberOfEvents
self.skipNEvents = parsed.skipNEvents
self.physicsList = parsed.physicsList
self.crossingAngleBoost = parsed.crossingAngleBoost
self.macroFile = parsed.macroFile
self.enableGun = parsed.enableGun
self.enableG4Gun = parsed.enableG4Gun
self.enableG4GPS = parsed.enableG4GPS
self.enableDetailedShowerMode = parsed.enableDetailedShowerMode
Andre Sailer
committed
self.vertexOffset = parsed.vertexOffset
self.vertexSigma = parsed.vertexSigma
if self.printLevel <= 2: # VERBOSE or DEBUG
logger.setLevel(logging.DEBUG)
if self._errorMessages and not (self._dumpParameter or self._dumpSteeringFile):
parser.epilog = "\n".join(self._errorMessages)
parser.print_help()
exit(1)
if self._dumpParameter:
from pprint import pprint
pprint(vars(self))
exit(0)
if self._dumpSteeringFile:
exit(0)
Andre Sailer
committed
Andre Sailer
committed
def getDetectorLists(self, detectorDescription):
''' get lists of trackers and calorimeters that are defined in detectorDescription (the compact xml file)'''
trackers, calos, unknown = [], [], []
for i in detectorDescription.detectors():
Andre Sailer
committed
name = det.name()
Andre Sailer
committed
if sd.isValid():
detType = sd.type()
Andre Sailer
committed
logger.info('getDetectorLists - found active detector %s type: %s', name, detType)
if any(pat.lower() in detType.lower() for pat in self.action.trackerSDTypes):
Andre Sailer
committed
logger.info('getDetectorLists - Identified %s as a tracker', name)
elif any(pat.lower() in detType.lower() for pat in self.action.calorimeterSDTypes):
Andre Sailer
committed
logger.info('getDetectorLists - Identified %s as a calorimeter', name)
Andre Sailer
committed
else:
Andre Sailer
committed
logger.warning('getDetectorLists - Unknown sensitive detector type: %s', detType)
unknown.append(det.name())
Andre Sailer
committed
return trackers, calos, unknown
Andre Sailer
committed
# ==================================================================================
Andre Sailer
committed
def run(self):
"""setup the geometry and dd4hep and geant4 and do what was asked to be done"""
import ROOT
ROOT.PyConfig.IgnoreCommandLineOptions = True
self.printLevel = getOutputLevel(self.printLevel)
Andre Sailer
committed
kernel = DDG4.Kernel()
dd4hep.setPrintLevel(self.printLevel)
Andre Sailer
committed
for compactFile in self.compactFile:
kernel.loadGeometry(str("file:" + os.path.abspath(compactFile)))
detectorDescription = kernel.detectorDescription()
Andre Sailer
committed
Andre Sailer
committed
# ----------------------------------------------------------------------------------
Andre Sailer
committed
# simple = DDG4.Geant4( kernel, tracker='Geant4TrackerAction',calo='Geant4CalorimeterAction')
# geant4 = DDG4.Geant4( kernel, tracker='Geant4TrackerCombineAction',calo='Geant4ScintillatorCalorimeterAction')
geant4 = DDG4.Geant4(kernel, tracker=self.action.tracker, calo=self.action.calo)
Andre Sailer
committed
if not self.disableSignalHandler:
geant4.registerInterruptHandler()
Andre Sailer
committed
Andre Sailer
committed
uiaction = geant4.setupUI(typ="tcsh", vis=True, macro=self.macroFile)
uiaction = geant4.setupUI(typ="qt", vis=True, macro=self.macroFile)
uiaction = geant4.setupUI(typ="tcsh", vis=False, macro=self.macroFile, ui=False)
uiaction = geant4.setupUI(typ="tcsh", vis=False, macro=None, ui=True)
Andre Sailer
committed
elif self.runType == "batch":
uiaction = geant4.setupUI(typ="tcsh", vis=False, macro=None, ui=False)
Andre Sailer
committed
else:
return 1
Andre Sailer
committed
# User Configuration for the Geant4Phases
Andre Sailer
committed
uiaction.ConfigureCommands = self.ui._commandsConfigure
uiaction.InitializeCommands = self.ui._commandsInitialize
uiaction.PostRunCommands = self.ui._commandsPostRun
uiaction.PreRunCommands = self.ui._commandsPreRun
uiaction.TerminateCommands = self.ui._commandsTerminate
Andre Sailer
committed
# -----------------------------------------------------------------------------------
Andre Sailer
committed
# setup the magnetic field:
Andre Sailer
committed
# configure geometry creation
self.geometry.constructGeometry(kernel, geant4, self.output.geometry)
Andre Sailer
committed
# ----------------------------------------------------------------------------------
# Configure run, event, track, step, and stack actions, if present
for action_list, DDG4_Action, kernel_Action in \
[(self.action.run, DDG4.RunAction, kernel.runAction),
(self.action.event, DDG4.EventAction, kernel.eventAction),
(self.action.track, DDG4.TrackingAction, kernel.trackingAction),
(self.action.step, DDG4.SteppingAction, kernel.steppingAction),
(self.action.stack, DDG4.StackingAction, kernel.stackingAction)]:
for action_dict in action_list:
action = DDG4_Action(kernel, action_dict["name"])
for parameter, value in action_dict.get('parameter', {}).items():
setattr(action, parameter, value)
kernel_Action().add(action)
# ----------------------------------------------------------------------------------
Andre Sailer
committed
# Configure Run actions
run1 = DDG4.RunAction(kernel, 'Geant4TestRunAction/RunInit')
Andre Sailer
committed
kernel.registerGlobalAction(run1)
kernel.runAction().add(run1)
# Configure the random seed, do it before the I/O because we might change the seed!
Andre Sailer
committed
Andre Sailer
committed
# Configure the output file format and plugin
self.outputConfig.initialize(dd4hepsimulation=self, geant4=geant4)
Andre Sailer
committed
actionList = []
gun = DDG4.GeneratorAction(kernel, "Geant4ParticleGun/" + "Gun")
self.gun.setOptions(gun)
Andre Sailer
committed
gun.Standalone = False
gun.Mask = 1
actionList.append(gun)
self.__applyBoostOrSmear(kernel, actionList, 1)
Marko Petric
committed
logger.info("++++ Adding DD4hep Particle Gun ++++")
if self.enableG4Gun:
# GPS Create something
self._g4gun = DDG4.GeneratorAction(kernel, "Geant4GeneratorWrapper/Gun")
self._g4gun.Uses = 'G4ParticleGun'
self._g4gun.Mask = 2
Marko Petric
committed
logger.info("++++ Adding Geant4 Particle Gun ++++")
actionList.append(self._g4gun)
if self.enableG4GPS:
# GPS Create something
self._g4gps = DDG4.GeneratorAction(kernel, "Geant4GeneratorWrapper/GPS")
self._g4gps.Uses = 'G4GeneralParticleSource'
self._g4gps.Mask = 3
Marko Petric
committed
logger.info("++++ Adding Geant4 General Particle Source ++++")
actionList.append(self._g4gps)
start = 4
for index, plugin in enumerate(self.inputConfig.userInputPlugin, start=start):
gen = plugin(self)
gen.Mask = index
start = index + 1
actionList.append(gen)
self.__applyBoostOrSmear(kernel, actionList, index)
logger.info("++++ Adding User Plugin %s ++++", gen.Name)
for index, inputFile in enumerate(self.inputFiles, start=start):
Andre Sailer
committed
if inputFile.endswith(".slcio"):
gen = DDG4.GeneratorAction(kernel, "LCIOInputAction/LCIO%d" % index)
Andre Sailer
committed
elif inputFile.endswith(".stdhep"):
gen = DDG4.GeneratorAction(kernel, "LCIOInputAction/STDHEP%d" % index)
gen.Input = "LCIOStdHepReader|" + inputFile
Andre Sailer
committed
elif inputFile.endswith(".HEPEvt"):
gen = DDG4.GeneratorAction(kernel, "Geant4InputAction/HEPEvt%d" % index)
gen.Input = "Geant4EventReaderHepEvtShort|" + inputFile
Andre Sailer
committed
elif inputFile.endswith(".hepevt"):
gen = DDG4.GeneratorAction(kernel, "Geant4InputAction/hepevt%d" % index)
gen.Input = "Geant4EventReaderHepEvtLong|" + inputFile
elif inputFile.endswith(tuple([".hepmc"] + HEPMC3_SUPPORTED_EXTENSIONS)):
Andre Sailer
committed
if self.hepmc3.useHepMC3:
gen = DDG4.GeneratorAction(kernel, "Geant4InputAction/hepmc%d" % index)
gen.Parameters = self.hepmc3.getParameters()
gen.Input = "HEPMC3FileReader|" + inputFile
else:
gen = DDG4.GeneratorAction(kernel, "Geant4InputAction/hepmc%d" % index)
gen.Input = "Geant4EventReaderHepMC|" + inputFile
gen = DDG4.GeneratorAction(kernel, "Geant4InputAction/GuineaPig%d" % index)
gen.Input = "Geant4EventReaderGuineaPig|" + inputFile
gen.Parameters = self.guineapig.getParameters()
elif inputFile.endswith(tuple(EDM4HEP_INPUT_EXTENSIONS)):
# EDM4HEP must come after HEPMC3 because of .root also part of hepmc3 extensions
gen = DDG4.GeneratorAction(kernel, "Geant4InputAction/EDM4hep%d" % index)
gen.Input = "EDM4hepFileReader|" + inputFile
Andre Sailer
committed
else:
# this should never happen because we already check at the top, but in case of some LogicError...
raise RuntimeError("Unknown input file type: %s" % inputFile)
gen.AlternativeDecayStatuses = self.physics.alternativeDecayStatuses
Andre Sailer
committed
gen.Sync = self.skipNEvents
gen.Mask = index
actionList.append(gen)
self.__applyBoostOrSmear(kernel, actionList, index)
generationInit = None
Andre Sailer
committed
if actionList:
generationInit = self._buildInputStage(geant4, actionList, output_level=self.output.inputStage,
have_mctruth=self._enablePrimaryHandler())
Andre Sailer
committed
# ================================================================================================
Andre Sailer
committed
# And handle the simulation particles.
part = DDG4.GeneratorAction(kernel, "Geant4ParticleHandler/ParticleHandler")
Andre Sailer
committed
kernel.generatorAction().adopt(part)
part.MinimalKineticEnergy = self.part.minimalKineticEnergy
part.KeepAllParticles = self.part.keepAllParticles
part.PrintEndTracking = self.part.printEndTracking
part.PrintStartTracking = self.part.printStartTracking
part.MinDistToParentVertex = self.part.minDistToParentVertex
part.OutputLevel = self.output.part
Andre Sailer
committed
part.enableUI()
if self.part.enableDetailedHitsAndParticleInfo:
self.part.setupUserParticleHandler(part, kernel, DDG4)
Andre Sailer
committed
# =================================================================================
Andre Sailer
committed
# Setup global filters for use in sensitive detectors
except RuntimeError as e:
Marko Petric
committed
logger.error("%s", e)
return 1
Andre Sailer
committed
# =================================================================================
# get lists of trackers and calorimeters in detectorDescription
Andre Sailer
committed
trk, cal, unk = self.getDetectorLists(detectorDescription)
Andre Sailer
committed
for detectors, function, defFilter, defAction, abort in \
[(trk, geant4.setupTracker, self.filter.tracker, self.action.tracker, False),
(cal, geant4.setupCalorimeter, self.filter.calo, self.action.calo, False),
(unk, geant4.setupDetector, None, "No Default", True),
]:
Andre Sailer
committed
self.__setupSensitiveDetectors(detectors, function, defFilter, defAction, abort)
except Exception as e:
logger.error("Failed setting up sensitive detector %s", e)
raise
# =================================================================================
Andre Sailer
committed
# Now build the physics list:
_phys = self.physics.setupPhysics(kernel, name=self.physicsList)
_phys.verbosity = self.output.physics
Andre Sailer
committed
# add the G4StepLimiterPhysics to activate the max step limits in volumes
ph = DDG4.PhysicsList(kernel, 'Geant4PhysicsList/Myphysics')
ph.addPhysicsConstructor(str('G4StepLimiterPhysics'))
Andre Sailer
committed
dd4hep.setPrintLevel(self.printLevel)
Andre Sailer
committed
kernel.configure()
kernel.initialize()
if self._g4gun is not None:
self._g4gun.generator()
if self._g4gps is not None:
self._g4gps.generator()
startUpTime, _sysTime, _cuTime, _csTime, _elapsedTime = os.times()
exitCode = 0
if not kernel.run():
logger.error("Simulation failed!")
exitCode += 1
if not kernel.terminate():
exitCode += 1
logger.error("Termination failed!")
Andre Sailer
committed
totalTimeUser, totalTimeSys, _cuTime, _csTime, _elapsedTime = os.times()
processedEvents = self.numberOfEvents
if generationInit:
processedEvents = int(generationInit.numberOfEvents)
if self.numberOfEvents < 0:
processedEvents -= 1
logger.debug(f"Correcting number of events to: {processedEvents}")
if self.printLevel <= 3:
logger.info("Total Time: %3.2f s (User), %3.2f s (System)" %
if processedEvents != 0:
eventTime = totalTimeUser - startUpTime
perEventTime = eventTime / processedEvents
logger.info("StartUp Time: %3.2f s, Processing and Init: %3.2f s (~%3.2f s/Event) "
return exitCode
def __setMagneticFieldOptions(self, geant4):
""" create and configure the magnetic tracking setup """
field = geant4.addConfig('Geant4FieldTrackingSetupAction/MagFieldTrackingSetup')
field.stepper = self.field.stepper
field.equation = self.field.equation
field.eps_min = self.field.eps_min
field.eps_max = self.field.eps_max
field.min_chord_step = self.field.min_chord_step
field.delta_chord = self.field.delta_chord
field.delta_intersection = self.field.delta_intersection
field.delta_one_step = self.field.delta_one_step
field.largest_step = self.field.largest_step
Andre Sailer
committed
def __checkFilesExist(self, fileNames, fileType=''):
"""Make sure all files in the given list exist, add to errorMessage otherwise.
:param list fileNames: list of files to check for existence
:param str fileType: type if file, for nicer error message
"""
if isinstance(fileNames, str):
fileNames = [fileNames]
for fileName in fileNames:
if not os.path.exists(fileName) and not urlparse(fileName).scheme:
self._errorMessages.append(f"ERROR: The {fileType}file '{fileName}' does not exist")
Andre Sailer
committed
def __checkFileFormat(self, fileNames, extensions):
"""check if the fileName is allowed, note that the filenames are case
sensitive, and in case of hepevt we depend on this to identify short and long versions of the content
"""
if isinstance(fileNames, str):
Andre Sailer
committed
fileNames = [fileNames]
if not all(fileName.endswith(tuple(extensions)) for fileName in fileNames):
self._errorMessages.append(f"ERROR: Unknown fileformat for file(s): {','.join(fileNames)}")
is_hepmc3_extension = any(fileName.endswith(tuple(HEPMC3_SUPPORTED_EXTENSIONS)) for fileName in fileNames)
if not self.hepmc3.useHepMC3 and is_hepmc3_extension:
self._errorMessages.append("ERROR: HepMC3 files or compressed HepMC2 require the use of HepMC3 library")
return fileNames
Andre Sailer
committed
def __applyBoostOrSmear(self, kernel, actionList, mask):
Andre Sailer
committed
"""apply boost or smearing for given mask index"""
Andre Sailer
committed
if self.crossingAngleBoost:
lbo = DDG4.GeneratorAction(kernel, "Geant4InteractionVertexBoost")
lbo.Angle = self.crossingAngleBoost
lbo.Mask = mask
actionList.append(lbo)
if any(self.vertexSigma) or any(self.vertexOffset):
vSmear = DDG4.GeneratorAction(kernel, "Geant4InteractionVertexSmear")
vSmear.Offset = self.vertexOffset
vSmear.Sigma = self.vertexSigma
vSmear.Mask = mask
actionList.append(vSmear)
""" parse all the options for the helper """
parsedDict = vars(parsed)
for name, obj in vars(self).items():
Andre Sailer
committed
try:
Andre Sailer
committed
except RuntimeError as e:
if logger.level <= logging.DEBUG:
self._errorMessages.append(traceback.format_exc())
obj._checkProperties()
def __checkOutputLevel(self, level):
"""return outputlevel as int so we don't have to import anything for faster startup"""
try:
return outputLevel(level)
self._errorMessages.append("ERROR: printLevel is neither integer nor string")
return -1
self._errorMessages.append("ERROR: printLevel '%s' unknown" % level)
def __setupSensitiveDetectors(self, detectors, setupFunction, defaultFilter=None,
Andre Sailer
committed
defaultAction=None, abortForMissingAction=False,
):
"""Attach sensitive detector actions for all subdetectors.
Can be steered with the `Action` ConfigHelpers
Andre Sailer
committed
:param detectors: list of detectors
:param setupFunction: function used to register the sensitive detector
:param defaultFilter: default filter to apply for given types
:param abortForMissingAction: if true end program if there is no action found
Andre Sailer
committed
"""
for det in detectors:
Andre Sailer
committed
logger.info('Setting up SD for %s with %s', det, defaultAction)
Andre Sailer
committed
action = None
for pattern in self.action.mapActions:
if pattern.lower() in det.lower():
action = self.action.mapActions[pattern]
logger.info(' replace default action with : %s', action)
Andre Sailer
committed
break
if abortForMissingAction and action is None:
logger.error('Cannot find Action for detector %s. You have to extend "action.mapAction"', det)
raise RuntimeError("Cannot find Action")
if self.enableDetailedShowerMode:
if isinstance(act, list):
for a in act:
a.HitCreationMode = 2
else:
act.HitCreationMode = 2
"""print the parameters formated as a steering file"""
steeringFileBase = textwrap.dedent("""\
from DDSim.DD4hepSimulation import DD4hepSimulation
from g4units import mm, GeV, MeV
SIM = DD4hepSimulation()
""")
steeringFileBase += "\n"
optionDict = parser._option_string_actions
parameters = vars(self)
for parName, parameter in sorted(list(parameters.items()), key=sortParameters):
if parName.startswith("_"):
continue
steeringFileBase += "\n\n"
steeringFileBase += "################################################################################\n"
steeringFileBase += "## %s \n" % "\n## ".join(parameter.__doc__.splitlines())
steeringFileBase += "################################################################################\n"
options = parameter.getOptions()
for opt, optionsDict in sorted(options.items(), key=sortParameters):
if opt.startswith("_"):
continue
Andre Sailer
committed
parValue = optionsDict['default']
if isinstance(optionsDict.get('help'), str):
Andre Sailer
committed
steeringFileBase += "\n## %s\n" % "\n## ".join(optionsDict.get('help').splitlines())
if isinstance(parValue, str):
steeringFileBase += "SIM.%s.%s = \"%s\"\n" % (parName, opt, parValue)
steeringFileBase += "SIM.%s.%s = %s\n" % (parName, opt, parValue)
# get the docstring from the command line parameter
optionObj = optionDict.get("--" + parName, None)
if isinstance(optionObj, argparse._StoreAction):
steeringFileBase += "## %s\n" % "\n## ".join(optionObj.help.splitlines())
if isinstance(parameter, str):
steeringFileBase += "SIM.%s = \"%s\"" % (parName, str(parameter))
steeringFileBase += "SIM.%s = %s" % (parName, str(parameter))
steeringFileBase += "\n"
for line in steeringFileBase.splitlines():
print(line)
"""Check if the requested setup makes sense, or if there is something preventing it from working correctly
Appends error messages to self._errorMessages
:returns: None
"""
if not self.compactFile:
self._errorMessages.append("ERROR: No geometry compact file provided")
if self.runType == "batch":
if not self.numberOfEvents:
self._errorMessages.append("ERROR: Batch mode requested, but did not set number of events")
if not (self.inputFiles or self.enableGun or self.inputConfig.userInputPlugin):
self._errorMessages.append("ERROR: Batch mode requested, but did not set inputFile(s), gun, or userInputPlugin")
if self.inputFiles and (self.enableG4Gun or self.enableG4GPS):
self._errorMessages.append("ERROR: Cannot use both inputFiles and Geant4Gun or GeneralParticleSource")
if self.enableGun and (self.enableG4Gun or self.enableG4GPS):
self._errorMessages.append("ERROR: Cannot use both DD4hepGun and Geant4 Gun or GeneralParticleSource")
if self.inputConfig.userInputPlugin and (self.enableG4Gun or self.enableG4GPS):
self._errorMessages.append("ERROR: Cannot use both userInputPlugin and Geant4 Gun or GeneralParticleSource")
if self.numberOfEvents < 0 and not self.inputFiles:
self._errorMessages.append("ERROR: Negative number of events only sensible for inputFiles")
""" the geant4 Gun or GeneralParticleSource cannot be used together with the PrimaryHandler.
Particles would be simulated multiple times
:returns: True or False
"""
enablePrimaryHandler = not (self.enableG4Gun or self.enableG4GPS)
if enablePrimaryHandler:
Marko Petric
committed
logger.info("Enabling the PrimaryHandler")
Marko Petric
committed
logger.info("Disabling the PrimaryHandler")
def _buildInputStage(self, geant4, generator_input_modules, output_level=None, have_mctruth=True):
"""
Generic build of the input stage with multiple input modules.
Actions executed are:
1) Register Generation initialization action
2) Append all modules to build the complete input record
These modules are readers/particle sources, boosters and/or smearing actions.
3) Merge all existing interaction records
4) Add the MC truth handler
"""
from DDG4 import GeneratorAction
# Register Generation initialization action
gen = GeneratorAction(geant4.kernel(), "Geant4GeneratorActionInit/GenerationInit")
generationInit = gen
if output_level is not None:
gen.OutputLevel = output_level
ga.adopt(gen)
# Now append all modules to build the complete input record
# These modules are readers/particle sources, boosters and/or smearing actions
for gen in generator_input_modules:
gen.enableUI()
if output_level is not None:
gen.OutputLevel = output_level
ga.adopt(gen)
# Merge all existing interaction records
gen = GeneratorAction(geant4.kernel(), "Geant4InteractionMerger/InteractionMerger")
gen.enableUI()
if output_level is not None:
gen.OutputLevel = output_level
ga.adopt(gen)
# Finally generate Geant4 primaries
if have_mctruth:
gen = GeneratorAction(geant4.kernel(), "Geant4PrimaryHandler/PrimaryHandler")
gen.RejectPDGs = ConfigHelper.makeString(self.physics.rejectPDGs)
Andre Sailer
committed
gen.ZeroTimePDGs = ConfigHelper.makeString(self.physics.zeroTimePDGs)
gen.enableUI()
if output_level is not None:
gen.OutputLevel = output_level
ga.adopt(gen)
# Puuuhh! All done.
return generationInit
################################################################################
################################################################################
def sortParameters(key):
from functools import cmp_to_key
def _sortParameters(parA, parB):
"""sort the parameters by name: first normal parameters, then set of
parameters based on ConfigHelper objects
"""
parTypeA = parA[1]
parTypeB = parB[1]
if isinstance(parTypeA, ConfigHelper) and isinstance(parTypeB, ConfigHelper):
return 1 if str(parA[0]) > str(parB[0]) else -1
elif isinstance(parTypeA, ConfigHelper):
return 1
elif isinstance(parTypeB, ConfigHelper):
return -1
else:
return 1 if str(parA[0]) > str(parB[0]) else -1
return cmp_to_key(_sortParameters)(key)
def getOutputLevel(level):
"""return output.LEVEL"""
from DDG4 import OutputLevel
levels = {1: OutputLevel.VERBOSE,
2: OutputLevel.DEBUG,
3: OutputLevel.INFO,
4: OutputLevel.WARNING,
5: OutputLevel.ERROR,
6: OutputLevel.FATAL,
7: OutputLevel.ALWAYS}