TBCI Numerical high perf. C++ Library  2.8.0
smp.cc
Go to the documentation of this file.
1 
52 #define _GNU_SOURCE 1
53 
54 #define NEED_SMP_DECLS 1
55 #include "tbci/basics.h"
56 #include "tbci/smp.h"
57 #include <stdio.h>
58 #include <time.h>
59 
60 #include <errno.h>
61 #include <fcntl.h>
62 
63 #ifdef _POSIX_THREADS
64 #ifdef SMP
65 
66 //#include <signum.h>
67 #include <stdlib.h>
68 
69 #ifdef HAVE_SYS_SYSINFO_H
70 # include <sys/sysinfo.h>
71 #endif
72 
73 #ifdef HAVE_SCHED_H
74 # include <sched.h>
75 #endif
76 
77 #ifdef HAVE_NUMA_H
78 # include <numa.h>
79 # include <numaif.h>
80 #endif
81 
82 #ifndef MAX_THREADS
83 # define MAX_THREADS 160
84 #endif
85 
86 #include "tbci/list.h"
87 
88 #ifdef __linux__
89 # include <sys/syscall.h>
90 # include <unistd.h>
91 inline static pid_t gettid()
92 {
93  return syscall(SYS_gettid);
94 }
95 #else
96 # define gettid getpid
97 #endif
98 
100 
101 int num_threads = 0;
102 int threads_busy = 0;
103 int numa_avail = 0;
104 struct thr_struct *threads = 0;
105 pid_t main_thread_pid = 0;
106 bool threads_bound = false;
107 bool bound_main = false;
110 THREAD__ int thrno = 0;
112 
113 #ifdef HAVE_LIBNUMA
114 unsigned page_size;
115 unsigned long page_mask;
116 #endif
117 
118 #ifdef DEBUG_THREAD
119 # define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
120 # define ERRDECL int err = 0
121 #else
122 # define TCHK(x) x
123 # define ERRDECL
124 #endif
125 
126 #ifdef HAVE_SCHED_GETAFFINITY
127 static cpu_set_t saved_cpuset;
128 
129 #ifndef HAVE_CPU_COUNT
130 #undef CPU_COUNT
131 static int CPU_COUNT(const cpu_set_t *cpus)
132 {
133  int cnt = 0;
134  for (int i = 0; i < CPU_SETSIZE; ++i)
135  if (CPU_ISSET(i, cpus))
136  ++cnt;
137  return cnt;
138 }
139 #endif
140 #ifndef CPU_XOR
141 static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
142 {
143  for (int i = 0; i < CPU_SETSIZE; ++i)
144  if (CPU_ISSET(i, src1) ^ CPU_ISSET(i, src2))
145  CPU_SET(i, dest);
146  else
147  CPU_CLR(i, dest);
148 }
149 #endif
150 #ifndef CPU_AND
151 static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
152 {
153  for (int i = 0; i < CPU_SETSIZE; ++i)
154  if (CPU_ISSET(i, src1) & CPU_ISSET(i, src2))
155  CPU_SET(i, dest);
156  else
157  CPU_CLR(i, dest);
158 }
159 #endif
160 
161 
162 static int next_set_bit(const int start, const cpu_set_t *cpus)
163 {
164  for (int i = start; i < CPU_SETSIZE; ++i)
165  if (CPU_ISSET(i, cpus))
166  return i;
167  return -1;
168 }
169 
170 static char cpu_buf[1024];
171 static const char* cpu_str(const cpu_set_t *cpus)
172 {
173  static volatile int off = 0;
174  int old_off = off;
175  if (off < 896)
176  off += 128;
177  else
178  off = 0;
179  char* ret = cpu_buf+old_off+1;
180  int ptr = old_off;
181  cpu_buf[ptr+1] = 0;
182  for (int i = 0; i < CPU_SETSIZE && ptr < 1016; ++i)
183  if (CPU_ISSET(i, cpus))
184  ptr += sprintf(cpu_buf+ptr, " %i", i);
185  return ret;
186 }
187 #define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
188 #if defined(__linux__)
189 
190 static void parse_siblings(const int cpu, cpu_set_t *cpus, const int remove)
191 {
192  char fn[80];
193  sprintf(fn, "/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
194  FILE *f = fopen(fn, "r");
195  if (!f)
196  return;
197  int sibl[(CPU_SETSIZE+31)/32];
198  int nr_ints = 0;
199  bool end = false;
200  /* Linux reports this as field of ints, last int covers first 32 CPUs */
201  while (!end) {
202  int msk;
203  if (fscanf(f, "%x,", &msk) == EOF) {
204  if (fscanf(f, "%x", &msk) == EOF)
205  break;
206  end = true;
207  }
208  sibl[nr_ints++] = msk;
209  }
210  fclose(f);
211  /* Convert to cpu_set_t data structure (machine endian field of longs)
212  * first long covers first 32/64 CPUs */
213  cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
214  long* sibl_ptr = (long*)&sibl_set;
215 #if __BYTE_ORDER == __LITTLE_ENDIAN
216  for (int j = nr_ints-1; j >= 0; --j)
217  *sibl_ptr++ = sibl[j];
218 #else
219  long* sibl_ptr2 = (long*)&sibl;
220  for (int j = (nr_ints*sizeof(int)/sizeof(long))-1; j >= 0; --j)
221  *sibl_ptr++ = sibl_ptr2[j];
222 #endif
223  //fprintf(stderr, "Siblings for cpu%i: %s\n", cpu, cpu_str(&sibl_set));
224  //fprintf(stderr, "CPUS before rmv: %s\n", cpu_str(cpus));
225  if (!CPU_ISSET(cpu, &sibl_set))
226  abort();
227  if (!remove) {
228  memcpy(cpus, &sibl_set, CPU_SETBYTES);
229  return;
230  }
231  CPU_CLR(cpu, &sibl_set);
232  CPU_AND(&sibl_set, &sibl_set, cpus);
233  CPU_XOR(cpus, cpus, &sibl_set);
234  //fprintf(stderr, "CPUS after rmvl: %s\n", cpu_str(cpus));
235 }
236 
237 static void add_siblings(const int cpu, cpu_set_t *cpus)
238 {
239  parse_siblings(cpu, cpus, 0);
240 }
241 
242 static void remove_hyperthreads(cpu_set_t *cpus)
243 {
244 #ifdef THREAD_STAT
245  fprintf(stderr, "CPU set before HT removal: %s\n", cpu_str(cpus));
246 #endif
247  for (int i = CPU_SETSIZE-1; i >=0; --i) {
248  if (!CPU_ISSET(i, cpus))
249  continue;
250  parse_siblings(i, cpus, 1);
251  }
252 #ifdef THREAD_STAT
253  fprintf(stderr, "CPU set after HT removal: %s\n", cpu_str(cpus));
254 #endif
255 }
256 #else
257 static void remove_hyperthreads(cpu_set_t *cpus)
258 {
259 }
260 #endif
261 
262 #endif
263 
264 /* detect the # of cpus for glibc and/or linux systems */
265 static int detect_num_cpu (int rmv_ht)
266 {
267  int cpus = 0;
268 #ifdef HAVE_SCHED_GETAFFINITY
269  pid_t pid = gettid();
270  if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
271  if (rmv_ht)
272  remove_hyperthreads(&saved_cpuset);
273  return CPU_COUNT(&saved_cpuset);
274  }
275 #endif
276 #ifdef HAVE_GET_NPROCS // defined(__GLIBC__) && __GLIBC__ >= 2
277  cpus = get_nprocs ();
278  if (cpus <= 0)
279  return 2;
280 #elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
281  FILE* cpuinfo;
282  char buf[128];
283  cpuinfo = fopen ("/proc/cpuinfo", "r");
284  if (cpuinfo <= 0)
285  return 2; /* return 2, if we can't find out */
286  while (!feof (cpuinfo)) {
287  fgets (buf, 128, cpuinfo);
288  if (CSTD__ memcmp (buf+1, "rocessor", 8) == 0)
289  cpus++;
290  }
291 #endif
292 #ifdef DEBUG_THREAD
293  fprintf (stderr, "%i CPUs detected.\n", cpus);
294 #endif
295  return (cpus == 0 ? 1: cpus);
296 }
297 
298 #ifdef HAVE_GETLOADAVG
299 static int loadavg ()
300 {
301  double loads[3];
302  int err = getloadavg (loads, 3);
303  if (err == 1)
304  return (int)loads[0];
305  else if (err > 1)
306  return (int)loads[1];
307  else
308  return 0;
309 }
310 #endif
311 
312 class cback {
313  public:
314  cbackfn *ctor;
315  cbackfn *dtor;
316  void *parm;
317  cback(cbackfn ct, cbackfn dt, void *p)
318  : ctor(ct), dtor(dt), parm(p) {};
320  : ctor(NULL), dtor(NULL), parm(NULL) {};
321  void init(const int thr)
322  { ctor(parm, thr); }
323  void deinit(const int thr)
324  { dtor(parm, thr); }
325 };
326 
328 /* Instantiate */
329 template class List<cback>;
330 
331 void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
332 {
333  cback callback(ctor, dtor, parm);
334  thread_cbacks.append(&callback);
335  //fprintf(stderr, "Register callback %p (%p %p %p)\n", &callback, ctor, dtor, parm);
336 }
337 
338 void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
339 {
340  cback callback(ctor, dtor, parm);
341  cback *cbk = thread_cbacks.setcurr(&callback);
342  BCHK(!cbk, NumErr, Deregister unrgistered callback, (long int)ctor, );
343  thread_cbacks.delcurr();
344  //fprintf(stderr, "Deregister callback %p (%p %p %p)\n", &callback, ctor, dtor, parm);
345 }
346 
347 
348 void lina_err (struct thr_ctrl *tc)
349 {
350  fprintf (stderr, " Thread (%i) synchronization problem!\n", tc->t_no);
351  fflush (stderr); abort ();
352 }
353 
354 void lina_empty (struct thr_ctrl *dummy)
355 {
356  /* nothing */
357 }
358 
359 void* empty_thread (void *dummy)
360 {
361  return NULL;
362 }
363 
364 #ifdef HAVE_LIBNUMA
365 
366 static int cpu_to_node(int cpu)
367 {
368  char fn[80];
369  for (int n = 0; n < numa_num_possible_nodes(); ++n) {
370  sprintf(fn, "/sys/devices/system/node/node%i/cpu%i", n, cpu);
371  if (!access(fn, R_OK))
372  return n;
373  }
374  return -1;
375 }
376 
377 int do_numa_init(unsigned int cpu, struct thr_struct *ts, unsigned long stack)
378 {
379  int node = cpu_to_node(cpu);
380  struct bitmask *nodes = numa_allocate_nodemask();
381  //fprintf(stderr, "Thr %i: CPU %i Node %i\n", ts->t_no, cpu, my_node);
382  cpu_set_t cpuset;
383  numa_bitmask_clearall(nodes);
384  numa_bitmask_setbit(nodes, node);
385  numa_set_preferred(node);
386 #ifdef HAVE_SCHED_GETAFFINITY
387  numa_set_membind(nodes);
388 #else
389  numa_bind(nodes);
390 #endif
391  //sched_yield();
392  //numa_set_localalloc();
393  ts->numa_node = node;
394 #ifdef HAVE_SCHED_GETAFFINITY
395  CPU_ZERO(&cpuset);
396  CPU_SET(cpu, &cpuset);
397  parse_siblings(cpu, &cpuset, 0);
398 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
399  pthread_setaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
400 #else
401  sched_setaffinity(ts->t_pid, CPU_SETBYTES, &cpuset);
402 #endif
403 #endif
404  /* Move stack page to my node */
405  if (1) {
406  void *pages[] = {(void*)(stack & page_mask)};
407  const int nodes[] = {node};
408  int status = node, oldstat;
409  long err = 1;
410  numa_move_pages(0, 1, pages,
411  NULL, &status, MPOL_MF_MOVE);
412  oldstat = status;
413  if (oldstat != node)
414  err = numa_move_pages(0, 1, pages,
415  nodes, &status, MPOL_MF_MOVE);
416 #ifdef THREAD_DEBUG
417  fprintf(stderr, "Numa thr %i(tid %p, lwp %i): CPUs %s Node %i (Stack @%p: %i->%i(%li)\n",
418  ts->t_no, (void*)ts->t_id, ts->t_pid,
419  cpu_str(&cpuset), node, pages[0], oldstat,
420  status, err);
421 #endif
422  }
423  numa_free_nodemask(nodes);
424  return node;
425 }
426 
427 void numa_init_job(struct thr_ctrl *tc)
428 {
429  do_numa_init(tc->t_size, (struct thr_struct*)tc->t_par[0], (unsigned long)tc);
430 }
431 
433 #define PG_PC 512
434 int do_numa_move_pages(int node, int fault_in, unsigned long firstaddr, unsigned long lastaddr)
435 {
436  void* pages[PG_PC];
437  int nodes[PG_PC], status[PG_PC];
438  int nr_pages = 0; int nr_moved = 0;
439  unsigned long orig_addr = firstaddr;
440 #ifdef VALGRIND
441  memset(status, 0, PG_PC*sizeof(int));
442 #endif
443  if (firstaddr - (firstaddr & page_mask) >= page_size/2)
444  firstaddr = firstaddr & page_mask;
445  else
446  firstaddr = (firstaddr + (page_size - 1)) & page_mask;
447  if (lastaddr - (lastaddr & page_mask) > page_size/2)
448  lastaddr = lastaddr & page_mask;
449  else
450  lastaddr = (lastaddr + (page_size - 1)) & page_mask;
451 #if 0
452  fprintf(stderr, "numa_move_pages(%p -- %p to node %i: %li pg %i calls)\n",
453  (void*)firstaddr, (void*)lastaddr, node,
454  (lastaddr-firstaddr)/page_size,
455  (lastaddr-firstaddr)/page_size/PG_PC);
456 #endif
457  while (lastaddr > firstaddr) {
458  int i;
459  for (i = 0; (i < PG_PC) && (firstaddr < lastaddr); ++i, firstaddr += page_size) {
460  pages[i] = (void*)firstaddr;
461  nodes[i] = node;
462  /* Note: Pages might be not allocated yet and won't be ... */
463  if (fault_in) {
464  if (UNLIKELY(firstaddr < orig_addr))
465  *(volatile int*)orig_addr = *(volatile int*)orig_addr;
466  else
467  *(volatile int*)firstaddr = *(volatile int*)firstaddr;
468  }
469  }
470  nr_pages += i;
471  long err = numa_move_pages(0, i, pages,
472  NULL, status, MPOL_MF_MOVE);
473  if (err)
474  fprintf(stderr, "numa_move_pages(%p -- %p to node %i): %s\n",
475  pages[0], pages[i-1], node, strerror(errno));
476  int to_move = 0; err = 0;
477  for (int j = 0; j < i; ++j)
478  if (status[j] != node)
479  ++to_move;
480 #if 0
481  fprintf(stderr, "numa_move_pages(%p -- %p: %i pages, move %i to node %i (from %i and others)\n",
482  pages[0], pages[i-1], i, to_move, node, status[0]);
483 #endif
484  nr_moved += to_move;
485  if (to_move)
486  err = numa_move_pages(0, i, pages,
487  nodes, status, MPOL_MF_MOVE);
488  if (err)
489  fprintf(stderr, "numa_move_pages(%p -- %p to node %i): %s\n",
490  pages[0], pages[i-1], node, strerror(errno));
491  }
492  return nr_moved;
493 }
494 
495 void numa_move_pages_job(struct thr_ctrl *tc)
496 {
497  tc->t_res_l =
498  do_numa_move_pages(tc->t_size, tc->t_off,
499  (unsigned long)tc->t_par[0],
500  (unsigned long)tc->t_par[1]);
501 }
502 
503 #endif
504 
505 #if defined(__i386__) || defined(__x86_64__)
506 # define _cpu_relax() asm ("rep; nop")
507 #else
508 # define _cpu_relax() do {} while(0)
509 #endif
510 
511 /* Sidenote: We do no locking, approx. numbers are OK here */
512 unsigned long poll_succ = 0;
513 unsigned long poll_usucc = 0;
514 unsigned long poll_fail = 0;
515 unsigned long poll_efail = 0;
516 
517 #ifndef POLL_REP
518 # define POLL_REP 1
519 #endif
520 #ifndef POLL_REP2
521 # define POLL_REP2 168
522 #endif
523 static int busy_read(int fd, void* ptr, size_t sz, int rep=POLL_REP)
524 {
525  size_t rd = 0;
526  int w = rep;
527  while (w-- && (signed)(rd = read(fd, ptr, sz)) < (signed)sz)
528  _cpu_relax();
529  if (sz == rd) {
530  if (rep == POLL_REP)
531  ++poll_usucc;
532  else
533  ++poll_succ;
534  return rd;
535  }
536  // slow path
537  if (rep == POLL_REP)
538  ++poll_efail;
539  else
540  ++poll_fail;
541  fcntl(fd, F_SETFL, 0);
542  _cpu_relax();
543  rd = read(fd, ptr, sz);
544  PREFETCH_R(ptr, 2);
545  fcntl(fd, F_SETFL, O_NONBLOCK);
546  return rd;
547 }
548 
549 void* lina_thread (void* thr)
550 {
551  struct thr_struct *ts = (struct thr_struct*) thr;
552  struct thr_ctrl tc;
553  struct job_output out;
554  int err;
555 #if defined(HAVE_TLS) || defined(HAVE_DTLS)
556  ismainthread = 0;
557  thrno = ts->t_no+1;
558  this_thread = ts;
559 #endif
560 #ifdef VALGRIND
561  memset(&tc, 0, sizeof(tc));
562  memset(&out, 0, sizeof(out));
563 #endif
564 #ifdef DEBUG_THREAD
565  fprintf (stderr, " Thread %i: Try to get setup lock\n", tc.t_no);
566 #endif
567  ts->t_pid = gettid (); clock ();
568  tc.t_no = ts->t_no;
569 #ifdef DEBUG_THREAD
570  fprintf (stderr, " Thread %i setup: id %08lx, lwp %i; signal main thread ...\n",
571  ts->t_no, ts->t_id, ts->t_pid);
572 #endif
573  /* Now wait for some work to do. Finish when a NULL job becomes runnable */
574  while (1) {
575  /* Set job to error! */
576  tc.t_job = lina_err;
577  //memset(tc.t_res_dummy, 0, sizeof(tc.t_res_dummy));
578  /* Ready to accept a job: Wait for it */
579  if (UNLIKELY((err = busy_read(ts->t_pipe_to_thread[0], &tc, sizeof(struct job_input))) != sizeof(struct job_input))) {
580  fprintf(stderr, "Thread %i: Reading failed! %i\n", tc.t_no, err);
581  abort();
582  }
583 #ifdef DEBUG_THREAD
584  fprintf (stderr, " Thread %i: Start job %li @ %p\n", ts->t_no, tc.t_job_no, tc.t_job);
585  fflush (stderr);
586 #endif
587  ts->t_done_var++;
588  out.t_job_output_no = tc.t_job_no;
589  if (!tc.t_job)
590  break; /* The end */
591  else
592  (*tc.t_job) (&tc);
593 #ifdef DEBUG_THREAD
594  fprintf (stderr, " Thread %i: Job done!\n", ts->t_no);
595  fflush (stderr);
596 #endif
597  memcpy((void*)out.t_res_dummy, (const void*)tc.t_res_dummy, sizeof(out.t_res_dummy));
598  /* Do we need some memory barrier here? But we are cache-coherent, no? */
599  if (UNLIKELY((err = write(ts->t_pipe_from_thread[1], &out, sizeof(struct job_output))) != sizeof(struct job_output))) {
600  fprintf(stderr, "Thread %i: Writing failed! %i\n", ts->t_no, err);
601  abort();
602  }
603 #ifdef DEBUG_THREAD
604  fprintf (stderr, " Thread %i: Signaled Job done!\n", ts->t_no);
605  fflush (stderr);
606 #endif
607  }
608 #ifdef DEBUG_THREAD
609  fprintf (stderr, " Thread %i: exit!\n", ts->t_no);
610 #endif
611  out.t_retval = clock ();
612  ts->t_done_var++;
613 
614 #if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
615  struct timespec tm;
616  if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
617  const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
618  out.t_retval = (long)(secs * CLOCKS_PER_SEC);
619  }
620 #endif
621  err = write(ts->t_pipe_from_thread[1], &out, sizeof(struct job_output));
622  pthread_exit(&out.t_retval);
623  return NULL;
624 }
625 
626 
627 int init_threads (const int num_cpu, const bool load_magic)
628 {
629  int t, err, det_cpus, all_cpus;
630  pthread_mutexattr_t mutattr;
631 
632  main_thread_pid = getpid ();
633  /* No of CPUs */
634  all_cpus = detect_num_cpu(0);
635  det_cpus = detect_num_cpu(1);
636  /* How many ? */
637  if (num_cpu <= 0) {
638  num_threads = det_cpus;
639 #ifdef HAVE_GETLOADAVG
640  if (load_magic) {
641  /* Subtract load */
642  int lavg = loadavg ();
643  /* If load exceeds no of CPUs, use 2 CPUs */
644  if (all_cpus - lavg < num_threads) {
645  num_threads = MAX(1, all_cpus - lavg);
646  if (num_threads < 2 && det_cpus > 2)
647  num_threads = 2;
648  }
649  }
650 #endif
651  if (num_cpu && num_threads > -num_cpu)
652  num_threads = -num_cpu;
653  } else
654  num_threads = num_cpu;
655  /* Max is MAX_THREADS */
656  if (num_threads > MAX_THREADS)
658  /* No threads? */
659  if (num_threads == 1)
660  num_threads = 0;
661  /* Sanity check */
662  if (num_threads > det_cpus)
663  fprintf(stderr, "Warning: Number of threads %i larger than no of CPU cores %i!\n",
664  num_threads, det_cpus);
665 #ifdef HAVE_SCHED_GETAFFINITY
666  else
667  sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
668 #endif
669 #ifdef TBCI_OMP
670  omp_set_num_threads(num_threads);
671 #endif
672  /* Alloc mem for control structs */
673  if (num_threads >= 1)
674  threads = (struct thr_struct *) memalign(128, sizeof(struct thr_struct) * (num_threads));
675  //threads = (struct thr_ctrl *) CSTD__ malloc (sizeof(struct thr_ctrl) * (num_threads));
676  else
677  threads = NULL;
678  if (threads)
679  CSTD__ memset (threads, 0, sizeof(struct thr_struct) * (num_threads));
680  /* Some Un*xes only start the clock after the first call to clock() */
681  clock ();
682 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
683  // If we don't have thread local storage (and no locking),
684  // we need to disable // concurrent memory caching allocations
685  // now globally :-(
686  if (num_threads >= 1) {
687  ismainthread = 0;
688  thrno = -1;
689  }
690 #endif
691  /* Now init threads */
692  for (t = 0; t < num_threads; ++t) {
693  int err;
694  struct thr_struct *ts = &threads[t];
695  //fprintf(stderr, "Debug: thread %i @ %p\n", t, tc);
696  ts->t_no = t;
697  /* Create pipes for thread sync */
698  err = pipe(ts->t_pipe_to_thread);
699  if (err) {
700  fprintf(stderr, "Failed to create pipe for thread %i\n", t);
701  abort();
702  }
703  err = pipe(ts->t_pipe_from_thread);
704  if (err) {
705  fprintf(stderr, "Failed to create pipe for thread %i\n", t);
706  abort();
707  }
708  fcntl(ts->t_pipe_to_thread[0], F_SETFL, O_NONBLOCK);
709  fcntl(ts->t_pipe_from_thread[0], F_SETFL, O_NONBLOCK);
710  TCHK(pthread_create (&ts->t_id, NULL, lina_thread, ts));
711  }
712 #ifdef DEBUG_THREAD
713  fprintf (stderr, "%i threads @%p (sz=%zi) set up.\n",
714  num_threads, threads, sizeof(struct thr_struct));
715 #endif
716  for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
717  //fprintf(stderr, "Callback %p (%p %p %p)\n", c, c->ctor, c->dtor, c->parm);
718  c->ctor(c->parm, num_threads);
719  }
720  //fprintf(stderr, "%li Callbacks registered\n", thread_cbacks.size());
721 
722  return num_threads;
723 }
724 
725 void bind_threads (bool bind_main, bool enable_numa, bool add_sibl)
726 {
727 #ifdef HAVE_SCHED_GETAFFINITY
728  /* Save CPU affinity mask */
729  CPU_ZERO(&saved_cpuset);
730  sched_getaffinity (main_thread_pid, CPU_SETBYTES, &saved_cpuset);
731  if (num_threads > CPU_COUNT(&saved_cpuset))
732  fprintf(stderr, "TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
733  num_threads, CPU_COUNT(&saved_cpuset));
734  cpu_set_t cpuset; CPU_ZERO(&cpuset);
735 
736  /* TODO: Be clever to which CPUs to bind to, e.g. avoid hyperthreads */
737  int next_cpu = next_set_bit(0, &saved_cpuset);
738  if (next_cpu < 0)
739  abort();
740  bound_main = bind_main;
741 #ifdef HAVE_LIBNUMA
742  numa_avail = (numa_available() == 0);
743  if (numa_max_node() == 0)
744  numa_avail = 0;
745  if (numa_avail) {
746  page_size = numa_pagesize();
747  page_mask = ~(page_size-1ULL);
748  }
749  if (!enable_numa)
750  numa_avail = 0;
751  fprintf(stderr, "NUMA: avail %i, maxnode %i\n", numa_avail, numa_max_node());
752 #endif /* HAVE_LIBNUMA */
753  if (bind_main) {
754  CPU_SET(next_cpu, &cpuset);
755  if (add_sibl)
756  add_siblings(next_cpu, &cpuset);
757 #ifdef HAVE_LIBNUMA
758  if (numa_avail) {
759  struct thr_struct ts;
760  ts.t_no = -1; ts.t_pid = main_thread_pid;
761  ts.t_id = pthread_self();
762  main_numa_node = do_numa_init(next_cpu, &ts, (unsigned long)&ts);
763  } else
764 #endif /* HAVE_LIBNUMA */
765 #ifdef HAVE_SCHED_GETAFFINITY
766  sched_setaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
767 #else
768  do {} while(0);
769 #endif
770 #ifdef THREAD_DEBUG
771  fprintf(stderr, "Set Main Thread %i: CPU %i (%s)\n", main_thread_pid,
772  next_cpu, cpu_str(&cpuset));
773 #endif
774 #ifdef DEBUG_THREAD
775  sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
776  fprintf(stderr, "Get Main Thread %i: CPU %i (%s)\n", main_thread_pid,
777  next_cpu, cpu_str(&cpuset));
778 #endif
779  CPU_ZERO(&cpuset);
780  }
781 #endif /* HAVE_SCHED_GETAFFINITY */
782  threads_bound = true;
783  int several_nodes = 0;
784  for (int t = 0; t < num_threads; ++t) {
785  struct thr_struct *ts = &threads[t];
786  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
787  if (next_cpu == -1)
788  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
789  CPU_SET(next_cpu, &cpuset);
790  if (add_sibl)
791  add_siblings(next_cpu, &cpuset);
792 #ifdef HAVE_LIBNUMA
793  if (numa_avail && cpu_to_node(next_cpu) != main_numa_node)
794  ++several_nodes;
795  if (numa_avail) {
796  thread_start(t, numa_init_job, next_cpu,
797  (void*)ts, (void*)0);
798  } else
799 #endif
800 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
801  pthread_setaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
802 #else
803  do {} while(0);
804 #endif
805 #ifdef THREAD_DEBUG
806  fprintf(stderr, "Set Thread %i: CPUs %s\n", t,
807  cpu_str(&cpuset));
808 #endif
809 #ifdef DEBUG_THREAD
810  pthread_getaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
811  fprintf(stderr, "Get Thread %i: CPUs %s\n", t,
812  cpu_str(&cpuset));
813 #endif
814  CPU_ZERO(&cpuset);
815  }
816 #ifdef HAVE_LIBNUMA
817  if (numa_avail)
818  for (int t = 0; t < num_threads; ++t)
819  thread_wait(t);
820 #endif
821 #ifdef THREAD_DEBUG
822 #ifdef HAVE_SCHED_GETAFFINITY
823  sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
824  fprintf(stderr, "Main thread: CPUs %s, Node %i\n",
825  cpu_str(&cpuset), main_numa_node);
826 #endif
827 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
828  for (int t = 0; t < num_threads; ++t) {
829  struct thr_struct *ts = &threads[t];
830  pthread_getaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
831  fprintf(stderr, "Thread %i: CPUs %s, Node %i\n",
832  ts->t_no, cpu_str(&cpuset), ts->numa_node);
833  }
834 #endif
835  if (!several_nodes) {
836  numa_avail = 0;
837 #ifdef THREAD_DEBUG
838  fprintf(stderr, "All threads on node %i, disabling NUMA\n",
840 #endif
841  }
842 #endif /* THREAD_DEBUG */
843 #ifdef TBCI_OMP
844  int omp_thr = omp_get_max_threads();
845 #ifdef DEBUG_THREAD
846  fprintf(stderr, "Now setting affinity for %i OpenMP threads ...\n", omp_thr);
847 #endif
848 #pragma omp parallel for private(cpuset, next_cpu) schedule(static,1)
849  for (int t = 0; t < omp_thr; ++t) {
850  int tid = omp_get_thread_num();
851  next_cpu = next_set_bit(0, &saved_cpuset);
852  for (int i = 0; i < t; ++i) {
853  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
854  if (next_cpu == -1)
855  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
856  }
857  CPU_ZERO(&cpuset);
858  CPU_SET(next_cpu, &cpuset);
859  if (add_sibl)
860  add_siblings(next_cpu, &cpuset);
861  sched_setaffinity(0, CPU_SETBYTES, &cpuset);
862 #ifdef THREAD_DEBUG
863 #pragma omp critical
864  fprintf(stderr, "Setting OMP thread %i to %i CPUs %s\n", tid, CPU_COUNT(&cpuset), cpu_str(&cpuset));
865 #endif
866  }
867 #endif /* TBCI_OMP */
868 }
869 
872 {
873  int t;
874  tot_cpu_tm = 0;
875  ERRDECL;
876  //fprintf (stderr, "Free threads ...\n");
877  for (t = 0; t < num_threads; ++t) {
878  int err;
879  struct thr_struct *ts = &threads[t];
880  struct job_input job;
881  struct job_output out;
882 #ifdef VALGRIND
883  memset(&job, 0, sizeof(job));
884 #endif
885  job.t_job = 0;
886  fcntl(ts->t_pipe_from_thread[0], F_SETFL, 0);
887  err = write(ts->t_pipe_to_thread[1], &job, sizeof(job));
888  err = read(ts->t_pipe_from_thread[0], &out, sizeof(out));
889  void *res;
890  TCHK(pthread_join (ts->t_id, &res));
895  long tm = out.t_retval;
896  //long tm = *(long*)res;
897  tot_cpu_tm += tm;
898 #ifdef THREAD_STAT
899  fprintf (stderr, " CPU time for thread %i :%7.3f s\n",
900  ts->t_no, (double)(tm)/CLOCKS_PER_SEC);
901 #endif
902  }
903  for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
904  //fprintf(stderr, "Callback %p (%p %p %p)\n", c, c->ctor, c->dtor, c->parm);
905  c->dtor(c->parm, num_threads);
906  }
907  clock_t mainclock = clock();
908 #ifdef HAVE_CLOCK_GETTIME
909  struct timespec tm;
911  if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
912  double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
913  mainclock = (long)(secs*CLOCKS_PER_SEC);
914  }
915 #endif
916  tot_cpu_tm += mainclock;
917 #ifdef THREAD_STAT
918  fprintf (stderr, " CPU time for main thr :%7.3f s\n",
919  (double)mainclock/CLOCKS_PER_SEC);
920  fprintf (stderr, " CPU for all threads :%7.3f s\n",
921  (double)(tot_cpu_tm)/CLOCKS_PER_SEC);
922  fprintf (stderr, " Poll successes: %li, unexp. succ.: %li, failures: %li, exp. fails: %li\n",
923  poll_succ, poll_usucc, poll_fail, poll_efail);
924  fflush (stderr);
925 #endif
926  if (num_threads > 0)
927  CSTD__ free (threads);
928  num_threads = 0; threads = 0;
929 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
930  ismainthread = 1;
931  thrno = 0;
932 #endif
933 #ifdef HAVE_SCHED_GETAFFINITY
934  if (bound_main) {
935  sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
936  bound_main = false;
937  }
938  threads_bound = false;
939 #endif
940 }
941 
942 static unsigned long job_no = 0;
943 
944 void _thread_start_off (const int thr_no, thr_job_t ljob,
945  const unsigned long off, const unsigned long sz,
946  va_list vl)
947 {
948  void * par; unsigned t = 0;
949  struct thr_struct* ts = &threads[thr_no];
950  struct job_input job;
951  ERRDECL;
952 #ifdef VALGRIND
953  memset(&job, 0, sizeof(job));
954 #endif
955  //va_list (vl);
956  BCHKNR(thr_no >= num_threads, NumErr, Starting thread no outside range, thr_no);
957  threads_busy++;
958  job.t_job = ljob; job.t_size = sz; job.t_off = off; job.t_job_no = job_no++;
959  //va_start (vl, sz);
960  while ((par = va_arg (vl, void*)))
961  job.t_par[t++] = par;
962  //va_end (vl);
963  BCHKNR(t > THREAD_MAX_ARGS, NumErr, Too many arguments to thread_start, t-1);
964 #ifdef DEBUG_THREAD
965  fprintf (stderr, "Signal thread %i\n", ts->t_no);
966  fflush (stderr);
967 #endif
968  if (write(ts->t_pipe_to_thread[1], &job, sizeof(job)) != sizeof(job)) {
969  fprintf(stderr, "Signaling job %li to thread %i failed",
970  job.t_job_no, ts->t_no);
971  abort();
972  }
973  /* Immediatly reschedule */
974  //sched_yield ();
975  }
976 
977 void thread_start_off (const int thr_no, thr_job_t job,
978  const unsigned long off, const unsigned long sz, ...)
979 {
980  va_list vl;
981  va_start(vl, sz);
982  _thread_start_off(thr_no, job, off, sz, vl);
983  va_end(vl);
984 }
985 
986 void thread_start ( const int thr_no, thr_job_t job,
987  const unsigned long sz, ...)
988 {
989  va_list vl;
990  va_start(vl, sz);
991  _thread_start_off(thr_no, job, 0, sz, vl);
992  va_end(vl);
993 }
994 
995 void thread_wait (const int thr_no, struct job_output *out)
996 {
997  struct thr_struct *ts = &threads[thr_no];
998  struct job_output out2;
999  BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
1000  /* Tell the thread that we wait for completion */
1001 #ifdef DEBUG_THREAD
1002  fprintf (stderr, "Wait for thread %i\n", ts->t_no);
1003 #endif
1004  if (busy_read(ts->t_pipe_from_thread[0], out? out: &out2, sizeof(out2), POLL_REP2) != sizeof(out2)) {
1005  fprintf(stderr, "Waiting for thread %i failed!\n", ts->t_no);
1006  abort();
1007  }
1008  threads_busy--;
1009 #ifdef DEBUG_THREAD
1010  fprintf (stderr, "Thread %i signaled completion.\n", ts->t_no);
1011 #endif
1012 }
1013 
1014 
1015 double thread_wait_result (const int thr_no)
1016 {
1017  struct thr_struct *ts = &threads[thr_no];
1018  struct job_output out2;
1019  int err;
1020  BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
1021 #ifdef DEBUG_THREAD
1022  fprintf (stderr, "Wait for result of thread %i\n", ts->t_no);
1023 #endif
1024  if (busy_read(ts->t_pipe_from_thread[0], &out2, sizeof(out2), POLL_REP2) != sizeof(out2)) {
1025  fprintf(stderr, "Waiting for thread %i failed!\n", ts->t_no);
1026  abort();
1027  }
1028  threads_busy--;
1029 #ifdef DEBUG_THREAD
1030  fprintf (stderr, "Thread %i signaled completion\n", ts->t_no);
1031 #endif
1032  return /*(volatile double)*/out2.t_res_d;
1033 }
1034 
1036 {
1037  threads_busy++;
1038 #ifdef TBCI_OMP
1039  omp_set_num_threads(0);
1040 #endif
1041 }
1042 
1044 {
1045  if (threads_busy)
1046  threads_busy--;
1047  else
1048  fprintf (stderr, "reenable_threads(): Threads already enabled.!\n");
1049 #ifdef TBCI_OMP
1050  omp_set_num_threads(num_threads);
1051 #endif
1052 }
1053 
1055 
1057 
1058 #else /* no SMP */
1059 /* These are for compatibility ... */
1060 
1062 pid_t main_thread_pid WEAKA = 0;
1063 
1064 WEAK(int init_threads (const int c, const bool load))
1065 { main_thread_pid = getpid(); clock (); return 0; }
1066 WEAK(void bind_threads(bool, bool, bool)) {}
1067 WEAK(void free_threads ()) {}
1068 WEAK(void disable_threads ())
1069 #ifdef TBCI_OMP
1070 { omp_set_num_threads(0); }
1071 #else
1072 {}
1073 #endif
1074 WEAK(void reenable_threads ())
1075 #ifdef TBCI_OMP
1076 { omp_set_num_threads(omp_get_num_procs()); }
1077 #else
1078 {}
1079 #endif
1080 WEAK(void thread_start (const int tno, thr_job_t job,
1081  const unsigned long sz, ...))
1082 {
1083  void * par;
1084  struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
1085  unsigned int t = 0;
1086  va_list (vl);
1087  tc->t_job = job; tc->t_size = sz; tc->t_off = 0;
1088  va_start (vl, sz);
1089  while ((par = va_arg (vl, void*)))
1090  tc->t_par[t++] = par;
1091  va_end (vl);
1092  fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1093 # ifdef ABORT_ON_ERR
1094  abort ();
1095 # endif
1096  (*job)(tc);
1097 }
1098 WEAK(void thread_start_off (const int tno, thr_job_t job,
1099  const unsigned long off, const unsigned long sz, ...))
1100 {
1101  void * par;
1102  struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
1103  unsigned int t = 0;
1104  va_list (vl);
1105  tc->t_job = job; tc->t_size = sz; tc->t_off = off;
1106  va_start (vl, sz);
1107  while ((par = va_arg (vl, void*)))
1108  tc->t_par[t++] = par;
1109  va_end (vl);
1110  fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1111 # ifdef ABORT_ON_ERR
1112  abort ();
1113 #endif
1114  (*job)(tc);
1115 }
1116 WEAK(void thread_wait (const int t)) {}
1117 WEAK(void* thread_wait_useful (const int t, useful_job_t j, void* a))
1118 { return 0; }
1119 WEAK(double thread_wait_result (const int thr_no))
1120 { return 0; }
1121 
1122 unsigned int curr_n_thr WEAKA;
1123 unsigned int last_n_thr WEAKA;
1124 unsigned int prev_n_thr WEAKA;
1125 
1127 
1128 #endif /* SMP */
1129 
1131 unsigned int tbci_control WEAKA = _TBCI_CWD_DEFAULT;
1133 
1134 #endif /* _POSIX_THREADS */
void * t_par[6]
Definition: smp.h:173
#define BCHKNR(cond, exc, txt, ind)
Definition: basics.h:579
int t_no
Definition: smp.h:158
void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
Definition: smp.cc:338
Matrix< T > a(10, 10)
#define _TBCI_CWD_DEFAULT
Definition: tbci_param.h:69
volatile char t_res_dummy[16]
Definition: smp.h:176
long t_res_l
Definition: smp.h:179
unsigned int tbci_control
const Vector< T > const Vector< T > const Vector< T > & p
Definition: LM_fit.h:97
#define MAX_THREADS
Definition: smp.cc:83
void * parm
Definition: smp.cc:316
void _thread_start_off(const int thr_no, thr_job_t ljob, const unsigned long off, const unsigned long sz, va_list vl)
Definition: smp.cc:944
void deinit(const int thr)
Definition: smp.cc:323
bool threads_bound
Definition: smp.cc:106
int numa_avail
Definition: smp.cc:103
cbackfn * dtor
Definition: smp.cc:315
unsigned long poll_efail
Definition: smp.cc:515
unsigned long t_job_output_no
Definition: smp.h:142
return c
Definition: f_matrix.h:760
#define POLL_REP
Definition: smp.cc:518
void lina_empty(struct thr_ctrl *dummy)
Definition: smp.cc:354
#define NAMESPACE_TBCI
Definition: basics.h:310
int t_pipe_from_thread[2]
Definition: smp.h:161
void thread_start(const int thr_no, thr_job_t job, const unsigned long sz,...)
Definition: smp.cc:986
unsigned long poll_succ
Definition: smp.cc:512
void reenable_threads()
Definition: smp.cc:1043
void init(const int thr)
Definition: smp.cc:321
exception base class for the TBCI NumLib
Definition: except.h:58
bool bound_main
Definition: smp.cc:107
void * t_par[6]
Definition: smp.h:137
int threads_busy
Definition: smp.cc:102
cbackfn * ctor
Definition: smp.cc:314
static List< cback > thread_cbacks
Definition: smp.cc:327
unsigned int curr_n_thr
Definition: smp.cc:1054
#define BCHK(cond, exc, txt, ind, rtval)
Definition: basics.h:568
#define NULL
Definition: basics.h:243
cback(cbackfn ct, cbackfn dt, void *p)
Definition: smp.cc:317
NAMESPACE_END NAMESPACE_TBCI unsigned int tbci_control WEAKA
Definition: smp.cc:1131
void bind_threads(bool bind_main, bool enable_numa, bool add_sibl)
Definition: smp.cc:725
#define WEAK(x)
Definition: basics.h:478
#define UNLIKELY(expr)
Definition: basics.h:101
T * getfirst() const
Definition: list.h:617
double t_res_d
Definition: smp.h:147
Definition: smp.h:168
#define TCHK(x)
Definition: smp.cc:122
void(* thr_job_t)(struct thr_ctrl *)
Before the double inclusion guard on purpose!
Definition: smp.h:126
NAMESPACE_TBCI int num_threads
Definition: smp.cc:101
Definition: smp.h:132
unsigned long t_job_no
Definition: smp.h:169
unsigned long t_off
Definition: smp.h:136
unsigned long t_job_no
Definition: smp.h:133
void * lina_thread(void *thr)
Definition: smp.cc:549
Definition: list.h:59
struct thr_struct * threads
Definition: smp.cc:104
#define CSTD__
Definition: basics.h:333
unsigned long t_size
Definition: smp.h:171
#define PREFETCH_R(addr, loc)
In case gcc does not yet support __builtin_prefetch(), we have handcoded assembly with gcc for a few ...
Definition: basics.h:741
long t_retval
Definition: smp.h:143
static int detect_num_cpu(int rmv_ht)
Definition: smp.cc:265
unsigned int last_n_thr
Definition: smp.cc:1054
F_TSMatrix< T > ts
Definition: f_matrix.h:1051
#define _cpu_relax()
Definition: smp.cc:508
THREAD__ int thrno
Definition: smp.cc:110
void * empty_thread(void *dummy)
Definition: smp.cc:359
Definition: smp.cc:312
#define POLL_REP2
Definition: smp.cc:521
T * getnext() const
Definition: list.h:629
#define THREAD__
Definition: basics.h:755
void disable_threads()
Definition: smp.cc:1035
void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
Definition: smp.cc:331
int tot_cpu_tm
Definition: smp.cc:870
void free_threads()
Definition: smp.cc:871
volatile char t_res_dummy[16]
Definition: smp.h:145
int numa_node
Definition: smp.h:163
static int busy_read(int fd, void *ptr, size_t sz, int rep=1)
Definition: smp.cc:523
#define THREAD_MAX_ARGS
Definition: smp.h:129
THREAD__ struct thr_struct * this_thread
Definition: smp.cc:111
pthread_t t_id
Definition: smp.h:160
cback()
Definition: smp.cc:319
int i
Definition: LM_fit.h:71
#define ERRDECL
Definition: smp.cc:123
void *(* useful_job_t)(void *)
Definition: smp.h:127
unsigned long t_off
Definition: smp.h:172
void thread_wait(const int thr_no, struct job_output *out)
Definition: smp.cc:995
double thread_wait_result(const int thr_no)
Definition: smp.cc:1015
#define MAX(a, b)
Definition: basics.h:649
#define NAMESPACE_END
Definition: basics.h:316
unsigned long poll_fail
Definition: smp.cc:514
THREAD__ int ismainthread
Definition: smp.cc:109
unsigned long poll_usucc
Definition: smp.cc:513
T * append()
Definition: list.h:436
unsigned int prev_n_thr
Definition: smp.cc:1054
int main_numa_node
Definition: smp.cc:108
int init_threads(const int num_cpu, const bool load_magic)
Definition: smp.cc:627
int t_no
Definition: smp.h:183
const Vector< T > const Vector< T > const Vector< T > int T T & err
Definition: LM_fit.h:102
static unsigned long job_no
Definition: smp.cc:942
int t_pipe_to_thread[2]
Definition: smp.h:161
thr_job_t t_job
Definition: smp.h:170
T * setcurr(const T *rec) const
Definition: list.h:354
void thread_start_off(const int thr_no, thr_job_t job, const unsigned long off, const unsigned long sz,...)
Definition: smp.cc:977
unsigned int t_done_var
Definition: smp.h:162
static pid_t gettid()
Definition: smp.cc:91
void lina_err(struct thr_ctrl *tc)
Definition: smp.cc:348
T * delcurr()
Definition: list.h:475
thr_job_t t_job
Definition: smp.h:134
unsigned long t_size
Definition: smp.h:135
pid_t main_thread_pid
Definition: smp.cc:105
pid_t t_pid
Definition: smp.h:159