// @(#)root/netx:$Name: $:$Id: TXConnectionMgr.cxx,v 1.5 2005/05/08 13:49:31 rdm Exp $
// Author: Alvise Dorigo, Fabrizio Furano
/*************************************************************************
* Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// TXConnectionMgr //
// //
// Authors: Alvise Dorigo, Fabrizio Furano //
// INFN Padova, 2003 //
// //
// The Connection Manager handles socket communications for TXNetFile //
// action: connect, disconnect, read, write. It is a static object of //
// the TXNetFile class such that within a single application multiple //
// TXNetFile objects share the same connection manager. //
// The connection manager maps multiple logical connections on a single //
// physical connection. //
// There is one and only one logical connection per client (XNTetFile //
// object), and one and only one physical connection per server:port. //
// Thus multiple TXNetFile objects withing a given application share //
// the same physical TCP channel to communicate with the server. //
// This reduces the time overhead for socket creation and reduces also //
// the server load due to handling many sockets. //
// //
//////////////////////////////////////////////////////////////////////////
#include "TEnv.h"
#include "TXConnectionMgr.h"
#include "TXNetConn.h"
#include "TXDebug.h"
#include "TXMessage.h"
#include "TError.h"
using namespace std;
extern TEnv *gEnv;
ClassImp(TXConnectionMgr);
//
// This is needed to provide a hook in TXNetConn upon successful load
// of libNetx.so
//
static TXConnectionMgr gInstance;
class TXConnectionMgrInit {
public:
TXConnectionMgrInit() { TXNetConn::SetTXConnectionMgr(&gInstance); }
};
static TXConnectionMgrInit txconnectionmgr_init;
//_____________________________________________________________________________
extern "C" void * GarbageCollectorThread(void * arg)
{
// Function executed in the garbage collector thread
int i;
TXConnectionMgr *thisObj = (TXConnectionMgr *)arg;
TThread::SetCancelDeferred();
TThread::SetCancelOn();
while (!(thisObj->ThreadKilled())) {
TThread::CancelPoint();
if (!thisObj->ThreadKilled())
thisObj->GarbageCollect();
if (!thisObj->ThreadKilled()) {
for (i = 0; i < 10; i++) {
TThread::CancelPoint();
if (gSystem)
gSystem->Sleep(200);
}
}
}
return 0;
}
//____________________________________________________
TXConnectionMgr::TXConnectionMgr()
{
// TXConnectionMgr constructor.
// Used only to create the static instance when loading libNetx.
fMutex = 0;
fThreadHandler = 0;
fInitialized = kFALSE;
fThreadKilled = kFALSE;
}
//____________________________________________________
void TXConnectionMgr::Init()
{
// Full initialization of the TXConnectionMgr instance.
// Creates a Connection Manager object.
// Starts the garbage collector thread, if requested
// Do it only once
if (fInitialized) return;
// Initialization of lock mutex
if (!fMutex)
fMutex = new TMutex(kTRUE);
if (!fMutex) {
Info("Init", "Can't create mutex: out of system resources");
return;
}
//
// Garbage collector thread creation void *(*start_routine, void*)
if (gEnv->GetValue("XNet.StartGarbageCollectorThread",
DFLT_STARTGARBAGECOLLECTORTHREAD)) {
// The type of the thread func makes it a detached thread
fThreadHandler =
new TThread((TThread::VoidFunc_t) GarbageCollectorThread,this);
if (!fThreadHandler) {
Info("Init","Can't create garbage collector"
" thread: out of system resources");
return;
} else {
// Start the thread
fThreadHandler->Run();
}
}
else
if(DebugLevel() >= kHIDEBUG)
Info("Init",
"Explicitly requested not to start the garbage collector"
" thread. Are you sure?");
// Set flag
fInitialized = kTRUE;
return;
}
//_____________________________________________________________________________
TXConnectionMgr::~TXConnectionMgr()
{
// Deletes mutex locks, stops garbage collector thread.
UInt_t i=0;
{
R__LOCKGUARD(fMutex);
for (i = 0; i < fLogVec.size(); i++)
if (fLogVec[i])
Disconnect(i, kFALSE);
for (i = 0; i < fLogVec.size(); i++)
if (fLogVec[i])
delete fLogVec[i];
for (i = 0; i < fPhyVec.size(); i++)
if (fPhyVec[i])
delete fPhyVec[i];
}
if (fThreadHandler) {
fThreadKilled = kTRUE;
fThreadHandler->Kill();
}
}
//_____________________________________________________________________________
void TXConnectionMgr::GarbageCollect()
{
// Get rid of unused physical connections. 'Unused' means not used for a
// TTL time from any logical one. The TTL depends on the kind of remote
// server. For a load balancer the TTL is very high, while for a data server
// is quite small.
// Mutual exclusion on the vectors and other vars
{
R__LOCKGUARD(fMutex);
if (DebugLevel() >= kDUMPDEBUG)
Info("GarbageCollect",
"%d: collecting gargage (%d phys conn, %d log conn)",
TThread::SelfId(),fPhyVec.size(),fLogVec.size());
// We cycle all the physical connections
for (unsigned short int i = 0; i < fPhyVec.size(); i++) {
// If a single physical connection has no linked logical connections,
// then we kill it if its TTL has expired
if ( fPhyVec[i] && (GetPhyConnectionRefCount(fPhyVec[i]) <= 0) &&
fPhyVec[i]->ExpiredTTL() ) {
if (DebugLevel() >= kDUMPDEBUG)
Info("GarbageCollect", "Purging physical connection %d", i);
// Wait until the physical connection is unlocked (it may be
// in use by slow processes)
if (fPhyVec[i])
delete(fPhyVec[i]);
RemovePhyConn(fPhyVec[i]);
if (DebugLevel() >= kHIDEBUG)
Info("GarbageCollect", "Purged physical connection %d", i);
}
}
}
}
//_____________________________________________________________________________
short int TXConnectionMgr::Connect(TString RemoteAddress,
Int_t TcpPort, Int_t TcpWindowSize)
{
// Connects to the remote server:
// - Looks for an existing physical connection already bound to
// RemoteAddress:TcpPort;
// - If needed, creates a TCP channel to RemoteAddress:TcpPort
// (this is a physical connection);
// - Creates a logical connection and binds it to the previous
// created physical connection;
// - Returns the logical connection ID. Every client will use this
// ID to deal with the server.
TXLogConnection *logconn;
TXPhyConnection *phyconn;
short int newid;
Bool_t phyfound;
// First we get a new logical connection object
if (DebugLevel() >= kHIDEBUG)
Info("Connect", "Creating a logical connection...");
logconn = new TXLogConnection();
if (!logconn) {
Error("Connect","Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
if(DebugLevel() >= kDUMPDEBUG)
Info("Connect", "Getting lock...");
{
R__LOCKGUARD(fMutex);
// If we already have a physical connection to that host:port,
// then we use that
phyfound = kFALSE;
if (DebugLevel() >= kHIDEBUG)
Info("Connect","Looking for an available physical"
" connection for address [%s:%d]",
RemoteAddress.Data(), TcpPort);
for (unsigned short int i=0; i < fPhyVec.size(); i++) {
if (fPhyVec[i] && fPhyVec[i]->IsValid() &&
fPhyVec[i]->IsPort(TcpPort) &&
fPhyVec[i]->IsAddress(RemoteAddress)) {
// We link that physical connection to the new logical connection
fPhyVec[i]->Touch();
logconn->SetPhyConnection( fPhyVec[i] );
phyfound = kTRUE;
break;
}
}
}
if (!phyfound) {
if (DebugLevel() >= kHIDEBUG)
Info("Connect",
"Physical connection not found. Creating a new one...");
// If not already present, then we must build a new physical connection,
// and try to connect it
// While we are trying to connect, the mutex must be unlocked
// Note that at this point logconn is a pure local instance, so it
// does not need to be protected by mutex
phyconn = new TXPhyConnection(this);
if (!phyconn) {
Error("Connect","Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
if (phyconn &&
phyconn->Connect(RemoteAddress, TcpPort, TcpWindowSize)) {
logconn->SetPhyConnection(phyconn);
if (DebugLevel() >= kHIDEBUG)
Info("Connect", "New physical connection to server [%s:%d]"
" succesfully created.",
RemoteAddress.Data(), TcpPort);
} else
return -1;
}
// Now, we are connected to the host desired.
// The physical connection can be old or newly created
{
R__LOCKGUARD(fMutex);
// Then, if needed, we push the physical connection into its vector
if (!phyfound)
fPhyVec.push_back(phyconn);
// Then we push the logical connection into its vector
fLogVec.push_back(logconn);
// Its ID is its position inside the vector, we must return it later
newid = fLogVec.size()-1;
// Now some debug log
if (DebugLevel() >= kHIDEBUG) {
Int_t logCnt = 0, phyCnt = 0;
for (unsigned short int i=0; i < fPhyVec.size(); i++)
if (fPhyVec[i])
phyCnt++;
for (unsigned short int i=0; i < fLogVec.size(); i++)
if (fLogVec[i])
logCnt++;
Info("Connect",
"LogConn: size:%d, count:%d - PhyConn: size:%d, count:%d",
fLogVec.size(), logCnt, phyCnt, fPhyVec.size());
}
}
return newid;
}
//_____________________________________________________________________________
void TXConnectionMgr::Disconnect(short int LogConnectionID,
Bool_t ForcePhysicalDisc)
{
// Deletes a logical connection.
// Also deletes the related physical one if ForcePhysicalDisc=TRUE.
if (DebugLevel() >= kDUMPDEBUG)
Info("Disconnect", "Getting lock...");
{
R__LOCKGUARD(fMutex);
if ((UInt_t(LogConnectionID) >= fLogVec.size()) ||
(!fLogVec[LogConnectionID])) {
Error("Disconnect", "Destroying nonexistent logconnid %d.",
LogConnectionID);
return;
}
if (ForcePhysicalDisc) {
// We disconnect the phyconn, but it will be removed by the
// garbagecollector as soon as possible. Note that here we
// cannot destroy the phyconn, since there can be other logconns
// pointing to it the phyconn will be removed when there are no
// more logconns pointing to it
fLogVec[LogConnectionID]->GetPhyConnection()->SetTTL(0);
fLogVec[LogConnectionID]->GetPhyConnection()->Disconnect();
}
fLogVec[LogConnectionID]->GetPhyConnection()->Touch();
SafeDelete(fLogVec[LogConnectionID]);
RemoveLogConn(fLogVec[LogConnectionID]);
fLogVec[LogConnectionID] = 0;
if (DebugLevel() >= kDUMPDEBUG)
Info("Disconnect", "Unlocking...");
}
}
//_____________________________________________________________________________
Int_t TXConnectionMgr::ReadRaw(short int LogConnectionID, void *buffer,
Int_t BufferLength, ESendRecvOptions opt)
{
// Read BufferLength bytes from the logical connection LogConnectionID
TXLogConnection *logconn;
logconn = GetConnection(LogConnectionID);
if (logconn) {
if (DebugLevel() >= kDUMPDEBUG)
Info("ReadRaw", "Reading from logical connection %d",
LogConnectionID);
return logconn->ReadRaw(buffer, BufferLength, opt);
}
else {
Info("ReadRaw", "There's not a logical connection with id=%d",
LogConnectionID);
return(-1);
}
}
//_____________________________________________________________________________
TXMessage *TXConnectionMgr::ReadMsg(short int LogConnectionID,
ESendRecvOptions opt)
{
TXLogConnection *logconn;
TXMessage *mex;
logconn = GetConnection(LogConnectionID);
if (logconn) {
// if (DebugLevel() >= kDUMPDEBUG)
// Info("ReadMsg", "Reading from logical connection %d",
// LogConnectionID);
}
// Parametric asynchronous stuff.
// If we are going Sync, then we must build the message here,
// otherwise the messages come directly from the queue
if ( !gEnv->GetValue("XNet.GoAsynchronous", DFLT_GOASYNC) ) {
// We get a new message directly from the socket.
// The message gets inserted inside the phyconn queue
// This line of code will be moved to a reader thread
// inside TXPhyConnection
// Timeouts must not be ignored here, indeed they are an error
// because we are waiting for a message that must come quickly
mex = logconn->GetPhyConnection()->BuildXMessage(opt, kFALSE, kFALSE);
}
else {
// Now we get the message from the queue, with the timeouts needed
mex = logconn->GetPhyConnection()->ReadXMessage(LogConnectionID);
}
// Return the message unmarshalled to ClientServerCmd
return mex;
}
//_____________________________________________________________________________
Int_t TXConnectionMgr::WriteRaw(short int LogConnectionID, const void *buffer,
Int_t BufferLength, ESendRecvOptions opt)
{
// Write BufferLength bytes into the logical connection LogConnectionID
TXLogConnection *logconn;
logconn = GetConnection(LogConnectionID);
if (logconn) {
if (DebugLevel() >= kDUMPDEBUG)
Info("WriteRaw", "Writing %d bytes to logical connection %d.",
BufferLength, LogConnectionID);
return logconn->WriteRaw(buffer, BufferLength, opt);
}
else {
Info("WriteRaw", "There's not a logical connection with id=%d",
LogConnectionID);
return(-1);
}
}
//_____________________________________________________________________________
TXLogConnection *TXConnectionMgr::GetConnection(short int LogConnectionID)
{
// Return a logical connection object that has LogConnectionID as its ID.
TXLogConnection *res;
{
R__LOCKGUARD(fMutex);
res = fLogVec[LogConnectionID];
}
return res;
}
//_____________________________________________________________________________
short int TXConnectionMgr::GetPhyConnectionRefCount(TXPhyConnection *PhyConn)
{
// Return the number of logical connections bound to the physical
// one 'PhyConn'
int cnt = 0;
{
R__LOCKGUARD(fMutex);
for (unsigned short int i = 0; i < fLogVec.size(); i++)
if (fLogVec[i] &&
(fLogVec[i]->GetPhyConnection() == PhyConn))
cnt++;
}
return cnt;
}
//_____________________________________________________________________________
Bool_t TXConnectionMgr::ProcessUnsolicitedMsg(TXUnsolicitedMsgSender *sender,
TXMessage *unsolmsg)
{
// We are here if an unsolicited response comes from a physical connection
// The response comes in the form of an TXMessage *, that must NOT be
// destroyed after processing. It is destroyed by the first sender.
// Remember that we are in a separate thread, since unsolicited responses
// are asynchronous by nature.
Info("Write", "Processing unsolicited response");
// Local processing ....
// Now we propagate the message to the interested objects.
// In our architecture, the interested objects are the objects which
// self-registered in the logical connections belonging to the Phyconn
// which threw the event
// So we throw the evt towards each logical connection
{
R__LOCKGUARD(fMutex);
for (unsigned short int i = 0; i < fLogVec.size(); i++)
if (fLogVec[i] &&
(fLogVec[i]->GetPhyConnection() == sender)) {
fLogVec[i]->ProcessUnsolicitedMsg(sender, unsolmsg);
}
}
return kTRUE;
}
//_____________________________________________________________________________
void TXConnectionMgr::RemoveLogConn(TXLogConnection *logc)
{
// Remove logc from the list of logical connections
{
R__LOCKGUARD(fMutex);
vector<TXLogConnection *>::iterator i;
for (i = fLogVec.begin(); i != fLogVec.end(); ++i) {
if (logc == *i) {
fLogVec.erase(i);
break;
}
}
}
}
//_____________________________________________________________________________
void TXConnectionMgr::RemovePhyConn(TXPhyConnection *phyc)
{
// Remove phyc from the list of physical connections
{
R__LOCKGUARD(fMutex);
vector<TXPhyConnection *>::iterator i;
for (i = fPhyVec.begin(); i != fPhyVec.end(); ++i) {
if (phyc == *i) {
fPhyVec.erase(i);
break;
}
}
}
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.