Class Reference for E1039 Core & Analysis Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
OnlMonServer.cc
Go to the documentation of this file.
1 #include <string>
2 #include <sstream>
3 #include <pthread.h>
4 #include <TMessage.h>
5 #include <TServerSocket.h>
6 #include <TSocket.h>
7 #include <TROOT.h>
8 #include <TH1.h>
9 #include "OnlMonComm.h"
10 #include "OnlMonClient.h"
11 #include "OnlMonServer.h"
12 
13 using namespace std;
14 
17 //#define ROOTTHREAD
18 #ifndef ROOTTHREAD
19 #define SERVER
20 #endif
21 #ifdef SERVER
22 int ServerThread = 0;
23 #endif
24 #ifdef ROOTTHREAD
25 static TThread *ServerThread = NULL;
26 #endif
27 
28 //std::string OnlMonServer::m_out_dir = "/data2/e1039/onlmon/plots";
29 std::string OnlMonServer::m_mon_host = "localhost";
30 int OnlMonServer::m_mon_port = 9081;
31 int OnlMonServer::m_mon_port_0 = 9081;
32 int OnlMonServer::m_mon_n_port = 5;
33 
35 {
36  if (! __instance)
37  {
38  __instance = new OnlMonServer();
39  }
40  OnlMonServer *onlmonserver = dynamic_cast<OnlMonServer *> (__instance);
41  return onlmonserver;
42 }
43 
44 OnlMonServer::OnlMonServer(const std::string &name)
45  : Fun4AllServer(name)
46  , m_is_online(true)
47  , m_go_end(false)
48  , m_svr_ready(false)
49 {
50  pthread_mutexattr_t mutex_attr;
51  pthread_mutexattr_init(&mutex_attr);
52  pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
53 
54  int ret = pthread_mutex_init(&mutex, &mutex_attr);
55  if (ret != 0) {
56  cout << "WARNING: pthread_mutex_init() returned " << ret << "." << endl;
57  }
58  return;
59 }
60 
62 {
63  return;
64 }
65 
67 {
68  cout << "OnlMonServer::StartServer(): Start." << endl;
69  // gBenchmark->Start("phnxmon");
70 
71  //GetMutex(mutex);
72  //pthread_mutex_lock(&mutex);
73 #if defined(SERVER) || defined(ROOTTHREAD)
74 
75  pthread_t ThreadId = 0;
76  if (!ServerThread)
77  {
78  if (Verbosity() > 2) cout << " Creating server thread." << endl;
79 #ifdef SERVER
80  ServerThread = pthread_create(&ThreadId, NULL, FuncServer, this);
81  SetThreadId(ThreadId);
82 #endif
83 #ifdef ROOTTHREAD
84  ServerThread = new TThread(server, (void *)0);
85  ServerThread->Run();
86 #endif
87  }
88 
89 #endif
90  //pthread_mutex_unlock(&mutex);
91  //if (Verbosity() > 1) cout << "OnlMonServer::StartServer(): finish." << endl;
92  cout << " Started." << endl;
93  return;
94 }
95 
97 {
98  m_go_end = true;
99  int ret = Fun4AllServer::End();
100  for (int ii = 0; ii < 30; ii++) { // Wait for one minute at max
101  if (m_svr_ready) break;
102  sleep(2);
103  }
104  return ret;
105 
106 //#if defined(SERVER) || defined(ROOTTHREAD)
107 // if (! serverthreadid) return;
108 // SetGoEnd(true);
109 //
110 //#ifdef SERVER
111 // pthread_join(serverthreadid, 0);
112 // cout << " Joined!" << endl;
113 //#endif
114 //#ifdef ROOTTHREAD
115 // ServerThread->Join(server); // not supported
116 //#endif
117 //
118 //#endif
119 }
120 
122 
128 {
129  TSocket sock(m_mon_host.c_str(), port);
130  if (sock.IsValid()) {
131  cout << " Close the existing onlmon server at " << port << "." << endl;
132  sock.Send("Suicide");
133  //TMessage *mess = 0;
134  //sock.Recv(mess); // Just check a response.
135  //delete mess;
136  return true;
137  } else {
138  //cout << "No onlmon server exists." << endl;
139  return false;
140  }
141 }
142 
143 void* OnlMonServer::FuncServer(void* arg)
144 {
145  OnlMonServer* se = (OnlMonServer*)arg;
146  if (se->Verbosity() >= 0) cout << "OnlMonServer::FuncServer(): start." << endl;
147 
148  sleep(5);
149  TServerSocket* ss = 0;
150  int port;
151  for (port = m_mon_port_0; port < m_mon_port_0 + m_mon_n_port; port++) {
152  if (se->CloseExistingServer(port)) continue; // Not try to use the port closed now
153  ss = new TServerSocket(port, kTRUE);
154  if (ss->IsValid()) break;
155  delete ss;
156  ss = 0;
157  }
158  if (! ss) {
159  cout << "Too many online-monitor servers are running. Start none." << endl;
160  return 0;
161  }
162  m_mon_port = port;
163  cout << " Port = " << port << endl;
164 
165  // root keeps a list of sockets and tries to close them when quitting.
166  // this interferes with my own threading and makes valgrind crash
167  // The solution is to remove the TServerSocket *ss from roots list of
168  // sockets. Then it will leave this socket alone.
169 
170  if (se->Verbosity() >= 0) cout << "OnlMonServer::RemoveSockets():" << endl;
171  int isock = gROOT->GetListOfSockets()->IndexOf(ss);
172  gROOT->GetListOfSockets()->RemoveAt(isock);
173  sleep(5);
174  se->SetServerReady(true);
175 
176  again:
177  if (se->Verbosity() >= 0) cout << "OnlMonServer::WaitForConnection():" << endl;
178  TSocket *s0 = ss->Accept();
179  if (!s0) {
180  cout << "Server socket " << port << " in use, either go to a different node or change the port and recompile server and client. Abort." << endl;
181  exit(1);
182  }
183  // mutex protected since writing of histo
184  // to outgoing buffer and updating by other thread do not
185  // go well together
186  TInetAddress adr = s0->GetInetAddress();
187  if (se->Verbosity() >= 0) {
188  cout << "Connection from " << adr.GetHostName() << "/" << adr.GetHostAddress() << ":" << adr.GetPort() << endl;
189  }
190  UInt_t ip0 = adr.GetAddress();
191  if ((ip0 >> 16) == (192 << 8) + 168 || ip0 == (127 << 24) + 1) {
192  se->HandleConnection(s0);
193  } else {
194  cout << "OnlMonServer::FuncServer(): Ignore a connection from WAN.\n ";
195  adr.Print();
196  }
197 
198  delete s0;
199  //s0->Close();
200  if (se->GetGoEnd()) {
201  cout << "OnlMonServer::FuncServer(): End." << endl;
202  return 0;
203  }
204  goto again;
205 }
206 
208 {
209  /*
210  int val;
211  sock->GetOption(kSendBuffer, val);
212  printf("sendbuffer size: %d\n", val);
213  sock->GetOption(kRecvBuffer, val);
214  printf("recvbuffer size: %d\n", val);
215  */
216  TMessage *mess = NULL;
217  while (1) {
218  if (Verbosity() > 2) cout << "OnlMonServer::HandleConnection(): while loop." << endl;
219  sock->Recv(mess);
220  if (! mess) {
221  if (Verbosity() > 2) cout << " Broken Connection, closing socket." << endl;
222  break;
223  }
224  if (m_go_end) {
225  if (Verbosity() > 2) cout << " Already going to end, closing socket." << endl;
226  break;
227  }
228 
229  if (mess->What() == kMESS_STRING) {
230  char msg_str_c[64];
231  mess->ReadString(msg_str_c, 64);
232  string msg_str = msg_str_c;
233  delete mess;
234  mess = 0;
235  if (Verbosity() > 2) cout << " Received message: " << msg_str << endl;
236 
237  if (msg_str == "Finished") {
238  break;
239  } else if (msg_str == "Suicide") {
240  cout << "OnlMonServer::HandleConnection(): Suicide." << endl;
241  sock->Send("OK");
242  m_go_end = true;
243  break;
244  } else if (msg_str == "Ping") {
245  if (Verbosity() > 2) cout << " Ping." << endl;
246  sock->Send("Pong");
247  } else if (msg_str == "Spill") {
248  if (Verbosity() > 2) cout << " Spill." << endl;
249  int id_min, id_max;
251  comm->FindFullSpillRange(id_min, id_max);
252  ostringstream oss;
253  oss << id_min << " " << id_max << " " << comm->GetSpillSelectability();
254  sock->Send(oss.str().c_str());
255  } else if (msg_str.substr(0, 7) == "SUBSYS:") {
256  istringstream iss(msg_str.substr(7));
257  string name_subsys;
258  int sp_min, sp_max;
259  iss >> name_subsys >> sp_min >> sp_max;
260  cout << " Subsystem " << name_subsys << endl;
261  SubsysReco* sub = getSubsysReco(name_subsys);
262  if (! sub) {
263  cout << " ... Not available." << endl;
264  sock->Send("NotReady");
265  } else {
266  OnlMonClient* cli = dynamic_cast<OnlMonClient*>(sub);
267  cli->SendHist(sock, sp_min, sp_max);
268  }
269  } else {
270  //if (Verbosity() > 2)
271  cout << " Unexpected string message (" << msg_str << "). Ignore it." << endl;
272  break;
273  }
274  } else {
275  cerr << "OnlMonServer::HandleConnection(): Unexpected message ("
276  << mess->What() << "). Ignore it." << endl;
277  delete mess;
278  mess = 0;
279  break;
280  }
281  }
282 
283  sock->Close();
284  return;
285 }
bool GetSpillSelectability()
Definition: OnlMonComm.h:33
virtual int End()
OnlMonServer(const std::string &name="OnlMonServer")
Definition: OnlMonServer.cc:44
void HandleConnection(TSocket *sock)
int SendHist(TSocket *sock, int sp_min, int sp_max)
int ServerThread
Definition: OnlMonServer.cc:22
Fun4AllServer * se
virtual ~OnlMonServer()
Definition: OnlMonServer.cc:61
#define NULL
Definition: Pdb.h:9
bool GetGoEnd()
Definition: OnlMonServer.h:44
void SetThreadId(pthread_t &id)
Definition: OnlMonServer.h:51
void SetServerReady(const bool val)
Definition: OnlMonServer.h:45
virtual int Verbosity() const
Gets the verbosity of this module.
Definition: Fun4AllBase.h:64
bool CloseExistingServer(const int port)
Close an existing server process if such exists.
void FindFullSpillRange(int &id_min, int &id_max)
Definition: OnlMonComm.cc:67
void StartServer()
Definition: OnlMonServer.cc:66
static OnlMonComm * instance()
Definition: OnlMonComm.cc:16
SubsysReco * getSubsysReco(const std::string &name)
virtual void Verbosity(const int ival)
Sets the verbosity of this module (0 by default=quiet).
Definition: Fun4AllBase.h:58
static OnlMonServer * instance()
Definition: OnlMonServer.cc:34
static void * FuncServer(void *arg)
pthread_mutex_t mutex
Definition: OnlMonServer.h:58
Base class for the OnlMon subsystem module.
Definition: OnlMonClient.h:35