GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
pthread_tools.hpp
Go to the documentation of this file.
1 #ifndef DEF_PTHREAD_TOOLS_HPP
2 #define DEF_PTHREAD_TOOLS_HPP
3 
4 // Stolen from GraphLab
5 
6 #include <cstdlib>
7 #include <memory.h>
8 #include <pthread.h>
9 #include <semaphore.h>
10 #include <sched.h>
11 #include <signal.h>
12 #include <sys/time.h>
13 #include <vector>
14 #include <cassert>
15 #include <list>
16 #include <iostream>
17 
18 #undef _POSIX_SPIN_LOCKS
19 #define _POSIX_SPIN_LOCKS -1
20 
21 
22 
23 
27 namespace graphchi {
28 
29 
30 
37  class mutex {
38  private:
39  // mutable not actually needed
40  mutable pthread_mutex_t m_mut;
41  public:
42  mutex() {
43  int error = pthread_mutex_init(&m_mut, NULL);
44  assert(!error);
45  }
46  inline void lock() const {
47  int error = pthread_mutex_lock( &m_mut );
48  assert(!error);
49  }
50  inline void unlock() const {
51  int error = pthread_mutex_unlock( &m_mut );
52  assert(!error);
53  }
54  inline bool try_lock() const {
55  return pthread_mutex_trylock( &m_mut ) == 0;
56  }
57  ~mutex(){
58  int error = pthread_mutex_destroy( &m_mut );
59  assert(!error);
60  }
61  friend class conditional;
62  }; // End of Mutex
63 
64 #if _POSIX_SPIN_LOCKS >= 0
65  // We should change this to use a test for posix_spin_locks eventually
66 
67  // #ifdef __linux__
76  class spinlock {
77  private:
78  // mutable not actually needed
79  mutable pthread_spinlock_t m_spin;
80  public:
81  spinlock () {
82  int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
83  assert(!error);
84  }
85 
86  inline void lock() const {
87  int error = pthread_spin_lock( &m_spin );
88  assert(!error);
89  }
90  inline void unlock() const {
91  int error = pthread_spin_unlock( &m_spin );
92  assert(!error);
93  }
94  inline bool try_lock() const {
95  return pthread_spin_trylock( &m_spin ) == 0;
96  }
97  ~spinlock(){
98  int error = pthread_spin_destroy( &m_spin );
99  assert(!error);
100  }
101  friend class conditional;
102  }; // End of spinlock
103 #define SPINLOCK_SUPPORTED 1
104 #else
105 
106  typedef mutex spinlock;
107 #define SPINLOCK_SUPPORTED 0
108 #endif
109 
110 
115  class conditional {
116  private:
117  mutable pthread_cond_t m_cond;
118  public:
119  conditional() {
120  int error = pthread_cond_init(&m_cond, NULL);
121  assert(!error);
122  }
123  inline void wait(const mutex& mut) const {
124  int error = pthread_cond_wait(&m_cond, &mut.m_mut);
125  assert(!error);
126  }
127  inline int timedwait(const mutex& mut, int sec) const {
128  struct timespec timeout;
129  struct timeval tv;
130  struct timezone tz;
131  gettimeofday(&tv, &tz);
132  timeout.tv_nsec = 0;
133  timeout.tv_sec = tv.tv_sec + sec;
134  return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
135  }
136  inline void signal() const {
137  int error = pthread_cond_signal(&m_cond);
138  assert(!error);
139  }
140  inline void broadcast() const {
141  int error = pthread_cond_broadcast(&m_cond);
142  assert(!error);
143  }
144  ~conditional() {
145  int error = pthread_cond_destroy(&m_cond);
146  assert(!error);
147  }
148  }; // End conditional
149 
154  class semaphore {
155  private:
156  mutable sem_t m_sem;
157  public:
158  semaphore() {
159  int error = sem_init(&m_sem, 0,0);
160  assert(!error);
161  }
162  inline void post() const {
163  int error = sem_post(&m_sem);
164  assert(!error);
165  }
166  inline void wait() const {
167  int error = sem_wait(&m_sem);
168  assert(!error);
169  }
170  ~semaphore() {
171  int error = sem_destroy(&m_sem);
172  assert(!error);
173  }
174  }; // End semaphore
175 
176 
177 
178 
179 #define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
180 #define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
181 #define atomic_inc(P) __sync_add_and_fetch((P), 1)
182 
190  class spinrwlock {
191 
192  union rwticket {
193  unsigned u;
194  unsigned short us;
195  __extension__ struct {
196  unsigned char write;
197  unsigned char read;
198  unsigned char users;
199  } s;
200  };
201  mutable bool writing;
202  mutable volatile rwticket l;
203  public:
204  spinrwlock() {
205  memset(const_cast<rwticket*>(&l), 0, sizeof(rwticket));
206  }
207  inline void writelock() const {
208  unsigned me = atomic_xadd(&l.u, (1<<16));
209  unsigned char val = me >> 16;
210 
211  while (val != l.s.write) sched_yield();
212  writing = true;
213  }
214 
215  inline void wrunlock() const{
216  rwticket t = *const_cast<rwticket*>(&l);
217 
218  t.s.write++;
219  t.s.read++;
220 
221  *(volatile unsigned short *) (&l) = t.us;
222  writing = false;
223  __asm("mfence");
224  }
225 
226  inline void readlock() const {
227  unsigned me = atomic_xadd(&l.u, (1<<16));
228  unsigned char val = me >> 16;
229 
230  while (val != l.s.read) sched_yield();
231  l.s.read++;
232  }
233 
234  inline void rdunlock() const {
235  atomic_inc(&l.s.write);
236  }
237 
238  inline void unlock() const {
239  if (!writing) rdunlock();
240  else wrunlock();
241  }
242  };
243 
244 #undef atomic_xadd
245 #undef cmpxchg
246 #undef atomic_inc
247 
248 
253  class rwlock {
254  private:
255  mutable pthread_rwlock_t m_rwlock;
256  public:
257  rwlock() {
258  int error = pthread_rwlock_init(&m_rwlock, NULL);
259  assert(!error);
260  }
261  ~rwlock() {
262  int error = pthread_rwlock_destroy(&m_rwlock);
263  assert(!error);
264  }
265  inline void readlock() const {
266  pthread_rwlock_rdlock(&m_rwlock);
267  //assert(!error);
268  }
269  inline void writelock() const {
270  pthread_rwlock_wrlock(&m_rwlock);
271  //assert(!error);
272  }
273  inline void unlock() const {
274  pthread_rwlock_unlock(&m_rwlock);
275  //assert(!error);
276  }
277  inline void rdunlock() const {
278  unlock();
279  }
280  inline void wrunlock() const {
281  unlock();
282  }
283  }; // End rwlock
284 
289 #ifdef __linux__
290 
294  class barrier {
295  private:
296  mutable pthread_barrier_t m_barrier;
297  public:
298  barrier(size_t numthreads) { pthread_barrier_init(&m_barrier, NULL, numthreads); }
299  ~barrier() { pthread_barrier_destroy(&m_barrier); }
300  inline void wait() const { pthread_barrier_wait(&m_barrier); }
301  };
302 
303 #else
304 
308  class barrier {
309  private:
310  mutex m;
311  int needed;
312  int called;
313  conditional c;
314 
315  // we need the following to protect against spurious wakeups
316  std::vector<unsigned char> waiting;
317  public:
318 
319  barrier(size_t numthreads) {
320  needed = (int)numthreads;
321  called = 0;
322  waiting.resize(numthreads);
323  std::fill(waiting.begin(), waiting.end(), 0);
324  }
325 
326  ~barrier() {}
327 
328 
329  inline void wait() {
330  m.lock();
331  // set waiting;
332  size_t myid = called;
333  waiting[myid] = 1;
334  called++;
335 
336  if (called == needed) {
337  // if I have reached the required limit, wait up. Set waiting
338  // to 0 to make sure everyone wakes up
339 
340  called = 0;
341  // clear all waiting
342  std::fill(waiting.begin(), waiting.end(), 0);
343  c.broadcast();
344  }
345  else {
346  // while no one has broadcasted, sleep
347  while(waiting[myid]) c.wait(m);
348  }
349  m.unlock();
350  }
351  };
352 #endif
353 
354 
355 
356  inline void prefetch_range(void *addr, size_t len) {
357  char *cp;
358  char *end = (char*)(addr) + len;
359 
360  for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 0);
361  }
362  inline void prefetch_range_write(void *addr, size_t len) {
363  char *cp;
364  char *end = (char*)(addr) + len;
365 
366  for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 1);
367  }
368 
369 
370 };
371 #endif
372