SpatialOps
ThreadPool.cpp
1 #include <spatialops/ThreadPool.h>
2 
3 namespace SpatialOps {
4 
5  //===========================================================================
6 
7  class ThreadPoolResourceManager {
8  ThreadPoolResourceManager(){};
9  ~ThreadPoolResourceManager(){};
10 
11  public:
12  static ThreadPoolResourceManager& self();
13 
14  template<class VoidType> static bool insert( VoidType& rID, int threads );
15  template<class VoidType> static bool remove( VoidType& rID, const int threads );
16  template<class VoidType> static int resize( VoidType& rID, int threads );
17  template<class VoidType> static int resize_active( VoidType& rID, const int threads );
18  template<class VoidType> static int get_worker_count( VoidType& rID );
19  template<class VoidType> static int get_max_active_worker_count( VoidType& rID );
20 
21  private:
22 
23  typedef std::map<void*, int> ResourceMap;
24  typedef std::map<void*,int>::iterator ResourceIter;
25  ResourceMap resourceMap_;
26 
27  class ExecutionMutex{
28  const boost::mutex::scoped_lock lock;
29  inline boost::mutex& get_mutex() const { static boost::mutex m; return m; }
30  public:
31  ExecutionMutex() : lock( get_mutex() ){}
32  ~ExecutionMutex(){}
33  };
34  };
35 
36  ThreadPoolResourceManager& ThreadPoolResourceManager::self()
37  {
38  static ThreadPoolResourceManager tprm;
39  return tprm;
40  }
41 
42  template<class VoidType>
43  bool ThreadPoolResourceManager::insert( VoidType& rID, int threads )
44  {
45  ExecutionMutex lock;
46  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
47  if( threads < 1 ) { threads = 1; }
48  //Make sure we don't have the threadpool
49  ResourceIter rit = tprm.resourceMap_.find(&rID);
50  if ( rit == tprm.resourceMap_.end() ){
51  tprm.resourceMap_.insert(std::make_pair(&rID, threads));
52  } else {
53  printf("Warning: attempting to insert a ThreadPool that already exists!\n");
54  return false;
55  }
56  return true;
57  }
58 
59  template<class VoidType>
60  bool ThreadPoolResourceManager::remove( VoidType& rID, const int threads )
61  {
62  ExecutionMutex lock;
63  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
64  ResourceIter rit = tprm.resourceMap_.find(&rID);
65  if ( rit != tprm.resourceMap_.end() ){
66  tprm.resourceMap_.erase(rit);
67  } else {
68  printf("Warning: attempting to remove ThreadPool that does not exist!\n");
69  return false;
70  }
71  return true;
72  }
73 
74  template<class VoidType>
75  int ThreadPoolResourceManager::resize( VoidType& rID, int threads )
76  {
77  ExecutionMutex lock;
78  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
79  //Make sure we have the threadpool
80  ResourceIter rit = tprm.resourceMap_.find(&rID);
81  if( rit == tprm.resourceMap_.end() ) {
82  fprintf(stderr, "Error: ThreadPool does not exist!\n");
83  return -1;
84  }
85 
86  //Fast exit
87  if( rit->second == threads ) { return threads; }
88 
89  //Connect the right resource interface
90  VoidType* resource = (VoidType*)rit->first;
91 
92  if( threads < 1 ) { threads = 1; }
93  rit->second = threads;
94  resource->size_controller().resize(threads);
95  return threads;
96  }
97 
98  template<class VoidType>
99  int ThreadPoolResourceManager::resize_active( VoidType& rID, const int threads )
100  {
101  ExecutionMutex lock;
102  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
103  //Make sure we have the threadpool
104  ResourceIter rit = tprm.resourceMap_.find(&rID);
105  if( rit == tprm.resourceMap_.end() ) {
106  fprintf(stderr, "Error: ThreadPool does not exist!\n");
107  return -1;
108  }
109  //Connect the right resource interface
110  VoidType* resource = (VoidType*)rit->first;
111  resource->size_controller().resize_active(std::max(1,threads));
112  return threads;
113  }
114 
115  template<class VoidType>
116  int ThreadPoolResourceManager::get_worker_count( VoidType& rID )
117  {
118  ExecutionMutex lock;
119  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
120  ResourceIter rit = tprm.resourceMap_.find(&rID);
121  if( rit == tprm.resourceMap_.end() ) {
122  fprintf(stderr, "Error: Threadpool does not exist!\n");
123  return -1;
124  }
125  VoidType* resource = (VoidType*)rit->first;
126  return resource->size();
127  }
128 
129  template<class VoidType>
130  int ThreadPoolResourceManager::get_max_active_worker_count( VoidType& rID )
131  {
132  ExecutionMutex lock;
133  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
134  //Make sure we have the threadpool
135  ResourceIter rit = tprm.resourceMap_.find(&rID);
136  if( rit == tprm.resourceMap_.end() ) {
137  fprintf(stderr, "Error: ThreadPool does not exist!\n");
138  return -1;
139  }
140 
141  //Connect the right resource interface
142  VoidType* resource = (VoidType*)rit->first;
143  return resource->max_active();
144  }
145 
146  //===========================================================================
147 
148  ThreadPool::ThreadPool( const int nthreads )
149  : boost::threadpool::prio_pool( nthreads )
150  {
151  init_ = false;
152  }
153 
154  ThreadPool::~ThreadPool()
155  {}
156 
157  int ThreadPool::resize_pool( const int threadCount ){
158  return ThreadPoolResourceManager::resize_active(self(),threadCount);
159  }
161  return ThreadPoolResourceManager::get_max_active_worker_count(self());
162  }
163 
164  int ThreadPool::set_pool_capacity( const int threadCount ){
165  return ThreadPoolResourceManager::resize(self(),threadCount);
166  }
168  return ThreadPoolResourceManager::get_worker_count(self());
169  }
170 
172  {
173  static ThreadPool tp(NTHREADS);
174  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
175  if( tp.init_ == false ){
176  tprm.insert<boost::threadpool::prio_pool>(tp, NTHREADS);
177  tp.init_ = true;
178  }
179  return tp;
180  }
181 
182  //===========================================================================
183 
184  ThreadPoolFIFO::ThreadPoolFIFO( const int nthreads )
185  : boost::threadpool::fifo_pool( nthreads )
186  {
187  init_ = false;
188  }
189 
190  ThreadPoolFIFO::~ThreadPoolFIFO()
191  {}
192 
195  {
196  static ThreadPoolFIFO tp( NTHREADS );
197  ThreadPoolResourceManager& tprm = ThreadPoolResourceManager::self();
198  if( tp.init_ == false ){
199  tprm.insert<boost::threadpool::fifo_pool>( tp, NTHREADS );
200  tp.init_ = true;
201  }
202  return tp;
203  }
204 
205  int ThreadPoolFIFO::resize_pool( const int threadCount ){
206  return ThreadPoolResourceManager::resize_active(self(),threadCount);
207  }
209  return ThreadPoolResourceManager::get_max_active_worker_count(self());
210  }
211 
212  int ThreadPoolFIFO::set_pool_capacity( const int threadCount ){
213  return ThreadPoolResourceManager::resize(self(),threadCount);
214  }
216  return ThreadPoolResourceManager::get_worker_count(self());
217  }
218 
219  //===========================================================================
220 
221 } // namespace SpatialOps
static int get_pool_size()
Definition: ThreadPool.cpp:160
Wrapper for a FIFO thread pool.
Definition: ThreadPool.h:80
static int get_pool_capacity()
Definition: ThreadPool.cpp:167
static int resize_pool(const int threadCount)
set the number of active worker threads in the pool.
Definition: ThreadPool.cpp:157
static int get_pool_capacity()
Definition: ThreadPool.cpp:215
static int set_pool_capacity(const int threadCount)
set the maximum number of worker threads in the pool.
Definition: ThreadPool.cpp:212
static int set_pool_capacity(const int threadCount)
set the maximum number of worker threads in the pool.
Definition: ThreadPool.cpp:164
static int resize_pool(const int threadCount)
set the number of active worker threads in the pool.
Definition: ThreadPool.cpp:205
static ThreadPool & self()
obtain the singleton instance of ThreadPool
Definition: ThreadPool.cpp:171
static ThreadPoolFIFO & self()
obtain the singleton instance of ThreadPoolFIFO
Definition: ThreadPool.cpp:194
Wrapper for a priority thread pool.
Definition: ThreadPool.h:39