Tachyon (current)  Current Main Branch
threads.c
Go to the documentation of this file.
1 /*
2  * threads.c - code for spawning threads on various platforms.
3  *
4  * (C) Copyright 1994-2022 John E. Stone
5  * SPDX-License-Identifier: BSD-3-Clause
6  *
7  * $Id: threads.c,v 1.111 2022/04/08 08:06:29 johns Exp $
8  *
9  */
10 
18 /*
19  * XXX will need to rename threads.[ch] src to avoid collision with
20  * the new headers included in the C11 standard and later
21  */
22 
23 /*
24  To use the WorkForce threading routines outside of Tachyon,
25  run the following sed commands to generate the WKF variants of the
26  function names and macros. This method supercedes the old hand-edited
27  versions that I previously maintained, and retains the support for
28  UI threads etc. These are written in c-shell (/bin/csh) syntax:
29  sed -e 's/rt_/wkf_/g' threads.c >! /tmp/tmp1
30  sed -e 's/defined(THR)/defined(WKFTHREADS)/g' /tmp/tmp1 >! /tmp/tmp2
31  sed -e 's/#ifdef THR/#ifdef WKFTHREADS/g' /tmp/tmp2 >! /tmp/tmp1
32  sed -e 's/#ifndef THR/#ifndef WKFTHREADS/g' /tmp/tmp1 >! /tmp/tmp2
33  sed -e 's/RTUSE/WKFUSE/g' /tmp/tmp2 >! /tmp/tmp1
34  sed -e 's/RTFORCE/WKFFORCE/g' /tmp/tmp1 >! /tmp/tmp2
35  sed -e 's/threads.h/WKFThreads.h/g' /tmp/tmp2 >! /tmp/tmp1
36  sed -e 's/RT_/WKF_/g' /tmp/tmp1 >! /tmp/tmp2
37  sed -e 's/STAWKF_ROUTINE/START_ROUTINE/g' /tmp/tmp2 >! /tmp/tmp1
38  sed -e 's/RTTHREAD/WKFTHREAD/g' /tmp/tmp1 >! /tmp/tmp2
39  sed -e 's/ THR / WKFTHREADS /g' /tmp/tmp2 >! /tmp/tmp1
40  sed -e 's/THRUSE/WKFUSE/g' /tmp/tmp1 >! /tmp/WKFThreads.C
41 
42  sed -e 's/rt_/wkf_/g' threads.h >! /tmp/tmp1
43  sed -e 's/defined(THR)/defined(WKFTHREADS)/g' /tmp/tmp1 >! /tmp/tmp2
44  sed -e 's/#ifdef THR/#ifdef WKFTHREADS/g' /tmp/tmp2 >! /tmp/tmp1
45  sed -e 's/#ifndef THR/#ifndef WKFTHREADS/g' /tmp/tmp1 >! /tmp/tmp2
46  sed -e 's/RTUSE/WKFUSE/g' /tmp/tmp2 >! /tmp/tmp1
47  sed -e 's/RTFORCE/WKFFORCE/g' /tmp/tmp1 >! /tmp/tmp2
48  sed -e 's/threads.h/WKFThreads.h/g' /tmp/tmp2 >! /tmp/tmp1
49  sed -e 's/RT_/WKF_/g' /tmp/tmp1 >! /tmp/tmp2
50  sed -e 's/STAWKF_ROUTINE/START_ROUTINE/g' /tmp/tmp2 >! /tmp/tmp1
51  sed -e 's/RTTHREAD/WKFTHREAD/g' /tmp/tmp1 >! /tmp/tmp2
52  sed -e 's/ THR / WKFTHREADS /g' /tmp/tmp2 >! /tmp/tmp1
53  sed -e 's/THRUSE/WKFUSE/g' /tmp/tmp1 >! /tmp/WKFThreads.h
54  */
55 
56 #include <stdio.h>
57 #include <stdlib.h>
58 #include <string.h>
59 
64 #if defined(__linux)
65 #define _GNU_SOURCE 1
66 #include <sched.h>
67 #endif
68 
69 #include "threads.h"
70 
71 /* needed for CPU info APIs and flag macros */
72 #if (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1300)) || (defined(_MSC_VER) && (_MSC_VER >= 1916))
73 #include <emmintrin.h>
74 #include <immintrin.h>
75 #endif
76 
77 
78 #ifdef _MSC_VER
79 #if 0
80 #define RTUSENEWWIN32APIS 1
81 #define _WIN32_WINNT 0x0400
82 #define WINVER 0x0400
83 #endif
84 #include <windows.h>
85 #include <winbase.h>
86 #endif
87 
88 #if defined(_AIX) || defined(_CRAY) || defined(__irix) || defined(__linux) || defined(__osf__) || defined(__sun)
89 #include <unistd.h>
90 #endif
91 
92 #if defined(__APPLE__) && defined(THR)
93 #if 1
94 #include <sys/types.h>
95 #include <sys/sysctl.h>
96 #else
97 #include <Carbon/Carbon.h>
98 #endif
99 #endif
100 
101 #if defined(__linux) && (defined(ARCH_LINUXARM64) || defined(__ARM_ARCH_ISA_A64) || defined(__ARM_NEON))
102 #include <sys/auxv.h>
103 #endif
104 
105 #if defined(__hpux)
106 #include <sys/mpctl.h>
107 #endif
108 
109 
110 #ifdef __cplusplus
111 extern "C" {
112 #endif
113 
115  int a=1;
116 
117 #ifdef THR
118 #if defined(__APPLE__)
119 #if 1
120  int rc;
121  int mib[2];
122  u_int miblen;
123  size_t alen = sizeof(a);
124  mib[0] = CTL_HW;
125  mib[1] = HW_AVAILCPU;
126  miblen = 2;
127  rc = sysctl(mib, miblen, &a, &alen, NULL, 0);
128  if (rc < 0) {
129  perror("Error during sysctl() query for CPU count");
130  a = 1;
131  }
132 #else
133  a = MPProcessorsScheduled();
134 #endif
135 #endif
136 
137 #ifdef _MSC_VER
138  struct _SYSTEM_INFO sysinfo;
139  GetSystemInfo(&sysinfo);
140  a = sysinfo.dwNumberOfProcessors;
141 #endif /* _MSC_VER */
142 
143 #if defined(__PARAGON__)
144  a=2;
145 #endif /* __PARAGON__ */
146 
147 #if defined(_CRAY)
148  a = sysconf(_SC_CRAY_NCPU);
149 #endif
150 
151 #if defined(ANDROID) || defined(USEPHYSCPUCOUNT)
152  /* Android devices and the NVIDIA/SECO "CARMA" and "Kayla" */
153  /* boards toggles cores on/off according to system activity, */
154  /* thermal management, and battery state. For now, we will */
155  /* use as many threads as the number of physical cores since */
156  /* the number that are online may vary even over a 2 second */
157  /* time window. We will likely have this issue on many more */
158  /* platforms as power management becomes more important... */
159 
160  /* use sysconf() for initial guess, although it produces incorrect */
161  /* results on the older android releases due to a bug in the platform */
162  a = sysconf(_SC_NPROCESSORS_CONF);
164  /* check CPU count by parsing /sys/devices/system/cpu/present and use */
165  /* whichever result gives the larger CPU count... */
166  {
167  int rc=0, b=1, i=-1, j=-1;
168  FILE *ifp;
169 
170  ifp = fopen("/sys/devices/system/cpu/present", "r");
171  if (ifp != NULL) {
172  rc = fscanf(ifp, "%d-%d", &i, &j); /* read and interpret line */
173  fclose(ifp);
174 
175  if (rc == 2 && i == 0) {
176  b = j+1; /* 2 or more cores exist */
177  }
178  }
179 
180  /* return the greater CPU count result... */
181  a = (a > b) ? a : b;
182  }
183 #else
184 #if defined(__sun) || defined(__linux) || defined(__osf__) || defined(_AIX)
185  a = sysconf(_SC_NPROCESSORS_ONLN);
186 #endif /* SunOS, and similar... */
187 #endif /* Android */
188 
189 #if defined(__irix)
190  a = sysconf(_SC_NPROC_ONLN);
191 #endif /* IRIX */
192 
193 #if defined(__hpux)
194  a = mpctl(MPC_GETNUMSPUS, 0, 0);
195 #endif /* HPUX */
196 #endif /* THR */
197 
198  return a;
199 }
200 
201 
203  int a=1;
204 
205 #ifdef THR
206  /* Allow the user to override the number of CPUs for use */
207  /* in scalability testing, debugging, etc. */
208  char *forcecount = getenv("RTFORCECPUCOUNT");
209  if (forcecount != NULL) {
210  if (sscanf(forcecount, "%d", &a) == 1) {
211  return a; /* if we got a valid count, return it */
212  } else {
213  a=1; /* otherwise use the real available hardware CPU count */
214  }
215  }
216 
217  /* otherwise return the number of physical processors currently available */
219 
220  /* XXX we should add checking for the current CPU affinity masks here, */
221  /* and return the min of the physical processor count and CPU affinity */
222  /* mask enabled CPU count. */
223 #endif /* THR */
224 
225  return a;
226 }
227 
228 
229 /*
230  * Functions supporting processor-specific runtime dispatch for hand-written
231  * kernels using SIMD vector intrinsics or other highly specialized routines.
232  */
233 #define RT_USEINTCPUID 1
234 #if defined(RT_USEINTCPUID) && (defined(__GNUC__) || defined(__INTEL_COMPILER) || (defined(_MSC_VER) && (_MSC_VER >= 1916))) && (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86) || defined(_M_AMD64))
235 #if 1
236 //static void rt_cpuid(uint32_t eax, uint32_t ecx, uint32_t* abcd) {
237 static void rt_cpuid(unsigned int eax, unsigned int ecx, unsigned int* abcd) {
238 #if defined(_MSC_VER)
239  __cpuidex((int*)abcd, eax, ecx);
240 #else
241 // uint32_t ebx, edx;
242  unsigned int ebx=0, edx=0;
243 #if defined(__i386__) && defined (__PIC__)
244  /* in case of PIC under 32-bit EBX cannot be clobbered */
245  __asm__("movl %%ebx, %%edi \n\t cpuid \n\t xchgl %%ebx, %%edi" : "=D" (ebx),
246 #else
247  __asm__("cpuid" : "+b" (ebx),
248 #endif
249  "+a" (eax), "+c" (ecx), "=d" (edx));
250  abcd[0] = eax; abcd[1] = ebx; abcd[2] = ecx; abcd[3] = edx;
251 #endif
252  }
253 #else
254 static void rt_cpuid(unsigned int eax, unsigned int ecx, unsigned int *info) {
255  __asm__ __volatile__(
256  "xchg %%ebx, %%edi;"
257  "cpuid;"
258  "xchg %%ebx, %%edi;"
259  :"=a" (info[0]), "=D" (info[1]), "=c" (info[2]), "=d" (info[3])
260  :"0" (eax)
261  );
262 }
263 #endif
264 
265 static unsigned long long rt_xgetbv(unsigned int index) {
266 #if defined(_MSC_VER)
267  return _xgetbv(index);
268 #else
269  unsigned int eax=0, edx=0;
270  __asm__ __volatile__(
271  "xgetbv;"
272  : "=a" (eax), "=d"(edx)
273  : "c" (index)
274  );
275  return ((unsigned long long) edx << 32) | eax;
276 #endif
277 }
278 #endif
279 
280 
282  int flags=CPU_UNKNOWN;
283  int smtdepth = CPU_SMTDEPTH_UNKNOWN;
284 
285 #if defined(RT_USEINTCPUID) && (defined(__GNUC__) || defined(__INTEL_COMPILER) || (defined(_MSC_VER) && (_MSC_VER >= 1916))) && (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86) || defined(_M_AMD64))
286 #define RT_INTERNAL_ENABLE_CPUCAP_BAILOUT 1
287  // https://software.intel.com/content/www/us/en/develop/articles/how-to-detect-new-instruction-support-in-the-4th-generation-intel-core-processor-family.html
288  // https://stackoverflow.com/questions/6121792/how-to-check-if-a-cpu-supports-the-sse3-instruction-set
289  // https://gist.github.com/hi2p-perim/7855506
290  // http://www.hugi.scene.org/online/coding/hugi%2016%20-%20corawhd4.htm
291  // http://www.geoffchappell.com/studies/windows/km/cpu/precpuid.htm
292  // http://www.geoffchappell.com/studies/windows/km/cpu/cpuid/index.htm
293  // https://www.sandpile.org/x86/cpuid.htm
294  // https://lemire.me/blog/2020/07/17/the-cost-of-runtime-dispatch/
295  // https://github.com/google/cpu_features/
296  // https://github.com/klauspost/cpuid
297  // https://github.com/anrieff/libcpuid/tree/master/libcpuid
298  // Considerations about clock rate capping and false dependencies
299  // when high AVX/AVX-512 registers are considered "in use" with
300  // stale data, unless cleared, e.g., by _mm256_zeroupper():
301  // https://blog.cloudflare.com/on-the-dangers-of-intels-frequency-scaling/
302  // https://www.agner.org/optimize/blog/read.php?i=857
303  unsigned int vendcpuinfo[4] = { 0 };
304  unsigned int cpuinfo[4] = { 0 };
305  unsigned long long xcrFeatureMask = 0;
306  int havexmmymm = 0;
307  int havezmmmask = 0;
308  int haveosxsave = 0;
309 
310  rt_cpuid(0, 0, vendcpuinfo); /* get vendor string, highest function code */
311  if (vendcpuinfo[0] == 0)
312  goto nocpuinfo; /* bail on very primitive CPU type, max fctn code==0 */
313 
314  rt_cpuid(1, 0, cpuinfo); /* get various SIMD extension flags */
315  haveosxsave = (cpuinfo[2] & (1 << 27)) != 0; /* OS save/restore xmm regs */
316 
317  flags = 0;
318  flags |= ((cpuinfo[2] & (1 << 19)) != 0) * CPU_SSE4_1;
319  flags |= ((cpuinfo[2] & (1 << 29)) != 0) * CPU_F16C;
320  flags |= ((cpuinfo[2] & (1 << 31)) != 0) * CPU_HYPERVISOR;
321  flags |= ((cpuinfo[3] & (1 << 26)) != 0) * CPU_SSE2;
322  flags |= ((cpuinfo[3] & (1 << 28)) != 0) * CPU_HT;
323 
324  /* if we have AVX, we need to call xgetbv too */
325  if ((cpuinfo[2] & (1 << 28)) != 0) {
326  xcrFeatureMask = rt_xgetbv(0);
327  havexmmymm = (xcrFeatureMask & 0x06) == 0x06;
328  havezmmmask = (xcrFeatureMask & 0xE6) == 0xE6;
329  }
330 
331  flags |= (((cpuinfo[2] & (1 << 12)) != 0) &&
332  havexmmymm && haveosxsave) * CPU_FMA;
333 
334  flags |= (((cpuinfo[2] & (1 << 28)) != 0) &&
335  havexmmymm && haveosxsave) * CPU_AVX;
336 
337  /* check that we can call CPUID function 7 */
338  if (cpuinfo[0] >= 0x7) {
339  unsigned int extcpuinfo[4] = { 0 };
340  rt_cpuid(7, 0, extcpuinfo);
341 
342  flags |= (((extcpuinfo[1] & (1 << 5)) != 0) &&
343  havexmmymm && haveosxsave) * CPU_AVX2;
344 
345  flags |= (((extcpuinfo[1] & (1 << 16)) != 0) &&
346  havezmmmask && haveosxsave) * CPU_AVX512F;
347  flags |= (((extcpuinfo[1] & (1 << 26)) != 0) &&
348  havezmmmask && haveosxsave) * CPU_AVX512PF;
349  flags |= (((extcpuinfo[1] & (1 << 27)) != 0) &&
350  havezmmmask && haveosxsave) * CPU_AVX512ER;
351  flags |= (((extcpuinfo[1] & (1 << 28)) != 0) &&
352  havezmmmask && haveosxsave) * CPU_AVX512CD;
353  }
354 
355  smtdepth = 1;
356  if (flags & CPU_HT) {
357 #if 1
358  /* XXX correct this for Phi, OS/BIOS settings */
359  smtdepth = 2;
360 
361  /* XXX Hack to detect Xeon Phi CPUs since no other CPUs */
362  /* support AVX-512ER or AVX-512PF (yet...) */
363  if ((flags & CPU_AVX512ER) && (flags & CPU_AVX512PF)) {
364  smtdepth = 4;
365  }
366 #else
367  int logicalcores = (cpuinfo[1] >> 16) && 0xFF;
368  int physicalcores = logicalcores;
369  char vendor[16] = { 0 };
370  ((unsigned *)vendor)[0] = vendcpuinfo[1];
371  ((unsigned *)vendor)[1] = vendcpuinfo[3];
372  ((unsigned *)vendor)[2] = vendcpuinfo[2];
373 
374  /* hmm, not quite right yet */
375  if (!strcmp(vendor, "GenuineIntel")) {
376  unsigned int corecpuinfo[4] = { 0 };
377  rt_cpuid(4, 0, corecpuinfo);
378  physicalcores = ((corecpuinfo[0] >> 26) & 0x3f) + 1;
379  } else if (!strcmp(vendor, "AuthenticAMD")) {
380  unsigned int corecpuinfo[4] = { 0 };
381  rt_cpuid(0x80000008, 0, corecpuinfo);
382  physicalcores = (corecpuinfo[2] & 0xFF) + 1;
383  }
384 
385 printf("cpuinfo: %d / %d vend: %s\n", logicalcores, physicalcores, vendor);
386 
387  smtdepth = logicalcores / physicalcores;
388 #endif
389  }
390 
391 #elif defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1300)
392 
393  // https://software.intel.com/content/www/us/en/develop/documentation/cpp-compiler-developer-guide-and-reference/top/compiler-reference/intrinsics/intrinsics-for-all-intel-architectures/may-i-use-cpu-feature.html
394  flags = 0;
395  flags |= _may_i_use_cpu_feature(_FEATURE_SSE2) * CPU_SSE2;
396  flags |= _may_i_use_cpu_feature(_FEATURE_SSE4_1) * CPU_SSE4_1;
397  flags |= _may_i_use_cpu_feature(_FEATURE_AVX) * CPU_AVX;
398  flags |= _may_i_use_cpu_feature(_FEATURE_AVX2) * CPU_AVX2;
399  flags |= _may_i_use_cpu_feature(_FEATURE_FMA) * CPU_FMA;
400  flags |= _may_i_use_cpu_feature(_FEATURE_AVX512F) * CPU_AVX512F;
401  flags |= _may_i_use_cpu_feature(_FEATURE_AVX512CD) * CPU_AVX512CD;
402  flags |= _may_i_use_cpu_feature(_FEATURE_AVX512ER) * CPU_AVX512ER;
403  flags |= _may_i_use_cpu_feature(_FEATURE_AVX512PF) * CPU_AVX512PF;
404 
405 #elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
406 
407  // https://gcc.gnu.org/onlinedocs/gcc/x86-Built-in-Functions.html
408  flags = 0;
409  __builtin_cpu_init();
410  flags |= (__builtin_cpu_supports("sse2")!=0) * CPU_SSE2;
411  flags |= (__builtin_cpu_supports("sse4.1")!=0) * CPU_SSE4_1;
412  flags |= (__builtin_cpu_supports("avx")!=0) * CPU_AVX;
413  flags |= (__builtin_cpu_supports("avx2")!=0) * CPU_AVX2;
414  flags |= (__builtin_cpu_supports("fma")!=0) * CPU_FMA;
415  flags |= (__builtin_cpu_supports("avx512f")!=0) * CPU_AVX512F;
416  flags |= (__builtin_cpu_supports("avx512cd")!=0) * CPU_AVX512CD;
417  flags |= (__builtin_cpu_supports("avx512er")!=0) * CPU_AVX512ER;
418  flags |= (__builtin_cpu_supports("avx512pf")!=0) * CPU_AVX512PF;
419 
420 #elif defined(__linux) && (defined(ARCH_LINUXARM64) || defined(__ARM_ARCH_ISA_A64) || defined(__ARM_NEON))
421 
422  // https://golang.org/src/internal/cpu/cpu_arm64.go
423  // https://code.woboq.org/qt5/qtbase/src/corelib/tools/qsimd.cpp.html
424  // https://www.kernel.org/doc/html/latest/arm64/elf_hwcaps.html
425  // https://man7.org/linux/man-pages/man3/getauxval.3.html
426  // https://lists.cs.columbia.edu/pipermail/kvmarm/2017-August/026715.html
427  unsigned long auxval1=0;
428 // unsigned long auxval2=0;
429  auxval1 = getauxval(AT_HWCAP);
430 // auxval2 = getauxval(AT_HWCAP2);
431 // printf("WKFThreadsARM: %016lx %016lx\n", auxval1, auxval2);
432 
433  flags = 0;
434  flags |= ((auxval1 & HWCAP_FP) != 0) * CPU_ARM64_FP;
435 
436  flags |= ((auxval1 & HWCAP_ASIMD) != 0) * CPU_ARM64_ASIMD;
437  flags |= ((auxval1 & HWCAP_ASIMDHP) != 0) * CPU_ARM64_ASIMDHP;
438  flags |= ((auxval1 & HWCAP_ASIMDRDM) != 0) * CPU_ARM64_ASIMDRDM;
439  flags |= ((auxval1 & HWCAP_ASIMDDP) != 0) * CPU_ARM64_ASIMDDP;
440  flags |= ((auxval1 & HWCAP_ASIMDFHM) != 0) * CPU_ARM64_ASIMDFHM;
441 
442  flags |= ((auxval1 & HWCAP_SVE) != 0) * CPU_ARM64_SVE;
443 
444  flags |= ((auxval1 & HWCAP_AES) != 0) * CPU_ARM64_AES;
445  flags |= ((auxval1 & HWCAP_CRC32) != 0) * CPU_ARM64_CRC32;
446  flags |= ((auxval1 & HWCAP_SHA1) != 0) * CPU_ARM64_SHA1;
447  flags |= ((auxval1 & HWCAP_SHA2) != 0) * CPU_ARM64_SHA2;
448  flags |= ((auxval1 & HWCAP_SHA3) != 0) * CPU_ARM64_SHA3;
449  flags |= ((auxval1 & HWCAP_SHA512) != 0) * CPU_ARM64_SHA512;
450 
451 #endif
452 
453 #if defined(RT_INTERNAL_ENABLE_CPUCAP_BAILOUT)
454 nocpuinfo:
455 #endif
456  cpucaps->flags = flags;
457  cpucaps->smtdepth = smtdepth;
458 
459  if (flags == CPU_UNKNOWN)
460  return 1;
461 
462  return 0;
463 }
464 
465 
466 int rt_cpu_smt_depth(void) {
467  int smtdepth = CPU_SMTDEPTH_UNKNOWN;
468 
469 #if defined(RT_USEINTCPUID) && (defined(__GNUC__) || defined(__INTEL_COMPILER)) && (defined(__i386__) || defined(__x86_64__))
470  // x86 examples:
471  // https://software.intel.com/en-us/articles/methods-to-utilize-intels-hyper-threading-technology-with-linux
472  // https://stackoverflow.com/questions/2901694/how-to-detect-the-number-of-physical-processors-cores-on-windows-mac-and-linu
473  rt_cpu_caps_t cpucaps;
474  if (!rt_cpu_capability_flags(&cpucaps)) {
475  smtdepth = cpucaps.smtdepth;
476  }
477 #endif
478 
479  return smtdepth;
480 }
481 
482 
483 int * rt_cpu_affinitylist(int *cpuaffinitycount) {
484  int *affinitylist = NULL;
485  *cpuaffinitycount = -1; /* return count -1 if unimplemented or err occurs */
486 
487 /* Win32 process affinity mask query */
488 #if 0 && defined(_MSC_VER)
489  /* XXX untested, but based on the linux code, may work with a few tweaks */
490  HANDLE myproc = GetCurrentProcess(); /* returns a psuedo-handle */
491  DWORD affinitymask, sysaffinitymask;
492 
493  if (!GetProcessAffinityMask(myproc, &affinitymask, &sysaffinitymask)) {
494  /* count length of affinity list */
495  int affinitycount=0;
496  int i;
497  for (i=0; i<31; i++) {
498  affinitycount += (affinitymask >> i) & 0x1;
499  }
500 
501  /* build affinity list */
502  if (affinitycount > 0) {
503  affinitylist = (int *) malloc(affinitycount * sizeof(int));
504  if (affinitylist == NULL)
505  return NULL;
506 
507  int curcount = 0;
508  for (i=0; i<CPU_SETSIZE; i++) {
509  if (CPU_ISSET(i, &affinitymask)) {
510  affinitylist[curcount] = i;
511  curcount++;
512  }
513  }
514  }
515 
516  *cpuaffinitycount = affinitycount; /* return final affinity list */
517  }
518 #endif
519 
520 /* Linux process affinity mask query */
521 #if defined(__linux)
522 
523 /* protect ourselves from some older Linux distros */
524 #if defined(CPU_SETSIZE)
525  int i;
526  cpu_set_t affinitymask;
527  int affinitycount=0;
528 
529  /* PID 0 refers to the current process */
530  if (sched_getaffinity(0, sizeof(affinitymask), &affinitymask) < 0) {
531  perror("rt_cpu_affinitylist: sched_getaffinity");
532  return NULL;
533  }
534 
535  /* count length of affinity list */
536  for (i=0; i<CPU_SETSIZE; i++) {
537  affinitycount += CPU_ISSET(i, &affinitymask);
538  }
539 
540  /* build affinity list */
541  if (affinitycount > 0) {
542  affinitylist = (int *) malloc(affinitycount * sizeof(int));
543  if (affinitylist == NULL)
544  return NULL;
545 
546  int curcount = 0;
547  for (i=0; i<CPU_SETSIZE; i++) {
548  if (CPU_ISSET(i, &affinitymask)) {
549  affinitylist[curcount] = i;
550  curcount++;
551  }
552  }
553  }
554 
555  *cpuaffinitycount = affinitycount; /* return final affinity list */
556 #endif
557 #endif
558 
559  /* MacOS X 10.5.x has a CPU affinity query/set capability finally */
560  /* http://developer.apple.com/releasenotes/Performance/RN-AffinityAPI/ */
561 
562  /* Solaris and HP-UX use pset_bind() and related functions, and they */
563  /* don't use the single-level mask-based scheduling mechanism that */
564  /* the others, use. Instead, they use a hierarchical tree of */
565  /* processor sets and processes float within those, or are tied to one */
566  /* processor that's a member of a particular set. */
567 
568  return affinitylist;
569 }
570 
571 
573  int status=-1; /* unsupported by default */
574 
575 #ifdef THR
576 
577 #if defined(__linux) && defined(CPU_ZERO) && defined(CPU_SET)
578 #if defined(__MIC__)
579  /* XXX this is available on Intel MIC */
580  /* XXX this code is too new even for RHEL4, though it runs on Fedora 7 */
581  /* and other newer revs. */
582  /* NPTL systems can assign per-thread affinities this way */
583  cpu_set_t affinitymask;
584  CPU_ZERO(&affinitymask);
585  CPU_SET(cpu, &affinitymask);
586  status = pthread_setaffinity_np(pthread_self(), sizeof(affinitymask), &affinitymask);
587 #else
588  /* non-NPTL systems based on the clone() API must use this method */
589  cpu_set_t affinitymask;
590  CPU_ZERO(&affinitymask);
591  CPU_SET(cpu, &affinitymask);
592 
593  /* PID 0 refers to the current process */
594  if ((status=sched_setaffinity(0, sizeof(affinitymask), &affinitymask)) < 0) {
595  perror("rt_thread_set_self_cpuaffinitylist: sched_setaffinity");
596  return status;
597  }
598 #endif
599 
600  /* call sched_yield() so new affinity mask takes effect immediately */
601  sched_yield();
602 #endif /* linux */
603 
604  /* MacOS X 10.5.x has a CPU affinity query/set capability finally */
605  /* http://developer.apple.com/releasenotes/Performance/RN-AffinityAPI/ */
606 
607  /* Solaris and HP-UX use pset_bind() and related functions, and they */
608  /* don't use the single-level mask-based scheduling mechanism that */
609  /* the others, use. Instead, they use a hierarchical tree of */
610  /* processor sets and processes float within those, or are tied to one */
611  /* processor that's a member of a particular set. */
612 #endif
613 
614  return status;
615 }
616 
617 
619  int status=0;
620 
621 #ifdef THR
622 #if defined(__sun)
623 #ifdef USEPOSIXTHREADS
624  status = pthread_setconcurrency(nthr);
625 #else
626  status = thr_setconcurrency(nthr);
627 #endif
628 #endif /* SunOS */
629 
630 #if defined(__irix) || defined(_AIX)
631  status = pthread_setconcurrency(nthr);
632 #endif
633 #endif /* THR */
634 
635  return status;
636 }
637 
638 
639 /*
640  * Thread creation/management
641  */
643 typedef void * (*RTTHREAD_START_ROUTINE)(void *);
644 
645 int rt_thread_create(rt_thread_t * thr, void * fctn(void *), void * arg) {
646  int status=0;
647 
648 #ifdef THR
649 #ifdef _MSC_VER
650  DWORD tid; /* thread id, msvc only */
651  *thr = CreateThread(NULL, 8192, (LPTHREAD_START_ROUTINE) fctn, arg, 0, &tid);
652  if (*thr == NULL) {
653  status = -1;
654  }
655  // If we want to spawn the thread "detached" without ever joining it in the
656  // future, such that it's totally on its own, we need to call CloseHandle()
657  // immediately on creation so the handle doesn't leak. If we need to join
658  // later, we call CloseHandle() at the end of the join sync-up.
659  // CloseHandle(thr);
660 #endif /* _MSC_VER */
661 
662 #ifdef USEPOSIXTHREADS
663 #if defined(_AIX)
664  /* AIX schedule threads in system scope by default, have to ask explicitly */
665  {
666  pthread_attr_t attr;
667  pthread_attr_init(&attr);
668  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
669  status = pthread_create(thr, &attr, (RTTHREAD_START_ROUTINE)fctn, arg);
670  pthread_attr_destroy(&attr);
671  }
672 #elif defined(__PARAGON__)
673  status = pthread_create(thr, pthread_attr_default, fctn, arg);
674 #else
675  status = pthread_create(thr, NULL, (RTTHREAD_START_ROUTINE)fctn, arg);
676 #endif
677 #endif /* USEPOSIXTHREADS */
678 
679 #ifdef USEUITHREADS
680  status = thr_create(NULL, 0, (RTTHREAD_START_ROUTINE)fctn, arg, 0, thr);
681 #endif /* USEUITHREADS */
682 #endif /* THR */
683 
684  return status;
685 }
686 
687 
688 int rt_thread_join(rt_thread_t thr, void ** stat) {
689  int status=0;
690 
691 #ifdef THR
692 #ifdef _MSC_VER
693  DWORD wstatus = 0;
694 
695  wstatus = WAIT_TIMEOUT;
696 
697  while (wstatus != WAIT_OBJECT_0) {
698  wstatus = WaitForSingleObject(thr, INFINITE);
699  }
700  // Windows won't free the thread handle until both the thread terminates
701  // AND all existing handles to it are explicitly closed
702  CloseHandle(thr);
703 #endif /* _MSC_VER */
704 
705 #ifdef USEPOSIXTHREADS
706  status = pthread_join(thr, stat);
707 #endif /* USEPOSIXTHREADS */
708 
709 #ifdef USEUITHREADS
710  status = thr_join(thr, NULL, stat);
711 #endif /* USEPOSIXTHREADS */
712 #endif /* THR */
713 
714  return status;
715 }
716 
717 
718 /*
719  * Mutexes
720  */
722  int status=0;
723 
724 #ifdef THR
725 #ifdef _MSC_VER
726  InitializeCriticalSection(mp);
727 #endif /* _MSC_VER */
728 
729 #ifdef USEPOSIXTHREADS
730  status = pthread_mutex_init(mp, 0);
731 #endif /* USEPOSIXTHREADS */
732 
733 #ifdef USEUITHREADS
734  status = mutex_init(mp, USYNC_THREAD, NULL);
735 #endif /* USEUITHREADS */
736 #endif /* THR */
737 
738  return status;
739 }
740 
741 
743  int status=0;
744 
745 #ifdef THR
746 #ifdef _MSC_VER
747  EnterCriticalSection(mp);
748 #endif /* _MSC_VER */
749 
750 #ifdef USEPOSIXTHREADS
751  status = pthread_mutex_lock(mp);
752 #endif /* USEPOSIXTHREADS */
753 
754 #ifdef USEUITHREADS
755  status = mutex_lock(mp);
756 #endif /* USEUITHREADS */
757 #endif /* THR */
758 
759  return status;
760 }
761 
762 
764  int status=0;
765 
766 #ifdef THR
767 #ifdef _MSC_VER
768 #if defined(THRUSENEWWIN32APIS)
769  /* TryEnterCriticalSection() is only available on newer */
770  /* versions of Win32: _WIN32_WINNT/WINVER >= 0x0400 */
771  status = (!(TryEnterCriticalSection(mp)));
772 #endif
773 #endif /* _MSC_VER */
774 
775 #ifdef USEPOSIXTHREADS
776  status = (pthread_mutex_lock(mp) != 0);
777 #endif /* USEPOSIXTHREADS */
778 #endif /* THR */
779 
780  return status;
781 }
782 
783 
785  int status=0;
786 
787 #ifdef THR
788 #ifdef _MSC_VER
789 #if defined(THRUSENEWWIN32APIS)
790  /* TryEnterCriticalSection() is only available on newer */
791  /* versions of Win32: _WIN32_WINNT/WINVER >= 0x0400 */
792  while (!TryEnterCriticalSection(mp));
793 #else
794  EnterCriticalSection(mp);
795 #endif
796 #endif /* _MSC_VER */
797 
798 #ifdef USEPOSIXTHREADS
799  while ((status = pthread_mutex_trylock(mp)) != 0);
800 #endif /* USEPOSIXTHREADS */
801 #endif /* THR */
802 
803  return status;
804 }
805 
806 
808  int status=0;
809 
810 #ifdef THR
811 #ifdef _MSC_VER
812  LeaveCriticalSection(mp);
813 #endif /* _MSC_VER */
814 
815 #ifdef USEPOSIXTHREADS
816  status = pthread_mutex_unlock(mp);
817 #endif /* USEPOSIXTHREADS */
818 
819 #ifdef USEUITHREADS
820  status = mutex_unlock(mp);
821 #endif /* USEUITHREADS */
822 #endif /* THR */
823 
824  return status;
825 }
826 
827 
829  int status=0;
830 
831 #ifdef THR
832 #ifdef _MSC_VER
833  DeleteCriticalSection(mp);
834 #endif /* _MSC_VER */
835 
836 #ifdef USEPOSIXTHREADS
837  status = pthread_mutex_destroy(mp);
838 #endif /* USEPOSIXTHREADS */
839 
840 #ifdef USEUITHREADS
841  status = mutex_destroy(mp);
842 #endif /* USEUITHREADS */
843 #endif /* THR */
844 
845  return status;
846 }
847 
848 
849 /*
850  * Condition variables
851  */
853  int status=0;
854 
855 #ifdef THR
856 #ifdef _MSC_VER
857 #if defined(RTUSEWIN2008CONDVARS)
858  InitializeConditionVariable(cvp);
859 #else
860  /* XXX not implemented */
861  cvp->waiters = 0;
862 
863  /* Create an auto-reset event. */
864  cvp->events[RT_COND_SIGNAL] = CreateEvent(NULL, /* no security */
865  FALSE, /* auto-reset event */
866  FALSE, /* non-signaled initially */
867  NULL); /* unnamed */
868 
869  /* Create a manual-reset event. */
870  cvp->events[RT_COND_BROADCAST] = CreateEvent(NULL, /* no security */
871  TRUE, /* manual-reset */
872  FALSE, /* non-signaled initially */
873  NULL); /* unnamed */
874 #endif
875 #endif /* _MSC_VER */
876 
877 #ifdef USEPOSIXTHREADS
878  status = pthread_cond_init(cvp, NULL);
879 #endif /* USEPOSIXTHREADS */
880 #ifdef USEUITHREADS
881  status = cond_init(cvp, USYNC_THREAD, NULL);
882 #endif
883 #endif /* THR */
884 
885  return status;
886 }
887 
889  int status=0;
890 
891 #ifdef THR
892 #ifdef _MSC_VER
893 #if defined(RTUSEWIN2008CONDVARS)
894  /* XXX not implemented */
895 #else
896  CloseHandle(cvp->events[RT_COND_SIGNAL]);
897  CloseHandle(cvp->events[RT_COND_BROADCAST]);
898 #endif
899 #endif /* _MSC_VER */
900 
901 #ifdef USEPOSIXTHREADS
902  status = pthread_cond_destroy(cvp);
903 #endif /* USEPOSIXTHREADS */
904 #ifdef USEUITHREADS
905  status = cond_destroy(cvp);
906 #endif
907 #endif /* THR */
908 
909  return status;
910 }
911 
913  int status=0;
914 #if defined(THR) && defined(_MSC_VER)
915  int result=0;
916  LONG last_waiter;
917  LONG my_waiter;
918 #endif
919 
920 #ifdef THR
921 #ifdef _MSC_VER
922 #if defined(RTUSEWIN2008CONDVARS)
923  SleepConditionVariableCS(cvp, mp, INFINITE)
924 #else
925 #if !defined(RTUSEINTERLOCKEDATOMICOPS)
926  EnterCriticalSection(&cvp->waiters_lock);
927  cvp->waiters++;
928  LeaveCriticalSection(&cvp->waiters_lock);
929 #else
930  InterlockedIncrement(&cvp->waiters);
931 #endif
932 
933  LeaveCriticalSection(mp); /* SetEvent() keeps state, avoids lost wakeup */
934 
935  /* Wait either a single or broadcast even to become signalled */
936  result = WaitForMultipleObjects(2, cvp->events, FALSE, INFINITE);
937 
938 #if !defined(RTUSEINTERLOCKEDATOMICOPS)
939  EnterCriticalSection (&cvp->waiters_lock);
940  cvp->waiters--;
941  last_waiter =
942  ((result == (WAIT_OBJECT_0 + RT_COND_BROADCAST)) && cvp->waiters == 0);
943  LeaveCriticalSection (&cvp->waiters_lock);
944 #else
945  my_waiter = InterlockedDecrement(&cvp->waiters);
946  last_waiter =
947  ((result == (WAIT_OBJECT_0 + RT_COND_BROADCAST)) && my_waiter == 0);
948 #endif
949 
950  /* Some thread called cond_broadcast() */
951  if (last_waiter)
952  /* We're the last waiter to be notified or to stop waiting, so */
953  /* reset the manual event. */
954  ResetEvent(cvp->events[RT_COND_BROADCAST]);
955 
956  EnterCriticalSection(mp);
957 #endif
958 #endif /* _MSC_VER */
959 
960 #ifdef USEPOSIXTHREADS
961  status = pthread_cond_wait(cvp, mp);
962 #endif /* USEPOSIXTHREADS */
963 #ifdef USEUITHREADS
964  status = cond_wait(cvp, mp);
965 #endif
966 #endif /* THR */
967 
968  return status;
969 }
970 
972  int status=0;
973 
974 #ifdef THR
975 #ifdef _MSC_VER
976 #if defined(RTUSEWIN2008CONDVARS)
977  WakeConditionVariable(cvp);
978 #else
979 #if !defined(RTUSEINTERLOCKEDATOMICOPS)
980  EnterCriticalSection(&cvp->waiters_lock);
981  int have_waiters = (cvp->waiters > 0);
982  LeaveCriticalSection(&cvp->waiters_lock);
983  if (have_waiters)
984  SetEvent (cvp->events[RT_COND_SIGNAL]);
985 #else
986  if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0)
987  SetEvent(cvp->events[RT_COND_SIGNAL]);
988 #endif
989 #endif
990 #endif /* _MSC_VER */
991 
992 #ifdef USEPOSIXTHREADS
993  status = pthread_cond_signal(cvp);
994 #endif /* USEPOSIXTHREADS */
995 #ifdef USEUITHREADS
996  status = cond_signal(cvp);
997 #endif
998 #endif /* THR */
999 
1000  return status;
1001 }
1002 
1004  int status=0;
1005 
1006 #ifdef THR
1007 #ifdef _MSC_VER
1008 #if defined(RTUSEWIN2008CONDVARS)
1009  WakeAllConditionVariable(cvp);
1010 #else
1011 #if !defined(RTUSEINTERLOCKEDATOMICOPS)
1012  EnterCriticalSection(&cvp->waiters_lock);
1013  int have_waiters = (cvp->waiters > 0);
1014  LeaveCriticalSection(&cvp->waiters_lock);
1015  if (have_waiters)
1016  SetEvent(cvp->events[RT_COND_BROADCAST]);
1017 #else
1018  if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0)
1019  SetEvent(cvp->events[RT_COND_BROADCAST]);
1020 #endif
1021 
1022 #endif
1023 #endif /* _MSC_VER */
1024 
1025 #ifdef USEPOSIXTHREADS
1026  status = pthread_cond_broadcast(cvp);
1027 #endif /* USEPOSIXTHREADS */
1028 #ifdef USEUITHREADS
1029  status = cond_broadcast(cvp);
1030 #endif
1031 #endif /* THR */
1032 
1033  return status;
1034 }
1035 
1036 
1037 /*
1038  * Atomic integer ops -- Ideally implemented by fast machine instruction
1039  * fetch-and-add operations. Worst-case implementation
1040  * based on mutex locks and math ops if no other choice.
1041  */
1042 
1043 int rt_atomic_int_init(rt_atomic_int_t * atomp, int val) {
1044  memset(atomp, 0, sizeof(rt_atomic_int_t));
1045 #ifdef THR
1046 #if defined(USEGCCATOMICS)
1047  /* nothing to do here */
1048 #elif defined(USENETBSDATOMICS)
1049  /* nothing to do here */
1050 #elif defined(USESOLARISATOMICS)
1051  /* nothing to do here */
1052 #elif defined(USEWIN32ATOMICS)
1053  /* nothing to do here */
1054 #else /* use mutexes */
1055  rt_mutex_init(&atomp->lock);
1056 #endif
1057 #else
1058  /* nothing to do for non-threaded builds */
1059 #endif
1060  atomp->val = val;
1061 
1062  return 0;
1063 }
1064 
1065 
1067 #ifdef THR
1068 #if defined(USEGCCATOMICS)
1069  /* nothing to do here */
1070 #elif defined(USENETBSDATOMICS)
1071  /* nothing to do here */
1072 #elif defined(USESOLARISATOMICS)
1073  /* nothing to do here */
1074 #elif defined(USEWIN32ATOMICS)
1075  /* nothing to do here */
1076 #else /* use mutexes */
1077  rt_mutex_destroy(&atomp->lock);
1078 #endif
1079 #else
1080  /* nothing to do for non-threaded builds */
1081 #endif
1082 
1083  return 0;
1084 }
1085 
1086 
1087 int rt_atomic_int_set(rt_atomic_int_t * atomp, int val) {
1088  int retval;
1089 
1090 #ifdef THR
1091 #if defined(USEGCCATOMICS)
1092  /* nothing special to do here? */
1093  atomp->val = val;
1094  retval = val;
1095 #elif defined(USENETBSDATOMICS)
1096  /* nothing special to do here? */
1097  atomp->val = val;
1098  retval = val;
1099 #elif defined(USESOLARISATOMICS)
1100  /* nothing special to do here? */
1101  atomp->val = val;
1102  retval = val;
1103 #elif defined(USEWIN32ATOMICS)
1104  /* nothing special to do here? */
1105  atomp->val = val;
1106  retval = val;
1107 #else /* use mutexes */
1108  rt_mutex_lock(&atomp->lock);
1109  atomp->val = val;
1110  retval = atomp->val;
1111  rt_mutex_unlock(&atomp->lock);
1112 #endif
1113 #else
1114  /* nothing special to do here */
1115  atomp->val = val;
1116  retval = atomp->val;
1117 #endif
1118 
1119  return retval;
1120 }
1121 
1122 
1124  int retval;
1125 
1126 #ifdef THR
1127 #if defined(USEGCCATOMICS)
1128  /* nothing special to do here? */
1129  retval = atomp->val;
1130 #elif defined(USENETBSDATOMICS)
1131  /* nothing special to do here? */
1132  retval = atomp->val;
1133 #elif defined(USESOLARISATOMICS)
1134  /* nothing special to do here? */
1135  retval = atomp->val;
1136 #elif defined(USEWIN32ATOMICS)
1137  /* nothing special to do here? */
1138  retval = atomp->val;
1139 #else /* use mutexes */
1140  rt_mutex_lock(&atomp->lock);
1141  retval = atomp->val;
1142  rt_mutex_unlock(&atomp->lock);
1143 #endif
1144 #else
1145  /* nothing special to do here */
1146  retval = atomp->val;
1147 #endif
1148 
1149  return retval;
1150 }
1151 
1153 #ifdef THR
1154 #if defined(USEGCCATOMICS)
1155  return __sync_fetch_and_add(&atomp->val, inc);
1156 #elif defined(USENETBSDATOMICS)
1157  /* value returned is the new value, so we have to subtract it off again */
1158  return atomic_add_int_nv(&atomp->val, inc) - inc;
1159 #elif defined(USESOLARISATOMICS)
1160  /* value returned is the new value, so we have to subtract it off again */
1161  return atomic_add_int_nv(&atomp->val, inc) - inc;
1162 #elif defined(USEWIN32ATOMICS)
1163  return InterlockedExchangeAdd(&atomp->val, inc);
1164 #else /* use mutexes */
1165  int retval;
1166  rt_mutex_lock(&atomp->lock);
1167  retval = atomp->val;
1168  atomp->val+=inc;
1169  rt_mutex_unlock(&atomp->lock);
1170  return retval;
1171 #endif
1172 #else
1173  int retval = atomp->val;
1174  atomp->val+=inc;
1175  return retval;
1176 #endif
1177 }
1178 
1179 
1181 #ifdef THR
1182 #if defined(USEGCCATOMICS)
1183  return __sync_add_and_fetch(&atomp->val, inc);
1184 #elif defined(USENETBSDATOMICS)
1185  return atomic_add_int_nv(&atomp->val, inc);
1186 #elif defined(USESOLARISATOMICS)
1187  return atomic_add_int_nv(&atomp->val, inc);
1188 #elif defined(USEWIN32ATOMICS)
1189  /* value returned is the old value, so we have to add it on again */
1190  return InterlockedExchangeAdd(&atomp->val, inc) + inc;
1191 #else /* use mutexes */
1192  int retval;
1193  rt_mutex_lock(&atomp->lock);
1194  atomp->val+=inc;
1195  retval = atomp->val;
1196  rt_mutex_unlock(&atomp->lock);
1197  return retval;
1198 #endif
1199 #else
1200  int retval;
1201  atomp->val+=inc;
1202  retval = atomp->val;
1203  return retval;
1204 #endif
1205 }
1206 
1207 
1208 
1209 /*
1210  * Reader/Writer locks -- slower than mutexes but good for some purposes
1211  */
1213  int status=0;
1214 
1215 #ifdef THR
1216 #ifdef _MSC_VER
1217  rt_mutex_init(&rwp->lock);
1218  rt_cond_init(&rwp->rdrs_ok);
1219  rt_cond_init(&rwp->wrtr_ok);
1220  rwp->rwlock = 0;
1221  rwp->waiting_writers = 0;
1222 #endif
1223 
1224 #ifdef USEPOSIXTHREADS
1225  pthread_mutex_init(&rwp->lock, NULL);
1226  pthread_cond_init(&rwp->rdrs_ok, NULL);
1227  pthread_cond_init(&rwp->wrtr_ok, NULL);
1228  rwp->rwlock = 0;
1229  rwp->waiting_writers = 0;
1230 #endif /* USEPOSIXTHREADS */
1231 
1232 #ifdef USEUITHREADS
1233  status = rwlock_init(rwp, USYNC_THREAD, NULL);
1234 #endif /* USEUITHREADS */
1235 #endif /* THR */
1236 
1237  return status;
1238 }
1239 
1240 
1242  int status=0;
1243 
1244 #ifdef THR
1245 #ifdef _MSC_VER
1246  rt_mutex_lock(&rwp->lock);
1247  while (rwp->rwlock < 0 || rwp->waiting_writers)
1248  rt_cond_wait(&rwp->rdrs_ok, &rwp->lock);
1249  rwp->rwlock++; /* increment number of readers holding the lock */
1250  rt_mutex_unlock(&rwp->lock);
1251 #endif
1252 
1253 #ifdef USEPOSIXTHREADS
1254  pthread_mutex_lock(&rwp->lock);
1255  while (rwp->rwlock < 0 || rwp->waiting_writers)
1256  pthread_cond_wait(&rwp->rdrs_ok, &rwp->lock);
1257  rwp->rwlock++; /* increment number of readers holding the lock */
1258  pthread_mutex_unlock(&rwp->lock);
1259 #endif /* USEPOSIXTHREADS */
1260 
1261 #ifdef USEUITHREADS
1262  status = rw_rdlock(rwp);
1263 #endif /* USEUITHREADS */
1264 #endif /* THR */
1265 
1266  return status;
1267 }
1268 
1269 
1271  int status=0;
1272 
1273 #ifdef THR
1274 #ifdef _MSC_VER
1275  rt_mutex_lock(&rwp->lock);
1276  while (rwp->rwlock != 0) {
1277  rwp->waiting_writers++;
1278  rt_cond_wait(&rwp->wrtr_ok, &rwp->lock);
1279  rwp->waiting_writers--;
1280  }
1281  rwp->rwlock=-1;
1282  rt_mutex_unlock(&rwp->lock);
1283 #endif
1284 
1285 #ifdef USEPOSIXTHREADS
1286  pthread_mutex_lock(&rwp->lock);
1287  while (rwp->rwlock != 0) {
1288  rwp->waiting_writers++;
1289  pthread_cond_wait(&rwp->wrtr_ok, &rwp->lock);
1290  rwp->waiting_writers--;
1291  }
1292  rwp->rwlock=-1;
1293  pthread_mutex_unlock(&rwp->lock);
1294 #endif /* USEPOSIXTHREADS */
1295 
1296 #ifdef USEUITHREADS
1297  status = rw_wrlock(rwp);
1298 #endif /* USEUITHREADS */
1299 #endif /* THR */
1300 
1301  return status;
1302 }
1303 
1304 
1306  int status=0;
1307 
1308 #ifdef THR
1309 #ifdef _MSC_VER
1310  int ww, wr;
1311  rt_mutex_lock(&rwp->lock);
1312  if (rwp->rwlock > 0) {
1313  rwp->rwlock--;
1314  } else {
1315  rwp->rwlock = 0;
1316  }
1317  ww = (rwp->waiting_writers && rwp->rwlock == 0);
1318  wr = (rwp->waiting_writers == 0);
1319  rt_mutex_unlock(&rwp->lock);
1320  if (ww)
1321  rt_cond_signal(&rwp->wrtr_ok);
1322  else if (wr)
1323  rt_cond_signal(&rwp->rdrs_ok);
1324 #endif
1325 
1326 #ifdef USEPOSIXTHREADS
1327  int ww, wr;
1328  pthread_mutex_lock(&rwp->lock);
1329  if (rwp->rwlock > 0) {
1330  rwp->rwlock--;
1331  } else {
1332  rwp->rwlock = 0;
1333  }
1334  ww = (rwp->waiting_writers && rwp->rwlock == 0);
1335  wr = (rwp->waiting_writers == 0);
1336  pthread_mutex_unlock(&rwp->lock);
1337  if (ww)
1338  pthread_cond_signal(&rwp->wrtr_ok);
1339  else if (wr)
1340  pthread_cond_signal(&rwp->rdrs_ok);
1341 #endif /* USEPOSIXTHREADS */
1342 
1343 #ifdef USEUITHREADS
1344  status = rw_unlock(rwp);
1345 #endif /* USEUITHREADS */
1346 #endif /* THR */
1347 
1348  return status;
1349 }
1350 
1351 
1352 /*
1353  * Simple counting barrier primitive
1354  */
1356  rt_barrier_t *barrier = (rt_barrier_t *) malloc(sizeof(rt_barrier_t));
1357 
1358 #ifdef THR
1359  if (barrier != NULL) {
1360  barrier->n_clients = n_clients;
1361  barrier->n_waiting = 0;
1362  barrier->phase = 0;
1363  barrier->sum = 0;
1364  rt_mutex_init(&barrier->lock);
1365  rt_cond_init(&barrier->wait_cv);
1366  }
1367 #endif
1368 
1369  return barrier;
1370 }
1371 
1372 
1373 /* When rendering in the CAVE we use a special synchronization */
1374 /* mode so that shared memory mutexes and condition variables */
1375 /* will work correctly when accessed from multiple processes. */
1376 /* Inter-process synchronization involves the kernel to a greater */
1377 /* degree, so these barriers are substantially more costly to use */
1378 /* than the ones designed for use within a single-process. */
1379 int rt_thread_barrier_init_proc_shared(rt_barrier_t *barrier, int n_clients) {
1380 #ifdef THR
1381 #ifdef USEPOSIXTHREADS
1382  if (barrier != NULL) {
1383  barrier->n_clients = n_clients;
1384  barrier->n_waiting = 0;
1385  barrier->phase = 0;
1386  barrier->sum = 0;
1387 
1388  pthread_mutexattr_t mattr;
1389  pthread_condattr_t cattr;
1390 
1391  printf("Setting barriers to have system scope...\n");
1392 
1393  pthread_mutexattr_init(&mattr);
1394  if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
1395  printf("WARNING: could not set mutex to process shared scope\n");
1396  }
1397 
1398  pthread_condattr_init(&cattr);
1399  if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) != 0) {
1400  printf("WARNING: could not set mutex to process shared scope\n");
1401  }
1402 
1403  pthread_mutex_init(&barrier->lock, &mattr);
1404  pthread_cond_init(&barrier->wait_cv, &cattr);
1405 
1406  pthread_condattr_destroy(&cattr);
1407  pthread_mutexattr_destroy(&mattr);
1408  }
1409 #endif
1410 #endif
1411 
1412  return 0;
1413 }
1414 
1415 
1417 #ifdef THR
1418  rt_mutex_destroy(&barrier->lock);
1419  rt_cond_destroy(&barrier->wait_cv);
1420 #endif
1421  free(barrier);
1422 }
1423 
1424 
1425 int rt_thread_barrier(rt_barrier_t *barrier, int increment) {
1426 #ifdef THR
1427  int my_phase;
1428  int my_result;
1429 
1430  rt_mutex_lock(&barrier->lock);
1431  my_phase = barrier->phase;
1432  barrier->sum += increment;
1433  barrier->n_waiting++;
1434 
1435  if (barrier->n_waiting == barrier->n_clients) {
1436  barrier->result = barrier->sum;
1437  barrier->sum = 0;
1438  barrier->n_waiting = 0;
1439  barrier->phase = 1 - my_phase;
1440  rt_cond_broadcast(&barrier->wait_cv);
1441  }
1442 
1443  while (barrier->phase == my_phase) {
1444  rt_cond_wait(&barrier->wait_cv, &barrier->lock);
1445  }
1446 
1447  my_result = barrier->result;
1448 
1449  rt_mutex_unlock(&barrier->lock);
1450 
1451  return my_result;
1452 #else
1453  return 0;
1454 #endif
1455 }
1456 
1457 
1458 /*
1459  * Barriers used for sleepable thread pools
1460  */
1461 /* symmetric run barrier for use within a single process */
1462 int rt_thread_run_barrier_init(rt_run_barrier_t *barrier, int n_clients) {
1463 #ifdef THR
1464  if (barrier != NULL) {
1465  barrier->n_clients = n_clients;
1466  barrier->n_waiting = 0;
1467  barrier->phase = 0;
1468  barrier->fctn = NULL;
1469 
1470  rt_mutex_init(&barrier->lock);
1471  rt_cond_init(&barrier->wait_cv);
1472  }
1473 #endif
1474 
1475  return 0;
1476 }
1477 
1479 #ifdef THR
1480  rt_mutex_destroy(&barrier->lock);
1481  rt_cond_destroy(&barrier->wait_cv);
1482 #endif
1483 }
1484 
1485 
1491  void * fctn(void*),
1492  void * parms,
1493  void **rsltparms))(void *) {
1494 #if defined(THR)
1495  int my_phase;
1496  void * (*my_result)(void*);
1497 
1498  rt_mutex_lock(&barrier->lock);
1499  my_phase = barrier->phase;
1500  if (fctn != NULL)
1501  barrier->fctn = fctn;
1502  if (parms != NULL)
1503  barrier->parms = parms;
1504  barrier->n_waiting++;
1505 
1506  if (barrier->n_waiting == barrier->n_clients) {
1507  barrier->rslt = barrier->fctn;
1508  barrier->rsltparms = barrier->parms;
1509  barrier->fctn = NULL;
1510  barrier->parms = NULL;
1511  barrier->n_waiting = 0;
1512  barrier->phase = 1 - my_phase;
1513  rt_cond_broadcast(&barrier->wait_cv);
1514  }
1515 
1516  while (barrier->phase == my_phase) {
1517  rt_cond_wait(&barrier->wait_cv, &barrier->lock);
1518  }
1519 
1520  my_result = barrier->rslt;
1521  if (rsltparms != NULL)
1522  *rsltparms = barrier->rsltparms;
1523 
1524  rt_mutex_unlock(&barrier->lock);
1525 #else
1526  void * (*my_result)(void*) = fctn;
1527  if (rsltparms != NULL)
1528  *rsltparms = parms;
1529 #endif
1530 
1531  return my_result;
1532 }
1533 
1534 
1537  int rc=0;
1538 #if defined(THR)
1539  rt_mutex_lock(&barrier->lock);
1540  if (barrier->n_waiting == (barrier->n_clients-1)) {
1541  rc=1;
1542  }
1543  rt_mutex_unlock(&barrier->lock);
1544 #endif
1545  return rc;
1546 }
1547 
1548 
1549 /*
1550  * task tile stack
1551  */
1553  if (s == NULL)
1554  return -1;
1555 
1556 #if defined(THR)
1557  rt_mutex_init(&s->mtx);
1558 #endif
1559 
1560  s->growthrate = 512;
1561  s->top = -1;
1562 
1563  if (size > 0) {
1564  s->size = size;
1565  s->s = (rt_tasktile_t *) malloc(s->size * sizeof(rt_tasktile_t));
1566  } else {
1567  s->size = 0;
1568  s->s = NULL;
1569  }
1570 
1571  return 0;
1572 }
1573 
1574 
1576 #if defined(THR)
1577  rt_mutex_destroy(&s->mtx);
1578 #endif
1579  free(s->s);
1580  s->s = NULL; /* prevent access after free */
1581 }
1582 
1583 
1585 #if defined(THR)
1586  rt_mutex_lock(&s->mtx);
1587 #endif
1588  if (s->size > (s->top + 1)) {
1589  int newsize = s->top + 1;
1590  rt_tasktile_t *tmp = (rt_tasktile_t *) realloc(s->s, newsize * sizeof(rt_tasktile_t));
1591  if (tmp == NULL) {
1592 #if defined(THR)
1593  rt_mutex_unlock(&s->mtx);
1594 #endif
1595  return -1; /* out of space! */
1596  }
1597  s->s = tmp;
1598  s->size = newsize;
1599  }
1600 #if defined(THR)
1601  rt_mutex_unlock(&s->mtx);
1602 #endif
1603 
1604  return 0;
1605 }
1606 
1607 
1609 #if defined(THR)
1610  rt_mutex_lock(&s->mtx);
1611 #endif
1612  s->top++;
1613  if (s->top >= s->size) {
1614  int newsize = s->size + s->growthrate;
1615  rt_tasktile_t *tmp = (rt_tasktile_t *) realloc(s->s, newsize * sizeof(rt_tasktile_t));
1616  if (tmp == NULL) {
1617  s->top--;
1618 #if defined(THR)
1619  rt_mutex_unlock(&s->mtx);
1620 #endif
1621  return -1; /* out of space! */
1622  }
1623  s->s = tmp;
1624  s->size = newsize;
1625  }
1626 
1627  s->s[s->top] = *t; /* push onto the stack */
1628 
1629 #if defined(THR)
1630  rt_mutex_unlock(&s->mtx);
1631 #endif
1632 
1633  return 0;
1634 }
1635 
1636 
1638 #if defined(THR)
1639  rt_mutex_lock(&s->mtx);
1640 #endif
1641 
1642  if (s->top < 0) {
1643 #if defined(THR)
1644  rt_mutex_unlock(&s->mtx);
1645 #endif
1646  return RT_TILESTACK_EMPTY; /* empty stack */
1647  }
1648 
1649  *t = s->s[s->top];
1650  s->top--;
1651 
1652 #if defined(THR)
1653  rt_mutex_unlock(&s->mtx);
1654 #endif
1655 
1656  return 0;
1657 }
1658 
1659 
1661 #if defined(THR)
1662  rt_mutex_lock(&s->mtx);
1663 #endif
1664 
1665  s->top = -1;
1666 
1667 #if defined(THR)
1668  rt_mutex_unlock(&s->mtx);
1669 #endif
1670 
1671  return 0;
1672 }
1673 
1674 
1676 #if defined(THR)
1677  rt_mutex_lock(&s->mtx);
1678 #endif
1679 
1680  if (s->top < 0) {
1681 #if defined(THR)
1682  rt_mutex_unlock(&s->mtx);
1683 #endif
1684  return 1;
1685  }
1686 
1687 #if defined(THR)
1688  rt_mutex_unlock(&s->mtx);
1689 #endif
1690 
1691  return 0;
1692 }
1693 
1694 
1695 /*
1696  * shared iterators
1697  */
1698 
1701  memset(it, 0, sizeof(rt_shared_iterator_t));
1702 #if defined(THR)
1703  rt_mutex_init(&it->mtx);
1704 #endif
1705  return 0;
1706 }
1707 
1708 
1711 #if defined(THR)
1712  rt_mutex_destroy(&it->mtx);
1713 #endif
1714  return 0;
1715 }
1716 
1717 
1720  rt_tasktile_t *tile) {
1721 #if defined(THR)
1722  rt_mutex_lock(&it->mtx);
1723 #endif
1724  it->start = tile->start;
1725  it->current = tile->start;
1726  it->end = tile->end;
1727  it->fatalerror = 0;
1728 #if defined(THR)
1729  rt_mutex_unlock(&it->mtx);
1730 #endif
1731  return 0;
1732 }
1733 
1734 
1737  rt_tasktile_t *tile) {
1738  int rc=RT_SCHED_CONTINUE;
1739 
1740 #if defined(THR)
1741  rt_mutex_spin_lock(&it->mtx);
1742 #endif
1743  if (!it->fatalerror) {
1744  tile->start=it->current; /* set start to the current work unit */
1745  it->current+=reqsize; /* increment by the requested tile size */
1746  tile->end=it->current; /* set the (exclusive) endpoint */
1747 
1748  /* if start is beyond the last work unit, we're done */
1749  if (tile->start >= it->end) {
1750  tile->start=0;
1751  tile->end=0;
1752  rc = RT_SCHED_DONE;
1753  }
1754 
1755  /* if the endpoint (exclusive) for the requested tile size */
1756  /* is beyond the last work unit, roll it back as needed */
1757  if (tile->end > it->end) {
1758  tile->end = it->end;
1759  }
1760  } else {
1761  rc = RT_SCHED_DONE;
1762  }
1763 #if defined(THR)
1764  rt_mutex_unlock(&it->mtx);
1765 #endif
1766 
1767  return rc;
1768 }
1769 
1770 
1773 #if defined(THR)
1774  rt_mutex_spin_lock(&it->mtx);
1775 #endif
1776  it->fatalerror=1;
1777 #if defined(THR)
1778  rt_mutex_unlock(&it->mtx);
1779 #endif
1780  return 0;
1781 }
1782 
1783 
1786  int rc=0;
1787 #if defined(THR)
1788  rt_mutex_lock(&it->mtx);
1789 #endif
1790  if (it->fatalerror)
1791  rc = -1;
1792 #if defined(THR)
1793  rt_mutex_unlock(&it->mtx);
1794 #endif
1795  return rc;
1796 }
1797 
1798 
1799 #if defined(THR)
1800 /*
1801  * Thread pool.
1802  */
1803 static void * rt_threadpool_workerproc(void *voidparms) {
1804  void *(*fctn)(void*);
1805  rt_threadpool_workerdata_t *workerdata = (rt_threadpool_workerdata_t *) voidparms;
1806  rt_threadpool_t *thrpool = (rt_threadpool_t *) workerdata->thrpool;
1807 
1808  while ((fctn = rt_thread_run_barrier(&thrpool->runbar, NULL, NULL, &workerdata->parms)) != NULL) {
1809  (*fctn)(workerdata);
1810  }
1811 
1812  return NULL;
1813 }
1814 
1815 
1816 static void * rt_threadpool_workersync(void *voidparms) {
1817  return NULL;
1818 }
1819 #endif
1820 
1821 
1822 rt_threadpool_t * rt_threadpool_create(int workercount, int *devlist) {
1823  int i;
1824  rt_threadpool_t *thrpool = NULL;
1825  thrpool = (rt_threadpool_t *) malloc(sizeof(rt_threadpool_t));
1826  if (thrpool == NULL)
1827  return NULL;
1828 
1829  memset(thrpool, 0, sizeof(rt_threadpool_t));
1830 
1831 #if !defined(THR)
1832  workercount=1;
1833 #endif
1834 
1835  /* if caller provides a device list, use it, otherwise we assume */
1836  /* all workers are CPU cores */
1837  thrpool->devlist = (int *) malloc(sizeof(int) * workercount);
1838  if (devlist == NULL) {
1839  for (i=0; i<workercount; i++)
1840  thrpool->devlist[i] = -1; /* mark as a CPU core */
1841  } else {
1842  memcpy(thrpool->devlist, devlist, sizeof(int) * workercount);
1843  }
1844 
1845  /* initialize shared iterator */
1846  rt_shared_iterator_init(&thrpool->iter);
1847 
1848  /* initialize tile stack for error handling */
1849  rt_tilestack_init(&thrpool->errorstack, 64);
1850 
1851  /* create a run barrier with N+1 threads: N workers, 1 master */
1852  thrpool->workercount = workercount;
1853  rt_thread_run_barrier_init(&thrpool->runbar, workercount+1);
1854 
1855  /* allocate and initialize thread pool */
1856  thrpool->threads = (rt_thread_t *) malloc(sizeof(rt_thread_t) * workercount);
1857  thrpool->workerdata = (rt_threadpool_workerdata_t *) malloc(sizeof(rt_threadpool_workerdata_t) * workercount);
1858  memset(thrpool->workerdata, 0, sizeof(rt_threadpool_workerdata_t) * workercount);
1859 
1860  /* setup per-worker data */
1861  for (i=0; i<workercount; i++) {
1862  thrpool->workerdata[i].iter=&thrpool->iter;
1863  thrpool->workerdata[i].errorstack=&thrpool->errorstack;
1864  thrpool->workerdata[i].threadid=i;
1865  thrpool->workerdata[i].threadcount=workercount;
1866  thrpool->workerdata[i].devid=thrpool->devlist[i];
1867  thrpool->workerdata[i].devspeed=1.0f; /* must be reset by dev setup code */
1868  thrpool->workerdata[i].thrpool=thrpool;
1869  }
1870 
1871 #if defined(THR)
1872  /* launch thread pool */
1873  for (i=0; i<workercount; i++) {
1874  rt_thread_create(&thrpool->threads[i], rt_threadpool_workerproc, &thrpool->workerdata[i]);
1875  }
1876 #endif
1877 
1878  return thrpool;
1879 }
1880 
1881 
1883  void *fctn(void *), void *parms, int blocking) {
1884  if (thrpool == NULL)
1885  return -1;
1886 
1887 #if defined(THR)
1888  /* wake sleeping threads to run fctn(parms) */
1889  rt_thread_run_barrier(&thrpool->runbar, fctn, parms, NULL);
1890  if (blocking)
1891  rt_thread_run_barrier(&thrpool->runbar, rt_threadpool_workersync, NULL, NULL);
1892 #else
1893  thrpool->workerdata[0].parms = parms;
1894  (*fctn)(&thrpool->workerdata[0]);
1895 #endif
1896  return 0;
1897 }
1898 
1899 
1901 #if defined(THR)
1902  rt_thread_run_barrier(&thrpool->runbar, rt_threadpool_workersync, NULL, NULL);
1903 #endif
1904  return 0;
1905 }
1906 
1907 
1909 #if defined(THR)
1910  return rt_thread_run_barrier_poll(&thrpool->runbar);
1911 #else
1912  return 1;
1913 #endif
1914 }
1915 
1916 
1918 #if defined(THR)
1919  int i;
1920 #endif
1921 
1922  /* wake threads and tell them to shutdown */
1923  rt_thread_run_barrier(&thrpool->runbar, NULL, NULL, NULL);
1924 
1925 #if defined(THR)
1926  /* join the pool of worker threads */
1927  for (i=0; i<thrpool->workercount; i++) {
1928  rt_thread_join(thrpool->threads[i], NULL);
1929  }
1930 #endif
1931 
1932  /* destroy the thread barrier */
1934 
1935  /* destroy the shared iterator */
1936  rt_shared_iterator_destroy(&thrpool->iter);
1937 
1938  /* destroy tile stack for error handling */
1939  rt_tilestack_destroy(&thrpool->errorstack);
1940 
1941  free(thrpool->devlist);
1942  free(thrpool->threads);
1943  free(thrpool->workerdata);
1944  free(thrpool);
1945 
1946  return 0;
1947 }
1948 
1949 
1952  return thrpool->workercount;
1953 }
1954 
1955 
1957 int rt_threadpool_worker_getid(void *voiddata, int *threadid, int *threadcount) {
1959  if (threadid != NULL)
1960  *threadid = worker->threadid;
1961 
1962  if (threadcount != NULL)
1963  *threadcount = worker->threadcount;
1964 
1965  return 0;
1966 }
1967 
1968 
1970 int rt_threadpool_worker_getdevid(void *voiddata, int *devid) {
1972  if (devid != NULL)
1973  *devid = worker->devid;
1974 
1975  return 0;
1976 }
1977 
1978 
1985 int rt_threadpool_worker_setdevspeed(void *voiddata, float speed) {
1987  worker->devspeed = speed;
1988  return 0;
1989 }
1990 
1991 
1996 int rt_threadpool_worker_getdevspeed(void *voiddata, float *speed) {
1998  if (speed != NULL)
1999  *speed = worker->devspeed;
2000  return 0;
2001 }
2002 
2003 
2008 int rt_threadpool_worker_devscaletile(void *voiddata, int *tilesize) {
2010  if (tilesize != NULL) {
2011  int scaledtilesize;
2012  scaledtilesize = (int) (worker->devspeed * ((float) (*tilesize)));
2013  if (scaledtilesize < 1)
2014  scaledtilesize = 1;
2015 
2016  *tilesize = scaledtilesize;
2017  }
2018 
2019  return 0;
2020 }
2021 
2022 
2024 int rt_threadpool_worker_getdata(void *voiddata, void **clientdata) {
2026  if (clientdata != NULL)
2027  *clientdata = worker->parms;
2028 
2029  return 0;
2030 }
2031 
2032 
2035  if (thrpool == NULL)
2036  return -1;
2037  return rt_shared_iterator_set(&thrpool->iter, tile);
2038 }
2039 
2040 
2042 int rt_threadpool_next_tile(void *voidparms, int reqsize,
2043  rt_tasktile_t *tile) {
2044  int rc;
2046  rc = rt_shared_iterator_next_tile(worker->iter, reqsize, tile);
2047  if (rc == RT_SCHED_DONE) {
2048  /* if the error stack is empty, then we're done, otherwise pop */
2049  /* a tile off of the error stack and retry it */
2050  if (rt_tilestack_pop(worker->errorstack, tile) != RT_TILESTACK_EMPTY)
2051  return RT_SCHED_CONTINUE;
2052  }
2053 
2054  return rc;
2055 }
2056 
2057 
2062 int rt_threadpool_tile_failed(void *voidparms, rt_tasktile_t *tile) {
2064  return rt_tilestack_push(worker->errorstack, tile);
2065 }
2066 
2067 
2068 /* worker thread calls this to indicate that an unrecoverable error occured */
2069 int rt_threadpool_setfatalerror(void *voidparms) {
2072  return 0;
2073 }
2074 
2075 
2076 /* worker thread calls this to indicate that an unrecoverable error occured */
2077 int rt_threadpool_getfatalerror(void *voidparms) {
2079  /* query error status for return to caller */
2080  return rt_shared_iterator_getfatalerror(worker->iter);
2081 }
2082 
2083 
2084 /* launch up to numprocs threads using shared iterator as a load balancer */
2085 int rt_threadlaunch(int numprocs, void *clientdata, void * fctn(void *),
2086  rt_tasktile_t *tile) {
2087  rt_shared_iterator_t iter;
2088  rt_threadlaunch_t *parms=NULL;
2089  rt_thread_t * threads=NULL;
2090  int i, rc;
2091 
2092  /* XXX have to ponder what the right thing to do is here */
2093 #if !defined(THR)
2094  numprocs=1;
2095 #endif
2096 
2097  /* initialize shared iterator and set the iteration and range */
2098  rt_shared_iterator_init(&iter);
2099  if (rt_shared_iterator_set(&iter, tile))
2100  return -1;
2101 
2102  /* allocate array of threads */
2103  threads = (rt_thread_t *) calloc(numprocs * sizeof(rt_thread_t), 1);
2104  if (threads == NULL)
2105  return -1;
2106 
2107  /* allocate and initialize array of thread parameters */
2108  parms = (rt_threadlaunch_t *) malloc(numprocs * sizeof(rt_threadlaunch_t));
2109  if (parms == NULL) {
2110  free(threads);
2111  return -1;
2112  }
2113  for (i=0; i<numprocs; i++) {
2114  parms[i].iter = &iter;
2115  parms[i].threadid = i;
2116  parms[i].threadcount = numprocs;
2117  parms[i].clientdata = clientdata;
2118  }
2119 
2120 #if defined(THR)
2121  if (numprocs == 1) {
2122  /* XXX we special-case the single worker thread */
2123  /* scenario because this greatly reduces the */
2124  /* GPU kernel launch overhead since a new */
2125  /* contexts doesn't have to be created, and */
2126  /* in the simplest case with a single-GPU we */
2127  /* will just be using the same device anyway */
2128  /* Ideally we shouldn't need to do this.... */
2129  /* single thread does all of the work */
2130  fctn((void *) &parms[0]);
2131  } else {
2132  /* spawn child threads to do the work */
2133  for (i=0; i<numprocs; i++) {
2134  rt_thread_create(&threads[i], fctn, &parms[i]);
2135  }
2136 
2137  /* join the threads after work is done */
2138  for (i=0; i<numprocs; i++) {
2139  rt_thread_join(threads[i], NULL);
2140  }
2141  }
2142 #else
2143  /* single thread does all of the work */
2144  fctn((void *) &parms[0]);
2145 #endif
2146 
2147  /* free threads/parms */
2148  free(parms);
2149  free(threads);
2150 
2151  /* query error status for return to caller */
2153 
2154  /* destroy the shared iterator */
2156 
2157  return rc;
2158 }
2159 
2160 
2162 int rt_threadlaunch_getid(void *voidparms, int *threadid, int *threadcount) {
2163  rt_threadlaunch_t *worker = (rt_threadlaunch_t *) voidparms;
2164  if (threadid != NULL)
2165  *threadid = worker->threadid;
2166 
2167  if (threadcount != NULL)
2168  *threadcount = worker->threadcount;
2169 
2170  return 0;
2171 }
2172 
2173 
2175 int rt_threadlaunch_getdata(void *voidparms, void **clientdata) {
2176  rt_threadlaunch_t *worker = (rt_threadlaunch_t *) voidparms;
2177  if (clientdata != NULL)
2178  *clientdata = worker->clientdata;
2179 
2180  return 0;
2181 }
2182 
2183 
2185 int rt_threadlaunch_next_tile(void *voidparms, int reqsize,
2186  rt_tasktile_t *tile) {
2187  rt_threadlaunch_t *worker = (rt_threadlaunch_t *) voidparms;
2188  return rt_shared_iterator_next_tile(worker->iter, reqsize, tile);
2189 }
2190 
2191 
2193 int rt_threadlaunch_setfatalerror(void *voidparms) {
2194  rt_threadlaunch_t *worker = (rt_threadlaunch_t *) voidparms;
2195  return rt_shared_iterator_setfatalerror(worker->iter);
2196 }
2197 
2198 
2199 #ifdef __cplusplus
2200 }
2201 #endif
2202 
2203 
#define CPU_ARM64_AES
AES insns avail.
Definition: threads.h:75
rt_shared_iterator_t * iter
dynamic work scheduler
Definition: threads.h:485
rt_mutex_t mtx
Mutex lock for the structure.
Definition: threads.h:402
int * devlist
per-worker CPU/GPU device IDs
Definition: threads.h:500
iterator used for dynamic load balancing
Definition: threads.h:440
int n_clients
Number of threads to wait for at barrier.
Definition: threads.h:198
int rt_cpu_smt_depth(void)
CPU logical processors (SMT depth / aka hyperthreading)
Definition: threads.c:466
int rt_tilestack_push(rt_tilestack_t *s, const rt_tasktile_t *t)
push a task tile onto the stack
Definition: threads.c:1608
int rt_mutex_lock(rt_mutex_t *mp)
lock a mutex
Definition: threads.c:742
int rt_atomic_int_get(rt_atomic_int_t *atomp)
get an atomic int variable
Definition: threads.c:1123
int rt_shared_iterator_set(rt_shared_iterator_t *it, rt_tasktile_t *tile)
set shared iterator parameters
Definition: threads.c:1719
int rt_thread_numphysprocessors(void)
If compiling on Linux, enable the GNU CPU affinity functions in both libc and the libpthreads...
Definition: threads.c:114
void * clientdata
worker parameters
Definition: threads.h:588
int rt_cond_destroy(rt_cond_t *cvp)
destroy a condition variable
Definition: threads.c:888
int rt_thread_barrier(rt_barrier_t *barrier, int increment)
synchronize on counting barrier primitive
Definition: threads.c:1425
int result
Answer to be returned by barrier_wait.
Definition: threads.h:202
void *(* fctn)(void *)
Fctn ptr to call, or NULL if done.
Definition: threads.h:215
rt_mutex_t lock
Mutex lock for the structure.
Definition: threads.h:211
#define CPU_ARM64_CRC32
CRC32 insns avail.
Definition: threads.h:72
Routines to generate a pool of threads which then grind through a dynamically load balanced work queu...
Definition: threads.h:583
#define CPU_ARM64_FP
FP insns avail.
Definition: threads.h:73
#define CPU_ARM64_SVE
Scalable Vector Extns avail.
Definition: threads.h:82
int rt_cond_t
Definition: threads.h:167
int rt_threadpool_worker_getdevspeed(void *voiddata, float *speed)
worker thread calls this to get relative speed of this device as determined by the SM/core count and ...
Definition: threads.c:1996
rt_threadpool_t * rt_threadpool_create(int workercount, int *devlist)
create a thread pool with a specified number of worker threads
Definition: threads.c:1822
void rt_thread_barrier_destroy(rt_barrier_t *barrier)
destroy counting barrier primitive
Definition: threads.c:1416
#define CPU_ARM64_SHA2
SHA-2 insns avail.
Definition: threads.h:85
Task tile struct for stack, iterator, and scheduler routines; &#39;start&#39; is inclusive, &#39;end&#39; is exclusive.
Definition: threads.h:387
#define CPU_ARM64_ASIMD
Advanced SIMD avail.
Definition: threads.h:77
int rt_rwlock_init(rt_rwlock_t *rwp)
initialize a reader/writer lock
Definition: threads.c:1212
int rt_thread_set_self_cpuaffinity(int cpu)
set the CPU affinity of the current thread (if allowed by host system)
Definition: threads.c:572
float devspeed
speed scaling for this device
Definition: threads.h:490
void * rsltparms
parms to return to barrier wait callers
Definition: threads.h:218
rt_barrier_t * rt_thread_barrier_init(int n_clients)
initialize counting barrier primitive
Definition: threads.c:1355
#define CPU_HT
x86 Hyperthreading detected
Definition: threads.h:55
void rt_thread_run_barrier_destroy(rt_run_barrier_t *barrier)
destroy thread pool barrier
Definition: threads.c:1478
int rt_threadlaunch(int numprocs, void *clientdata, void *fctn(void *), rt_tasktile_t *tile)
launch up to numprocs threads using shared iterator as a load balancer
Definition: threads.c:2085
#define CPU_SSE2
SSE2 SIMD avail.
Definition: threads.h:57
int rt_tilestack_pop(rt_tilestack_t *s, rt_tasktile_t *t)
pop a task tile off of the stack
Definition: threads.c:1637
int size
current allocated stack size
Definition: threads.h:404
rt_threadpool_workerdata_t * workerdata
per-worker data
Definition: threads.h:504
rt_mutex_t mtx
mutex lock
Definition: threads.h:441
#define CPU_AVX
AVX SIMD avail.
Definition: threads.h:61
rt_cond_t wait_cv
Clients wait on condition variable to proceed.
Definition: threads.h:203
int rt_rwlock_t
Definition: threads.h:168
int * rt_cpu_affinitylist(int *cpuaffinitycount)
query CPU affinity of the calling process (if allowed by host system)
Definition: threads.c:483
rt_tasktile_t * s
stack of task tiles
Definition: threads.h:406
int top
index of top stack element
Definition: threads.h:405
int rt_thread_run_barrier_init(rt_run_barrier_t *barrier, int n_clients)
initialize thread pool barrier
Definition: threads.c:1462
#define RT_SCHED_DONE
Shared iterators intended for trivial CPU/GPU load balancing with no exception handling capability (a...
Definition: threads.h:436
int rt_mutex_destroy(rt_mutex_t *mp)
destroy a mutex
Definition: threads.c:828
#define CPU_ARM64_SHA512
SHA-512 insns avail.
Definition: threads.h:83
int rt_rwlock_readlock(rt_rwlock_t *rwp)
set reader lock
Definition: threads.c:1241
int rt_threadlaunch_getdata(void *voidparms, void **clientdata)
worker thread can call this to get its client data pointer
Definition: threads.c:2175
#define CPU_ARM64_SHA1
SHA-1 insns avail.
Definition: threads.h:84
int end
ending value (exlusive)
Definition: threads.h:443
int rt_atomic_int_init(rt_atomic_int_t *atomp, int val)
initialize an atomic int variable
Definition: threads.c:1043
#define CPU_UNKNOWN
Unknown CPU type.
Definition: threads.h:52
int n_clients
Number of threads to wait for at barrier.
Definition: threads.h:212
int rt_mutex_init(rt_mutex_t *mp)
initialize a mutex
Definition: threads.c:721
Tachyon cross-platform thread creation and management, atomic operations, and CPU feature query APIs...
int rt_threadpool_launch(rt_threadpool_t *thrpool, void *fctn(void *), void *parms, int blocking)
launch threads onto a new function, with associated parms
Definition: threads.c:1882
int rt_thread_setconcurrency(int nthr)
set the concurrency level and scheduling scope for threads
Definition: threads.c:618
rt_run_barrier_t runbar
master/worker run barrier
Definition: threads.h:505
#define RT_SCHED_CONTINUE
some work remains in the queue
Definition: threads.h:437
int rt_rwlock_unlock(rt_rwlock_t *rwp)
unlock reader/writer lock
Definition: threads.c:1305
#define CPU_ARM64_ASIMDRDM
Advanced SIMD RDM avail.
Definition: threads.h:80
int rt_atomic_int_fetch_and_add(rt_atomic_int_t *atomp, int inc)
fetch an atomic int and add inc to it, returning original value
Definition: threads.c:1152
int rt_mutex_trylock(rt_mutex_t *mp)
try to lock a mutex
Definition: threads.c:763
unsigned int flags
Definition: threads.h:89
int rt_shared_iterator_getfatalerror(rt_shared_iterator_t *it)
master thread calls this to query for fatal errors
Definition: threads.c:1785
rt_tilestack_t * errorstack
stack of tiles that failed
Definition: threads.h:486
int rt_threadlaunch_getid(void *voidparms, int *threadid, int *threadcount)
worker thread can call this to get its ID and number of peers
Definition: threads.c:2162
thread-specific handle data for workers
Definition: threads.h:483
#define RT_TILESTACK_EMPTY
Definition: threads.h:396
int rt_thread_numprocessors(void)
number of processors available, subject to user override
Definition: threads.c:202
int rt_tilestack_popall(rt_tilestack_t *s)
pop all of the task tiles off of the stack
Definition: threads.c:1660
int rt_threadpool_tile_failed(void *voidparms, rt_tasktile_t *tile)
worker thread calls this when a failure occurs on a tile it has already taken from the scheduler ...
Definition: threads.c:2062
#define CPU_ARM64_ASIMDHP
Advanced SIMD HP avail.
Definition: threads.h:79
#define CPU_ARM64_SHA3
SHA-3 insns avail.
Definition: threads.h:86
int end
ending task ID (exclusive)
Definition: threads.h:389
int rt_thread_t
Definition: threads.h:165
int val
Integer value to be atomically manipulated.
Definition: threads.h:188
int rt_rwlock_writelock(rt_rwlock_t *rwp)
set writer lock
Definition: threads.c:1270
int rt_threadpool_worker_getdata(void *voiddata, void **clientdata)
worker thread can call this to get its client data pointer
Definition: threads.c:2024
int sum
Sum of arguments passed to barrier_wait.
Definition: threads.h:201
int fatalerror
cancel processing immediately for all threads
Definition: threads.h:445
#define CPU_AVX512CD
AVX-512CD SIMD avail.
Definition: threads.h:64
int rt_cpu_capability_flags(rt_cpu_caps_t *cpucaps)
CPU optional instruction set capability flags.
Definition: threads.c:281
int threadcount
total number of worker threads
Definition: threads.h:488
#define CPU_AVX2
AVX2 SIMD avail.
Definition: threads.h:62
rt_mutex_t lock
Mutex lock for the structure.
Definition: threads.h:197
int start
starting task ID (inclusive)
Definition: threads.h:388
#define CPU_AVX512F
AVX-512F SIMD avail.
Definition: threads.h:63
void *(* rslt)(void *)
Fctn ptr to return to barrier wait callers.
Definition: threads.h:217
int rt_mutex_t
Definition: threads.h:166
int rt_tilestack_empty(rt_tilestack_t *s)
query if the task tile stack is empty or not
Definition: threads.c:1675
int rt_threadpool_worker_getid(void *voiddata, int *threadid, int *threadcount)
worker thread can call this to get its ID and number of peers
Definition: threads.c:1957
#define CPU_AVX512ER
AVX-512ER SIMD avail.
Definition: threads.h:65
void * parms
fctn parms for this worker
Definition: threads.h:491
int rt_tilestack_compact(rt_tilestack_t *s)
shrink memory buffers associated with task tile stack if possible
Definition: threads.c:1584
int rt_threadpool_get_workercount(rt_threadpool_t *thrpool)
return the number of worker threads currently in the pool
Definition: threads.c:1951
stack of work tiles, for error handling
Definition: threads.h:401
int rt_thread_run_barrier_poll(rt_run_barrier_t *barrier)
non-blocking poll to see if peers are already at the barrier
Definition: threads.c:1536
#define CPU_ARM64_ASIMDDP
Advanced SIMD DP avail.
Definition: threads.h:78
int n_waiting
Number of currently waiting threads.
Definition: threads.h:213
int rt_cond_signal(rt_cond_t *cvp)
signal a condition variable, waking at least one thread
Definition: threads.c:971
atomic int structure with padding to prevent false sharing
Definition: threads.h:178
int rt_threadlaunch_next_tile(void *voidparms, int reqsize, rt_tasktile_t *tile)
iterate the shared iterator over the requested half-open interval
Definition: threads.c:2185
int rt_thread_barrier_init_proc_shared(rt_barrier_t *barrier, int n_clients)
When rendering in the CAVE we use a special synchronization mode so that shared memory mutexes and co...
Definition: threads.c:1379
rt_shared_iterator_t iter
dynamic work scheduler
Definition: threads.h:501
int rt_mutex_unlock(rt_mutex_t *mp)
unlock a mutex
Definition: threads.c:807
void * thrpool
void ptr to thread pool struct
Definition: threads.h:492
int growthrate
stack growth chunk size
Definition: threads.h:403
rt_tilestack_t errorstack
stack of tiles that failed
Definition: threads.h:502
int rt_thread_join(rt_thread_t thr, void **stat)
join (wait for completion of, and merge with) a thread
Definition: threads.c:688
int rt_threadpool_getfatalerror(void *voidparms)
master thread calls this to query for fatal errors
Definition: threads.c:2077
int rt_threadpool_worker_devscaletile(void *voiddata, int *tilesize)
worker thread calls this to scale max tile size by worker speed as determined by the SM/core count an...
Definition: threads.c:2008
int rt_threadlaunch_setfatalerror(void *voidparms)
worker thread calls this to indicate that an unrecoverable error occured
Definition: threads.c:2193
int n_waiting
Number of currently waiting threads.
Definition: threads.h:199
int rt_atomic_int_destroy(rt_atomic_int_t *atomp)
destroy an atomic int variable
Definition: threads.c:1066
#define CPU_FMA
FMA insns avail.
Definition: threads.h:60
barrier sync object with padding to prevent false sharing
Definition: threads.h:195
int rt_shared_iterator_next_tile(rt_shared_iterator_t *it, int reqsize, rt_tasktile_t *tile)
iterate the shared iterator, over a requested half-open interval
Definition: threads.c:1736
rt_mutex_t lock
Mutex lock for the structure.
Definition: threads.h:180
int rt_cond_broadcast(rt_cond_t *cvp)
signal a condition variable, waking all threads
Definition: threads.c:1003
int start
starting value (inclusive)
Definition: threads.h:442
#define CPU_AVX512PF
AVX-512PF SIMD avail.
Definition: threads.h:66
int workercount
number of worker threads
Definition: threads.h:499
int rt_threadpool_setfatalerror(void *voidparms)
worker thread calls this to indicate that an unrecoverable error occured
Definition: threads.c:2069
int rt_threadpool_destroy(rt_threadpool_t *thrpool)
join all worker threads and free resources
Definition: threads.c:1917
int rt_atomic_int_set(rt_atomic_int_t *atomp, int val)
set an atomic int variable
Definition: threads.c:1087
int rt_shared_iterator_init(rt_shared_iterator_t *it)
initialize a shared iterator
Definition: threads.c:1700
#define CPU_SMTDEPTH_UNKNOWN
Unknown SMT depth.
Definition: threads.h:51
rt_shared_iterator_t * iter
dynamic scheduler iterator
Definition: threads.h:585
#define CPU_SSE4_1
SSE4.1 SIMD avail.
Definition: threads.h:58
void rt_tilestack_destroy(rt_tilestack_t *s)
destroy task tile stack
Definition: threads.c:1575
int rt_cond_wait(rt_cond_t *cvp, rt_mutex_t *mp)
wait on a condition variable
Definition: threads.c:912
int rt_mutex_spin_lock(rt_mutex_t *mp)
lock a mutex by spinning only
Definition: threads.c:784
rt_cond_t wait_cv
Clients wait on condition variable to proceed.
Definition: threads.h:219
int rt_threadpool_next_tile(void *voidparms, int reqsize, rt_tasktile_t *tile)
iterate the shared iterator over the requested half-open interval
Definition: threads.c:2042
int current
current value
Definition: threads.h:444
int phase
Flag to separate waiters from fast workers.
Definition: threads.h:200
void * parms
parms for fctn pointer
Definition: threads.h:216
void *(*)(void *) rt_thread_run_barrier(rt_run_barrier_t *barrier, void *fctn(void *), void *parms, void **rsltparms)
Wait until all threads reach barrier, and return the function pointer passed in by the master thread...
Definition: threads.c:1490
void *(* RTTHREAD_START_ROUTINE)(void *)
Typedef to eliminate compiler warning caused by C/C++ linkage conflict.
Definition: threads.c:643
rt_thread_t * threads
worker threads
Definition: threads.h:503
int rt_threadpool_worker_setdevspeed(void *voiddata, float speed)
worker thread calls this to set relative speed of this device as determined by the SM/core count and ...
Definition: threads.c:1985
#define CPU_HYPERVISOR
VM/Hypervisor environment.
Definition: threads.h:56
int rt_threadpool_worker_getdevid(void *voiddata, int *devid)
worker thread can call this to get its CPU/GPU device ID
Definition: threads.c:1970
int threadid
ID of worker thread.
Definition: threads.h:586
int threadid
worker thread&#39;s id
Definition: threads.h:487
int rt_shared_iterator_setfatalerror(rt_shared_iterator_t *it)
worker thread calls this to indicate a fatal error
Definition: threads.c:1772
#define CPU_F16C
F16C insns avail.
Definition: threads.h:59
int rt_atomic_int_add_and_fetch(rt_atomic_int_t *atomp, int inc)
fetch an atomic int and add inc to it, returning new value
Definition: threads.c:1180
int rt_cond_init(rt_cond_t *cvp)
initialize a condition variable
Definition: threads.c:852
run-barrier sync object with padding to prevent false sharing
Definition: threads.h:209
int devid
worker CPU/GPU device ID
Definition: threads.h:489
int rt_shared_iterator_destroy(rt_shared_iterator_t *it)
destroy a shared iterator
Definition: threads.c:1710
int rt_threadpool_sched_dynamic(rt_threadpool_t *thrpool, rt_tasktile_t *tile)
Set shared iterator state to half-open interval defined by tile.
Definition: threads.c:2034
#define CPU_ARM64_ASIMDFHM
Advanced SIMD FHM avail.
Definition: threads.h:81
int threadcount
number of workers
Definition: threads.h:587
int phase
Flag to separate waiters from fast workers.
Definition: threads.h:214
int rt_thread_create(rt_thread_t *thr, void *fctn(void *), void *arg)
create a new child thread
Definition: threads.c:645
int rt_threadpool_wait(rt_threadpool_t *thrpool)
wait for all worker threads to complete their work
Definition: threads.c:1900
int rt_tilestack_init(rt_tilestack_t *s, int size)
initialize task tile stack (to empty)
Definition: threads.c:1552
persistent thread pool
Definition: threads.h:498
int rt_threadpool_poll(rt_threadpool_t *thrpool)
Definition: threads.c:1908