Class Reference for E1039 Core & Analysis Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OnlMonClient.cc
Go to the documentation of this file.
1 #include <iomanip>
2 #include <algorithm>
3 #include <TStyle.h>
4 #include <TSystem.h>
5 #include <TH1D.h>
6 #include <TSocket.h>
7 #include <TClass.h>
8 #include <TMessage.h>
9 #include <interface_main/SQRun.h>
10 #include <interface_main/SQEvent.h>
13 #include <phool/PHNodeIterator.h>
14 #include <phool/PHIODataNode.h>
15 #include <phool/getClass.h>
16 #include <UtilAna/UtilOnline.h>
17 #include "OnlMonServer.h"
18 #include "OnlMonCanvas.h"
19 #include "OnlMonComm.h"
20 #include "OnlMonClient.h"
21 using namespace std;
22 
23 std::vector<OnlMonClient*> OnlMonClient::m_list_us;
24 bool OnlMonClient::m_bl_clear_us = true;
25 
27  : SubsysReco("OnlMonClient")
28  , m_title("Client Title")
29  , m_hm(0)
30  , m_n_can(1)
31  , m_h1_basic_id(0)
32  , m_h1_basic_cnt(0)
33  , m_spill_id_pre(-1)
34  , m_make_sp_hist(true)
35 {
36  memset(m_list_can, 0, sizeof(m_list_can));
37  m_list_us.push_back(this);
38 }
39 
41 {
42  if (! m_hm) delete m_hm;
43  ClearSpillHist();
44  ClearHistList(m_list_h1);
45  ClearCanvasList();
46  m_list_us.erase( find(m_list_us.begin(), m_list_us.end(), this) );
47 }
48 
50 {
51  cerr << "!!ERROR!! OnlMonClient::Clone(): virtual function called. Abort." << endl;
52  exit(1);
53 }
54 
56 {
57  if (! OnlMonServer::instance()->GetOnline()) DisableSpillHist();
58  return InitOnlMon(topNode);
59 }
60 
62 {
67  gStyle->SetLabelSize (0.05, "X");
68  gStyle->SetTitleSize (0.05, "X");
69  gStyle->SetLabelSize (0.06, "YZ");
70  gStyle->SetTitleSize (0.06, "YZ");
71  gStyle->SetTitleOffset(0.90, "XY");
72  gStyle->SetTitleSize (0.10, "");
73 
74  m_hm = new Fun4AllHistoManager(Name());
76  m_h1_basic_id = new TH1D("h1_basic_id" , "", 10, 0.5, 10.5);
77  m_h1_basic_cnt = new TH1D("h1_basic_cnt", "", 10, 0.5, 10.5);
78  m_hm->registerHisto(m_h1_basic_id);
79  m_hm->registerHisto(m_h1_basic_cnt);
80 
81  SQRun* run_header = findNode::getClass<SQRun>(topNode, "SQRun");
82  if (!run_header) return Fun4AllReturnCodes::ABORTEVENT;
83  m_h1_basic_id->SetBinContent(BIN_RUN, run_header->get_run_id());
84 
85  return InitRunOnlMon(topNode);
86 }
87 
89 {
90  SQEvent* event = findNode::getClass<SQEvent>(topNode, "SQEvent");
91  if (!event) return Fun4AllReturnCodes::ABORTEVENT;
92 
93  pthread_mutex_t* mutex = OnlMonServer::instance()->GetMutex();
94  int ret_mutex = pthread_mutex_lock(mutex);
95  if (ret_mutex != 0) cout << "WARNING: mutex_lock returned " << ret_mutex << "." << endl;
96 
97  int sp_id = event->get_spill_id();
98  if (sp_id != m_spill_id_pre) { // New spill
100  if (m_spill_id_pre >= 0 && m_make_sp_hist) { // Not first spill
101  if (comm->GetNumSpills() <= comm->GetMaxNumSelSpills()) {
102  MakeSpillHist(m_spill_id_pre, sp_id);
103  } else {
104  cout << Name() << ": Disable spill-by-spill hists." << endl;
105  MakeSpillHist(m_spill_id_pre);
106  DisableSpillHist();
107  }
108  }
109  m_spill_id_pre = sp_id;
110  OnlMonComm::instance()->AddSpill(sp_id);
111  m_h1_basic_cnt->AddBinContent(BIN_N_SP, 1);
112  m_h1_basic_id ->SetBinContent(BIN_RUN , event->get_run_id());
113  m_h1_basic_id ->SetBinContent(BIN_SPILL, sp_id);
114  m_h1_basic_id ->SetBinContent(BIN_EVENT, event->get_event_id());
115  int sp_curr = m_h1_basic_id->GetBinContent(BIN_SPILL_MIN);
116  if (sp_curr <= 0 || sp_curr > sp_id) m_h1_basic_id->SetBinContent(BIN_SPILL_MIN, sp_id);
117  sp_curr = m_h1_basic_id->GetBinContent(BIN_SPILL_MAX);
118  if (sp_curr < sp_id) m_h1_basic_id->SetBinContent(BIN_SPILL_MAX, sp_id);
119  }
120  m_h1_basic_cnt->AddBinContent(BIN_N_EVT, 1);
121 
122  int ret = ProcessEventOnlMon(topNode);
123  ret_mutex = pthread_mutex_unlock(mutex);
124  if (ret_mutex != 0) cout << "WARNING: mutex_unlock returned " << ret_mutex << "." << endl;
125  return ret;
126 }
127 
129 {
130  if (! m_hm) return Fun4AllReturnCodes::EVENT_OK;
131 
132  if (m_spill_id_pre >= 0 && m_make_sp_hist) {
133  MakeSpillHist(m_spill_id_pre);
134  DisableSpillHist(); // Necessary here to merge all spill hists.
135  }
136  ClearHistList(m_list_h1);
137  for (unsigned int ih = 0; ih < m_hm->nHistos(); ih++) {
138  TH1* h1_org = (TH1*)m_hm->getHisto(ih);
139  TH1* h1 = (TH1*)h1_org->Clone(h1_org->GetName());
140  m_list_h1.push_back(h1);
141  }
142 
143  ClearCanvasList();
144  int ret = DrawCanvas(true);
145  if (ret != 0) {
146  cerr << "WARNING: OnlMonClient::End(): ret = " << ret << endl;
147  }
148 
149  int run_id;
150  GetBasicID(&run_id);
151 
152  ostringstream oss;
153  oss << UtilOnline::GetOnlMonDir() << "/" << setfill('0') << setw(6) << run_id;
154  gSystem->mkdir(oss.str().c_str(), true);
155  gSystem->Chmod(oss.str().c_str(), 0775);
156  oss << "/" << Name() << ".root";
157  m_hm->dumpHistos(oss.str());
158  gSystem->Chmod(oss.str().c_str(), 0664);
159 
160  return EndOnlMon(topNode);
161 }
162 
163 void OnlMonClient::GetBasicID(int* run_id, int* spill_id, int* event_id, int* spill_id_min, int* spill_id_max)
164 {
165  if (run_id ) *run_id = (int)m_h1_basic_id->GetBinContent(BIN_RUN);
166  if (spill_id ) *spill_id = (int)m_h1_basic_id->GetBinContent(BIN_SPILL);
167  if (event_id ) *event_id = (int)m_h1_basic_id->GetBinContent(BIN_EVENT);
168  if (spill_id_min) *spill_id_min = (int)m_h1_basic_id->GetBinContent(BIN_SPILL_MIN);
169  if (spill_id_max) *spill_id_max = (int)m_h1_basic_id->GetBinContent(BIN_SPILL_MAX);
170 }
171 
172 void OnlMonClient::GetBasicCount(int* n_evt, int* n_sp)
173 {
174  if (n_evt) *n_evt = (int)m_h1_basic_cnt->GetBinContent(BIN_N_EVT);
175  if (n_sp ) *n_sp = (int)m_h1_basic_cnt->GetBinContent(BIN_N_SP );
176 }
177 
178 TH1* OnlMonClient::FindMonHist(const std::string name, const bool non_null)
179 {
180  for (HistList_t::iterator it = m_list_h1.begin(); it != m_list_h1.end(); it++) {
181  if (name == (*it)->GetName()) return *it;
182  }
183  if (non_null) {
184  cerr << "!!ERROR!! OnlMonClient::FindMonHist() cannot find '" << name << "'. Abort." << endl;
185  exit(1);
186  }
187  return 0;
188 }
189 
191 {
192  cerr << "!!ERROR!! OnlMonClient::InitOnlMon(): virtual function called. Abort." << endl;
193  exit(1);
194 }
195 
197 {
198  cerr << "!!ERROR!! OnlMonClient::InitRunOnlMon(): virtual function called. Abort." << endl;
199  exit(1);
200 }
201 
203 {
204  cerr << "!!ERROR!! OnlMonClient::ProcessEventOnlMon(): virtual function called. Abort." << endl;
205  exit(1);
206 }
207 
209 {
210  cerr << "!!ERROR!! OnlMonClient::EndOnlMon(): virtual function called. Abort." << endl;
211  exit(1);
212 }
213 
215 {
216  cerr << "!!ERROR!! OnlMonClient::FindAllMonHist(): virtual function called. Abort." << endl;
217  exit(1);
218 }
219 
221 {
222  cerr << "!!ERROR!! OnlMonClient::DrawMonitor(): virtual function called. Abort." << endl;
223  exit(1);
224 }
225 
227 {
228  if (GetClearUsFlag()) {
229  for (SelfList_t::iterator it = m_list_us.begin(); it != m_list_us.end(); it++) {
230  (*it)->ClearCanvasList();
231  }
232  } else { // Clear only its own canvases
233  ClearCanvasList();
234  }
235  int ret = ReceiveHist();
236  if (ret == 1) {
237  cout << "WARNING: The OnlMon server is NOT running." << endl;
238  return 1;
239  } else if (ret == 2) {
240  cout << "WARNING: The OnlMon server is NOT ready." << endl;
241  return 1;
242  } else if (ret != 0) {
243  cout << "WARNING: Unknown error in connecting to the OnlMon server." << endl;
244  return 1;
245  }
246 
247  return DrawCanvas();
248 }
249 
250 void OnlMonClient::RegisterHist(TH1* h1, const HistMode_t mode)
251 {
252  if (m_hm) {
253  m_hm->registerHisto(h1);
254  m_hist_mode[h1->GetName()] = mode;
255  } else {
256  cerr << "WARNING: OnlMonClient::RegisterHist(): Cannot register hist (" << h1->GetName()
257  << "). You must call this function in InitRunOnlMon(), not InitOnlMon(). Do nothing." << endl;
258  }
259 }
260 
261 int OnlMonClient::SendHist(TSocket* sock, int sp_min, int sp_max)
262 {
263  if (! m_hm) {
264  //if (Verbosity() > 2) cout << " HM not ready." << endl;
265  sock->Send("NotReady");
266  return 1; // Not ready
267  }
268 
269  pthread_mutex_t* mutex = OnlMonServer::instance()->GetMutex();
270  int ret_mutex = pthread_mutex_lock(mutex);
271  if (ret_mutex != 0) cout << "WARNING: mutex_lock returned " << ret_mutex << "." << endl;
272 
273  HistList_t list_h1;
274  if (m_make_sp_hist) {
275  if (sp_min > 0 && sp_max == 0) { // "sp_min" means N of spills
276  int min0, max0;
278  sp_max = max0;
279  sp_min = max0 - sp_min + 1;
280  }
281  MakeMergedHist(list_h1, sp_min, sp_max);
282  } else {
283  for (unsigned int ih = 0; ih < m_hm->nHistos(); ih++) {
284  TH1* h1_org = (TH1*)m_hm->getHisto(ih);
285  TH1* h1 = (TH1*)h1_org->Clone(h1_org->GetName());
286  list_h1.push_back(h1);
287  }
288  }
289 
290  ret_mutex = pthread_mutex_unlock(mutex);
291  if (ret_mutex != 0) cout << "WARNING: mutex_unlock returned " << ret_mutex << "." << endl;
292 
293  TMessage outgoing(kMESS_OBJECT);
294  //if (Verbosity() > 2) cout << " SUBSYS: " << name_subsys << " " << hm->nHistos() << endl;
295  for (HistList_t::iterator it = list_h1.begin(); it != list_h1.end(); it++) {
296  TH1* h1 = *it;
297  outgoing.Reset();
298  outgoing.WriteObject(h1);
299  sock->Send(outgoing);
300  outgoing.Reset();
301  TMessage* mess = 0;
302  sock->Recv(mess); // Just check a response.
303  delete mess;
304  }
305  ClearHistList(list_h1);
306 
307  sock->Send("Finished");
308  return 0;
309 }
310 
311 void OnlMonClient::ClearSpillHist()
312 {
313  for (Name2SpillHistMap_t::iterator it1 = m_map_hist_sp.begin(); it1 != m_map_hist_sp.end(); it1++) {
314  SpillHistMap_t* map_hist = &it1->second;
315  for (SpillHistMap_t::iterator it2 = map_hist->begin(); it2 != map_hist->end(); it2++) {
316  delete it2->second;
317  }
318  }
319  m_map_hist_sp.clear();
320 }
321 
328 void OnlMonClient::MakeSpillHist(const int spill_id, const int spill_id_new)
329 {
330  bool add_dir = TH1::AddDirectoryStatus();
331  TH1::AddDirectory(false); // Significantly speeds up deleting hists in dtor.
332  for (unsigned int ih = 0; ih < m_hm->nHistos(); ih++) {
333  TH1* h1 = (TH1*)m_hm->getHisto(ih);
334  string name = h1->GetName();
335  ostringstream oss;
336  oss << name << "_sp" << spill_id;
337  m_map_hist_sp[name][spill_id] = (TH1*)h1->Clone(oss.str().c_str());
338  h1->Reset("M");
339  if (spill_id_new > 0) m_map_hist_sp[name][spill_id_new] = h1;
340  }
341  TH1::AddDirectory(add_dir);
342 }
343 
348 void OnlMonClient::DisableSpillHist()
349 {
350  if (! m_make_sp_hist) return;
351  if (m_map_hist_sp.size() > 0) { // Merge existing spill hists
352  HistList_t list_h1;
353  MakeMergedHist(list_h1);
354  for (unsigned int ih = 0; ih < m_hm->nHistos(); ih++) {
355  TH1* h1 = (TH1*)m_hm->getHisto(ih);
356  h1->Reset("M");
357  h1->Add(list_h1.at(ih));
358  }
359  ClearHistList(list_h1);
360  ClearSpillHist();
361  }
362  m_make_sp_hist = false;
364 }
365 
366 void OnlMonClient::MakeMergedHist(HistList_t& list_h1, const int sp_min, const int sp_max)
367 {
368  bool add_dir = TH1::AddDirectoryStatus();
369  TH1::AddDirectory(false);
370  ClearHistList(list_h1);
371  for (unsigned int ih = 0; ih < m_hm->nHistos(); ih++) {
372  TH1* h1_org = (TH1*)m_hm->getHisto(ih);
373  string name = h1_org->GetName();
374  TH1* h1 = (TH1*)h1_org->Clone(name.c_str());
375  h1->Reset("M");
376  SpillHistMap_t* map_hist = &m_map_hist_sp[name];
377  for (SpillHistMap_t::iterator it = map_hist->begin(); it != map_hist->end(); it++) {
378  int sp_id = it->first;
379  if (sp_min > 0 && sp_id < sp_min) continue;
380  if (sp_max > 0 && sp_id > sp_max) continue;
381  if (name == "h1_basic_id") {
382  h1->SetBinContent(BIN_RUN , TMath::Max(h1->GetBinContent(BIN_RUN ), it->second->GetBinContent(BIN_RUN )));
383  h1->SetBinContent(BIN_SPILL, TMath::Max(h1->GetBinContent(BIN_SPILL), it->second->GetBinContent(BIN_SPILL)));
384  h1->SetBinContent(BIN_EVENT, TMath::Max(h1->GetBinContent(BIN_EVENT), it->second->GetBinContent(BIN_EVENT)));
385  int sp_curr = h1->GetBinContent(BIN_SPILL_MIN);
386  if (sp_curr <= 0 || sp_curr > sp_id) h1->SetBinContent(BIN_SPILL_MIN, sp_id);
387  sp_curr = h1->GetBinContent(BIN_SPILL_MAX);
388  if (sp_curr < sp_id) h1->SetBinContent(BIN_SPILL_MAX, sp_id);
389  } else if (m_hist_mode[name] == MODE_UPDATE) {
390  if (it == --map_hist->end()) h1->Add(it->second); // Take the last one
391  } else { // MODE_ADD
392  h1->Add(it->second);
393  }
394  }
395  list_h1.push_back(h1);
396  }
397  TH1::AddDirectory(add_dir);
398 }
399 
400 int OnlMonClient::ReceiveHist()
401 {
402  TSocket* sock = OnlMonComm::instance()->ConnectServer();
403  if (! sock) return 1;
404 
405  int sp_lo, sp_hi;
406  switch (OnlMonComm::instance()->GetSpillMode()) {
407  case OnlMonComm::SP_ALL:
408  sp_lo = sp_hi = 0;
409  break;
410  case OnlMonComm::SP_LAST:
411  sp_lo = OnlMonComm::instance()->GetSpillNum();
412  sp_hi = 0;
413  break;
415  OnlMonComm::instance()->GetSpillRange(sp_lo, sp_hi);
416  break;
417  }
418 
419  ostringstream oss;
420  oss << "SUBSYS:" << Name() << " " << sp_lo << " " << sp_hi;
421  sock->Send(oss.str().c_str());
422 
423  ClearHistList(m_list_h1);
424 
425  int ret = 0;
426  TMessage *mess = NULL;
427  while (true) { // incoming hist
428  sock->Recv(mess);
429  if (!mess) {
430  break;
431  } else if (mess->What() == kMESS_STRING) {
432  char str[200];
433  mess->ReadString(str, 200);
434  delete mess;
435  mess = 0;
436  if (strcmp(str, "Finished") == 0) break;
437  else if (strcmp(str, "NotReady") == 0) {
438  ret = 2;
439  break;
440  }
441  } else if (mess->What() == kMESS_OBJECT) {
442  TClass* cla = mess->GetClass();
443  TObject* obj = mess->ReadObject(cla);
444  cout << " Receive: " << cla->GetName() << " " << obj->GetName() << endl;
445  m_list_h1.push_back( (TH1*)obj->Clone() ); // copy
446  delete mess;
447  mess = 0;
448  sock->Send("NEXT"); // Any text is ok for now
449  }
450  }
451  sock->Close();
452  delete sock;
453  return ret;
454 }
455 
456 void OnlMonClient::ClearHistList(HistList_t& list_h1)
457 {
458  for (HistList_t::iterator it = list_h1.begin(); it != list_h1.end(); it++) {
459  delete *it;
460  }
461  list_h1.clear();
462 }
463 
465 {
466  if (num >= m_n_can) {
467  cerr << "ERROR OnlMonClient::GetCanvas(): Num out of range (" << num << " >= " << m_n_can << "). Abort.";
468  exit(1);
469  }
470  return m_list_can[num];
471 }
472 
473 void OnlMonClient::ClearCanvasList()
474 {
475  for (int ii = 0; ii < m_n_can; ii++) {
476  if (m_list_can[ii]) {
477  delete m_list_can[ii];
478  m_list_can[ii] = 0;
479  }
480  }
481 }
482 
483 int OnlMonClient::DrawCanvas(const bool at_end)
484 {
485  m_h1_basic_id = FindMonHist("h1_basic_id");
486  m_h1_basic_cnt = FindMonHist("h1_basic_cnt");
487  int ret = FindAllMonHist();
488  if (m_h1_basic_id == 0 || m_h1_basic_cnt == 0 || ret != 0) {
489  cout << "WARNING: Cannot find OnlMon histogram(s)." << endl;
490  return 2;
491  }
492 
493  int run_id, spill_id, event_id, spill_id_min, spill_id_max;
494  GetBasicID(&run_id, &spill_id, &event_id, &spill_id_min, &spill_id_max);
495  int n_evt, n_sp;
496  GetBasicCount(&n_evt, &n_sp);
497  for (int ii = 0; ii < m_n_can; ii++) {
498  m_list_can[ii] = new OnlMonCanvas(Name(), Title(), ii);
499  m_list_can[ii]->SetBasicID(run_id, spill_id, event_id, spill_id_min, spill_id_max);
500  m_list_can[ii]->SetBasicCount(n_evt, n_sp);
501  m_list_can[ii]->PreDraw(at_end);
502  }
503 
504  ret = DrawMonitor();
505 
506  for (int ii = 0; ii < m_n_can; ii++) {
507  m_list_can[ii]->PostDraw(at_end);
508  }
509 
510  return ret;
511 }
int GetNumSpills()
Definition: OnlMonComm.h:46
virtual int EndOnlMon(PHCompositeNode *topNode)
void SetBasicCount(const int n_evt=0, const int n_sp=0)
Definition: OnlMonCanvas.cc:44
OnlMonCanvas * GetCanvas(const int num=0)
void SetBasicID(const int run_id, const int spill_id=0, const int event_id=0, const int spill_id_min=0, const int spill_id_max=0)
Definition: OnlMonCanvas.cc:35
int SendHist(TSocket *sock, int sp_min, int sp_max)
int GetSpillNum()
Definition: OnlMonComm.h:36
void GetBasicCount(int *n_evt=0, int *n_sp=0)
unsigned int nHistos() const
int process_event(PHCompositeNode *topNode)
Definition: OnlMonClient.cc:88
pthread_mutex_t * GetMutex()
Definition: OnlMonServer.h:49
virtual int InitOnlMon(PHCompositeNode *topNode)
int registerHistoManager(Fun4AllHistoManager *manager)
void AddSpill(const int id)
Definition: OnlMonComm.cc:62
void GetSpillRange(int &sp_lo, int &sp_hi)
Definition: OnlMonComm.cc:56
#define NULL
Definition: Pdb.h:9
void PreDraw(const bool at_end=false)
Definition: OnlMonCanvas.cc:80
TSocket * ConnectServer()
Definition: OnlMonComm.cc:111
int StartMonitor()
void RegisterHist(TH1 *h1, const HistMode_t mode=MODE_ADD)
int GetMaxNumSelSpills()
Definition: OnlMonComm.h:44
std::string Title()
Definition: OnlMonClient.h:73
void SetSpillSelectability(const bool val)
Definition: OnlMonComm.h:32
static std::string GetOnlMonDir()
Definition: UtilOnline.h:27
virtual const std::string Name() const
Returns the name of this module.
Definition: Fun4AllBase.h:23
virtual int FindAllMonHist()
TNamed * getHisto(const std::string &hname) const
int InitRun(PHCompositeNode *topNode)
Definition: OnlMonClient.cc:61
int dumpHistos(const std::string &filename="", const std::string &openmode="RECREATE")
virtual int ProcessEventOnlMon(PHCompositeNode *topNode)
void PostDraw(const bool at_end=false)
void GetBasicID(int *run_id=0, int *spill_id=0, int *event_id=0, int *spill_id_min=0, int *spill_id_max=0)
An SQ interface class to hold one event header.
Definition: SQEvent.h:17
void FindFullSpillRange(int &id_min, int &id_max)
Definition: OnlMonComm.cc:67
virtual int get_run_id() const
Return the run ID.
Definition: SQRun.h:32
bool registerHisto(const std::string &hname, TNamed *h1d, const int replace=0)
static OnlMonComm * instance()
Definition: OnlMonComm.cc:16
virtual OnlMonClient * Clone()
Definition: OnlMonClient.cc:49
int End(PHCompositeNode *topNode)
Called at the end of all processing.
int Init(PHCompositeNode *topNode)
Definition: OnlMonClient.cc:55
An SQ interface class to hold the run-level info.
Definition: SQRun.h:18
static bool GetClearUsFlag()
Definition: OnlMonClient.h:93
virtual ~OnlMonClient()
Definition: OnlMonClient.cc:40
static OnlMonServer * instance()
Definition: OnlMonServer.cc:34
TH1 * FindMonHist(const std::string name, const bool non_null=true)
virtual int DrawMonitor()
virtual int InitRunOnlMon(PHCompositeNode *topNode)
Base class for the OnlMon subsystem module.
Definition: OnlMonClient.h:35