C++线程同步成产者消费者举例1.测试用例文件说明:本用例是在VS2010(中文版)环境下进行。项目创建流程(只供参考):文件新建VisualC++Win32项目输入项目名(ThreadPC)确定下一步选择控制台应用程序完成测试用例文件用于描述各线程的有关信息,该文件内容及格式如下(将下列内容复制到test.txt文件中):31P32P43C414P25C3124说明:第一行给出的是程序中设置的临界区个数;其余各行是各进程信息。每行中的数据之间用Tab键分隔。第一列(除第一行外):线程号(共5个线程)。第二列:P——生产者,C——消费者。第三列:线程在生产和消费前的休眠时间,单位为秒。第四及以后各列:消费的产品所对应的生产者线程号。2.数据结构(1)用整型数组Buffer_Critical表示缓冲区。//本程序允许的最大临界区数#defineMAX_BUFFER_NUM10intBuffer_Critical[MAX_BUFFER_NUM];//缓冲区声明,用于存放产品(2)用自定义结构ThreadInfo记录一条线程信息,多个线程对应一个ThreadInfo数组。structThreadInfo{intserial;//线程序列号charentity;//是P还是Cdoubledelay;//线程延迟intthread_request[MAX_THREAD_NUM];//线程请求队列intn_request;//请求个数};(3)通过如下同步对象实现互斥:设一个互斥量h_mutex,实现生产者在查询和保留缓冲区的下一个空位置时进行互斥。HANDLEh_mutex;//一个互斥量设置h_Semaphore[MAX_THREAD_NUM]信号量数组表示相应产品已经生产,实现生产者与消费者之间的同步。同时,用表示空缓冲区数目的信号量empty_semephore指示是否存在空位置,实现类似的同步,以便开始下一个产品的生产。//本程序允许的生产和消费线程的总数#defineMAX_THREAD_NUM64HANDLEh_Semaphore[MAX_THREAD_NUM];//生产者允许消费者开始消费的信号量设置临界区对象数组PC_Critical[MAX_BUFFER_NUM]实现每个缓冲区上消费者之间的互斥。CRITICAL_SECTIONPC_Critical[MAX_BUFFER_NUM];//临界区对象的声明,用于管理缓冲区的互斥访问3.程序流程为了方便,程序结构用如下的文字予以描述。(1)主函数(2)初始化缓冲区、消费请求队列及部分同步对象(3)提取线程信息(到test.txt文件中提取)(4)完成线程相关同步对象的初始化(5)创建线程,模拟生产者和消费者(6)等待所有线程结束(7)程序结束(8)消费者(9)有无消费请求?有,则继续(10);无,则转(16)(10)此请求可满足?可满足,转(11);否,则阻塞,再转(10)(11)确定产品位置(12)此产品正被消费?是,则阻塞,再转(12);否,则转(13)(13)进入临界区(请求同一产品的消费者之间互斥)(14)消费产品,并判断是否应该释放产品所占缓冲区(15)退出临界区,转(9)(16)结束消费者线程(17)生产者(18)存在空缓冲区?有,则继续(19);无,则阻塞,再转(18)(19)另一生产者在写?否,则转(20);是,则阻塞,再转(19)(20)进入临界区(请求同一产品的生产者之间互斥)(21)在缓冲区中为本线程产品分配空间(22)退出临界区(23)写入产品到分配的缓冲区空间中(24)结束生产者线程4.代码实现//ThreadPC.cpp:定义控制台应用程序的入口点#includestdafx.h#includewindows.h#includefstream#includestdio.h#includestring#includeconio.h#includeiostreamusingnamespacestd;//定义一些常量//本程序允许的最大临界区数#defineMAX_BUFFER_NUM10//秒到微秒的乘法因子#defineINTE_PER_SEC1000//本程序允许的生产和消费线程的总数#defineMAX_THREAD_NUM64//定义一个结构,记录在测试文件中指定的每一个线程的参数structThreadInfo{intserial;//线程序列号charentity;//是P还是Cdoubledelay;//线程延迟intthread_request[MAX_THREAD_NUM];//线程请求队列intn_request;//请求个数};//全局变量的定义//struct_RTL_CRITICAL_SECTION;//typedef_RTL_CRITICAL_SECTIONRTL_CRITICAL_SECTION;//typedefRTL_CRITICAL_SECTIONCRITICAL_SECTION;CRITICAL_SECTIONPC_Critical[MAX_BUFFER_NUM];//临界区对象的声明,用于管理缓冲区的互斥访问intBuffer_Critical[MAX_BUFFER_NUM];//缓冲区声明,用于存放产品HANDLEh_Thread[MAX_THREAD_NUM];//用于存储每个线程句柄的数组ThreadInfoThread_Info[MAX_THREAD_NUM];//线程信息数组HANDLEempty_semaphore;//一个信号量HANDLEh_mutex;//一个互斥量DWORDn_Thread=0;//实际的线程的数目DWORDn_Buffer_or_Critical;//实际的缓冲区或者临界区的数目HANDLEh_Semaphore[MAX_THREAD_NUM];//生产者允许消费者开始消费的信号量//生产消费及辅助函数的声明voidProduce(void*p);voidConsume(void*p);boolIfInOtherRequest(int);intFindProducePosition();intFindBufferPosition(int);intmain(void){//声明所需变量inti=0,j=0;DWORDwait_for_all;ifstreaminFile;//输入文件//初始化缓冲区,即产品缓冲区,存储生产的产品,产品为正整数,-1表示空for(inti=0;iMAX_BUFFER_NUM;i++)Buffer_Critical[i]=-1;//初始化每个线程的请求队列for(intj=0;jMAX_THREAD_NUM;j++){for(intk=0;kMAX_THREAD_NUM;k++)Thread_Info[j].thread_request[k]=-1;//-1表示无请求,正整数表示生产者请求生产的产品或消费者请求消费的产品Thread_Info[j].n_request=0;}//初始化临界区对象列表,使用临界区是必须初始化for(i=0;iMAX_BUFFER_NUM;i++)InitializeCriticalSection(&PC_Critical[i]);//打开输入文件,按照规定的格式提取线程等信息inFile.open(test.txt);//从文件中获得实际的缓冲区或临界区对象的数目inFilen_Buffer_or_Critical;//=3inFile.get();//读取\nprintf(输入文件是:\n);//回显获得的缓冲区的数目信息printf(缓冲区的数目是:%d\n,(int)n_Buffer_or_Critical);//提取每个线程的信息到相应数据结构中while(inFile){inFileThread_Info[n_Thread].serial;//线程号inFileThread_Info[n_Thread].entity;//P—生产者,C—消费者inFileThread_Info[n_Thread].delay;//线程在生产和消费前的休眠时间,单位为秒charc;inFile.get(c);while(c!='\n'&&!inFile.eof()){//c!='\n'说明delay列后面还有数据inFileThread_Info[n_Thread].thread_request[Thread_Info[n_Thread].n_request++];inFile.get(c);}n_Thread++;}//输出获得的线程信息,便于确认正确性cout从文件中获得的线程信息endl;coutserial\tentity\tdelay\trequest\t...endl;for(j=0;j(int)n_Thread;j++){intTemp_serial=Thread_Info[j].serial;charTemp_entity=Thread_Info[j].entity;doubleTemp_delay=Thread_Info[j].delay;printf(\n线程%2d\t%c\t%f\t,Temp_serial,Temp_entity,Temp_delay);intTemp_request=Thread_Info[j].n_request;for(intk=0;kTemp_request;k++)printf(%d\t,Thread_Info[j].thread_request[k]);coutendl;}printf(\n\n);//创建信号量empty_semaphore控制同一时刻访问空缓冲区域的线程数目empty_semaphore=CreateSemaphore(NULL,n_Buffer_or_Critical,n_Buffer_or_Critical,(LPCWSTR)semaphore_for_empty);//互斥量h_mutex,实现生产者在查询和保留缓冲区的下一个空位置时进行互斥h_mutex=CreateMutex(NULL,FALSE,(LPCWSTR)mutex_for_update);//下面这个循环用线程的ID号来为相应生产线程的产品读写时所使用的同步信号量命名for(j=0;j(int)n_Thread;j++){std::stringlp=semaphore_for_produce_;inttemp=j;while(temp){//将tmp转化为字符串charc=(char)(temp%10)+'0';lp+=c;temp/=10;}//信号量h_Semaphore[i]控制被生产者线程h_Thread[i]生产的线程可以被多少个消费线程使用h_Semaphore[j+1]=CreateSemaphore(NULL,0,(LONG)n_Thread,(LPCWSTR)lp.c_str());}//创建生产者和消费者线程for(i=0;i(int)n_Thread;i++){if(Thread_Info[i].entity=='P')h_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)(Produce),&(Thread_Info[i]),0,NULL);elseh_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)(Consume),&(Thread_Info[i]),0,NULL);}//主程序等待各个线程的动作结束wait_for_all=WaitForMultipleObjects(n_Thread,h_Thread,TRUE,-1);printf(\n\n所有的生产者和消费者已经完成他们的工作.\n);printf(按任意键退出