XRootD
Loading...
Searching...
No Matches
XrdClJobManager.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2013 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
20#include "XrdCl/XrdClLog.hh"
23#include "XrdSys/XrdSysE2T.hh"
24
25//------------------------------------------------------------------------------
26// The thread
27//------------------------------------------------------------------------------
28extern "C"
29{
30 static void *RunRunnerThread( void *arg )
31 {
32 using namespace XrdCl;
33 JobManager *mgr = (JobManager*)arg;
34 mgr->RunJobs();
35 return 0;
36 }
37}
38
39namespace XrdCl
40{
41 //----------------------------------------------------------------------------
42 // Initialize the job manager
43 //----------------------------------------------------------------------------
45 {
46 return true;
47 }
48
49 //----------------------------------------------------------------------------
50 // Finalize the job manager, clear the queues
51 //----------------------------------------------------------------------------
53 {
54 pJobs.Clear();
55 return true;
56 }
57
58 //----------------------------------------------------------------------------
59 // Start the workers
60 //----------------------------------------------------------------------------
62 {
63 XrdSysMutexHelper scopedLock( pMutex );
64 Log *log = DefaultEnv::GetLog();
65 log->Debug( JobMgrMsg, "Starting the job manager..." );
66
67 if( pRunning )
68 {
69 log->Error( JobMgrMsg, "The job manager is already running" );
70 return false;
71 }
72
73 for( uint32_t i = 0; i < pWorkers.size(); ++i )
74 {
75 int ret = ::pthread_create( &pWorkers[i], 0, ::RunRunnerThread, this );
76 if( ret != 0 )
77 {
78 log->Error( JobMgrMsg, "Unable to spawn a job worker thread: %s",
79 XrdSysE2T( errno ) );
80 if( i > 0 )
81 StopWorkers( i );
82 return false;
83 }
84 }
85 pRunning = true;
86 log->Debug( JobMgrMsg, "Job manager started, %d workers", pWorkers.size() );
87 return true;
88 }
89
90 //----------------------------------------------------------------------------
91 // Stop the workers
92 //----------------------------------------------------------------------------
94 {
95 XrdSysMutexHelper scopedLock( pMutex );
96 Log *log = DefaultEnv::GetLog();
97 log->Debug( JobMgrMsg, "Stopping the job manager..." );
98 if( !pRunning )
99 {
100 log->Error( JobMgrMsg, "The job manager is not running" );
101 return false;
102 }
103
104 StopWorkers( pWorkers.size() );
105
106 pRunning = false;
107 log->Debug( JobMgrMsg, "Job manager stopped" );
108 return true;
109 }
110
111 //----------------------------------------------------------------------------
112 // Stop all workers up to n'th
113 //----------------------------------------------------------------------------
114 void JobManager::StopWorkers( uint32_t n )
115 {
116 Log *log = DefaultEnv::GetLog();
117 for( uint32_t i = 0; i < n; ++i )
118 {
119 void *threadRet;
120 log->Dump( JobMgrMsg, "Stopping worker #%d...", i );
121 int rc = pthread_cancel( pWorkers[i] );
122 if( rc != 0 )
123 {
124 log->Error( TaskMgrMsg, "Unable to cancel worker #%d: %s", i,
125 XrdSysE2T( errno ) );
126 if( rc == ESRCH ) continue;
127 abort();
128 }
129
130 rc = pthread_join( pWorkers[i], (void**)&threadRet );
131 if( rc != 0 )
132 {
133 log->Error( TaskMgrMsg, "Unable to join worker #%d: %s", i,
134 XrdSysE2T( errno ) );
135 if( rc == ESRCH ) continue;
136 abort();
137 }
138
139 log->Dump( JobMgrMsg, "Worker #%d stopped", i );
140 }
141 }
142
143 //----------------------------------------------------------------------------
144 // Initialize the job manager
145 //----------------------------------------------------------------------------
147 {
148 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
149 for( ;; )
150 {
151 JobHelper h = pJobs.Get();
152 pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
153 h.job->Run( h.arg );
154 pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
155 }
156 }
157}
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
A synchronized queue.
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
void RunJobs()
Run the jobs.
bool Stop()
Stop the workers.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
void Clear()
Clear the queue.
Item Get()
Get the item from the front of the queue.
const uint64_t TaskMgrMsg
const uint64_t JobMgrMsg