#include "work_pool.h" #include #include #include #include #include struct some_stats { std::vector data; std::string string_rep; public: some_stats( std::istream& in ) { // take a csv input stream and convert it into a person_stat char comma; double d; while( in ) { in >> d; data.push_back(d); in >> comma; } } const char* csv() { string_rep=""; std::stringstream buf_str; for( size_t idx=0; idx != data.size(); ++idx ) { buf_str << data[idx]; if ( idx+1 != data.size() ) buf_str << ","; } string_rep = buf_str.str(); return string_rep.c_str(); } }; struct thread_specific_data { Tcl_Interp* interpreter; some_stats* record; Tcl_Obj* long_result; Tcl_Obj* double_result; }; pthread_key_t my_thread_key; int some_stats_tcl_interface( ClientData cd, struct Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ) { thread_specific_data* this_thread = static_cast(cd); some_stats* record = this_thread->record; if ( !record || objc < 2 ) { Tcl_AddErrorInfo( interp, "Unknown command or argument for person record Object" ); return TCL_ERROR; } const char* command = Tcl_GetStringFromObj( objv[1], NULL ); // Example of the indexed random access method if ( strcmp( command, "size" ) == 0 ) { size_t sz = record->data.size(); if (Tcl_IsShared(this_thread->long_result)) { // Handle the case where the user has retained a reference to the old result. Tcl_DecrRefCount(this_thread->long_result); this_thread->long_result=Tcl_NewLongObj(sz); Tcl_IncrRefCount(this_thread->long_result); } else { // Use existing copy since no references exist but our own. Tcl_SetLongObj(this_thread->long_result, sz); } Tcl_SetObjResult( interp, this_thread->long_result ); } else if ( strcmp( command, "value" ) == 0 ) { if ( objc != 3 ) { Tcl_AddErrorInfo( interp, "Unknown command or argument for person record Object" ); return TCL_ERROR; } long index_val; int rc = Tcl_GetLongFromObj( interp, objv[2], &index_val ); if ( rc != TCL_OK ) return rc; double val = record->data[index_val]; if (Tcl_IsShared(this_thread->double_result)) { // Handle the case where the user has retained a reference to the old result. Tcl_DecrRefCount(this_thread->double_result); this_thread->double_result=Tcl_NewDoubleObj(val); Tcl_IncrRefCount(this_thread->double_result); } else { // Use existing copy since no references exist but our own. Tcl_SetDoubleObj(this_thread->double_result, val); } Tcl_SetObjResult( interp, this_thread->double_result ); } else { Tcl_AddErrorInfo( interp, "Unknown command for person record Object" ); return TCL_ERROR; } return TCL_OK; } void cleanup_interpreter( ) { thread_specific_data* thread_data = (thread_specific_data*) pthread_getspecific(my_thread_key); if ( thread_data ) { Tcl_DeleteInterp( thread_data->interpreter ); thread_data->interpreter = NULL; free( thread_data ); } } const char* tcl_proc = "# Tcl based calculation returning a number\n" "set tcl_precision 17\n" "proc calc_abmi { record } {\n" " set wval 0.0\n" " set vlen [ $record size ]\n" " set old_val 0.0\n" " for { set idx 0 } { $idx < $vlen } { incr idx } {\n" " set wval [ expr { pow(1.009 + sqrt($old_val)*(1/(abs($wval)+0.001) + [ $record value $idx ]), 1.0002) } ]\n" " set old_val [ expr { $wval/2.355832 } ]\n" " }\n" " return [ expr { $wval/$vlen } ]\n" "}\n" "proc calc_abmi_string { record } {\n" " set wval 0.0\n" " set vlen [ $record size ]\n" " set old_val 0.0\n" " set result \"$vlen\"\n" " for { set idx 0 } { $idx < $vlen } { incr idx } {\n" " append result [format \",\\$i=%d\\{\%.3lg\\}\" $idx [ $record value $idx ]]\n" " append result \",the,quick,brown,fox,\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " append result \"jumped,over,the,lazy,dog\"\n" " set wval [ expr { pow(1.009 + sqrt($old_val)*(1/(abs($wval)+0.001) + [ $record value $idx ]), 1.0002) } ]\n" " set old_val [ expr { $wval/2.355832 } ]\n" " append result \",$wval\"\n" " append result \",$old_val\"\n" " }\n" " return $result\n" "}\n"; // C++ based calculation returning a number void do_cpp_calc_abmi( some_stats &p, double* result ) { double wval=0.0; size_t vlen = p.data.size(); double old_val=0.0; for( size_t idx=0; idx != vlen; ++idx ) { wval = pow(1.009 + sqrt(old_val)*(1/fabs(wval+0.001) + p.data[idx]),1.0002); old_val = wval/2.355832; } *result = wval/vlen; } void do_cpp_calc_abmi_string( some_stats &p, std::vector& result ) { double wval=0.0; size_t vlen = p.data.size(); double old_val=0.0; //std::stringstream result; size_t curr_str_len = 0; result.clear(); char buf[256]; size_t print_length = snprintf(buf, 256, "%ld", vlen); result.insert(result.end(),&buf[0],&buf[0]+print_length); for( size_t idx=0; idx != vlen; ++idx ) { print_length = snprintf(buf, 256, ",$i=%ld{%.3lg}", idx, p.data[idx]); result.insert(result.end(),&buf[0],&buf[0]+print_length); curr_str_len+=print_length; const char *gratuitous_extra_string =",the,quick,brown,fox,jumped,over,the,lazy,dog"; result.insert(result.end(),gratuitous_extra_string,gratuitous_extra_string+21); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); result.insert(result.end(),gratuitous_extra_string+21,gratuitous_extra_string+45); wval = pow(1.009 + sqrt(old_val)*(1/fabs(wval+0.001) + p.data[idx]),1.0002); print_length = snprintf(buf, 256, ",%.17lg", wval); result.insert(result.end(),&buf[0],&buf[0]+print_length); old_val = wval/2.355832; print_length = snprintf(buf, 256, ",%.17lg", old_val); result.insert(result.end(),&buf[0],&buf[0]+print_length); } result.insert(result.end(),'\0'); } bool setup_interpreter( ) { // Lazy creation of the interpreters inside the thread that is using it // follows Tcls restriction that interpreters are created, used and destroyed on the same thread. // set up thread specific data thread_specific_data* thread_data = (thread_specific_data*) malloc(sizeof(thread_specific_data)); pthread_setspecific(my_thread_key,thread_data); // Fill in Tcl specific contents for the thread data Tcl_Interp *interp = thread_data->interpreter = Tcl_CreateInterp(); thread_data->long_result=Tcl_NewLongObj(0); Tcl_IncrRefCount(thread_data->long_result); thread_data->double_result=Tcl_NewDoubleObj(0.0); Tcl_IncrRefCount(thread_data->double_result); int rc=0; // Initialize the interpreter with the function we are calling Tcl_CreateObjCommand( interp, "arg1", some_stats_tcl_interface, thread_data, NULL ); rc = Tcl_Eval( interp, tcl_proc ); if ( rc != TCL_OK ) { std::cout << "An error was encountered during evaluation of TCL code."; const char * errorInfoStr = Tcl_GetVar(interp, "errorInfo", TCL_GLOBAL_ONLY); if ( errorInfoStr ) std::cout << errorInfoStr << std::endl; return false; } return true; } // dummy task data const char* dummy_record="2.3,482.5,5.8,9825.23,2853.209834,43049.8,42,55,88,203984,2039.20,20346.0239,3849.2309,20389.22,4e-07,0.123,0.1351324"; int iteration_count=5000; void do_tcl_strings( unsigned long task_index ) { std::stringstream linestream( dummy_record ); some_stats next( linestream ); #ifdef DUMP_OUTPUT std::stringstream filename; filename << "tcl.s.out." << task_index; std::ofstream outfile( filename.str().c_str() ); #endif thread_specific_data* thread_data = (thread_specific_data*) pthread_getspecific(my_thread_key); if (!thread_data) { // Set up an interpreter on this thread. if (!setup_interpreter()) return; thread_data = (thread_specific_data*) pthread_getspecific(my_thread_key); if (!thread_data) { std::cerr << "Unable to initialize thread data." << std::endl; return; } } for( int i=0; irecord = &next; const char *s = "calc_abmi_string arg1"; // Now call the proc Tcl_Interp *interp = thread_data->interpreter; int rc = Tcl_Eval( interp, s ); if ( rc != TCL_OK ) { std::cout << "An error was encountered during evaluation of TCL code."; const char * errorInfoStr = Tcl_GetVar(interp, "errorInfo", TCL_GLOBAL_ONLY); if ( errorInfoStr ) std::cout << errorInfoStr << std::endl; } Tcl_Obj *res = Tcl_GetObjResult( interp ); char* result = Tcl_GetStringFromObj(res, NULL ); #ifdef DUMP_OUTPUT outfile << result << std::endl; #endif thread_data->record = NULL; } } void do_tcl_numbers( unsigned long task_index ) { std::stringstream linestream( dummy_record ); some_stats next( linestream ); thread_specific_data* thread_data = (thread_specific_data*) pthread_getspecific(my_thread_key); if (!thread_data) { // Set up an interpreter on this thread. if (!setup_interpreter()) return; thread_data = (thread_specific_data*) pthread_getspecific(my_thread_key); if (!thread_data) { std::cerr << "Unable to initialize thread data." << std::endl; return; } } #ifdef DUMP_OUTPUT std::stringstream filename; filename << "tcl.n.out." << task_index; std::ofstream outfile( filename.str().c_str() ); #endif for( int i=0; irecord = &next; double ret_val; const char *s = "calc_abmi arg1"; // Now call the proc Tcl_Interp *interp = thread_data->interpreter; int rc = Tcl_Eval( interp, s ); if ( rc != TCL_OK ) { std::cout << "An error was encountered during evaluation of TCL code."; const char * errorInfoStr = Tcl_GetVar(interp, "errorInfo", TCL_GLOBAL_ONLY); if ( errorInfoStr ) std::cout << errorInfoStr << std::endl; } Tcl_Obj *res = Tcl_GetObjResult( interp ); rc = Tcl_GetDoubleFromObj(interp, res, &ret_val ); #ifdef DUMP_OUTPUT outfile << ret_val << std::endl; #endif thread_data->record = NULL; } } void do_c_strings( unsigned long task_index ) { std::stringstream linestream( dummy_record ); some_stats next( linestream ); std::vector outbuf; #ifdef DUMP_OUTPUT std::stringstream filename; filename << "c.s.out." << task_index; std::ofstream outfile( filename.str().c_str() ); #endif for( int i=0; icurr_arg ) { if ( strcmp( argv[curr_arg], "-tcl" ) == 0 ) { run_tcl=true; curr_arg++; continue; } if ( strcmp( argv[curr_arg], "-c" ) == 0 ) { run_c=true; curr_arg++; continue; } if ( strcmp( argv[curr_arg], "-s" ) == 0 ) { run_strings=true; curr_arg++; continue; } if ( strcmp( argv[curr_arg], "-n" ) == 0 ) { run_numbers=true; curr_arg++; continue; } thread_count=atoi(argv[curr_arg]); if ( thread_count < 2 ) { std::cerr << usage; return 1; } curr_arg++; } if ( (run_strings == run_numbers) || (run_c == run_tcl) ) { std::cout << usage; exit(1); } // Set up our worker pool WorkerPoolManager task_manager( thread_count ); // Create a number of tasks that will run multi-threaded unsigned long num_tasks=256; if ( run_tcl ) { int rc = pthread_key_create( &my_thread_key, NULL ); if ( run_strings ) { for( int curr_task=0; curr_task < num_tasks; ++curr_task ) { task_manager.add_task( curr_task, do_tcl_strings ); } } else { for( int curr_task=0; curr_task < num_tasks; ++curr_task ) { task_manager.add_task( curr_task, do_tcl_numbers); } } } else { // run C++ based tasks if ( run_strings ) { for( int curr_task=0; curr_task < num_tasks; ++curr_task ) { task_manager.add_task( curr_task, do_c_strings); } } else { for( int curr_task=0; curr_task < num_tasks; ++curr_task ) { task_manager.add_task( curr_task, do_c_numbers); } } } if ( run_strings ) std::cout << "Doing string calculations in "; else std::cout << "Doing number calculations in "; if ( run_tcl ) std::cout << "tcl. "; else std::cout << "C++. "; WallClock WC; CpuUserClock UC; CpuSystemClock SC; std::cout << "Running " << thread_count << " threads. " << std::endl; task_manager.run_threads(); if ( run_tcl ) { pthread_key_delete( my_thread_key ); } std::cout << std::setprecision(1) << WC.read_timer() << " " << UC.read_timer() << " " << SC.read_timer() << std::endl; }