SpatialOps
MemoryPool.cpp
Go to the documentation of this file.
1 
33 
34 #include <spatialops/structured/ExternalAllocators.h>
35 
36 #include <boost/type_traits.hpp>
37 #include <boost/foreach.hpp>
38 #ifdef ENABLE_THREADS
39 # include <boost/thread/mutex.hpp>
40 #endif
41 
42 #include <limits>
43 
44 namespace SpatialOps{
45 
46 #ifdef ENABLE_THREADS
47 
50  inline static boost::mutex& get_mutex(){ static boost::mutex m; return m; }
51 #endif
52 
53 template< typename T >
54 Pool<T>::Pool() : deviceIndex_(0)
55 {
56  destroyed_ = false;
57  cpuhighWater_ = 0;
58 # ifdef ENABLE_CUDA
59  pinned_ = true;
60  gpuhighWater_ = 0;
61 # endif
62 }
63 
64 template< typename T >
65 Pool<T>::~Pool()
66 {
67  destroyed_ = true;
68 
69  for( typename FQSizeMap::iterator i=cpufqm_.begin(); i!=cpufqm_.end(); ++i ){
70  typename Pool<T>::FieldQueue& fq = i->second;
71  while( !fq.queue.empty() ){
72 # ifdef ENABLE_CUDA
73  if(pinned_) { ema::cuda::CUDADeviceInterface::self().releaseHost( fq.queue.top() ); }
74  else { delete [] fq.queue.top(); }
75 # else
76  delete [] fq.queue.top();
77 # endif
78  fq.queue.pop();
79  }
80  }
81 
82 # ifdef ENABLE_CUDA
83  for( typename FQSizeMap::iterator i=gpufqm_.begin(); i!=gpufqm_.end(); ++i ){
84  typename Pool<T>::FieldQueue& fq = i->second;
85  while( !fq.queue.empty() ){
86  ema::cuda::CUDADeviceInterface::self().release( fq.queue.top(), deviceIndex_ );
87  fq.queue.pop();
88  }
89  }
90 # endif
91 }
92 
93 template< typename T >
94 Pool<T>&
95 Pool<T>::self()
96 {
97 # ifdef ENABLE_CUDA
98  //ensure CUDA driver is loaded before pool is initialized
99  int deviceCount = 0;
100  static cudaError_t err = cudaGetDeviceCount(&deviceCount);
101  if( cudaSuccess != err ){
102  std::ostringstream msg;
103  msg << "Error at CudaGetDeviceCount() API, at " << __FILE__ << " : " << __LINE__
104  << std::endl;
105  msg << "\t - " << cudaGetErrorString(err);
106  throw(std::runtime_error(msg.str()));
107  }
108 # endif
109  // see Modern C++ (Alexandrescu) chapter 6 for an excellent discussion on singleton implementation
110  static Pool<T> p;
111  return p;
112 }
113 
114 template< typename T >
115 T*
116 Pool<T>::get( const short int deviceLocation, const size_t _n )
117 {
118  Pool<T>& pool = Pool<T>::self();
119 
120 # ifdef ENABLE_THREADS
121  boost::mutex::scoped_lock lock( get_mutex() );
122 # endif
123 
124  // reset access counters before they go out of range
125  pool.accessCounter_++;
126  if( pool.accessCounter_ >= ( std::numeric_limits<size_t>::max()-1) ){
127  pool.accessCounter_ = 0;
128  BOOST_FOREACH( typename FQSizeMap::value_type& fq, pool.cpufqm_ ) fq.second.accessCounter = 0;
129  BOOST_FOREACH( typename FQSizeMap::value_type& fq, pool.gpufqm_ ) fq.second.accessCounter = 0;
130  }
131 
132  // remove (free) entries that haven't been visited in the past nskip queries to the pool
133  const size_t nskip = 200;
134  if( pool.accessCounter_ % nskip == 0 ) flush_unused( nskip );
135 
136  assert( !pool.destroyed_ );
137 
138  size_t nentries = 1.2*_n; // 20% padding on field size
139 
140  if( deviceLocation == CPU_INDEX ){
141  T* field = NULL;
142  typename FQSizeMap::iterator ifq = pool.cpufqm_.lower_bound( nentries );
143  if( ifq == pool.cpufqm_.end() ) ifq = pool.cpufqm_.insert( ifq, std::make_pair(nentries,FieldQueue()) );
144  else nentries = ifq->first;
145 
146  typename Pool<T>::FieldQueue& fq = ifq->second;
147  fq.accessCounter++;
148 
149  if( fq.queue.empty() ){
150  ++pool.cpuhighWater_;
151 # ifdef NEBO_REPORT_BACKEND
152  std::cout << "Allocating CPU memory" << std::endl;
153 # endif
154  try{
155 # ifdef ENABLE_CUDA
156  /* Pinned Memory Mode
157  * As the Pinned memory allocation and deallocation has higher overhead
158  * this operation is performed at memory pool level which is created
159  * and destroyed only once.
160  */
161  ema::cuda::CUDADeviceInterface& CDI = ema::cuda::CUDADeviceInterface::self();
162  field = (T*)CDI.get_pinned_pointer( nentries*sizeof(T) );
163 # else
164  // Pageable Memory mode
165  field = new T[nentries];
166 # endif
167  }
168  catch( std::runtime_error& e ){
169  std::ostringstream msg;
170  msg << __FILE__ << " : " << __LINE__ << std::endl
171  << "Error occurred while allocating memory on CPU" << std::endl
172  << e.what() << std::endl;
173  throw std::runtime_error( msg.str() );
174  }
175  pool.fsm_[field] = nentries;
176  }
177  else{
178  field = fq.queue.top(); fq.queue.pop();
179  }
180  return field;
181  }
182 # ifdef ENABLE_CUDA
183  else if( IS_GPU_INDEX(deviceLocation) ){
184  T* field = NULL;
185  typename FQSizeMap::iterator ifq = pool.gpufqm_.lower_bound( nentries );
186  if(ifq == pool.gpufqm_.end()) ifq = pool.gpufqm_.insert( ifq, std::make_pair(nentries,typename Pool<T>::FieldQueue()) );
187  else nentries = ifq->first;
188 
189  typename Pool<T>::FieldQueue& fq = ifq->second;
190  if( fq.queue.empty() ) {
191 # ifdef NEBO_REPORT_BACKEND
192  std::cout << "Allocating GPU memory" << std::endl;
193 # endif
194  ++pool.gpuhighWater_;
195  ema::cuda::CUDADeviceInterface& CDI = ema::cuda::CUDADeviceInterface::self();
196  field = (T*)CDI.get_raw_pointer( nentries*sizeof(T), deviceLocation );
197  pool.fsm_[field] = nentries;
198  }
199  else{
200  field = fq.queue.top(); fq.queue.pop();
201  }
202  return field;
203  }
204 # endif
205  else{
206  std::ostringstream msg;
207  msg << "Attempt to get unsupported memory pool ( "
208  << DeviceTypeTools::get_memory_type_description(deviceLocation)
209  << " ) \n"
210  << "\t " << __FILE__ << " : " << __LINE__;
211  throw(std::runtime_error(msg.str()));
212  }
213 }
214 
215 template< typename T >
216 void
217 Pool<T>::put( const short int deviceLocation, T* t )
218 {
219  Pool<T>& pool = Pool<T>::self();
220 # ifdef ENABLE_THREADS
221  boost::mutex::scoped_lock lock( get_mutex() );
222 # endif
223  // in some cases (notably in the LBMS code), singleton destruction order
224  // causes the pool to be prematurely deleted. Then subsequent calls here
225  // would result in undefined behavior. If the singleton has been destroyed,
226  // then we will just ignore calls to return resources to the pool. This will
227  // leak memory on shut-down in those cases.
228  if( pool.destroyed_ ) return;
229 
230  if( deviceLocation == CPU_INDEX ){
231  const size_t n = pool.fsm_[t];
232  const typename FQSizeMap::iterator ifq = pool.cpufqm_.lower_bound( n );
233  assert( ifq != pool.cpufqm_.end() );
234  ifq->second.queue.push(t);
235  }
236 # ifdef ENABLE_CUDA
237  else if( IS_GPU_INDEX(deviceLocation) ) {
238  const size_t n = pool.fsm_[t];
239  const typename FQSizeMap::iterator ifq = pool.gpufqm_.lower_bound( n );
240  assert( ifq != pool.gpufqm_.end() );
241  ifq->second.queue.push(t);
242  }
243 # endif
244  else {
245  std::ostringstream msg;
246  msg << "Error occurred while restoring memory back to memory pool ( "
247  << DeviceTypeTools::get_memory_type_description(deviceLocation)
248  << " ) \n";
249  msg << "\t " << __FILE__ << " : " << __LINE__;
250  throw(std::runtime_error(msg.str()));
251  }
252 }
253 
254 template<typename T>
255 size_t
257 {
258  Pool<T>& pool = Pool<T>::self();
259  size_t n=0;
260  for( typename FQSizeMap::const_iterator ifq=pool.cpufqm_.begin(); ifq!=pool.cpufqm_.end(); ++ifq ){
261  n += ifq->second.queue.size();
262  }
263  return pool.cpuhighWater_ - n;
264 }
265 
266 template<typename T>
267 size_t
269 {
270  return Pool<T>::self().cpuhighWater_;
271 }
272 
273 template<typename T>
274 size_t
275 Pool<T>::flush_unused( const size_t n )
276 {
277  size_t nflushed = 0;
278 
279  Pool<T>& pool = Pool<T>::self();
280 
281  for( typename FQSizeMap::iterator ifq = pool.cpufqm_.begin(); ifq!=pool.cpufqm_.end(); ++ifq ){
282  FieldQueue& fq = ifq->second;
283  if( fq.accessCounter < (pool.accessCounter_ - n) ){
284  // Delete all existing entries in the queue but leave the FQSizeMap entry
285  // alone since it is possible that there are outstanding fields that need
286  // to be returned to the pool.
287 // std::cout << "Flushing " << fq.queue.size() << " CPU fields of size " << ifq->first << " because they have not been recently accessed\n";
288  while( !fq.queue.empty() ){
289  T* t = fq.queue.top();
290  fq.queue.pop();
291  pool.fsm_.erase( t );
292 # ifdef ENABLE_CUDA
293  if(pool.pinned_) { ema::cuda::CUDADeviceInterface::self().releaseHost( t ); }
294  else delete [] t;
295 # else
296  delete [] t;
297 # endif
298  ++nflushed;
299  }
300  }
301  }
302 # ifdef ENABLE_CUDA
303  for( typename FQSizeMap::iterator ifq = pool.gpufqm_.begin(); ifq!=pool.gpufqm_.end(); ++ifq ){
304  FieldQueue& fq = ifq->second;
305  if( fq.accessCounter < (pool.accessCounter_ - n) ){
306 // std::cout << "Flushing " << fq.queue.size() << " GPU fields of size " << ifq->first << " because they have not been recently accessed\n";
307  while( !fq.queue.empty() ){
308  T* t = fq.queue.top();
309  fq.queue.pop();
310  pool.fsm_.erase( t );
311  ema::cuda::CUDADeviceInterface::self().release( t, pool.deviceIndex_ );
312  ++nflushed;
313  }
314  }
315  }
316 # endif
317 //std::cout << nflushed << " fields flushed from Pool\n";
318  return nflushed;
319 }
320 
321 
322 // explicit instantiation for supported pool types
323 template class Pool<double>;
324 template class Pool<float>;
325 template class Pool<unsigned int>;
326 template class Pool<int>;
327 template class Pool<double *>;
328 
329 } // namespace SpatialOps
static size_t flush_unused(const size_t n)
remove fields that have not been used for more than n calls to the pool
Definition: MemoryPool.cpp:275
static void put(const short int deviceLocation, T *t)
Return the requested block of memory to the pool. This should only be done for memory requested from ...
Definition: MemoryPool.cpp:217
static T * get(const short int deviceLocation, const size_t n)
Obtain a pointer to a block of memory with the requested size. If the requested size is zero...
Definition: MemoryPool.cpp:116
static size_t total()
Definition: MemoryPool.cpp:268
static size_t active()
Definition: MemoryPool.cpp:256