#include #include #include #include #include #include #include #include // some basic c++ threading utilities // class ScopedLock { pthread_mutex_t *lock; public: ScopedLock(pthread_mutex_t *m) { lock = m; int rc=pthread_mutex_lock( lock ); // get the lock if ( rc == 0 ) return; std::cout << "Mutex lock failure." << std::endl; exit(1); } ~ScopedLock() { int rc = pthread_mutex_unlock(lock); if ( rc == 0 ) return; std::cout << "Mutex lock failure." << std::endl; exit(1); } }; class Clock { clockid_t clock_type; timespec start_time; const char* label; public: Clock( clockid_t clock, const char* l="" ) : clock_type(clock), label(l) { clock_gettime(clock_type, &start_time); } std::string read_timer() { struct timespec now; clock_gettime(clock_type, &now); double elapsed = now.tv_sec - start_time.tv_sec + 1e-9*(now.tv_nsec - start_time.tv_nsec); return format_time( label, elapsed ); } static std::string format_time( const char* label, double elapsed ) { std::stringstream rt; double seconds; int tenths; tenths = int( 10 * modf( elapsed, &seconds ) ); rt << label << (int(elapsed)) << "." << tenths; return rt.str(); } }; class WallClock :public Clock { public: WallClock() : Clock(CLOCK_MONOTONIC, "REAL TIME=" ) { } }; class CpuUserClock : public Clock { public: CpuUserClock() : Clock( CLOCK_PROCESS_CPUTIME_ID, "CPU USER TIME=" ) {} }; class CpuSystemClock { public: tms start_time; CpuSystemClock() { times(&start_time); } std::string read_timer() { tms now; times(&now); double elapsed = ( (double)(now.tms_stime - start_time.tms_stime) / (double)sysconf(_SC_CLK_TCK) ); return Clock::format_time( "SYSTEM TIME=", elapsed ); } }; // HACK ALERT - there isn't any good way to clean up Tcl interpreters in the worker - thread pool pattern!! void cleanup_interpreter( ); class WorkerPoolManager { int thread_count; std::vector slots; std::vector threads; static std::vector > task_queue; static pthread_mutex_t in_data_lock; public: WorkerPoolManager( int tc ) : thread_count(tc), slots(tc), threads(tc) { pthread_mutex_init( &in_data_lock, NULL ); } ~WorkerPoolManager( ) { pthread_mutex_destroy( &in_data_lock ); } void add_task( unsigned long task_number, void(*task_func)(unsigned long) ) { task_queue.push_back(std::pair(task_number,task_func)); } void run_threads() { { // Grab a lock so that threads won't start working until they all have been created. ScopedLock L(&in_data_lock); for( unsigned long i=0; i != thread_count; ++i ) { slots[i] = i; pthread_create( &(threads[i]), NULL, worker_thread, &slots[i] ); } } // for( unsigned long i=0; i != thread_count; ++i ) { pthread_join( threads[i], NULL ); } } static void* worker_thread( void* slot_p ) { while( 1 ) { std::pair task; int task_number; { ScopedLock L(&in_data_lock); if (task_queue.size() == 0) break; task = task_queue.back(); task_queue.pop_back( ); } task.second( task.first ); } cleanup_interpreter( ); return NULL; } };