// @(#)root/netx:$Name: $:$Id: TXPhyConnection.cxx,v 1.6 2005/01/05 01:55:13 rdm Exp $
// Author: Alvise Dorigo, Fabrizio Furano
/*************************************************************************
* Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// TXPhyConnection //
// //
// Class handling physical connections to xrootd servers. //
// //
//////////////////////////////////////////////////////////////////////////
#include "TXPhyConnection.h"
#include "TError.h"
#include "TXDebug.h"
#include "TXMessage.h"
#include "TString.h"
#include "TEnv.h"
#include "TROOT.h"
#include "TSystem.h"
#include "TThread.h"
#include "TApplication.h"
#include "Riostream.h"
ClassImp(TXPhyConnection);
//____________________________________________________________________________
TThread::VoidFunc_t SocketReaderThread(void * arg)
{
// This thread is the base for the async capabilities of TXPhyConnection
// It repeatedly keeps reading from the socket, while feeding the
// MsqQ with a stream of TXMessages containing what's happening
// at the socket level
TXPhyConnection *thisObj;
if (DebugLevel() >= kHIDEBUG)
Info("SocketReaderThread", "Reader Thread starting");
// It should be possible to cancel the thread as soon as the
// cancellation request is received.
TThread::SetCancelAsynchronous();
TThread::SetCancelOn();
if (DebugLevel() >= kHIDEBUG)
Info("SocketReaderThread", "Reader Thread started");
thisObj = (TXPhyConnection *)arg;
// Set running state ...
thisObj->ReaderStarted();
while (!(thisObj->ReaderThreadKilled())) {
thisObj->BuildXMessage(kDefault, kTRUE, kTRUE);
if (!thisObj->ReaderThreadKilled())
thisObj->CheckAutoTerm();
}
if (DebugLevel() >= kHIDEBUG)
Info("SocketReaderThread","Reader Thread exiting");
return 0;
}
//____________________________________________________________________________
TXPhyConnection::TXPhyConnection(TXAbsUnsolicitedMsgHandler *h)
{
// Constructor
// Initialization of lock mutex
if (!(fRwMutex = new TMutex(kTRUE)))
Error("TXPhyConnection",
"can't create mutex for read/write: out of system resources");
if (!(fMutex = new TMutex(kTRUE)))
Error("TXPhyConnection",
"can't create mutex for local locks: out of system resources");
Touch();
fServer = kUnknown;
SetLogged(kNo);
fRequestTimeout = gEnv->GetValue("XNet.RequestTimeout",
DFLT_REQUESTTIMEOUT);
UnsolicitedMsgHandler = h;
fReaderthreadhandler = 0;
fReaderthreadrunning = kFALSE;
fReaderthreadkilled = kFALSE;
fReaderCV = new TCondition();
}
//____________________________________________________________________________
TXPhyConnection::~TXPhyConnection()
{
// Destructor
Disconnect();
SafeDelete(fRwMutex);
SafeDelete(fReaderCV);
}
//____________________________________________________________________________
Bool_t TXPhyConnection::Connect(TString TcpAddress, Int_t TcpPort,
Int_t TcpWindowSize)
{
// Connect to remote server
R__LOCKGUARD(fMutex);
if (DebugLevel() >= kHIDEBUG)
Info("Connect", "Connecting to [%s:%d]", TcpAddress.Data(), TcpPort);
fSocket = new TXSocket(TcpAddress, TcpPort, TcpWindowSize);
if(!fSocket) {
Error("Connect","Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
fSocket->TryConnect();
if (!fSocket->IsValid()) {
Error("Connect",
"can't open connection to xrootd/rootd on host [%s:%d]",
TcpAddress.Data(), TcpPort);
Disconnect();
return kFALSE;
}
fSocket->SetOption(kNoDelay, 1);
Touch();
fTTLsec = DATA_TTL;
if (DebugLevel() >= kHIDEBUG)
Info("Connect", "Connected to host [%s:%d].",
TcpAddress.Data(), TcpPort);
fRemoteAddress = TcpAddress;
fRemotePort = TcpPort;
fReaderthreadrunning = kFALSE;
return kTRUE;
}
//____________________________________________________________________________
void TXPhyConnection::StartReader()
{
// Start reader thread
bool running;
{
R__LOCKGUARD(fMutex);
running = fReaderthreadrunning;
}
// Parametric asynchronous stuff.
// If we are going Sync, then nothing has to be done,
// otherwise the reader thread must be started
if ( !running &&
gEnv->GetValue("XNet.GoAsynchronous", DFLT_GOASYNC) ) {
if (DebugLevel() >= kHIDEBUG)
Info("StartReader", "Starting reader thread...");
// Now we launch the reader thread
fReaderthreadhandler =
new TThread((TThread::VoidFunc_t) SocketReaderThread, this);
if (!fReaderthreadhandler)
Info("StartReader",
"Can't create reader thread: out of system resources");
else {
// Start the thread
fReaderthreadhandler->Run();
// Make sure that it is really running
do {
{
R__LOCKGUARD(fMutex);
running = fReaderthreadrunning;
}
if (!running) {
if (DebugLevel() >= kHIDEBUG)
Info("StartReader","Waiting a little bit ...");
fReaderCV->TimedWaitRelative(100);
}
} while (!running);
}
}
}
//____________________________________________________________________________
void TXPhyConnection::ReaderStarted()
{
// Called inside SocketReaderThread to flag the running status
// of the thread
R__LOCKGUARD(fMutex);
fReaderthreadrunning = kTRUE;
fReaderCV->Signal();
}
//____________________________________________________________________________
Bool_t TXPhyConnection::ReConnect(TString TcpAddress, Int_t TcpPort,
Int_t TcpWindowSize)
{
// Re-connection attempt
Disconnect();
return Connect(TcpAddress, TcpPort, TcpWindowSize);
}
//____________________________________________________________________________
TSocket *TXPhyConnection::SaveSocket()
{
// Return copy of the TSocket part of the existing socket
// Used to save an open connection to rootd daemons
TSocket *opensock = 0;
if (fSocket) {
if (fReaderthreadrunning) {
fReaderthreadkilled = kTRUE;
fReaderthreadhandler->Kill();
}
// Extract TSocket part of TXSocket
opensock = fSocket->ExtractSocket();
// Signal deactivation
fTTLsec = 0;
}
return opensock;
}
//____________________________________________________________________________
void TXPhyConnection::Disconnect()
{
// Terminate connection
if (fReaderthreadrunning) {
fReaderthreadkilled = kTRUE;
fReaderthreadhandler->Kill();
}
fSocket = 0;
}
//____________________________________________________________________________
void TXPhyConnection::Touch()
{
// Set last-use-time to present time
R__LOCKGUARD(fMutex);
time_t t = time(0);
if (DebugLevel() >= kDUMPDEBUG)
Info("Touch", "Setting 'fLastUseTimestamp' to current time: %d",t);
fLastUseTimestamp = t;
}
//____________________________________________________________________________
Int_t TXPhyConnection::ReadRaw(void *buf, Int_t len, ESendRecvOptions opt)
{
// Receive 'len' bytes from the connected server and store them in 'buf'.
// Return number of bytes received.
Int_t res;
Touch();
if (IsValid()) {
if (DebugLevel() >= kDUMPDEBUG)
Info("ReadRaw", "Reading from socket: %d [%s:%d]",
fSocket->GetDescriptor(), fRemoteAddress.Data(), fRemotePort);
res = fSocket->RecvRaw(buf, len, opt);
if ((res <= 0) && (res != TXSOCK_ERR_TIMEOUT) && fReaderthreadkilled &&
(DebugLevel() >= kHIDEBUG)) {
Info("ReadRaw", "Reader thread has been killed");
return res;
}
if ((res <= 0) && (res != TXSOCK_ERR_TIMEOUT) &&
(DebugLevel() >= kHIDEBUG) && (gSystem->GetErrno()) )
Info("ReadRaw", "Read error [%s:%d]. Errno %d:'%s'.",
fRemoteAddress.Data(), fRemotePort, gSystem->GetErrno(),
gSystem->GetError() );
// If a socket error comes, then we disconnect (and destroy the fSocket)
// but we have not to disconnect in the case of a timeout
if (((res <= 0) && (res != TXSOCK_ERR_TIMEOUT)) ||
(!fSocket->IsValid())) {
if (DebugLevel() >= kHIDEBUG)
Info("ReadRaw",
"Socket reported a disconnection (server[%s:%d]). Closing.",
fRemoteAddress.Data(), fRemotePort);
Disconnect();
}
Touch();
return res;
}
else {
// Socket already destroyed or disconnected
if (DebugLevel() >= kDUMPDEBUG)
Info("ReadRaw", "Socket is disconnected (server [%s:%d])",
fRemoteAddress.Data(), fRemotePort);
return TXSOCK_ERR;
}
}
//____________________________________________________________________________
TXMessage *TXPhyConnection::ReadXMessage(Int_t streamid)
{
// Gets a full loaded TXMessage from this phyconn.
// May be a pure msg pick from a queue
Touch();
return fMsgQ.GetMsg(streamid, fRequestTimeout );
}
//____________________________________________________________________________
TXMessage *TXPhyConnection::BuildXMessage(ESendRecvOptions opt,
Bool_t IgnoreTimeouts, Bool_t Enqueue)
{
// Builds an TXMessage, and makes it read its header/data from the socket
// Also put automatically the msg into the queue
TXMessage *m;
m = new TXMessage();
if (!m) {
Error("BuildXMessage", "Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
m->ReadRaw(this, opt);
if (fReaderthreadkilled) {
if (m) delete m;
return (TXMessage *)0;
}
if (m->IsAttn()) {
// Here we insert the PhyConn-level support for unsolicited responses
// Some of them will be propagated in some way to the upper levels
// TLogConn, TConnMgr, TXNetConn
HandleUnsolicited(m);
// The purpose of this message ends here
delete m;
m = 0;
}
else
if (Enqueue) {
// If we have to ignore the socket timeouts, then we have not to
// feed the queue with them. In this case, the newly created TXMessage
// has to be freed.
bool waserror;
if (IgnoreTimeouts) {
if (m->GetStatusCode() != TXMessage::kXMSC_timeout) {
waserror = m->IsError();
fMsgQ.PutMsg(m);
if (waserror)
for (int kk=0; kk < 10; kk++)
fMsgQ.PutMsg(0);
} else {
delete m;
m = 0;
}
} else
fMsgQ.PutMsg(m);
}
return m;
}
//____________________________________________________________________________
void TXPhyConnection::CheckAutoTerm()
{
// Check if auto-termination is needed
Bool_t doexit = kFALSE;
{
R__LOCKGUARD(fMutex);
// Parametric asynchronous stuff
// If we are going async, we might be willing to term ourself
if (!IsValid() && gEnv->GetValue("XNet.GoAsynchronous", DFLT_GOASYNC)) {
if (TThread::SelfId() == fReaderthreadhandler->GetId()) {
// Notify termination, if requested
if (DebugLevel() >= kHIDEBUG)
Info("CheckAutoTerm", "self-Cancelling reader thread.");
// Reset thread handlers (real termination will be done
// at ::Exit() )
fReaderthreadhandler = 0;
fReaderthreadrunning = kFALSE;
// Destroy the socket
delete fSocket;
fSocket = 0;
// We are going to exit
doexit = kTRUE;
}
}
}
// Now exit, if requested
if (doexit) {
UnlockChannel();
TThread::Exit(0);
}
}
//____________________________________________________________________________
void TXPhyConnection::HandleUnsolicited(TXMessage *m)
{
// Local processing of unsolicited responses is done here
Bool_t ProcessingToGo = kTRUE;
struct ServerResponseBody_Attn *attnbody;
Touch();
// Local processing of the unsolicited TXMessage
attnbody = (struct ServerResponseBody_Attn *)m->GetData();
if (attnbody) {
switch (attnbody->actnum) {
case kXR_asyncms:
// A message arrived from the server. Let's print it.
Info("HandleUnsolicited",
"Message from server at socket %d[%s:%d]: '%s'.",
fSocket->GetDescriptor(), fRemoteAddress.Data(),
fRemotePort, attnbody->parms);
ProcessingToGo = kFALSE;
break;
}
}
// Now we propagate the message to the interested object, if any
// It could be some sort of upper layer of the architecture
if (ProcessingToGo)
SendUnsolicitedMsg(this, m);
}
//____________________________________________________________________________
Int_t TXPhyConnection::WriteRaw(const void *buf, Int_t len, ESendRecvOptions opt)
{
// Send 'len' bytes located at 'buf' to the connected server.
// Return number of bytes sent.
Int_t res;
Touch();
if (IsValid()) {
if (DebugLevel() >= kDUMPDEBUG)
Info("WriteRaw", "Writing to socket %d[%s:%d]",
fSocket->GetDescriptor(), fRemoteAddress.Data(), fRemotePort);
res = fSocket->SendRaw(buf, len, opt);
if ((res <= 0) && (res != TXSOCK_ERR_TIMEOUT) &&
(DebugLevel() >= kHIDEBUG) && (gSystem->GetErrno()))
Info("WriteRaw", "Write error [%s:%d]. Errno: %d:'%s'.",
fRemoteAddress.Data(), fRemotePort, gSystem->GetErrno(),
gSystem->GetError() );
// If a socket error comes, then we disconnect (and destroy the fSocket)
if ((res < 0) || (!fSocket->IsValid())) {
if (DebugLevel() >= kHIDEBUG)
Info("WriteRaw",
"Socket reported a disconnection (server[%s:%d]). Closing.",
fRemoteAddress.Data(), fRemotePort);
Disconnect();
}
Touch();
return( res );
}
else {
// Socket already destroyed or disconnected
if (DebugLevel() >= kDUMPDEBUG)
Info("WriteRaw", "Socket is disconnected (server [%s:%d])",
fRemoteAddress.Data(), fRemotePort);
return TXSOCK_ERR;
}
}
//____________________________________________________________________________
UInt_t TXPhyConnection::GetBytesSent()
{
// Return number of bytes sent
if (IsValid())
return fSocket->GetBytesSent();
else {
// Socket already destroyed or disconnected
if (DebugLevel() >= kDUMPDEBUG)
Info("GetBytesSent",
"Socket is disconnected (server [%s:%d])",
fRemoteAddress.Data(), fRemotePort);
return 0;
}
}
//____________________________________________________________________________
UInt_t TXPhyConnection::GetBytesRecv()
{
// Return number of bytes received
if (IsValid())
return fSocket->GetBytesRecv();
else {
// Socket already destroyed or disconnected
if(DebugLevel() >= kDUMPDEBUG)
Info("GetBytesRecv",
"Socket is disconnected (server [%s:%d])",
fRemoteAddress.Data(), fRemotePort);
return 0;
}
}
//____________________________________________________________________________
UInt_t TXPhyConnection::GetSocketBytesSent()
{
// Return number of bytes sent
if (IsValid())
return fSocket->GetSocketBytesSent();
else {
// Socket already destroyed or disconnected
if(DebugLevel() >= kDUMPDEBUG)
Info("GetSocketBytesSent",
"Socket is disconnected (server [%s:%s])",
fRemoteAddress.Data(), fRemotePort);
return 0;
}
}
//____________________________________________________________________________
UInt_t TXPhyConnection::GetSocketBytesRecv()
{
// Return number of bytes received
if (IsValid())
return fSocket->GetSocketBytesRecv();
else {
// Socket already destroyed or disconnected
if(DebugLevel() >= kDUMPDEBUG)
Info("GetSocketBytesRecv",
"Socket is disconnected (server [%s:%s])",
fRemoteAddress.Data(), fRemotePort);
return 0;
}
}
//____________________________________________________________________________
Bool_t TXPhyConnection::ExpiredTTL()
{
// Check expiration time
return( (time(0) - fLastUseTimestamp) > fTTLsec ? kTRUE : kFALSE);
}
//____________________________________________________________________________
void TXPhyConnection::LockChannel()
{
// Lock
fRwMutex->Lock();
}
//____________________________________________________________________________
void TXPhyConnection::UnlockChannel()
{
// Unlock
fRwMutex->UnLock();
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.