/* * C-DAC Tech Workshop : hyPACK-2013 * October 15-18, 2013 * * Date : August 2013 * File : pthread_prod_cons_large.cpp * Desc : Producer-Consumer problem with large no. of threads and large no. of resources * (as many as user specifies) using mutex * Input : NumThreads, NumResources & ThreadAffMask (all 3, first 2 or first 1 only are * also accepted) * Output : Thread-affinity set (if specified) , time taken to execute in sec & microsec. * E-mail : hpcfte@cdac.in * */ #include #include #include #include #include #include #include //for thread affinity using namespace std; #define NUM_LOOP 10000 //Number of loops for each thread. int *ResourceQueue;//queue for the resources produced. int QueueIndex; double ResourceCount; int NumThreads;//equal for producer/consumer int NumResource;//maximum num., of resource on shared memory. pthread_mutex_t QueueMutex;//mutex object for accessing queue. void *producer(void *data); void *consumer(void *data); int main(int argc, char *argv[]) { int temp; int rc; //return code double timetaken; struct timeval tv_start, tv_end;//for gettimeofday() struct timezone tz_start, tz_end;//for gettimeofday() unsigned long ThreadAffMask;//to obtain Thread Affinity (no. of processors to which thread is bound) unsigned int ThreadAffMaskLen = sizeof(ThreadAffMask);//length of AffinityMask pid_t ProgPid = getpid();//PID of the program for thread affinity ResourceCount = 0;//initially no resources in the queue. QueueIndex = 0; if(argc == 2) { NumThreads = atoi(argv[1]); cout << "Number of resources not provided.\n" "Assuming 1000 resources in the shared queue." "\n"; NumResource = 1000; } else if(argc == 3) { NumThreads = atoi(argv[1]); NumResource = atoi(argv[2]); } else if(argc == 4)//user provided NumThreads, NumResources & ThreadAffMask (i.e., no. of cores to bind) { NumThreads = atoi(argv[1]); NumResource = atoi(argv[2]); ThreadAffMask = atoi(argv[3]); rc = sched_setaffinity(ProgPid, ThreadAffMaskLen, (cpu_set_t *)&ThreadAffMask); if(rc==-1) cout << "ERROR: Couldn't set thread affinity mask\n"; } else { cout << "Correct usage: " << argv[0] << " \n" "Assuming NumThreads = 64 , NumResource = 1000 & default thread-affinity\n"; NumThreads = 64; NumResource = 1000; } //initialize Resource Queue based on NumResource ResourceQueue = (int *)malloc(sizeof(int)*NumResource); long ThreadIds[NumThreads*2];//for passing unique ID nos. to each thread. pthread_t tid[NumThreads*2]; pthread_mutex_init(&QueueMutex, NULL); //start time gettimeofday(&tv_start, &tz_start); //Thread Create operation - producer for(int i=0; i 0) { ResourceQueue[--QueueIndex] = -1;//remove the resource ResourceCount--;//reduce no. of resources available by one. extracted = 1; loopCnt--; } pthread_mutex_unlock(&QueueMutex); } cout << "Consumer : "<< *id << " extracted.\n"; //random = rand_r((unsigned int*)&seed) % 3 ; //sleep(random+1); } }