#========================================================================== # AIDA Detector description implementation #-------------------------------------------------------------------------- # Copyright (C) Organisation europeenne pour la Recherche nucleaire (CERN) # All rights reserved. # # For the licensing terms see $DD4hepINSTALL/LICENSE. # For the list of contributors see $DD4hepINSTALL/doc/CREDITS. # #========================================================================== from __future__ import absolute_import, unicode_literals import logging from dd4hep_base import * import six logging.basicConfig(format='%(levelname)s: %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def loadDDG4(): ## import ROOT ## done in import * above from ROOT import gSystem ## Try to load libglapi to avoid issues with TLS Static ## Turn off all errors from ROOT about the library missing orgLevel = ROOT.gErrorIgnoreLevel ROOT.gErrorIgnoreLevel=6000 gSystem.Load("libglapi") ROOT.gErrorIgnoreLevel=orgLevel import platform import os if platform.system()=="Darwin": gSystem.SetDynamicPath(os.environ['DD4HEP_LIBRARY_PATH']) os.environ['DYLD_LIBRARY_PATH'] = os.pathsep.join([os.environ['DD4HEP_LIBRARY_PATH'], os.environ.get('DYLD_LIBRARY_PATH', '')]).strip(os.pathsep) result = gSystem.Load("libDDG4Plugins") if result < 0: raise Exception('DDG4.py: Failed to load the DDG4 library libDDG4Plugins: '+gSystem.GetErrorStr()) from ROOT import dd4hep as module return module # We are nearly there .... current = __import__(__name__) def _import_class(ns,nam): scope = getattr(current,ns) setattr(current,nam,getattr(scope,nam)) #--------------------------------------------------------------------------- # try: dd4hep = loadDDG4() except Exception as X: logger.error('+--%-100s--+',100*'-') logger.error('| %-100s |','Failed to load DDG4 library:') logger.error('| %-100s |',str(X)) logger.error('+--%-100s--+',100*'-') exit(1) from ROOT import CLHEP as CLHEP Core = dd4hep Sim = dd4hep.sim Simulation = dd4hep.sim Kernel = Sim.KernelHandle Interface = Sim.Geant4ActionCreation Detector = Core.Detector # from dd4hep_base import std, std_vector, std_list, std_map, std_pair import g4units as G4Units # #--------------------------------------------------------------------------- def _constant(self,name): return self.constantAsString(name) Detector.globalVal = _constant #--------------------------------------------------------------------------- """ Import the Detector constants into the DDG4 namespace """ def importConstants(description,namespace=None,debug=False): scope = current ns = current if namespace is not None and not hasattr(current,namespace): import imp m = imp.new_module('DDG4.'+namespace) setattr(current,namespace,m) ns = m evaluator = dd4hep.g4Evaluator() cnt = 0 num = 0 todo = {} strings = {} for c in description.constants(): if c.second.dataType == 'string': strings[c.first] = c.second.GetTitle() else: todo[c.first] = c.second.GetTitle().replace('(int)','') while len(todo) and cnt<100: cnt = cnt + 1 if cnt == 100: logger.error('%s %d out of %d %s "%s": [%s]\n+++ %s',\ '+++ FAILED to import', len(todo),len(todo)+num, 'global values into namespace', ns.__name__,'Try to continue anyway',100*'=') for k,v in todo.items(): if not hasattr(ns,k): logger.error('+++ FAILED to import: "'+k+'" = "'+str(v)+'"') logger.info('+++ %s',100*'=') for k,v in todo.items(): if not hasattr(ns,k): val = evaluator.evaluate(v) status = evaluator.status() if status == 0: evaluator.setVariable(k,val) setattr(ns,k,val) if debug: logger.info('Imported global value: "'+k+'" = "'+str(val)+'" into namespace'+ns.__name__) del todo[k] num = num + 1 if cnt<100: logger.info('+++ Imported %d global values to namespace:%s',num,ns.__name__,) #--------------------------------------------------------------------------- def _registerGlobalAction(self,action): self.get().registerGlobalAction(Interface.toAction(action)) def _registerGlobalFilter(self,filter): self.get().registerGlobalFilter(Interface.toAction(filter)) #--------------------------------------------------------------------------- def _getKernelProperty(self, name): ret = Interface.getPropertyKernel(self.get(),name) if ret.status > 0: return ret.data elif hasattr(self.get(),name): return getattr(self.get(),name) elif hasattr(self,name): return getattr(self,name) msg = 'Geant4Kernel::GetProperty [Unhandled]: Cannot access Kernel.'+name raise KeyError(msg) #--------------------------------------------------------------------------- def _setKernelProperty(self, name, value): if Interface.setPropertyKernel(self.get(),name,str(value)): return msg = 'Geant4Kernel::SetProperty [Unhandled]: Cannot set Kernel.'+name+' = '+str(value) raise KeyError(msg) #--------------------------------------------------------------------------- def _kernel_phase(self,name): return self.addSimplePhase(str(name),False) #--------------------------------------------------------------------------- def _kernel_worker(self): return Kernel(self.get().createWorker()) #--------------------------------------------------------------------------- def _kernel_terminate(self): return self.get().terminate() #--------------------------------------------------------------------------- Kernel.phase = _kernel_phase Kernel.registerGlobalAction = _registerGlobalAction Kernel.registerGlobalFilter = _registerGlobalFilter Kernel.createWorker = _kernel_worker Kernel.__getattr__ = _getKernelProperty Kernel.__setattr__ = _setKernelProperty Kernel.terminate = _kernel_terminate #--------------------------------------------------------------------------- ActionHandle = Sim.ActionHandle #--------------------------------------------------------------------------- def SensitiveAction(kernel,nam,det,shared=False): return Interface.createSensitive(kernel,str(nam),str(det),shared) #--------------------------------------------------------------------------- def Action(kernel,nam,shared=False): return Interface.createAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def Filter(kernel,nam,shared=False): return Interface.createFilter(kernel,str(nam), shared) #--------------------------------------------------------------------------- def PhaseAction(kernel,nam,shared=False): return Interface.createPhaseAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def RunAction(kernel,nam,shared=False): return Interface.createRunAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def EventAction(kernel,nam,shared=False): return Interface.createEventAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def GeneratorAction(kernel,nam,shared=False): return Interface.createGeneratorAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def TrackingAction(kernel,nam,shared=False): return Interface.createTrackingAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def SteppingAction(kernel,nam,shared=False): return Interface.createSteppingAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def StackingAction(kernel,nam,shared=False): return Interface.createStackingAction(kernel,str(nam), shared) #--------------------------------------------------------------------------- def DetectorConstruction(kernel,nam): return Interface.createDetectorConstruction(kernel,str(nam)) #--------------------------------------------------------------------------- def PhysicsList(kernel,nam): return Interface.createPhysicsList(kernel,str(nam)) #--------------------------------------------------------------------------- def UserInitialization(kernel, nam): return Interface.createUserInitialization(kernel,str(nam)) #--------------------------------------------------------------------------- def SensitiveSequence(kernel, nam): return Interface.createSensDetSequence(kernel,str(nam)) #--------------------------------------------------------------------------- def _setup(obj): def _adopt(self,action): self.__adopt(action.get()) _import_class('Sim',obj) o = getattr(current,obj) setattr(o,'__adopt',getattr(o,'adopt')) setattr(o,'adopt',_adopt) setattr(o,'add',_adopt) def _setup_callback(obj): def _adopt(self,action): self.__adopt(action.get(),action.callback()) _import_class('Sim',obj) o = getattr(current,obj) setattr(o,'__adopt',getattr(o,'add')) setattr(o,'add',_adopt) #--------------------------------------------------------------------------- _setup_callback('Geant4ActionPhase') _setup('Geant4RunActionSequence') _setup('Geant4EventActionSequence') _setup('Geant4GeneratorActionSequence') _setup('Geant4TrackingActionSequence') _setup('Geant4SteppingActionSequence') _setup('Geant4StackingActionSequence') _setup('Geant4PhysicsListActionSequence') _setup('Geant4SensDetActionSequence') _setup('Geant4DetectorConstructionSequence') _setup('Geant4UserInitializationSequence') _setup('Geant4Sensitive') _setup('Geant4ParticleHandler') _import_class('Sim','Geant4Vertex') _import_class('Sim','Geant4Particle') _import_class('Sim','Geant4VertexVector') _import_class('Sim','Geant4ParticleVector') _import_class('Sim','Geant4Action') _import_class('Sim','Geant4Filter') _import_class('Sim','Geant4RunAction') _import_class('Sim','Geant4TrackingAction') _import_class('Sim','Geant4StackingAction') _import_class('Sim','Geant4PhaseAction') _import_class('Sim','Geant4UserParticleHandler') _import_class('Sim','Geant4UserInitialization') _import_class('Sim','Geant4DetectorConstruction') _import_class('Sim','Geant4GeneratorWrapper') _import_class('Sim','Geant4Random') _import_class('CLHEP','HepRandom') _import_class('CLHEP','HepRandomEngine') #--------------------------------------------------------------------------- def _get(self, name): import traceback a = Interface.toAction(self) ret = Interface.getProperty(a,name) if ret.status > 0: return ret.data elif hasattr(self.action,name): return getattr(self.action,name) elif hasattr(a,name): return getattr(a,name) #elif hasattr(self,name): # return getattr(self,name) #traceback.print_stack() msg = 'Geant4Action::GetProperty [Unhandled]: Cannot access property '+a.name()+'.'+name raise KeyError(msg) def _set(self, name, value): a = Interface.toAction(self) if isinstance(value,list): value = [str(x) for x in value] if Interface.setProperty(a,name,str(value)): return msg = 'Geant4Action::SetProperty [Unhandled]: Cannot set '+a.name()+'.'+name+' = '+str(value) raise KeyError(msg) def _props(obj): _import_class('Sim',obj) cl = getattr(current,obj) cl.__getattr__ = _get cl.__setattr__ = _set _props('FilterHandle') _props('ActionHandle') _props('PhaseActionHandle') _props('RunActionHandle') _props('EventActionHandle') _props('GeneratorActionHandle') _props('PhysicsListHandle') _props('TrackingActionHandle') _props('SteppingActionHandle') _props('StackingActionHandle') _props('DetectorConstructionHandle') _props('SensitiveHandle') _props('UserInitializationHandle') _props('Geant4ParticleHandler') _props('Geant4UserParticleHandler') _props('GeneratorActionSequenceHandle') _props('RunActionSequenceHandle') _props('EventActionSequenceHandle') _props('TrackingActionSequenceHandle') _props('SteppingActionSequenceHandle') _props('StackingActionSequenceHandle') _props('DetectorConstructionSequenceHandle') _props('PhysicsListActionSequenceHandle') _props('SensDetActionSequenceHandle') _props('UserInitializationSequenceHandle') _props('Geant4PhysicsListActionSequence') """ Helper object to perform stuff, which occurs very often. I am sick of typing the same over and over again. Hence, I grouped often used python fragments to this small class to re-usage. Long live laziness! \author M.Frank \version 1.0 """ class Geant4: def __init__(self, kernel=None, calo='Geant4CalorimeterAction', tracker='Geant4SimpleTrackerAction'): kernel.UI = "UI" kernel.printProperties() self._kernel = kernel if kernel is None: self._kernel = Kernel() self.description = self._kernel.detectorDescription() self.sensitive_types = {} self.sensitive_types['tracker'] = tracker self.sensitive_types['calorimeter'] = calo self.sensitive_types['escape_counter'] = 'Geant4EscapeCounter' """ Access the worker kernel object. \author M.Frank """ def kernel(self): return self._kernel.worker() """ Access the master kernel object. \author M.Frank """ def master(self): return self._kernel """ Configure the Geant4 command executive \author M.Frank """ def setupUI(self,typ='csh',vis=False,ui=True,macro=None): # Configure UI ui_action = Action(self.master(),"Geant4UIManager/UI") if vis: ui_action.HaveVIS = True else: ui_action.HaveVIS = False if ui: ui_action.HaveUI = True else: ui_action.HaveUI = False ui_action.SessionType = typ if macro: ui_action.SetupUI = macro self.master().registerGlobalAction(ui_action) return ui_action """ Configure the Geant4 command executive with a csh like command prompt \author M.Frank """ def setupCshUI(self,typ='csh',vis=False,ui=True,macro=None): return self.setupUI(typ='csh',vis=vis,ui=ui,macro=macro) """ Configure Geant4 user initialization for optionasl multi-threading mode \author M.Frank """ def addUserInitialization(self, worker, worker_args=None, master=None, master_args=None): import sys init_seq = self.master().userInitialization(True) init_action = UserInitialization(self.master(),'Geant4PythonInitialization/PyG4Init') # if worker: init_action.setWorkerSetup(worker, worker_args) else: raise RuntimeError('Invalid argument for Geant4 worker initialization') # if master: init_action.setMasterSetup(master,master_args) # init_seq.adopt(init_action) return init_seq,init_action def detectorConstruction(self): seq = self.master().detectorConstruction(True) return seq """ Configure Geant4 user initialization for optionasl multi-threading mode \author M.Frank """ def addDetectorConstruction(self, name_type, field=None, field_args=None, geometry=None, geometry_args=None, sensitives=None, sensitives_args=None, allow_threads=False): init_seq = self.master().detectorConstruction(True) init_action = DetectorConstruction(self.master(),name_type) # if geometry: init_action.setConstructGeo(geometry,geometry_args) # if field: init_action.setConstructField(field,field_args) # if sensitives: init_action.setConstructSensitives(sensitives,sensitives_args) # init_seq.adopt(init_action) if allow_threads: last_action = DetectorConstruction(self.master(),"Geant4PythonDetectorConstructionLast/LastDetectorAction") init_seq.adopt(last_action) return init_seq,init_action """ Add a new phase action to an arbitrary step. \author M.Frank """ def addPhaseAction(self,phase_name,factory_specification,ui=True,instance=None): if instance is None: instance = self.kernel() action = PhaseAction(instance,factory_specification) instance.phase(phase_name).add(action) if ui: action.enableUI() return action """ Add a new phase action to the 'configure' step. Called at the beginning of Geant4Exec::configure. The factory specification is the typical string "<factory_name>/<instance name>". If no instance name is specified it defaults to the factory name. \author M.Frank """ def addConfig(self, factory_specification): return self.addPhaseAction('configure',factory_specification,instance=self.master()) """ Add a new phase action to the 'initialize' step. Called at the beginning of Geant4Exec::initialize. The factory specification is the typical string "<factory_name>/<instance name>". If no instance name is specified it defaults to the factory name. \author M.Frank """ def addInit(self, factory_specification): return self.addPhaseAction('initialize',factory_specification) """ Add a new phase action to the 'start' step. Called at the beginning of Geant4Exec::run. The factory specification is the typical string "<factory_name>/<instance name>". If no instance name is specified it defaults to the factory name. \author M.Frank """ def addStart(self, factory_specification): return self.addPhaseAction('start',factory_specification) """ Add a new phase action to the 'stop' step. Called at the end of Geant4Exec::run. The factory specification is the typical string "<factory_name>/<instance name>". If no instance name is specified it defaults to the factory name. \author M.Frank """ def addStop(self, factory_specification): return self.addPhaseAction('stop',factory_specification) """ Execute the Geant 4 program with all steps. \author M.Frank """ def execute(self): self.kernel().configure() self.kernel().initialize() self.kernel().run() self.kernel().terminate() return self def printDetectors(self): logger.info('+++ List of sensitive detectors:') for i in self.description.detectors(): o = DetElement(i.second.ptr()) sd = self.description.sensitiveDetector(str(o.name())) if sd.isValid(): typ = sd.type() sdtyp = 'Unknown' if self.sensitive_types.has_key(typ): sdtyp = self.sensitive_types[typ] logger.info('+++ %-32s type:%-12s --> Sensitive type: %s',o.name(), typ, sdtyp) def setupSensitiveSequencer(self, name, action): if isinstance( action, tuple ): sensitive_type = action[0] else: sensitive_type = action seq = SensitiveSequence(self.kernel(),'Geant4SensDetActionSequence/'+name) seq.enableUI() return seq def setupDetector(self,name,action,collections=None): #fg: allow the action to be a tuple with parameter dictionary sensitive_type = "" parameterDict = {} if isinstance(action,tuple) or isinstance(action,list): sensitive_type = action[0] parameterDict = action[1] else: sensitive_type = action seq = SensitiveSequence(self.kernel(),'Geant4SensDetActionSequence/'+name) seq.enableUI() acts = [] if collections is None: sd = self.description.sensitiveDetector(str(name)) ro = sd.readout() collections = ro.collectionNames() if len(collections)==0: act = SensitiveAction(self.kernel(),sensitive_type+'/'+name+'Handler',name) for parameter, value in six.iteritems(parameterDict): setattr( act, parameter, value) acts.append(act) # Work down the collections if present if collections is not None: for coll in collections: params = {} if isinstance(coll,tuple) or isinstance(coll,list): if len(coll)>2: coll_nam = coll[0] sensitive_type = coll[1] params = coll[2] elif len(coll)>1: coll_nam = coll[0] sensitive_type = coll[1] else: coll_nam = coll[0] else: coll_nam = coll act = SensitiveAction(self.kernel(),sensitive_type+'/'+coll_nam+'Handler',name) act.CollectionName = coll_nam for parameter, value in six.iteritems(params): setattr( act, parameter, value) acts.append(act) for act in acts: act.enableUI() seq.add(act) if len(acts)>1: return (seq,acts) return (seq,acts[0]) def setupCalorimeter(self,name,type=None,collections=None): sd = self.description.sensitiveDetector(str(name)) ### sd.setType('calorimeter') if type is None: type = self.sensitive_types['calorimeter'] return self.setupDetector(name,type,collections) def setupTracker(self,name,type=None,collections=None): sd = self.description.sensitiveDetector(str(name)) ### sd.setType('tracker') if type is None: type = self.sensitive_types['tracker'] return self.setupDetector(name,type,collections) def _private_setupField(self, field, stepper, equation, prt): import SystemOfUnits field.stepper = stepper field.equation = equation field.eps_min = 5e-05*SystemOfUnits.mm field.eps_max = 0.001*SystemOfUnits.mm field.min_chord_step = 0.01*SystemOfUnits.mm field.delta_chord = 0.25*SystemOfUnits.mm field.delta_intersection = 0.001*SystemOfUnits.mm field.delta_one_step = 0.01*SystemOfUnits.mm field.largest_step = 1000*SystemOfUnits.m if prt: logger.info('+++++> %s %s %s %s ',field.name,'-> stepper = ',str(field.stepper),'') logger.info('+++++> %s %s %s %s ',field.name,'-> equation = ',str(field.equation),'') logger.info('+++++> %s %s %s %s ',field.name,'-> eps_min = ',str(field.eps_min),'[mm]') logger.info('+++++> %s %s %s %s ',field.name,'-> eps_max = ',str(field.eps_max),'[mm]') logger.info('+++++> %s %s %s %s ',field.name,'-> delta_chord = ',str(field.delta_chord),'[mm]') logger.info('+++++> %s %s %s %s ',field.name,'-> min_chord_step = ',str(field.min_chord_step),'[mm]') logger.info('+++++> %s %s %s %s ',field.name,'-> delta_one_step = ',str(field.delta_one_step),'[mm]') logger.info('+++++> %s %s %s %s ',field.name,'-> delta_intersection = ',str(field.delta_intersection),'[mm]') logger.info('+++++> %s %s %s %s ',field.name,'-> largest_step = ',str(field.largest_step),'[mm]') return field def setupTrackingFieldMT(self, name='MagFieldTrackingSetup', stepper='ClassicalRK4', equation='Mag_UsualEqRhs',prt=False): seq,fld = self.addDetectorConstruction("Geant4FieldTrackingConstruction/"+name) self._private_setupField(fld, stepper, equation, prt) return (seq,fld) def setupTrackingField(self, name='MagFieldTrackingSetup', stepper='ClassicalRK4', equation='Mag_UsualEqRhs',prt=False): field = self.addConfig('Geant4FieldTrackingSetupAction/'+name) self._private_setupField(field, stepper, equation, prt) return field def setupPhysics(self, name): phys = self.master().physicsList() phys.extends = name phys.decays = True phys.enableUI() phys.dump() return phys def addPhysics(self, name): phys = self.master().physicsList() opt = PhysicsList(self.master(), name) opt.enableUI() phys.adopt(opt) return opt def setupGun(self, name, particle, energy, isotrop=True, multiplicity=1, position=(0.0,0.0,0.0),register=True, **args): gun = GeneratorAction(self.kernel(),"Geant4ParticleGun/"+name,True) for i in args.items(): setattr(gun,i[0],i[1]) gun.energy = energy gun.particle = particle gun.multiplicity = multiplicity gun.position = position gun.isotrop = isotrop gun.enableUI() if register: self.kernel().generatorAction().add(gun) return gun """ Configure ROOT output for the simulated events \author M.Frank """ def setupROOTOutput(self,name,output,mc_truth=True): evt_root = EventAction(self.kernel(),'Geant4Output2ROOT/'+name,True) evt_root.HandleMCTruth = mc_truth evt_root.Control = True if not output.endswith('.root'): output = output + '.root' evt_root.Output = output evt_root.enableUI() self.kernel().eventAction().add(evt_root) return evt_root """ Configure LCIO output for the simulated events \author M.Frank """ def setupLCIOOutput(self,name,output): evt_lcio = EventAction(self.kernel(),'Geant4Output2LCIO/'+name,True) evt_lcio.Control = True evt_lcio.Output = output evt_lcio.enableUI() self.kernel().eventAction().add(evt_lcio) return evt_lcio """ Generic build of the input stage with multiple input modules. Actions executed are: 1) Register Generation initialization action 2) Append all modules to build the complete input record These modules are readers/particle sources, boosters and/or smearing actions. 3) Merge all existing interaction records 4) Add the MC truth handler \author M.Frank """ def buildInputStage(self, generator_input_modules, output_level=None, have_mctruth=True): ga = self.kernel().generatorAction() # Register Generation initialization action gen = GeneratorAction(self.kernel(),"Geant4GeneratorActionInit/GenerationInit") if output_level is not None: gen.OutputLevel = output_level ga.adopt(gen) # Now append all modules to build the complete input record # These modules are readers/particle sources, boosters and/or smearing actions for gen in generator_input_modules: gen.enableUI() if output_level is not None: gen.OutputLevel = output_level ga.adopt(gen) # Merge all existing interaction records gen = GeneratorAction(self.kernel(),"Geant4InteractionMerger/InteractionMerger") gen.enableUI() if output_level is not None: gen.OutputLevel = output_level ga.adopt(gen) # Finally generate Geant4 primaries if have_mctruth: gen = GeneratorAction(self.kernel(),"Geant4PrimaryHandler/PrimaryHandler") gen.RejectPDGs="{1,2,3,4,5,6,21,23,24}" gen.enableUI() if output_level is not None: gen.OutputLevel = output_level ga.adopt(gen) # Puuuhh! All done. return self """ Execute the main Geant4 action \author M.Frank """ def run(self): #self.master().configure() #self.master().initialize() #self.master().run() #self.master().terminate() from ROOT import PyDDG4 PyDDG4.run(self.master().get()) return self Simple = Geant4