Introduced new OpenThreads::Affinity class to wrap up specification of thread affinity.

Simplified the OpenThreads::SetProcessorAffinityOfCurrentThread/Thread::SetProcessorAffinity() to utilize the new Affinity class
This commit is contained in:
Robert Osfield 2016-09-27 10:50:38 +01:00
parent 0f8a5a86e2
commit bc44da49e6
5 changed files with 108 additions and 83 deletions

View File

@ -0,0 +1,65 @@
/* -*-c++-*- OpenThreads library, Copyright (C) 2016 Robert Osfield
*
* This library is open source and may be redistributed and/or modified under
* the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
* (at your option) any later version. The full license is in LICENSE file
* included with this distribution, and on the openscenegraph.org website.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* OpenSceneGraph Public License for more details.
*/
//
// Affinity - C++ Affinity class
// ~~~~~~~~
//
#ifndef _OPENTHREADS_AFFINITY_
#define _OPENTHREADS_AFFINITY_
#include <set>
#include <OpenThreads/Mutex>
namespace OpenThreads {
/**
* @class Affinity
* @brief Simple container for specifying which CPU a thread should have affinity with.
* An empty Affinity.activeCPUs/default constructed Affinity signifies that a thread should not have any specific affinity and be able to run on all available CPUs.
*/
class Affinity
{
public:
Affinity() {}
Affinity(unsigned int cpuNumber) { activeCPUs.insert(cpuNumber); }
Affinity(unsigned int cpuNumber, unsigned int cpuCount) { while(cpuCount>0) { activeCPUs.insert(cpuNumber++); --cpuCount; } }
Affinity(const Affinity& rhs) : activeCPUs(rhs.activeCPUs) {}
Affinity& operator = (const Affinity& rhs) { if (&rhs!=this) { activeCPUs = rhs.activeCPUs; } return *this; }
/** add a specfied cpu core from the list to have affinity to. */
void add(unsigned int cpuNmber) { activeCPUs.insert(cpuNmber); }
/** remove a specfied cpu core from the list to have affinity to. */
void remove(unsigned int cpuNmber) { activeCPUs.erase(cpuNmber); }
/** return true if affinity has been provided for specific CPU cores.*/
operator bool () const { return !activeCPUs.empty(); }
typedef std::set<unsigned int> ActiveCPUs;
/** Set of CPUs that a thread should have affinity to.*/
ActiveCPUs activeCPUs;
};
}
#endif // !_OPENTHREADS_THREAD_

View File

@ -23,6 +23,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <OpenThreads/Mutex> #include <OpenThreads/Mutex>
#include <OpenThreads/Affinity>
namespace OpenThreads { namespace OpenThreads {
@ -34,19 +35,11 @@ namespace OpenThreads {
*/ */
extern OPENTHREAD_EXPORT_DIRECTIVE int GetNumberOfProcessors(); extern OPENTHREAD_EXPORT_DIRECTIVE int GetNumberOfProcessors();
/**
* Set the processor affinity mask of current thread. If you want to allow thread to run on any processor core use ~0ul for the cpumask
*/
extern OPENTHREAD_EXPORT_DIRECTIVE int SetProcessorAffinityMaskOfCurrentThread(unsigned long cpumask);
/** /**
* Set the processor affinity of current thread. * Set the processor affinity of current thread.
*/ */
inline int SetProcessorAffinityOfCurrentThread(unsigned int cpunum) int SetProcessorAffinityOfCurrentThread(const Affinity& affinity);
{
unsigned long cpumask = 1ul << cpunum;
return SetProcessorAffinityMaskOfCurrentThread(cpumask);
}
/** /**
* @class Thread * @class Thread
@ -338,27 +331,13 @@ public:
void* getImplementation(){ return _prvData; }; void* getImplementation(){ return _prvData; };
/** Set the Thread's processor affinity to a single CPU. /** Set the Thread's processor affinity to all, a single CPU or multiple CPU's
* This call must be made before * This call must be made before
* start() or startThread() and has no effect after the thread * start() or startThread() and has no effect after the thread
* has been running. Returns 0 on success, implementation's * has been running. Returns 0 on success, implementation's
* error on failure, or -1 if ignored. * error on failure, or -1 if ignored.
*/ */
int setProcessorAffinity( unsigned int cpunum ) int setProcessorAffinity( const Affinity& affinity);
{
unsigned long cpumask = 1ul << cpunum;
return setProcessorAffinityMask(cpumask);
}
/** Set the Thread's processor affinity to a one or more CPU's using mask.
* If you want this threadd to run on any processor core then use a cpumask of ~0ul
* This call must be made before
* start() or startThread() and has no effect after the thread
* has been running. Returns 0 on success, implementation's
* error on failure, or -1 if ignored.
*/
int setProcessorAffinityMask( unsigned long cpumask);
/** microSleep method, equivalent to the posix usleep(microsec). /** microSleep method, equivalent to the posix usleep(microsec).

View File

@ -59,6 +59,7 @@ ENDIF()
SET(HEADER_PATH ${OpenThreads_SOURCE_DIR}/include/OpenThreads) SET(HEADER_PATH ${OpenThreads_SOURCE_DIR}/include/OpenThreads)
SET(OpenThreads_PUBLIC_HEADERS SET(OpenThreads_PUBLIC_HEADERS
${HEADER_PATH}/Atomic ${HEADER_PATH}/Atomic
${HEADER_PATH}/Affinity
${HEADER_PATH}/Barrier ${HEADER_PATH}/Barrier
${HEADER_PATH}/Block ${HEADER_PATH}/Block
${HEADER_PATH}/Condition ${HEADER_PATH}/Condition

View File

@ -108,19 +108,35 @@ void thread_cleanup_handler(void *arg)
namespace OpenThreads namespace OpenThreads
{ {
static void setCPUMask(cpu_set_t* cpumask, unsigned long in_cpumask) static void setCPUMask(cpu_set_t* cpumask, const Affinity& affinity)
{ {
std::cout<<"setCPUMask : "<< affinity.activeCPUs.size() <<std::endl;
CPU_ZERO( cpumask ); CPU_ZERO( cpumask );
unsigned int numprocessors = OpenThreads::GetNumberOfProcessors(); unsigned int numprocessors = OpenThreads::GetNumberOfProcessors();
unsigned int cpunum=0; if (affinity)
while(in_cpumask!=0 && cpunum<numprocessors)
{ {
if ((in_cpumask&1)!=0) for(Affinity::ActiveCPUs::const_iterator itr = affinity.activeCPUs.begin();
itr != affinity.activeCPUs.end();
++itr)
{ {
CPU_SET( cpunum, cpumask ); if (*itr<numprocessors)
{
std::cout<<" setting CPU : "<< *itr<<std::endl;
CPU_SET( *itr, cpumask );
}
}
}
else
{
// BUG-fix for linux:
// Each thread inherits the processor affinity mask from its parent thread.
// We need to explicitly set it to all CPUs, if no affinity was specified.
for (unsigned int i = 0; i < numprocessors; ++i)
{
std::cout<<" Fallback setting CPU : "<< i<<std::endl;
CPU_SET( i, cpumask );
} }
in_cpumask>>=1;
++cpunum;
} }
} }
@ -146,11 +162,9 @@ private:
PThreadPrivateData *pd = PThreadPrivateData *pd =
static_cast<PThreadPrivateData *>(thread->_prvData); static_cast<PThreadPrivateData *>(thread->_prvData);
if (pd->cpumask>=0) // set up processor affinity
{
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) || defined(HAVE_THREE_PARAM_SCHED_SETAFFINITY) || defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
cpu_set_t cpumask; cpu_set_t cpumask;
setCPUMask( &cpumask, pd->cpumask ); setCPUMask( &cpumask, pd->affinity );
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) #if defined(HAVE_PTHREAD_SETAFFINITY_NP)
pthread_setaffinity_np( pthread_self(), sizeof(cpumask), &cpumask); pthread_setaffinity_np( pthread_self(), sizeof(cpumask), &cpumask);
@ -159,32 +173,7 @@ private:
#elif defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY) #elif defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
sched_setaffinity( 0, &cpumask ); sched_setaffinity( 0, &cpumask );
#endif #endif
#endif
}
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) || defined(HAVE_THREE_PARAM_SCHED_SETAFFINITY) || defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
else
{
// BUG-fix for linux:
// Each thread inherits the processor affinity mask from its parent thread.
// We need to explicitly set it to all CPUs, if no affinity was specified.
cpu_set_t cpumask;
CPU_ZERO( &cpumask );
for (int i = 0; i < OpenThreads::GetNumberOfProcessors(); ++i)
{
CPU_SET( i, &cpumask );
}
#if defined(HAVE_PTHREAD_SETAFFINITY_NP)
pthread_setaffinity_np( pthread_self(), sizeof(cpumask), &cpumask);
#elif defined(HAVE_THREE_PARAM_SCHED_SETAFFINITY)
sched_setaffinity( 0, sizeof(cpumask), &cpumask );
#elif defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
sched_setaffinity( 0, &cpumask );
#endif
}
#endif
ThreadCleanupStruct tcs; ThreadCleanupStruct tcs;
tcs.thread = thread; tcs.thread = thread;
@ -548,20 +537,20 @@ size_t Thread::getProcessId()
return (size_t)(pd->tid); return (size_t)(pd->tid);
} }
int OpenThreads::SetProcessorAffinityMaskOfCurrentThread(unsigned long in_cpumask) int OpenThreads::SetProcessorAffinityOfCurrentThread(const Affinity& affinity)
{ {
Thread::Init(); Thread::Init();
Thread* thread = Thread::CurrentThread(); Thread* thread = Thread::CurrentThread();
if (thread) if (thread)
{ {
return thread->setProcessorAffinityMask(in_cpumask); return thread->setProcessorAffinity(affinity);
} }
else else
{ {
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) || defined(HAVE_THREE_PARAM_SCHED_SETAFFINITY) || defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY) // set up processor affinity
cpu_set_t cpumask; cpu_set_t cpumask;
setCPUMask(&cpumask, in_cpumask); setCPUMask( &cpumask, affinity );
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) #if defined(HAVE_PTHREAD_SETAFFINITY_NP)
pthread_setaffinity_np( pthread_self(), sizeof(cpumask), &cpumask); pthread_setaffinity_np( pthread_self(), sizeof(cpumask), &cpumask);
@ -572,7 +561,6 @@ int OpenThreads::SetProcessorAffinityMaskOfCurrentThread(unsigned long in_cpumas
#elif defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY) #elif defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
sched_setaffinity( 0, &cpumask ); sched_setaffinity( 0, &cpumask );
return 0; return 0;
#endif
#endif #endif
} }
@ -585,18 +573,15 @@ int OpenThreads::SetProcessorAffinityMaskOfCurrentThread(unsigned long in_cpumas
// //
// Use: public // Use: public
// //
int Thread::setProcessorAffinityMask(unsigned long in_cpumask) int Thread::setProcessorAffinity(const Affinity& affinity)
{ {
PThreadPrivateData *pd = static_cast<PThreadPrivateData *> (_prvData); PThreadPrivateData *pd = static_cast<PThreadPrivateData *> (_prvData);
pd->cpumask = in_cpumask; pd->affinity = affinity;
if (pd->cpumask==0) return -1;
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) || defined(HAVE_THREE_PARAM_SCHED_SETAFFINITY) || defined(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
if (pd->isRunning() && Thread::CurrentThread()==this) if (pd->isRunning() && Thread::CurrentThread()==this)
{ {
cpu_set_t cpumask; cpu_set_t cpumask;
setCPUMask(&cpumask, in_cpumask); setCPUMask(&cpumask, affinity);
#if defined(HAVE_PTHREAD_SETAFFINITY_NP) #if defined(HAVE_PTHREAD_SETAFFINITY_NP)
return pthread_setaffinity_np (pthread_self(), sizeof(cpumask), &cpumask); return pthread_setaffinity_np (pthread_self(), sizeof(cpumask), &cpumask);
@ -608,9 +593,6 @@ int Thread::setProcessorAffinityMask(unsigned long in_cpumask)
} }
return -1; return -1;
#else
return -1;
#endif
} }

View File

@ -53,7 +53,6 @@ private:
nextId++; nextId++;
threadPriority = Thread::THREAD_PRIORITY_DEFAULT; threadPriority = Thread::THREAD_PRIORITY_DEFAULT;
threadPolicy = Thread::THREAD_SCHEDULE_DEFAULT; threadPolicy = Thread::THREAD_SCHEDULE_DEFAULT;
cpumask = ~0ul;
}; };
virtual ~PThreadPrivateData() {}; virtual ~PThreadPrivateData() {};
@ -81,8 +80,7 @@ private:
volatile int uniqueId; volatile int uniqueId;
volatile unsigned long cpumask; Affinity affinity;
static int nextId; static int nextId;