xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
XrdSysLinuxSemaphore.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2013 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef __XRD_SYS_LINUX_SEMAPHORE__
26 #define __XRD_SYS_LINUX_SEMAPHORE__
27 
28 #if defined(__linux__) && defined(HAVE_ATOMICS)
29 
30 #include <pthread.h>
31 #include <linux/futex.h>
32 #include <sys/syscall.h>
33 #include <unistd.h>
34 #include <cerrno>
35 #include <exception>
36 #include <string>
37 #include <cstdlib>
38 
39 namespace XrdSys
40 {
41  //----------------------------------------------------------------------------
43  //----------------------------------------------------------------------------
44  class LinuxSemaphoreError: public std::exception
45  {
46  public:
47  LinuxSemaphoreError( const std::string &error ): pError( error ) {}
48  virtual ~LinuxSemaphoreError() throw() {};
49 
50  virtual const char *what() const throw()
51  {
52  return pError.c_str();
53  }
54 
55  private:
56  std::string pError;
57  };
58 
59  //----------------------------------------------------------------------------
77  //----------------------------------------------------------------------------
78  class LinuxSemaphore
79  {
80  public:
81  //------------------------------------------------------------------------
85  //------------------------------------------------------------------------
86  inline int CondWait()
87  {
88  int value = 0;
89  int val = 0;
90  int waiters = 0;
91  int newVal = 0;
92 
93  //----------------------------------------------------------------------
94  // We get the value of the semaphore try to atomically decrement it if
95  // it's larger than 0.
96  //----------------------------------------------------------------------
97  while( 1 )
98  {
99  Unpack( pValue, value, val, waiters );
100  if( val == 0 )
101  return 0;
102  newVal = Pack( --val, waiters );
103  if( __sync_bool_compare_and_swap( pValue, value, newVal ) )
104  return 1;
105  }
106  }
107 
108  //------------------------------------------------------------------------
113  //------------------------------------------------------------------------
114  inline void Wait()
115  {
116  //----------------------------------------------------------------------
117  // Examine the state of the semaphore and atomically decrement it if
118  // possible. If CondWait fails, it means that the semaphore value was 0.
119  // In this case we atomically bump the number of waiters and go to sleep
120  //----------------------------------------------------------------------
121  while( !CondWait() )
122  {
123  int value = 0;
124  int val = 0;
125  int waiters = 0;
126  int cancelType = 0;
127 
128  Unpack( pValue, value, val, waiters );
129 
130  //--------------------------------------------------------------------
131  // We need to make sure again that the value of the semaphore is 0
132  // because we fetched it again (first time was in CondWait()) and
133  // it may have changed in the mean time.
134  //--------------------------------------------------------------------
135  if( val != 0 )
136  continue;
137 
138  if( waiters == WaitersMask )
139  throw LinuxSemaphoreError( "Reached maximum number of waiters" );
140 
141  int newVal = Pack( val, ++waiters );
142 
143  //--------------------------------------------------------------------
144  // We have bumped the number of waiters successfuly if neither the
145  // semaphore value nor the number of waiters changed in the mean time.
146  // We can safely go to sleep.
147  //
148  // Once the number of waiters was bumped we cannot get cancelled
149  // without decrementing it.
150  //--------------------------------------------------------------------
151  pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, &cancelType );
152  if( __sync_bool_compare_and_swap( pValue, value, newVal ) )
153  {
154  while( 1 )
155  {
156  int r = 0;
157 
158  pthread_cleanup_push( Cleanup, pValue );
159  pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, 0 );
160 
161  r = syscall( SYS_futex, pValue, FUTEX_WAIT, newVal, 0, 0, 0 );
162 
163  pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
164  pthread_cleanup_pop( 0 );
165 
166  if( r == 0 ) // we've been woken up
167  break;
168 
169  if( errno == EINTR ) // interrupt
170  continue;
171 
172  if( errno == EWOULDBLOCK ) // futex value changed
173  break;
174 
175  throw LinuxSemaphoreError( "FUTEX_WAIT syscall error" );
176  }
177 
178  //------------------------------------------------------------------
179  // We have been woken up, so we need to decrement the number of
180  // waiters
181  //------------------------------------------------------------------
182  do
183  {
184  Unpack( pValue, value, val, waiters );
185  newVal = Pack( val, --waiters );
186  }
187  while( !__sync_bool_compare_and_swap( pValue, value, newVal ) );
188  }
189 
190  //--------------------------------------------------------------------
191  // We are here if:
192  // 1) we were unable to increase the number of waiters bacause the
193  // atomic changed in the mean time in another execution thread
194  // 2) *pValue != newVal upon futex call, this indicates the state
195  // change in another thread
196  // 3) we have been woken up by another thread
197  //
198  // In either of the above cases we need to re-examine the atomic and
199  // decide whether we need to sleep or are free to proceed
200  //--------------------------------------------------------------------
201  pthread_setcanceltype( cancelType, 0 );
202  }
203  }
204 
205  //------------------------------------------------------------------------
210  //------------------------------------------------------------------------
211  inline void Post()
212  {
213  int value = 0;
214  int val = 0;
215  int waiters = 0;
216  int newVal = 0;
217 
218  //----------------------------------------------------------------------
219  // We atomically increment the value of the semaphore and wake one of
220  // the threads that was waiting for the semaphore value to change
221  //----------------------------------------------------------------------
222  while( 1 )
223  {
224  Unpack( pValue, value, val, waiters );
225 
226  if( val == ValueMask )
227  throw LinuxSemaphoreError( "Reached maximum value" );
228 
229  newVal = Pack( ++val, waiters );
230  if( __sync_bool_compare_and_swap( pValue, value, newVal ) )
231  {
232  if( waiters )
233  syscall( SYS_futex, pValue, FUTEX_WAKE, 1, 0, 0, 0 );
234  return;
235  }
236  }
237  }
238 
239  //------------------------------------------------------------------------
241  //------------------------------------------------------------------------
242  int GetValue() const
243  {
244  int value = __sync_fetch_and_add( pValue, 0 );
245  return value & ValueMask;
246  }
247 
248  //------------------------------------------------------------------------
252  //------------------------------------------------------------------------
253  LinuxSemaphore( int value )
254  {
255  pValue = (int *)malloc(sizeof(int));
256  *pValue = (value & ValueMask);
257  }
258 
259  //------------------------------------------------------------------------
261  //------------------------------------------------------------------------
262  ~LinuxSemaphore()
263  {
264  free( pValue );
265  }
266 
267  private:
268  static const int ValueMask = 0x000fffff;
269  static const int WaitersOffset = 20;
270  static const int WaitersMask = 0x00000fff;
271 
272  //------------------------------------------------------------------------
273  // Unpack the semaphore value
274  //------------------------------------------------------------------------
275  static inline void Unpack( int *sourcePtr,
276  int &source,
277  int &value,
278  int &nwaiters )
279  {
280  source = __sync_fetch_and_add( sourcePtr, 0 );
281  value = source & ValueMask;
282  nwaiters = (source >> WaitersOffset) & WaitersMask;
283  }
284 
285  //------------------------------------------------------------------------
286  // Pack the semaphore value
287  //------------------------------------------------------------------------
288  static inline int Pack( int value, int nwaiters )
289  {
290  return (nwaiters << WaitersOffset) | (value & ValueMask);
291  }
292 
293  //------------------------------------------------------------------------
294  // Cancellation cleaner
295  //------------------------------------------------------------------------
296  static void Cleanup( void *param )
297  {
298  int *iParam = (int*)param;
299  int value = 0;
300  int val = 0;
301  int waiters = 0;
302  int newVal = 0;
303 
304  do
305  {
306  Unpack( iParam, value, val, waiters );
307  newVal = Pack( val, --waiters );
308  }
309  while( !__sync_bool_compare_and_swap( iParam, value, newVal ) );
310  }
311 
312  int *pValue;
313  };
314 };
315 
316 #endif // __linux__ && HAVE_ATOMICS
317 
318 #endif // __XRD_SYS_LINUX_SEMAPHORE__