54 #define NEED_SMP_DECLS 1
55 #include "tbci/basics.h"
69 #ifdef HAVE_SYS_SYSINFO_H
70 # include <sys/sysinfo.h>
83 # define MAX_THREADS 160
86 #include "tbci/list.h"
89 # include <sys/syscall.h>
93 return syscall(SYS_gettid);
96 # define gettid getpid
115 unsigned long page_mask;
119 # define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
120 # define ERRDECL int err = 0
126 #ifdef HAVE_SCHED_GETAFFINITY
127 static cpu_set_t saved_cpuset;
129 #ifndef HAVE_CPU_COUNT
131 static int CPU_COUNT(
const cpu_set_t *cpus)
134 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
135 if (CPU_ISSET(
i, cpus))
141 static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
143 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
144 if (CPU_ISSET(
i, src1) ^ CPU_ISSET(
i, src2))
151 static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
153 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
154 if (CPU_ISSET(
i, src1) & CPU_ISSET(
i, src2))
162 static int next_set_bit(
const int start,
const cpu_set_t *cpus)
164 for (
int i = start;
i < CPU_SETSIZE; ++
i)
165 if (CPU_ISSET(
i, cpus))
170 static char cpu_buf[1024];
171 static const char* cpu_str(
const cpu_set_t *cpus)
173 static volatile int off = 0;
179 char* ret = cpu_buf+old_off+1;
182 for (
int i = 0;
i < CPU_SETSIZE && ptr < 1016; ++
i)
183 if (CPU_ISSET(
i, cpus))
184 ptr += sprintf(cpu_buf+ptr,
" %i",
i);
187 #define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
188 #if defined(__linux__)
190 static void parse_siblings(
const int cpu, cpu_set_t *cpus,
const int remove)
193 sprintf(fn,
"/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
194 FILE *f = fopen(fn,
"r");
197 int sibl[(CPU_SETSIZE+31)/32];
203 if (fscanf(f,
"%x,", &msk) == EOF) {
204 if (fscanf(f,
"%x", &msk) == EOF)
208 sibl[nr_ints++] = msk;
213 cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
214 long* sibl_ptr = (
long*)&sibl_set;
215 #if __BYTE_ORDER == __LITTLE_ENDIAN
216 for (
int j = nr_ints-1; j >= 0; --j)
217 *sibl_ptr++ = sibl[j];
219 long* sibl_ptr2 = (
long*)&sibl;
220 for (
int j = (nr_ints*
sizeof(
int)/
sizeof(long))-1; j >= 0; --j)
221 *sibl_ptr++ = sibl_ptr2[j];
225 if (!CPU_ISSET(cpu, &sibl_set))
228 memcpy(cpus, &sibl_set, CPU_SETBYTES);
231 CPU_CLR(cpu, &sibl_set);
232 CPU_AND(&sibl_set, &sibl_set, cpus);
233 CPU_XOR(cpus, cpus, &sibl_set);
237 static void add_siblings(
const int cpu, cpu_set_t *cpus)
239 parse_siblings(cpu, cpus, 0);
242 static void remove_hyperthreads(cpu_set_t *cpus)
245 fprintf(stderr,
"CPU set before HT removal: %s\n", cpu_str(cpus));
247 for (
int i = CPU_SETSIZE-1;
i >=0; --
i) {
248 if (!CPU_ISSET(
i, cpus))
250 parse_siblings(
i, cpus, 1);
253 fprintf(stderr,
"CPU set after HT removal: %s\n", cpu_str(cpus));
257 static void remove_hyperthreads(cpu_set_t *cpus)
268 #ifdef HAVE_SCHED_GETAFFINITY
270 if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
272 remove_hyperthreads(&saved_cpuset);
273 return CPU_COUNT(&saved_cpuset);
276 #ifdef HAVE_GET_NPROCS // defined(__GLIBC__) && __GLIBC__ >= 2
277 cpus = get_nprocs ();
280 #elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
283 cpuinfo = fopen (
"/proc/cpuinfo",
"r");
286 while (!feof (cpuinfo)) {
287 fgets (buf, 128, cpuinfo);
288 if (
CSTD__ memcmp (buf+1,
"rocessor", 8) == 0)
293 fprintf (stderr,
"%i CPUs detected.\n", cpus);
295 return (cpus == 0 ? 1: cpus);
298 #ifdef HAVE_GETLOADAVG
299 static int loadavg ()
302 int err = getloadavg (loads, 3);
304 return (
int)loads[0];
306 return (
int)loads[1];
333 cback callback(ctor, dtor, parm);
334 thread_cbacks.
append(&callback);
340 cback callback(ctor, dtor, parm);
342 BCHK(!cbk,
NumErr, Deregister unrgistered callback, (
long int)ctor, );
350 fprintf (stderr,
" Thread (%i) synchronization problem!\n", tc->
t_no);
351 fflush (stderr); abort ();
366 static int cpu_to_node(
int cpu)
369 for (
int n = 0; n < numa_num_possible_nodes(); ++n) {
370 sprintf(fn,
"/sys/devices/system/node/node%i/cpu%i", n, cpu);
371 if (!access(fn, R_OK))
377 int do_numa_init(
unsigned int cpu,
struct thr_struct *
ts,
unsigned long stack)
379 int node = cpu_to_node(cpu);
380 struct bitmask *nodes = numa_allocate_nodemask();
383 numa_bitmask_clearall(nodes);
384 numa_bitmask_setbit(nodes, node);
385 numa_set_preferred(node);
386 #ifdef HAVE_SCHED_GETAFFINITY
387 numa_set_membind(nodes);
394 #ifdef HAVE_SCHED_GETAFFINITY
396 CPU_SET(cpu, &cpuset);
397 parse_siblings(cpu, &cpuset, 0);
398 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
399 pthread_setaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
401 sched_setaffinity(ts->
t_pid, CPU_SETBYTES, &cpuset);
406 void *pages[] = {(
void*)(stack & page_mask)};
407 const int nodes[] = {node};
408 int status = node, oldstat;
410 numa_move_pages(0, 1, pages,
411 NULL, &status, MPOL_MF_MOVE);
414 err = numa_move_pages(0, 1, pages,
415 nodes, &status, MPOL_MF_MOVE);
417 fprintf(stderr,
"Numa thr %i(tid %p, lwp %i): CPUs %s Node %i (Stack @%p: %i->%i(%li)\n",
419 cpu_str(&cpuset), node, pages[0], oldstat,
423 numa_free_nodemask(nodes);
427 void numa_init_job(
struct thr_ctrl *tc)
434 int do_numa_move_pages(
int node,
int fault_in,
unsigned long firstaddr,
unsigned long lastaddr)
437 int nodes[PG_PC], status[PG_PC];
438 int nr_pages = 0;
int nr_moved = 0;
439 unsigned long orig_addr = firstaddr;
441 memset(status, 0, PG_PC*
sizeof(
int));
443 if (firstaddr - (firstaddr & page_mask) >= page_size/2)
444 firstaddr = firstaddr & page_mask;
446 firstaddr = (firstaddr + (page_size - 1)) & page_mask;
447 if (lastaddr - (lastaddr & page_mask) > page_size/2)
448 lastaddr = lastaddr & page_mask;
450 lastaddr = (lastaddr + (page_size - 1)) & page_mask;
452 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i: %li pg %i calls)\n",
453 (
void*)firstaddr, (
void*)lastaddr, node,
454 (lastaddr-firstaddr)/page_size,
455 (lastaddr-firstaddr)/page_size/PG_PC);
457 while (lastaddr > firstaddr) {
459 for (i = 0; (i < PG_PC) && (firstaddr < lastaddr); ++
i, firstaddr += page_size) {
460 pages[
i] = (
void*)firstaddr;
464 if (
UNLIKELY(firstaddr < orig_addr))
465 *(
volatile int*)orig_addr = *(
volatile int*)orig_addr;
467 *(
volatile int*)firstaddr = *(
volatile int*)firstaddr;
471 long err = numa_move_pages(0, i, pages,
472 NULL, status, MPOL_MF_MOVE);
474 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i): %s\n",
475 pages[0], pages[i-1], node, strerror(errno));
476 int to_move = 0; err = 0;
477 for (
int j = 0; j <
i; ++j)
478 if (status[j] != node)
481 fprintf(stderr,
"numa_move_pages(%p -- %p: %i pages, move %i to node %i (from %i and others)\n",
482 pages[0], pages[i-1], i, to_move, node, status[0]);
486 err = numa_move_pages(0, i, pages,
487 nodes, status, MPOL_MF_MOVE);
489 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i): %s\n",
490 pages[0], pages[i-1], node, strerror(errno));
495 void numa_move_pages_job(
struct thr_ctrl *tc)
499 (
unsigned long)tc->
t_par[0],
500 (
unsigned long)tc->
t_par[1]);
505 #if defined(__i386__) || defined(__x86_64__)
506 # define _cpu_relax() asm ("rep; nop")
508 # define _cpu_relax() do {} while(0)
521 # define POLL_REP2 168
527 while (w-- && (
signed)(rd = read(fd, ptr, sz)) < (
signed)sz)
541 fcntl(fd, F_SETFL, 0);
543 rd = read(fd, ptr, sz);
545 fcntl(fd, F_SETFL, O_NONBLOCK);
555 #if defined(HAVE_TLS) || defined(HAVE_DTLS)
561 memset(&tc, 0,
sizeof(tc));
562 memset(&out, 0,
sizeof(out));
565 fprintf (stderr,
" Thread %i: Try to get setup lock\n", tc.
t_no);
570 fprintf (stderr,
" Thread %i setup: id %08lx, lwp %i; signal main thread ...\n",
580 fprintf(stderr,
"Thread %i: Reading failed! %i\n", tc.
t_no, err);
584 fprintf (stderr,
" Thread %i: Start job %li @ %p\n", ts->
t_no, tc.
t_job_no, tc.
t_job);
594 fprintf (stderr,
" Thread %i: Job done!\n", ts->
t_no);
600 fprintf(stderr,
"Thread %i: Writing failed! %i\n", ts->
t_no, err);
604 fprintf (stderr,
" Thread %i: Signaled Job done!\n", ts->
t_no);
609 fprintf (stderr,
" Thread %i: exit!\n", ts->
t_no);
614 #if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
616 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
617 const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
618 out.
t_retval = (
long)(secs * CLOCKS_PER_SEC);
629 int t,
err, det_cpus, all_cpus;
630 pthread_mutexattr_t mutattr;
639 #ifdef HAVE_GETLOADAVG
642 int lavg = loadavg ();
646 if (num_threads < 2 && det_cpus > 2)
663 fprintf(stderr,
"Warning: Number of threads %i larger than no of CPU cores %i!\n",
665 #ifdef HAVE_SCHED_GETAFFINITY
682 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
700 fprintf(stderr,
"Failed to create pipe for thread %i\n", t);
705 fprintf(stderr,
"Failed to create pipe for thread %i\n", t);
713 fprintf (stderr,
"%i threads @%p (sz=%zi) set up.\n",
714 num_threads, threads,
sizeof(
struct thr_struct));
718 c->ctor(
c->parm, num_threads);
727 #ifdef HAVE_SCHED_GETAFFINITY
729 CPU_ZERO(&saved_cpuset);
732 fprintf(stderr,
"TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
734 cpu_set_t cpuset; CPU_ZERO(&cpuset);
737 int next_cpu = next_set_bit(0, &saved_cpuset);
743 if (numa_max_node() == 0)
746 page_size = numa_pagesize();
747 page_mask = ~(page_size-1ULL);
751 fprintf(stderr,
"NUMA: avail %i, maxnode %i\n",
numa_avail, numa_max_node());
754 CPU_SET(next_cpu, &cpuset);
756 add_siblings(next_cpu, &cpuset);
761 ts.
t_id = pthread_self();
765 #ifdef HAVE_SCHED_GETAFFINITY
772 next_cpu, cpu_str(&cpuset));
777 next_cpu, cpu_str(&cpuset));
783 int several_nodes = 0;
786 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
788 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
789 CPU_SET(next_cpu, &cpuset);
791 add_siblings(next_cpu, &cpuset);
797 (
void*)ts, (
void*)0);
800 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
801 pthread_setaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
806 fprintf(stderr,
"Set Thread %i: CPUs %s\n", t,
810 pthread_getaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
811 fprintf(stderr,
"Get Thread %i: CPUs %s\n", t,
822 #ifdef HAVE_SCHED_GETAFFINITY
824 fprintf(stderr,
"Main thread: CPUs %s, Node %i\n",
827 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
830 pthread_getaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
831 fprintf(stderr,
"Thread %i: CPUs %s, Node %i\n",
835 if (!several_nodes) {
838 fprintf(stderr,
"All threads on node %i, disabling NUMA\n",
844 int omp_thr = omp_get_max_threads();
846 fprintf(stderr,
"Now setting affinity for %i OpenMP threads ...\n", omp_thr);
848 #pragma omp parallel for private(cpuset, next_cpu) schedule(static,1)
849 for (
int t = 0; t < omp_thr; ++t) {
850 int tid = omp_get_thread_num();
851 next_cpu = next_set_bit(0, &saved_cpuset);
852 for (
int i = 0; i < t; ++
i) {
853 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
855 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
858 CPU_SET(next_cpu, &cpuset);
860 add_siblings(next_cpu, &cpuset);
861 sched_setaffinity(0, CPU_SETBYTES, &cpuset);
864 fprintf(stderr,
"Setting OMP thread %i to %i CPUs %s\n", tid, CPU_COUNT(&cpuset), cpu_str(&cpuset));
883 memset(&job, 0,
sizeof(job));
890 TCHK(pthread_join (ts->
t_id, &res));
899 fprintf (stderr,
" CPU time for thread %i :%7.3f s\n",
900 ts->
t_no, (
double)(tm)/CLOCKS_PER_SEC);
905 c->dtor(
c->parm, num_threads);
907 clock_t mainclock = clock();
908 #ifdef HAVE_CLOCK_GETTIME
911 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
912 double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
913 mainclock = (
long)(secs*CLOCKS_PER_SEC);
916 tot_cpu_tm += mainclock;
918 fprintf (stderr,
" CPU time for main thr :%7.3f s\n",
919 (
double)mainclock/CLOCKS_PER_SEC);
920 fprintf (stderr,
" CPU for all threads :%7.3f s\n",
921 (
double)(tot_cpu_tm)/CLOCKS_PER_SEC);
922 fprintf (stderr,
" Poll successes: %li, unexp. succ.: %li, failures: %li, exp. fails: %li\n",
923 poll_succ, poll_usucc, poll_fail, poll_efail);
928 num_threads = 0; threads = 0;
929 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
933 #ifdef HAVE_SCHED_GETAFFINITY
945 const unsigned long off,
const unsigned long sz,
948 void * par;
unsigned t = 0;
953 memset(&job, 0,
sizeof(job));
960 while ((par = va_arg (vl,
void*)))
961 job.
t_par[t++] = par;
965 fprintf (stderr,
"Signal thread %i\n", ts->
t_no);
969 fprintf(stderr,
"Signaling job %li to thread %i failed",
978 const unsigned long off,
const unsigned long sz, ...)
987 const unsigned long sz, ...)
1002 fprintf (stderr,
"Wait for thread %i\n", ts->
t_no);
1005 fprintf(stderr,
"Waiting for thread %i failed!\n", ts->
t_no);
1010 fprintf (stderr,
"Thread %i signaled completion.\n", ts->
t_no);
1022 fprintf (stderr,
"Wait for result of thread %i\n", ts->
t_no);
1025 fprintf(stderr,
"Waiting for thread %i failed!\n", ts->
t_no);
1030 fprintf (stderr,
"Thread %i signaled completion\n", ts->
t_no);
1039 omp_set_num_threads(0);
1048 fprintf (stderr,
"reenable_threads(): Threads already enabled.!\n");
1070 { omp_set_num_threads(0); }
1076 { omp_set_num_threads(omp_get_num_procs()); }
1081 const unsigned long sz, ...))
1089 while ((par = va_arg (vl,
void*)))
1090 tc->
t_par[t++] = par;
1092 fprintf (stderr,
"Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1093 # ifdef ABORT_ON_ERR
1099 const unsigned long off,
const unsigned long sz, ...))
1107 while ((par = va_arg (vl,
void*)))
1108 tc->
t_par[t++] = par;
1110 fprintf (stderr,
"Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1111 # ifdef ABORT_ON_ERR
1122 unsigned int curr_n_thr
WEAKA;
1123 unsigned int last_n_thr
WEAKA;
1124 unsigned int prev_n_thr
WEAKA;
#define BCHKNR(cond, exc, txt, ind)
void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
#define _TBCI_CWD_DEFAULT
volatile char t_res_dummy[16]
unsigned int tbci_control
const Vector< T > const Vector< T > const Vector< T > & p
void _thread_start_off(const int thr_no, thr_job_t ljob, const unsigned long off, const unsigned long sz, va_list vl)
void deinit(const int thr)
unsigned long t_job_output_no
void lina_empty(struct thr_ctrl *dummy)
int t_pipe_from_thread[2]
void thread_start(const int thr_no, thr_job_t job, const unsigned long sz,...)
exception base class for the TBCI NumLib
static List< cback > thread_cbacks
#define BCHK(cond, exc, txt, ind, rtval)
cback(cbackfn ct, cbackfn dt, void *p)
NAMESPACE_END NAMESPACE_TBCI unsigned int tbci_control WEAKA
void bind_threads(bool bind_main, bool enable_numa, bool add_sibl)
void(* thr_job_t)(struct thr_ctrl *)
Before the double inclusion guard on purpose!
NAMESPACE_TBCI int num_threads
void * lina_thread(void *thr)
struct thr_struct * threads
#define PREFETCH_R(addr, loc)
In case gcc does not yet support __builtin_prefetch(), we have handcoded assembly with gcc for a few ...
static int detect_num_cpu(int rmv_ht)
void * empty_thread(void *dummy)
void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
volatile char t_res_dummy[16]
static int busy_read(int fd, void *ptr, size_t sz, int rep=1)
THREAD__ struct thr_struct * this_thread
void *(* useful_job_t)(void *)
void thread_wait(const int thr_no, struct job_output *out)
double thread_wait_result(const int thr_no)
THREAD__ int ismainthread
int init_threads(const int num_cpu, const bool load_magic)
const Vector< T > const Vector< T > const Vector< T > int T T & err
static unsigned long job_no
T * setcurr(const T *rec) const
void thread_start_off(const int thr_no, thr_job_t job, const unsigned long off, const unsigned long sz,...)
void lina_err(struct thr_ctrl *tc)