Class Reference for E1039 Core & Analysis Software
KScheduler.cc
Go to the documentation of this file.
1 /*
2  * KScheduler.cxx
3  * Scheduler for KEventJobs in the KJobQueue
4  * Author: Noah Wuerfel, nwuerfel@umich.edu
5  * created: 10/14/2020
6  */
7 #include <iostream>
8 #include <algorithm>
9 #include <cmath>
11 #include <interface_main/SQEvent.h>
12 #include <interface_main/SQRun.h>
15 #include <phool/recoConsts.h>
16 #include <phfield/PHFieldConfig_v3.h>
17 #include <phfield/PHFieldUtility.h>
18 #include <phgeom/PHGeomUtility.h>
20 #include "KJob.h"
21 #include "KScheduler.h"
22 using namespace std;
23 
24 //sure there's a better way for passing compound args...
25 // thread_local may not be right?.... root is annoying
26 struct workerArg{
27  KScheduler* kschdPtr;
28  unsigned threadId;
29 };
30 
31 // FUCK this was tricky need to initialize blecause static
32 // need static to acess via the member threads because pthreads are typically
33 // static code
34 int KScheduler::verb = 0;
35 int KScheduler::n_threads = 16;
36 int KScheduler::input_pipe_depth = 32; // play with iobuffer to load memory vs do computation...
37 int KScheduler::output_pipe_depth = 32;
38 int KScheduler::print_freq = 100; // num events per timeupdate TODO post fraction need update from fReaderThread
39 int KScheduler::save_num = 5000;
40 bool KScheduler::save_raw_evt = false;
41 
42 //bool KScheduler::use_e906_data = true;
43 TString KScheduler::inputFilename="";
44 TString KScheduler::outputFilename="";
45 int KScheduler::completedEvents = 0;
46 
47 KScheduler::KScheduler(TString inFile, TString outFile)
48  : use_tracklet_reco(false)
49 {
50  this->setInputFilename(inFile);
51  this->setOutputFilename(outFile);
52 }
53 
54 void KScheduler::Init(PHField* phfield, TGeoManager* t_geo_manager, KalmanFitter* kfitter, SQGenFit::GFFitter* gfitter, SQGenFit::GFField* gfield)
55 {
56  std::cout << "Initialization of KScheduler ..." << std::endl;
57  std::cout << "================================" << std::endl;
58 
59  // init services
60  std::cout << "starting KScheduler services\n";
61  //GeomSvc* p_geomSvc = GeomSvc::instance();
63 
64  //Initialize event reducer
65  string evt_red_opt = rc->get_CharFlag("EventReduceOpts");
66  for (int i = 0; i < n_threads; i++) {
67  EventReducer* eventReducer = 0;
68  if (evt_red_opt != "none") {
69  eventReducer = new EventReducer(evt_red_opt);
70  assert(eventReducer);
71  }
72  eventReducerQueue.push(eventReducer);
73  vec_eventReducer.push_back(eventReducer);
74  }
75 
76  //Initialize the kfasttrackers
77  KalmanFastTracking* kFastTracker = 0;
78  for (int i = 0; i < n_threads; i++) {
79  if (! use_tracklet_reco) kFastTracker = new KalmanFastTracking (phfield, t_geo_manager, false);
80  else kFastTracker = new KalmanFastTrackletting(phfield, t_geo_manager, false);
81  assert(kFastTracker);
82  kFastTrkQueue.push(kFastTracker);
83  vec_kFastTrk.push_back(kFastTracker);
84  }
85 
86  m_kfitter = kfitter;
87  m_gfitter = gfitter;
88 
89  // build TClonesArrays for tracklet outputs
90  TClonesArray* trackletArray = 0;
91  for (int i = 0; i < n_threads; i++) {
92  trackletArray= new TClonesArray("Tracklet",1000);
93  trackletArray->BypassStreamer();
94  assert(trackletArray);
95  kTrackArrQueue.push(trackletArray);
96  }
97 
98  // worker threads
99  workThreadArr.resize(n_threads, 0);
100 
101  // net time for alljobs
102  avgTimer = new TStopwatch();
103  totalTimer = new TStopwatch();
104 
105  // lock for wArg structure when initializing structs
106  wArgMutex = new TMutex();
107  fReaderMutex = new TMutex();
108 
109  // pipeline inside
110  newJobQueuePutMutex = new TMutex();
111  newJobQueueTakeMutex = new TMutex();
112 
113  // pipeline evReducer
114  evRedQueuePutMutex = new TMutex();
115  evRedQueueTakeMutex = new TMutex();
116 
117  // pipeline finder
118  kFTrkQueuePutMutex = new TMutex();
119  kFTrkQueueTakeMutex = new TMutex();
120 
121  // pipeline out
122  cmpJobQueuePutMutex = new TMutex();
123  cmpJobQueueTakeMutex = new TMutex();
124 
125  //tracklet arr
126  ktrkQueueMutex = new TMutex();
127 
128 // jobSem = new TSemaphore(MAXJOBS);
129  njqFSem = new TSemaphore(0);
130  njqESem = new TSemaphore(input_pipe_depth);
131 
132  // evReducer
133  erqFSem = new TSemaphore(n_threads);
134  erqESem = new TSemaphore(0);
135 
136  // finder
137  kftqFSem = new TSemaphore(n_threads);
138  kftqESem = new TSemaphore(0);
139 
140  // trackletArrays
141  ktrkqFSem = new TSemaphore(n_threads);
142  ktrkqESem = new TSemaphore(0);
143 
144  // output-stage pipeline
145  cjqFSem = new TSemaphore(0);
146  cjqESem = new TSemaphore(output_pipe_depth);
147 
148  // inits TThreads to 0
149  fRDPtr = 0;
150  fRPPtr = 0;
151 
152  return;
153 };
154 
156 
157  int i = 0;
158 
159  //TODO kill remaning threads and clean memory for them
160  /*
161  for(int i = 0; i<n_threads; i++){
162  delete jobMutexArr[i];
163  }
164  */
165 
166  delete fRDPtr;
167  fRDPtr = 0;
168  delete fRPPtr;
169  fRPPtr = 0;
170  for(i=0;i<n_threads;i++){
171  assert(workThreadArr[i]);
172  delete workThreadArr[i];
173  workThreadArr[i]=0;
174  }
175 
176  delete wArgMutex;
177  delete fReaderMutex;
178 
179  // first stage of pipeline
180  delete newJobQueuePutMutex;
181  delete newJobQueueTakeMutex;
182 
183  // eventReducer
184  delete evRedQueuePutMutex;
185  delete evRedQueueTakeMutex;
186  // TODO CLEAN UP EVENTREDUCERS
187  EventReducer* er = 0;
188  for(i=0;i<n_threads;i++){
189  er = eventReducerQueue.front();
190  eventReducerQueue.pop();
191  delete er;
192  }
193  //delete eventReducerQueue;
194 
195  // finder
196  delete kFTrkQueuePutMutex;
197  delete kFTrkQueueTakeMutex;
198 
199  KalmanFastTracking* ft = 0;
200  for(i=0; i<n_threads; i++){
201  ft = kFastTrkQueue.front();
202  kFastTrkQueue.pop();
203  delete ft;
204  }
205 
206  TClonesArray* trkArr= 0;
207  for(i=0; i<n_threads; i++){
208  trkArr = kTrackArrQueue.front();
209  assert(trkArr);
210  kTrackArrQueue.pop();
211  delete trkArr;
212  }
213 
214  // second stage of pipeline
215  delete cmpJobQueuePutMutex;
216  delete cmpJobQueueTakeMutex;
217 
218  //tracklet arr mutex
219  delete ktrkQueueMutex;
220 
221  // first stage of pipeline sems
222  delete njqFSem;
223  delete njqESem;
224 
225  // ev reducer
226  delete erqFSem;
227  delete erqESem;
228 
229  // finder
230  delete kftqFSem;
231  delete kftqESem;
232 
233  // tracklet arrays sems
234  delete ktrkqFSem;
235  delete ktrkqESem;
236 
237  //output stage of pipeline sems
238  delete cjqFSem;
239  delete cjqESem;
240 
241  // timer
242  delete avgTimer;
243  TThread::Printf("Total Time and average per event below:");
244  // something with printing or formatting is wrong here...
245  // yeah the formatting is stupid.. I pulled this from TStopwatch::Print()...
246 // TThread::Printf("Total: %d, AvgPerEntry: %d",ttimeelapsed,ttimeelapsed/(double)completedEvents);
247  Double_t realt = totalTimer->RealTime();
248  Double_t cput = totalTimer->CpuTime();
249  Double_t avgt = totalTimer->RealTime()/(double)completedEvents;
250 
251  Int_t hours = Int_t(realt / 3600);
252  realt -= hours * 3600;
253  Int_t min = Int_t(realt / 60);
254  realt -= min * 60;
255  //Int_t sec = Int_t(realt);
256 
257  Int_t avhours = Int_t(avgt / 3600);
258  avgt -= avhours * 3600;
259  Int_t avmin = Int_t(avgt / 60);
260  avgt -= avmin * 60;
261  //Int_t avsec = Int_t(avgt);
262 
263  if (realt < 0) realt = 0;
264  if (cput < 0) cput = 0;
265  if (totalTimer->Counter() > 1) {
266  TThread::Printf("Real time %d:%02d:%06.3f, CP time %.3f, %d slices",
267  hours, min, realt, cput, totalTimer->Counter());
268  }
269  else {
270  TThread::Printf("Real time %d:%02d:%06.3f, CP time %.3f",
271  hours, min, realt, cput);
272  TThread::Printf("Average time per event: %d:%02d:%06.3f",
273  avhours, avmin, avgt);
274  }
275 
276 
277 // totalTimer->Print("m");
278 
279 // TThread::Printf("total events: %i",completedEvents);
280  delete totalTimer;
281 
282 
283  return;
284 }
285 
287  Int_t ret;
288  std::cout << "KScheduler spawning threads..." << std::endl;
289 // std::cout << "KScheduler spawning readerThread..." << std::endl;
290 // ret = this->startReaderThread();
291 // assert(ret == 0);
292  std::cout << "KScheduler spawning reaperThread..." << std::endl;
293  ret = this->startReaperThread();
294  assert(ret == 0);
295  std::cout << "KScheduler spawning worker threads..." << std::endl;
296  ret = this->startWorkerThreads();
297  assert(ret == 0);
298  return 0;
299 }
300 
302 {
303  //TThread::Join(fRDPtr->GetId(),NULL);
304  //delete fRDPtr;
305  //fRDPtr = 0;
306  TThread::Join(fRPPtr->GetId(),NULL);
307  delete fRPPtr;
308  fRPPtr = 0;
309  TThread* wPtr = 0;
310  for (int i=0; i < n_threads; i++) {
311  wPtr = workThreadArr[i];
312  TThread::Join(wPtr->GetId(),NULL);
313  }
314  return 0;
315 }
316 
317 // static getters and setters
319  return inputFilename;
320 }
321 void KScheduler::setInputFilename(TString name){
322  inputFilename = name;
323 }
324 // static getters and setters
326  return outputFilename;
327 }
328 
329 void KScheduler::setOutputFilename(TString name){
330  outputFilename = name;
331 }
332 
334  if(Verbose() > 0 && completedEvents % print_freq == 0){
335  avgTimer->Stop();
336  TThread::Printf("completed: %i events, last %i in time shown below:",
337  completedEvents, print_freq);
338  // uhoh gap here?
339  avgTimer->Print("u");
340  avgTimer->Start();
341  //TThread::Printf("\n");
342 
343  }
344  completedEvents++;
345 }
346 
347 // allocates memory for KJOBs - need to delete them in reaperThread
352 //void* KScheduler::fReaderThread(void* readerArgPtr){
353 //
354 // int i;
355 //
356 // // wack... outdated instructions for ROOT?
357 // KScheduler* kschd = (KScheduler*) readerArgPtr;
358 // TString filename = kschd->getInputFilename();
359 // if(Verbose() > 0){
360 // TThread::Printf("Start of fReaderThread - ifile is: %s\n", filename.Data());
361 // }
362 //
363 // // to be filled from Tree
364 // // or from converter in e1039
365 // SRawEvent* localEventPtr=0;
366 //
367 // // in case we have E1039 data
368 // SQEvent* SQEvPtr = 0;
369 // SQHitVector* SQHitVecPtr = 0;
370 // SQHitVector* SQTrigHitVecPtr = 0;
371 //
372 // // rules of synch for ROOT obj are still not clear, so i just sync
373 // // everything.. Not sure if i need to use global mutex for aLL root obj or
374 // // local ones are good enough with the EnableThreadSafety feature of R6...
375 // // if its global i'm boned unless i set up a sleeping system for queue
376 // // filling...
377 // //
378 // // GetEntries does make a new TObject... BUT IN ROOT6
379 // // ENABLETHREADSAFETYCLAIMSTHAT THIS IS ACCEPTABLEEEEEEEEE.... yikes
380 //
381 //
382 // // just lock everything for now... performace issues are better than race
383 // // cond.
384 // kschd->fReaderMutex->Lock();
385 // TFile* inputFile = new TFile(filename, "READ");
386 // TTree* dataTree = 0;
387 // if(use_e906_data){
388 // dataTree = (TTree*) inputFile->Get("save");
389 // dataTree->SetBranchAddress("rawEvent",&localEventPtr);
390 // }
391 // // e1039 data formatting...
392 // else{
393 // dataTree = (TTree*) inputFile->Get("T");
394 // dataTree->SetBranchAddress("DST.SQEvent",&SQEvPtr);
395 // dataTree->SetBranchAddress("DST.SQHitVector", &SQHitVecPtr);
396 // dataTree->SetBranchAddress("DST.SQTriggerHitVector", &SQTrigHitVecPtr);
397 // }
398 // int nEvents = dataTree->GetEntries();
399 // TThread::Printf("fReaderThread: starting read of %i events...\n",nEvents);
400 // kschd->fReaderMutex->UnLock();
401 //
402 // // KJob pointer to add to queue
403 // KJob* newKJobPtr = NULL;
404 //
405 // bool copy = true;
406 //
407 // // TThread sched_yield? for IO block?
408 // for(i=0; i<nEvents; i++){
409 // dataTree->GetEntry(i);
410 //
411 // // convert SQHitVector to SRawEvent
412 // // TODO memory leak from new alloc in BuildSRawEvent?
413 // if(! use_e906_data){
414 // localEventPtr = KScheduler::BuildSRawEvent(SQEvPtr, SQHitVecPtr, SQTrigHitVecPtr);
415 // copy = false;
416 // }
417 //
418 // // makes a copy of the SRawEvent
419 // // takes a mutex when allocating memory for the event
420 // newKJobPtr = new KJob(i,localEventPtr,kschd,copy);
421 //
422 // // need shared queue class
423 // // for now theres just one mutex....
424 // kschd->njqESem->Wait();
425 // kschd->newJobQueuePutMutex->Lock();
426 // kschd->newJobQueue.push(newKJobPtr);
427 // kschd->newJobQueuePutMutex->UnLock();
428 // kschd->njqFSem->Post();
429 // }
430 //
431 // TThread::Printf("fReaderThread: finished reading all events... dumping poison...");
432 //
433 // for(i=0;i<n_threads;i++){
434 // if(Verbose() > 0){
435 // TThread::Printf("poisoning...");
436 // }
437 // newKJobPtr = new KJob(true);
438 // kschd->njqESem->Wait();
439 // kschd->newJobQueuePutMutex->Lock();
440 // kschd->newJobQueue.push(newKJobPtr);
441 // kschd->newJobQueuePutMutex->UnLock();
442 // kschd->njqFSem->Post();
443 // }
444 // return 0;
445 //}
446 
447 void KScheduler::PushEvent(SRawEvent* sraw, bool copy)
448 {
449  static int job_id = 0;
450  KJob* newKJobPtr = new KJob(job_id++, sraw, this, copy);
451 
452  // need shared queue class
453  // for now theres just one mutex....
454  njqESem->Wait();
455  newJobQueuePutMutex->Lock();
456  newJobQueue.push(newKJobPtr);
457  newJobQueuePutMutex->UnLock();
458  njqFSem->Post();
459 }
460 
462 {
463  TThread::Printf("fReaderThread: finished reading all events... dumping poison...");
464  for (int i = 0; i < n_threads; i++) {
465  if(Verbose() > 2) TThread::Printf(" poisoning...");
466  KJob* newKJobPtr = new KJob(true);
467  njqESem->Wait();
468  newJobQueuePutMutex->Lock();
469  newJobQueue.push(newKJobPtr);
470  newJobQueuePutMutex->UnLock();
471  njqFSem->Post();
472  }
473 }
474 
475 
476 // reaper thread
477 void* KScheduler::fReaperThread(void* reaperArg){
478 
479  KScheduler* kschd = (KScheduler*) reaperArg;
480  //recoConsts* rc = recoConsts::instance();
481 
482 // TThread::Printf("worker #%u got ptr:%p",dwArgPtr->threadId,wArgPtr);
483  TThread::Printf("Starting fReaper thread\n");
484 
485  // output fields for saveTree->Fill
486  TString outputFilename = kschd->getOutputFilename();
487  SRawEvent* outputRawEventPtr = 0;
488  SRecEvent* outputRecEventPtr = 0;
489  TClonesArray* outputTracklets = new TClonesArray("Tracklet",1000);
490  int nTracklets = 0;
491  int sizeTrackArr = 0;
492 
493  // setup the output files...
494  TThread::Printf("opening output file\n");
495  TFile* saveFile = new TFile(outputFilename.Data(), "recreate");
496  TTree* saveTree = new TTree("save","save");
497  if (save_raw_evt) saveTree->Branch("rawEvent", &outputRawEventPtr, 256000, 99);
498  saveTree->Branch("recEvent", &outputRecEventPtr, 256000, 99);
499  //saveTree->Branch("time", &time, "time/D");
500  saveTree->Branch("outputTracklets", &outputTracklets, 256000, 99);
501  saveTree->Branch("nTracklets", &nTracklets, "nTracklets/I");
502  saveTree->Branch("sizeTrackArr", &sizeTrackArr, "nTracklets/I");
503 
504  // try to reap jobs forever
505  int poisonPills = 0;
506  bool running = true;
507  while(running){
508  // try to acquire a job from the queue...
509  kschd->cjqFSem->Wait();
510  kschd->cmpJobQueuePutMutex->Lock();
511  KJob* tCompleteJobPtr = kschd->cmpJobQueue.front();
512  kschd->cmpJobQueue.pop();
513  kschd->cmpJobQueuePutMutex->UnLock();
514  kschd->cjqESem->Post();
515 
516  // check for poison
517  if(tCompleteJobPtr->isPoison){
518  // cleanup job and die...
519  if(Verbose() > 1) TThread::Printf("ReaperThread got a poison pill...");
520  delete tCompleteJobPtr;
521  poisonPills++;
522  if (poisonPills == n_threads) {
523  TThread::Printf("ReaperThread caught all pills...");
524  break;
525  } else {
526  continue;
527  }
528  }
529 
530  // check for halted thread
531  if(tCompleteJobPtr->p_JobStatus==HALTED){
532  // job failed for some reason...
533  delete tCompleteJobPtr;
534  continue;
535  }
536 
537  if(Verbose() > 1){
538  tCompleteJobPtr->jobMutex->Lock();
539  TThread::Printf("fReaper gets jobId: %i, eventID: %i",
540  tCompleteJobPtr->jobId, tCompleteJobPtr->evData->getEventID());
541  tCompleteJobPtr->jobMutex->UnLock();
542  }
543 
544  // TODO CHECK JOB STATUS
545  // otherwise no atomic way to safely delete... because i'd be deleting
546  // mutex too...
547  // TODO PRAY .... is there a race conidition here? I'm 99% sure the
548  // pipeline enforces enough serialism here tobe safe
549  //
550  // TODO UNLESS there's a 3rd party daemon that checsk events or jobs...
551  // maybe when doing time scheduling? ... careful here...
552 
554  kschd->DoTrackFitting(tCompleteJobPtr->tracklets, tCompleteJobPtr->recEvData);
555 
556  outputRawEventPtr = tCompleteJobPtr->evData;
557  outputRecEventPtr = tCompleteJobPtr->recEvData;
558  //outputTracklets = tCompleteJobPtr->tracklets;
559 
560  //outputTracklets = new TClonesArray(*(tCompleteJobPtr->tracklets));
561  *outputTracklets = *(tCompleteJobPtr->tracklets);
562 
563  TClonesArray& ref_output = *outputTracklets;
564  nTracklets = tCompleteJobPtr->nTracklets;
565  sizeTrackArr = outputTracklets->GetEntries();
566  assert(outputRawEventPtr);
567  assert(outputRecEventPtr);
568  assert(outputTracklets);
569  if(Verbose() > 1){
570  TThread::Printf("got tracklets: %i for disk for event: %i\n",outputTracklets->GetEntries(), tCompleteJobPtr->evData->getEventID());
571  TThread::Printf("outputTracklets pointer is: %p\n", outputTracklets);
572 
573  if(outputTracklets->GetEntries()>0){
574  Tracklet* printer = (Tracklet*) ref_output[0];
575  TThread::Printf("first tracklet for disk is%p\n",printer);
576  if(printer) printer->print();
577  }
578  }
579 
580  saveTree->Fill();
581  if(saveTree->GetEntries() % save_num == 0){
582  TThread::Printf("fReaper saving another %i jobs", save_num);
583  saveTree->AutoSave("SaveSelf");
584  }
585  outputTracklets->Clear();
586  // need to reuse these job objects because tracklets are large...
587  delete tCompleteJobPtr;
588  kschd->postCompletedEvent();
589  }
590 
591  TThread::Printf("ReaperThread attempting to save tree");
592  // save outputs
593  saveTree->AutoSave("SaveSelf");
594  saveFile->cd();
595  saveTree->Write();
596  saveFile->Close();
597 
598  // cleanup
599  // oh no how do i clean these all up...
600  // outputTracklets? do i need to delete shallow copy?
601  return 0;
602 }
603 
604 // need to delete the memory for the worker thread for now
605 void* KScheduler::fWorkerThread(void* wArgPtr)
606 {
607  //workerArg lwArg;
608  workerArg* dwArgPtr = (workerArg*) wArgPtr;
609  unsigned threadId = dwArgPtr->threadId;
610  KScheduler* kschd = dwArgPtr->kschdPtr;
611  //recoConsts* rc = recoConsts::instance();
612  //KJob* tCompleteJobPtr = NULL;
613 
614  // worker tool pointers
615  //int nEvents_dimuon=0;
616 // std::list<Tracklet>& rec_tracklets = NULL;
617 // TClonesArray& arr_tracklets = NULL;
618 // TThread::Printf("worker #%u got ptr:%p",dwArgPtr->threadId,wArgPtr);
619  TThread::Printf("Starting worker thread%u\n",threadId);
620 
621  // try to get jobs forever
622  bool running = true;
623  while(running){
624 
625  // try to acquire a job from the queue...
626  kschd->njqFSem->Wait();
627  kschd->newJobQueuePutMutex->Lock();
628  KJob* tCompleteJobPtr = kschd->newJobQueue.front();
629  assert(tCompleteJobPtr);
630  kschd->newJobQueue.pop();
631  kschd->newJobQueuePutMutex->UnLock();
632  kschd->njqESem->Post();
633 
634  // TODO check for poison pill
635  if(tCompleteJobPtr->isPoison){
636  if(Verbose() > 2) TThread::Printf("WorkerThread got a poison pill...");
637  running = false;
638  //put job in complete queue to kill next part of pipeline
639  kschd->cjqESem->Wait();
640  kschd->cmpJobQueuePutMutex->Lock();
641  kschd->cmpJobQueue.push(tCompleteJobPtr);
642  kschd->cmpJobQueuePutMutex->UnLock();
643  kschd->cjqFSem->Post();
644  break;
645  }
646 
647  // check poison stream so no crash
648  assert(tCompleteJobPtr->evData);
649 
650  // try to acquire a job from the queue...
652  // semaphore enforces synchronization... (I hope...)
653  if(Verbose() > 1){
654  TThread::Printf("Worker %u gets jobId: %i, eventID: %i\n",
655  threadId, tCompleteJobPtr->jobId, tCompleteJobPtr->evData->getEventID());
656  }
657 
658  EventReducer* evReducer = kschd->vec_eventReducer.at(threadId);
659 
661  //kschd->erqFSem->Wait();
662  //kschd->evRedQueuePutMutex->Lock();
663  //EventReducer* evReducer = kschd->eventReducerQueue.front();
664  //assert(evReducer);
665  //kschd->eventReducerQueue.pop();
666  //kschd->evRedQueuePutMutex->UnLock();
667  //kschd->erqESem->Post();
668  //
669  //if(Verbose() > 1){
670  // TThread::Printf("Worker %u gets evReducer: %p\n",threadId,evReducer);
671  //}
672 
673  // reduce the event...
674  int n_red = evReducer->reduceEvent(tCompleteJobPtr->evData);
675  if(Verbose() > 2){
676  int n_hits = tCompleteJobPtr->evData->getNChamberHitsAll();
677  TThread::Printf("Worker %u reduced %u hits (%u -> %u).\n", threadId, n_red, n_hits+n_red, n_hits);
678  }
679 
680  // TODO update hit hinfo for the SQhitvector? needs a lot of
681  // bookkeeping...
682 
684  //kschd->erqESem->Wait();
685  //kschd->evRedQueuePutMutex->Lock();
686  //kschd->eventReducerQueue.push(evReducer);
687  //kschd->evRedQueuePutMutex->UnLock();
688  //kschd->erqFSem->Post();
689 
690  KalmanFastTracking* kFastTracker = kschd->vec_kFastTrk.at(threadId);
691 
693  //kschd->kftqFSem->Wait();
694  //kschd->kFTrkQueuePutMutex->Lock();
695  //KalmanFastTracking* kFastTracker = kschd->kFastTrkQueue.front();
697  //assert(kFastTracker);
698  //kschd->kFastTrkQueue.pop();
699  //kschd->kFTrkQueuePutMutex->UnLock();
700  //kschd->kftqESem->Post();
701 
702  // do something with the tracker
703  if(Verbose() > 1) TThread::Printf("Worker %u gets kFastTracker: %p\n", threadId, kFastTracker);
704 
705  // set the event
706  int recStatus = kFastTracker->setRawEvent(tCompleteJobPtr->evData);
707  if((recStatus != 0 && Verbose() > 1) || Verbose() > 2) TThread::Printf("kFastTrackRecStatus: %i", recStatus);
708  tCompleteJobPtr->recEvData->setRecStatus(recStatus);
709  tCompleteJobPtr->recEvData->setRawEvent(tCompleteJobPtr->evData);
710  std::list<Tracklet>& rec_tracklets = kFastTracker->getFinalTracklets();
711  if(Verbose() > 1) {
712  TThread::Printf("Worker %u completed setRawEvent: %p, trackletsize = %i\n", threadId, kFastTracker, (int)rec_tracklets.size());
713  // tCompleteJobPtr->tracklets->GetEntries()
714  //TThread::Printf("job pointer for tracklets is%p\n",tCompleteJobPtr->tracklets);
715  }
716 
717  TClonesArray* _tracklets = tCompleteJobPtr->tracklets; // Output
718  int nTracklets = 0;
719  int nFittedTracks = 0;
720  for(std::list<Tracklet>::iterator iter = rec_tracklets.begin(); iter != rec_tracklets.end(); ++iter){
721  iter->calcChisq();
722 
723  //KalmanFitter* kfitter = kschd->GetKFitter();
724  //SQGenFit::GFFitter* gfitter = kschd->GetGFitter();
725  //SRecTrack recTrack;
726  //bool fitOK = false;
727  //if (kfitter) fitOK = kschd->fitTrackCand(*iter, kfitter, recTrack);
728  //else if (gfitter) fitOK = kschd->fitTrackCand(*iter, gfitter, recTrack);
729  //
730  //if (!fitOK) {
731  // recTrack = iter->getSRecTrack(kfitter != 0);
732  // recTrack.setKalmanStatus(-1);
733  // //fillRecTrack(recTrack);
734  //} else {
735  // ++nFittedTracks;
736  //}
737 
738  new((*_tracklets)[nTracklets]) Tracklet(*iter);
739  ++nTracklets;
740  tCompleteJobPtr->nTracklets++;
741  //tCompleteJobPtr->recEvData->insertTrack(recTrack);
742  }
743  if(Verbose() > 1) {
744  TThread::Printf("Worker %u: Fitter: nTracklets = %i, nFittedTracks = %i\n", threadId, nTracklets, nFittedTracks);
745  }
746 
748 // for (int idx = 0; idx <= 4; idx++) {
749 // std::list<Tracklet>& tracklets_temp = kFastTracker->getTrackletList(idx);
750 // for(std::list<Tracklet>::iterator iter = tracklets_temp.begin(); iter != tracklets_temp.end(); ++iter){
751 // iter->calcChisq();
752 // //TODO tracklets
753 // new(arr_tracklets[nTracklets++]) Tracklet(*iter);
754 // tCompleteJobPtr->nTracklets++;
755 // }
756 // }
757 // if(Verbose() > 0){
758 // TThread::Printf("arr_tracklet has %i entries for eventID: %i\n",arr_tracklets.GetEntries(),
759 // tCompleteJobPtr->evData->getEventID());
760 // if(arr_tracklets.GetEntries() > 0){
761 // Tracklet* printer = (Tracklet*) arr_tracklets[0];
762 // TThread::Printf("first tracklet is:%p for eventID: %i\n",printer, tCompleteJobPtr->evData->getEventID());
763 // if(printer)
764 // printer->print();
765 // }
766 // }
767 
769  //kschd->kftqESem->Wait();
770  //kschd->kFTrkQueuePutMutex->Lock();
771  //kschd->kFastTrkQueue.push(kFastTracker);
772  //kschd->kFTrkQueuePutMutex->UnLock();
773  //kschd->kftqFSem->Post();
774 
775  //put job in complete queue
776  kschd->cjqESem->Wait();
777  kschd->cmpJobQueuePutMutex->Lock();
778  kschd->cmpJobQueue.push(tCompleteJobPtr);
779  kschd->cmpJobQueuePutMutex->UnLock();
780  kschd->cjqFSem->Post();
781  }
782 
783  // figure out right place to do this... malloced in startWorkerThread
784  delete dwArgPtr;
785  return 0;
786 }
787 
788 // takes mem for thred
789 //Int_t KScheduler::startReaderThread(){
790 // // check threadstatus
791 // std::cout << "Booting fReaderThread" << std::endl;
792 // if(!fRDPtr){
794 // fRDPtr = new TThread("fReaderThread",
795 // (TThread::VoidRtnFunc_t) &fReaderThread, (void*) this );
796 //
797 // fRDPtr->Run();
798 // return 0;
799 // }
800 // return 1;
801 //}
802 // frees mem for thred
803 //Int_t KScheduler::stopReaderThread(){
804 // if(fRDPtr){
805 // TThread::Delete(fRDPtr);
806 // delete fRDPtr;
807 // fRDPtr = 0;
808 // return 0;
809 // }
810 // return 1;
811 //}
812 
813 // takes mem for thred
814 Int_t KScheduler::startReaperThread(){
815  // check threadstatus
816  std::cout << "Booting fReaperThread" << std::endl;
817  if(!fRPPtr){
818 // fRPPtr = new TThread("fReaperThread", (void(*)(void*)) &fReaperThread, (void*) this );
819  fRPPtr = new TThread("fReaperThread",
820  (TThread::VoidRtnFunc_t) &fReaperThread, (void*) this );
821 
822  fRPPtr->Run();
823  return 0;
824  }
825  return 1;
826 }
827 Int_t KScheduler::stopReaperThread(){
828  if(fRPPtr){
829  TThread::Delete(fRPPtr);
830  delete fRPPtr;
831  fRPPtr = 0;
832  return 0;
833  }
834  return 1;
835 }
836 
837 // takes mem for argptr, needs to be freed by the Kthread
838 // TODO FIX INPUT ARG TO THIS NO SLOT ID NEEDED
839 Int_t KScheduler::startWorkerThread(unsigned threadId){
840  // check threadstatus
841  std::cout << "Booting fWorkerThread:" << threadId << std::endl;
842  std::string threadnm = "workerThread" + std::to_string(threadId);
843  const char* formatTnm = threadnm.c_str();
844  TThread* thisThread;
845  workerArg* wArgPtr = new workerArg;
846  wArgPtr->kschdPtr = this;
847  wArgPtr->threadId = threadId;
848 
849  Printf("thread %u Gets Ptr:%p", threadId, wArgPtr);
850  if(!workThreadArr[threadId]){
851 // workThreadArr[threadId] = new TThread(formatTnm, (void(*)(void*)) &fWorkerThread,
852 // (void*) wArgPtr );
853  workThreadArr[threadId] = new TThread(formatTnm,
854  (TThread::VoidRtnFunc_t) &fWorkerThread, (void*) wArgPtr);
855  thisThread = workThreadArr[threadId];
856  assert(thisThread != 0);
857  thisThread->Run();
858  return 0;
859  }
860  return 1;
861 }
862 
863 // wrapper
864 Int_t KScheduler::startWorkerThreads(){
865  Int_t ret;
866  //TThread* thisThread;
867  for(int i = 0; i < n_threads; i++){
868  ret = startWorkerThread(i);
869  // delay in print here is enough for the wArg to work out from run...
870  // TODO NEED REAL SYNC FOR THAT (sig after set warg?)
871  // THIS IS REALLY BAD TODO
872  std::cout << "started thread:" << i << std::endl;
873  assert(ret == 0);
874  }
875  return 0;
876 }
877 
878 // TODO this is a little more confusing with worker ids
879 /*
880 Int_t KScheduler::stopWorkerThread(){
881  if(workThreadArr[]){
882  TThread::Delete(fRPPtr);
883  delete fRPPtr;
884  fRPPtr = 0;
885  return 0;
886  }
887  return 1;
888 
889 }
890 */
891 
892 void KScheduler::DoTrackFitting(TClonesArray* tracklets, SRecEvent* srec)
893 {
894  int n_trk = tracklets->GetEntries();
895  int n_trk_fit = 0;
896  for (int i_trk = 0; i_trk < n_trk; i_trk++) {
897  Tracklet* tracklet = (Tracklet*)tracklets->At(i_trk);
898 
899  KalmanFitter* kfitter = GetKFitter();
900  SQGenFit::GFFitter* gfitter = GetGFitter();
901  SRecTrack recTrack;
902  bool fitOK = false;
903  if (kfitter) fitOK = fitTrackCand(*tracklet, kfitter, recTrack);
904  else if (gfitter) fitOK = fitTrackCand(*tracklet, gfitter, recTrack);
905 
906  if (!fitOK) {
907  recTrack = tracklet->getSRecTrack(kfitter != 0);
908  recTrack.setKalmanStatus(-1);
909  } else {
910  n_trk_fit++;
911  }
912  srec->insertTrack(recTrack);
913  }
914  if ((Verbose() > 0 && n_trk > 0) || Verbose() > 1) {
915  int run_id = srec->getRunID();
916  int spill_id = srec->getSpillID();
917  int event_id = srec->getEventID();
918  cout << "DoTrackFitting: run " << run_id << " spill " << spill_id << " event " << event_id << " | " << n_trk_fit << " / " << n_trk << endl;
919  }
920 }
921 
925 bool KScheduler::fitTrackCand(Tracklet& tracklet, KalmanFitter* fitter, SRecTrack& strack)
926 {
927  KalmanTrack kmtrk;
928  kmtrk.setTracklet(tracklet);
929 
930  if(kmtrk.getNodeList().empty())
931  {
932  LogDebug("kmtrk nodelist empty");
933  return false;
934  }
935 
936  if(fitter->processOneTrack(kmtrk) == 0)
937  {
938  LogDebug("kFitter failed to converge");
939  return false;
940  }
941 
942  fitter->updateTrack(kmtrk);//update after fitting
943 
944  if(!kmtrk.isValid())
945  {
946  LogDebug("kmtrk quality cut failed");
947  return false;
948  }
949 
950  //SRecTrack strack = kmtrk.getSRecTrack();
951  strack = kmtrk.getSRecTrack();
952 
953  //Set trigger road ID
954  TriggerRoad road(tracklet);
955  strack.setTriggerRoad(road.getRoadID());
956 
957  //Set prop tube slopes
958  strack.setNHitsInPT(tracklet.seg_x.getNHits(), tracklet.seg_y.getNHits());
959  strack.setPTSlope(tracklet.seg_x.a, tracklet.seg_y.a);
960  strack.setKalmanStatus(1);
961  //fillRecTrack(strack);
962  return true;
963 }
964 
968 bool KScheduler::fitTrackCand(Tracklet& tracklet, SQGenFit::GFFitter* fitter, SRecTrack& strack)
969 {
970  SQGenFit::GFTrack gftrk;
971  gftrk.setTracklet(tracklet);
972 
973  int fitOK = fitter->processTrack(gftrk);
974  if(fitOK != 0)
975  {
976  LogDebug("gFitter failed to converge.");
977  return false;
978  }
979 
980  //if(Verbosity() > Fun4AllBase::VERBOSITY_A_LOT)
981  //{
982  // gftrk.postFitUpdate();
983  // gftrk.print(2);
984  //}
985 
986  //TODO: A gtrack quality cut?
987 
988  //SRecTrack strack = gftrk.getSRecTrack();
989  strack = gftrk.getSRecTrack();
990 
991  //Set trigger road ID
992  TriggerRoad road(tracklet);
993  strack.setTriggerRoad(road.getRoadID());
994 
995  //Set prop tube slopes
996  strack.setNHitsInPT(tracklet.seg_x.getNHits(), tracklet.seg_y.getNHits());
997  strack.setPTSlope(tracklet.seg_x.a, tracklet.seg_y.a);
998 
999  //fillRecTrack(strack);
1000  return true;
1001 }
#define LogDebug(exp)
@ HALTED
Definition: KJob.h:18
#define NULL
Definition: Pdb.h:9
int reduceEvent(SRawEvent *rawEvent)
Definition: KJob.h:21
KJobStatus p_JobStatus
Definition: KJob.h:47
int jobId
Definition: KJob.h:34
TClonesArray * tracklets
Definition: KJob.h:44
int nTracklets
Definition: KJob.h:36
bool isPoison
Definition: KJob.h:37
SRawEvent * evData
Definition: KJob.h:41
SRecEvent * recEvData
Definition: KJob.h:46
TMutex * jobMutex
Definition: KJob.h:39
TSemaphore * ktrkqESem
Definition: KScheduler.h:95
void postCompletedEvent()
Definition: KScheduler.cc:333
static void setInputFilename(TString name)
Definition: KScheduler.cc:321
static TString getOutputFilename()
Definition: KScheduler.cc:325
TMutex * ktrkQueueMutex
Definition: KScheduler.h:91
static void setOutputFilename(TString name)
Definition: KScheduler.cc:329
std::queue< TClonesArray * > kTrackArrQueue
Definition: KScheduler.h:98
Int_t endThreads()
Definition: KScheduler.cc:301
static TString getInputFilename()
Definition: KScheduler.cc:318
SQGenFit::GFFitter * GetGFitter()
Definition: KScheduler.h:86
TSemaphore * ktrkqFSem
Definition: KScheduler.h:94
void Init(PHField *phfield, TGeoManager *t_geo_manager, KalmanFitter *kfitter, SQGenFit::GFFitter *gfitter, SQGenFit::GFField *gfield)
Definition: KScheduler.cc:54
KalmanFitter * GetKFitter()
Definition: KScheduler.h:85
KScheduler(TString inFile, TString outFile)
Definition: KScheduler.cc:47
void PushPoison()
Definition: KScheduler.cc:461
static int Verbose()
Definition: KScheduler.h:59
void PushEvent(SRawEvent *sraw, bool copy)
Definition: KScheduler.cc:447
Int_t runThreads()
Definition: KScheduler.cc:286
std::list< Tracklet > & getFinalTracklets()
Final output.
virtual int setRawEvent(SRawEvent *event_input)
int processOneTrack(KalmanTrack &_track)
void updateTrack(KalmanTrack &_track)
SRecTrack getSRecTrack()
Output to SRecTrack.
std::list< Node > & getNodeList()
Definition: KalmanTrack.h:84
void setTracklet(Tracklet &tracklet, bool wildseedcov=true)
Definition: KalmanTrack.cxx:62
bool isValid()
Self check to see if it is null.
Definition: KalmanTrack.cxx:96
transient DST object for field storage and access
Definition: PHField.h:14
virtual const std::string get_CharFlag(const std::string &flag) const
Definition: PHFlag.cc:13
int getNHits() const
int processTrack(GFTrack &track, bool display=false)
Definition: GFFitter.cxx:50
SRecTrack getSRecTrack()
Definition: GFTrack.cxx:394
void setTracklet(Tracklet &tracklet, double z_reference=590., bool wildseedcov=false)
Definition: GFTrack.cxx:338
Int_t getEventID()
Definition: SRawEvent.h:150
Int_t getNChamberHitsAll()
Definition: SRawEvent.cxx:381
void setRecStatus(int status)
Definition: SRecEvent.h:432
Int_t getEventID()
Definition: SRecEvent.h:442
void insertTrack(SRecTrack trk)
Insert tracks.
Definition: SRecEvent.h:466
void setRawEvent(SRawEvent *rawEvent)
directly setup everything by raw event
Definition: SRecEvent.cxx:764
Int_t getRunID()
Definition: SRecEvent.h:440
Int_t getSpillID()
Definition: SRecEvent.h:441
void setNHitsInPT(Int_t nHitsX, Int_t nHitsY)
Definition: SRecEvent.h:226
void setPTSlope(Double_t slopeX, Double_t slopeY)
Prop. tube muon ID info.
Definition: SRecEvent.h:225
void setTriggerRoad(Int_t roadID)
Trigger road info.
Definition: SRecEvent.h:221
void setKalmanStatus(Int_t status)
Definition: SRecEvent.h:148
PropSegment seg_x
Definition: FastTracklet.h:231
PropSegment seg_y
Definition: FastTracklet.h:232
SRecTrack getSRecTrack(bool hyptest=true)
void print(std::ostream &os=std::cout)
static recoConsts * instance()
Definition: recoConsts.cc:7