16 #include <phfield/PHFieldConfig_v3.h>
17 #include <phfield/PHFieldUtility.h>
18 #include <phgeom/PHGeomUtility.h>
34 int KScheduler::verb = 0;
35 int KScheduler::n_threads = 16;
36 int KScheduler::input_pipe_depth = 32;
37 int KScheduler::output_pipe_depth = 32;
38 int KScheduler::print_freq = 100;
39 int KScheduler::save_num = 5000;
40 bool KScheduler::save_raw_evt =
false;
43 TString KScheduler::inputFilename=
"";
44 TString KScheduler::outputFilename=
"";
45 int KScheduler::completedEvents = 0;
48 : use_tracklet_reco(false)
56 std::cout <<
"Initialization of KScheduler ..." << std::endl;
57 std::cout <<
"================================" << std::endl;
60 std::cout <<
"starting KScheduler services\n";
65 string evt_red_opt = rc->
get_CharFlag(
"EventReduceOpts");
66 for (
int i = 0; i < n_threads; i++) {
68 if (evt_red_opt !=
"none") {
72 eventReducerQueue.push(eventReducer);
73 vec_eventReducer.push_back(eventReducer);
78 for (
int i = 0; i < n_threads; i++) {
79 if (! use_tracklet_reco) kFastTracker =
new KalmanFastTracking (phfield, t_geo_manager,
false);
82 kFastTrkQueue.push(kFastTracker);
83 vec_kFastTrk.push_back(kFastTracker);
90 TClonesArray* trackletArray = 0;
91 for (
int i = 0; i < n_threads; i++) {
92 trackletArray=
new TClonesArray(
"Tracklet",1000);
93 trackletArray->BypassStreamer();
94 assert(trackletArray);
99 workThreadArr.resize(n_threads, 0);
102 avgTimer =
new TStopwatch();
103 totalTimer =
new TStopwatch();
106 wArgMutex =
new TMutex();
107 fReaderMutex =
new TMutex();
110 newJobQueuePutMutex =
new TMutex();
111 newJobQueueTakeMutex =
new TMutex();
114 evRedQueuePutMutex =
new TMutex();
115 evRedQueueTakeMutex =
new TMutex();
118 kFTrkQueuePutMutex =
new TMutex();
119 kFTrkQueueTakeMutex =
new TMutex();
122 cmpJobQueuePutMutex =
new TMutex();
123 cmpJobQueueTakeMutex =
new TMutex();
129 njqFSem =
new TSemaphore(0);
130 njqESem =
new TSemaphore(input_pipe_depth);
133 erqFSem =
new TSemaphore(n_threads);
134 erqESem =
new TSemaphore(0);
137 kftqFSem =
new TSemaphore(n_threads);
138 kftqESem =
new TSemaphore(0);
145 cjqFSem =
new TSemaphore(0);
146 cjqESem =
new TSemaphore(output_pipe_depth);
170 for(i=0;i<n_threads;i++){
171 assert(workThreadArr[i]);
172 delete workThreadArr[i];
180 delete newJobQueuePutMutex;
181 delete newJobQueueTakeMutex;
184 delete evRedQueuePutMutex;
185 delete evRedQueueTakeMutex;
188 for(i=0;i<n_threads;i++){
189 er = eventReducerQueue.front();
190 eventReducerQueue.pop();
196 delete kFTrkQueuePutMutex;
197 delete kFTrkQueueTakeMutex;
200 for(i=0; i<n_threads; i++){
201 ft = kFastTrkQueue.front();
206 TClonesArray* trkArr= 0;
207 for(i=0; i<n_threads; i++){
215 delete cmpJobQueuePutMutex;
216 delete cmpJobQueueTakeMutex;
243 TThread::Printf(
"Total Time and average per event below:");
247 Double_t realt = totalTimer->RealTime();
248 Double_t cput = totalTimer->CpuTime();
249 Double_t avgt = totalTimer->RealTime()/(double)completedEvents;
251 Int_t hours = Int_t(realt / 3600);
252 realt -= hours * 3600;
253 Int_t min = Int_t(realt / 60);
257 Int_t avhours = Int_t(avgt / 3600);
258 avgt -= avhours * 3600;
259 Int_t avmin = Int_t(avgt / 60);
263 if (realt < 0) realt = 0;
264 if (cput < 0) cput = 0;
265 if (totalTimer->Counter() > 1) {
266 TThread::Printf(
"Real time %d:%02d:%06.3f, CP time %.3f, %d slices",
267 hours, min, realt, cput, totalTimer->Counter());
270 TThread::Printf(
"Real time %d:%02d:%06.3f, CP time %.3f",
271 hours, min, realt, cput);
272 TThread::Printf(
"Average time per event: %d:%02d:%06.3f",
273 avhours, avmin, avgt);
288 std::cout <<
"KScheduler spawning threads..." << std::endl;
292 std::cout <<
"KScheduler spawning reaperThread..." << std::endl;
293 ret = this->startReaperThread();
295 std::cout <<
"KScheduler spawning worker threads..." << std::endl;
296 ret = this->startWorkerThreads();
306 TThread::Join(fRPPtr->GetId(),
NULL);
310 for (
int i=0; i < n_threads; i++) {
311 wPtr = workThreadArr[i];
312 TThread::Join(wPtr->GetId(),
NULL);
319 return inputFilename;
322 inputFilename = name;
326 return outputFilename;
330 outputFilename = name;
334 if(
Verbose() > 0 && completedEvents % print_freq == 0){
336 TThread::Printf(
"completed: %i events, last %i in time shown below:",
337 completedEvents, print_freq);
339 avgTimer->Print(
"u");
449 static int job_id = 0;
450 KJob* newKJobPtr =
new KJob(job_id++, sraw,
this, copy);
455 newJobQueuePutMutex->Lock();
456 newJobQueue.push(newKJobPtr);
457 newJobQueuePutMutex->UnLock();
463 TThread::Printf(
"fReaderThread: finished reading all events... dumping poison...");
464 for (
int i = 0; i < n_threads; i++) {
465 if(
Verbose() > 2) TThread::Printf(
" poisoning...");
468 newJobQueuePutMutex->Lock();
469 newJobQueue.push(newKJobPtr);
470 newJobQueuePutMutex->UnLock();
477 void* KScheduler::fReaperThread(
void* reaperArg){
483 TThread::Printf(
"Starting fReaper thread\n");
489 TClonesArray* outputTracklets =
new TClonesArray(
"Tracklet",1000);
491 int sizeTrackArr = 0;
494 TThread::Printf(
"opening output file\n");
495 TFile* saveFile =
new TFile(outputFilename.Data(),
"recreate");
496 TTree* saveTree =
new TTree(
"save",
"save");
497 if (save_raw_evt) saveTree->Branch(
"rawEvent", &outputRawEventPtr, 256000, 99);
498 saveTree->Branch(
"recEvent", &outputRecEventPtr, 256000, 99);
500 saveTree->Branch(
"outputTracklets", &outputTracklets, 256000, 99);
501 saveTree->Branch(
"nTracklets", &nTracklets,
"nTracklets/I");
502 saveTree->Branch(
"sizeTrackArr", &sizeTrackArr,
"nTracklets/I");
509 kschd->cjqFSem->Wait();
510 kschd->cmpJobQueuePutMutex->Lock();
511 KJob* tCompleteJobPtr = kschd->cmpJobQueue.front();
512 kschd->cmpJobQueue.pop();
513 kschd->cmpJobQueuePutMutex->UnLock();
514 kschd->cjqESem->Post();
519 if(
Verbose() > 1) TThread::Printf(
"ReaperThread got a poison pill...");
520 delete tCompleteJobPtr;
522 if (poisonPills == n_threads) {
523 TThread::Printf(
"ReaperThread caught all pills...");
533 delete tCompleteJobPtr;
539 TThread::Printf(
"fReaper gets jobId: %i, eventID: %i",
541 tCompleteJobPtr->
jobMutex->UnLock();
556 outputRawEventPtr = tCompleteJobPtr->
evData;
557 outputRecEventPtr = tCompleteJobPtr->
recEvData;
561 *outputTracklets = *(tCompleteJobPtr->
tracklets);
563 TClonesArray& ref_output = *outputTracklets;
565 sizeTrackArr = outputTracklets->GetEntries();
566 assert(outputRawEventPtr);
567 assert(outputRecEventPtr);
568 assert(outputTracklets);
570 TThread::Printf(
"got tracklets: %i for disk for event: %i\n",outputTracklets->GetEntries(), tCompleteJobPtr->
evData->
getEventID());
571 TThread::Printf(
"outputTracklets pointer is: %p\n", outputTracklets);
573 if(outputTracklets->GetEntries()>0){
575 TThread::Printf(
"first tracklet for disk is%p\n",printer);
576 if(printer) printer->
print();
581 if(saveTree->GetEntries() % save_num == 0){
582 TThread::Printf(
"fReaper saving another %i jobs", save_num);
583 saveTree->AutoSave(
"SaveSelf");
585 outputTracklets->Clear();
587 delete tCompleteJobPtr;
591 TThread::Printf(
"ReaperThread attempting to save tree");
593 saveTree->AutoSave(
"SaveSelf");
605 void* KScheduler::fWorkerThread(
void* wArgPtr)
608 workerArg* dwArgPtr = (workerArg*) wArgPtr;
609 unsigned threadId = dwArgPtr->threadId;
619 TThread::Printf(
"Starting worker thread%u\n",threadId);
626 kschd->njqFSem->Wait();
627 kschd->newJobQueuePutMutex->Lock();
628 KJob* tCompleteJobPtr = kschd->newJobQueue.front();
629 assert(tCompleteJobPtr);
630 kschd->newJobQueue.pop();
631 kschd->newJobQueuePutMutex->UnLock();
632 kschd->njqESem->Post();
636 if(
Verbose() > 2) TThread::Printf(
"WorkerThread got a poison pill...");
639 kschd->cjqESem->Wait();
640 kschd->cmpJobQueuePutMutex->Lock();
641 kschd->cmpJobQueue.push(tCompleteJobPtr);
642 kschd->cmpJobQueuePutMutex->UnLock();
643 kschd->cjqFSem->Post();
648 assert(tCompleteJobPtr->
evData);
654 TThread::Printf(
"Worker %u gets jobId: %i, eventID: %i\n",
658 EventReducer* evReducer = kschd->vec_eventReducer.at(threadId);
677 TThread::Printf(
"Worker %u reduced %u hits (%u -> %u).\n", threadId, n_red, n_hits+n_red, n_hits);
703 if(
Verbose() > 1) TThread::Printf(
"Worker %u gets kFastTracker: %p\n", threadId, kFastTracker);
707 if((recStatus != 0 &&
Verbose() > 1) ||
Verbose() > 2) TThread::Printf(
"kFastTrackRecStatus: %i", recStatus);
712 TThread::Printf(
"Worker %u completed setRawEvent: %p, trackletsize = %i\n", threadId, kFastTracker, (
int)rec_tracklets.size());
717 TClonesArray* _tracklets = tCompleteJobPtr->
tracklets;
719 int nFittedTracks = 0;
720 for(std::list<Tracklet>::iterator iter = rec_tracklets.begin(); iter != rec_tracklets.end(); ++iter){
738 new((*_tracklets)[nTracklets])
Tracklet(*iter);
744 TThread::Printf(
"Worker %u: Fitter: nTracklets = %i, nFittedTracks = %i\n", threadId, nTracklets, nFittedTracks);
776 kschd->cjqESem->Wait();
777 kschd->cmpJobQueuePutMutex->Lock();
778 kschd->cmpJobQueue.push(tCompleteJobPtr);
779 kschd->cmpJobQueuePutMutex->UnLock();
780 kschd->cjqFSem->Post();
814 Int_t KScheduler::startReaperThread(){
816 std::cout <<
"Booting fReaperThread" << std::endl;
819 fRPPtr =
new TThread(
"fReaperThread",
820 (TThread::VoidRtnFunc_t) &fReaperThread, (
void*)
this );
827 Int_t KScheduler::stopReaperThread(){
829 TThread::Delete(fRPPtr);
839 Int_t KScheduler::startWorkerThread(
unsigned threadId){
841 std::cout <<
"Booting fWorkerThread:" << threadId << std::endl;
842 std::string threadnm =
"workerThread" + std::to_string(threadId);
843 const char* formatTnm = threadnm.c_str();
845 workerArg* wArgPtr =
new workerArg;
846 wArgPtr->kschdPtr =
this;
847 wArgPtr->threadId = threadId;
849 Printf(
"thread %u Gets Ptr:%p", threadId, wArgPtr);
850 if(!workThreadArr[threadId]){
853 workThreadArr[threadId] =
new TThread(formatTnm,
854 (TThread::VoidRtnFunc_t) &fWorkerThread, (
void*) wArgPtr);
855 thisThread = workThreadArr[threadId];
856 assert(thisThread != 0);
864 Int_t KScheduler::startWorkerThreads(){
867 for(
int i = 0; i < n_threads; i++){
868 ret = startWorkerThread(i);
872 std::cout <<
"started thread:" << i << std::endl;
892 void KScheduler::DoTrackFitting(TClonesArray* tracklets,
SRecEvent* srec)
894 int n_trk = tracklets->GetEntries();
896 for (
int i_trk = 0; i_trk < n_trk; i_trk++) {
903 if (kfitter) fitOK = fitTrackCand(*tracklet, kfitter, recTrack);
904 else if (gfitter) fitOK = fitTrackCand(*tracklet, gfitter, recTrack);
918 cout <<
"DoTrackFitting: run " << run_id <<
" spill " << spill_id <<
" event " << event_id <<
" | " << n_trk_fit <<
" / " << n_trk << endl;
938 LogDebug(
"kFitter failed to converge");
946 LogDebug(
"kmtrk quality cut failed");
976 LogDebug(
"gFitter failed to converge.");
int reduceEvent(SRawEvent *rawEvent)
void postCompletedEvent()
static void setInputFilename(TString name)
static TString getOutputFilename()
static void setOutputFilename(TString name)
std::queue< TClonesArray * > kTrackArrQueue
static TString getInputFilename()
SQGenFit::GFFitter * GetGFitter()
void Init(PHField *phfield, TGeoManager *t_geo_manager, KalmanFitter *kfitter, SQGenFit::GFFitter *gfitter, SQGenFit::GFField *gfield)
KalmanFitter * GetKFitter()
KScheduler(TString inFile, TString outFile)
void PushEvent(SRawEvent *sraw, bool copy)
std::list< Tracklet > & getFinalTracklets()
Final output.
virtual int setRawEvent(SRawEvent *event_input)
int processOneTrack(KalmanTrack &_track)
void updateTrack(KalmanTrack &_track)
SRecTrack getSRecTrack()
Output to SRecTrack.
std::list< Node > & getNodeList()
void setTracklet(Tracklet &tracklet, bool wildseedcov=true)
bool isValid()
Self check to see if it is null.
transient DST object for field storage and access
virtual const std::string get_CharFlag(const std::string &flag) const
int processTrack(GFTrack &track, bool display=false)
void setTracklet(Tracklet &tracklet, double z_reference=590., bool wildseedcov=false)
Int_t getNChamberHitsAll()
void setRecStatus(int status)
void insertTrack(SRecTrack trk)
Insert tracks.
void setRawEvent(SRawEvent *rawEvent)
directly setup everything by raw event
void setNHitsInPT(Int_t nHitsX, Int_t nHitsY)
void setPTSlope(Double_t slopeX, Double_t slopeY)
Prop. tube muon ID info.
void setTriggerRoad(Int_t roadID)
Trigger road info.
void setKalmanStatus(Int_t status)
SRecTrack getSRecTrack(bool hyptest=true)
void print(std::ostream &os=std::cout)
static recoConsts * instance()