串行归并与并行归并排序算法一、串行归并排序算法归并排序是一种很容易进行并行化的算法,因为归并的各个数据区间都是独立的,没有依赖关系。并且归并排序是一种速度比较快的排序,且是一种稳定的排序算法,排序速度与关键词初始排列无关。串行归并排序的算法大体的可以描述为:首先将要排序的表分成两个节点个数基本相等的子表,然后对每个子表进行排序,最后将排好序的两个子表合并成一个子表。在对子表进行排序时可以将子表再分解成两个节点数量基本相同的子表,当子表足够小时,也可以采用其他排序方法对子表进行排序,然后再对排好序的子表进行归并操作,最后将整个表排好序。1、1算法流程图并行归并排序算法的流程图:开始将数组从中间分成两段,成为两个数组结束将两个数组分别进行归并排序串行归并排序算发流程图合并两个数组递归11、2代码分析#includeiostreamusingnamespacestd;#defineN11intarray[N]={4,67,456,23,1,78,26,222,34,432,12};//待排序数组intother[N];//辅助空间,用以暂存已经排序的数组元素voidSwap(int&a,int&b){inttmp=a;a=b;b=tmp;}/*array待排序数组*begin数组开始的下标*end数组最后一个元素的下标*/voidMergeSort(int*array,intbegin,intend){if(end-begin+12){MergeSort(array,begin,(end+begin)/2);MergeSort(array,(end+begin)/2+1,end);inti=begin,j=(end+begin)/2+1,k=begin;while(i=(begin+end)/2&&j=end){if(array[i]array[j])other[k++]=array[i++];elseother[k++]=array[j++];}while(i=(begin+end)/2)other[k++]=array[i++];while(j=end)other[k++]=array[j++];for(k=begin;k=end;++k)array[k]=other[k];}else2if(array[end]array[begin])Swap(array[end],array[begin]);}voidOutput(int*array,intn){for(inti=0;in;++i)coutarray[i];coutendl;}intmain(){cout串行归并排序算法endlthenumbersare:;Output(array,N);MergeSort(array,0,N-1);coutthesortedresult:;Output(array,N);inti;cini;return0;}1、3运行结果1、4复杂度分析通过算法分析很容易发现串行归并排序算法的时间复杂度地推公式为:(1)(1)()(1)2()()2ifnTnnifnTn这是一个时间复杂度的递推公式,我们需要消去等号右侧的T(n),把T(n)写成n的函数。其实符合一定条件的Recurrence的展开有数学公式可以套,这里我们略去严格的数学证明,只是从直观上看一下这个递推公式的结果。当n=1时可以设1(1)Tc,当n1时可以设2()2()2nTnTcn,我们取c1和c2中较大的一个设为c,把原来的公式改为:3(1)()(1)2()2cifnTnnifnTcn参照主定理公式()()()nTnaTfnb,可以知道当n1时,有以下等式成立:22()abfncn同时又有下面的等式成立:log()()abfncnn则有T(n)满足主定理公式的第二种情况,也即是T(n)的时间复杂度为:log()(lg)(lg)abTnnnnn二、并行归并排序算法由串行归并排序算法可知,归并的各个数据区间都是独立的,没有依赖关系。因此归并排序是一种很容易进行并行化的算法。其方案是先将待排序区间划分成若干个相等的小区间,然后并行地对这些小区间进行排序,最后再通过归并的方法将所有排好序的小区间归并成一个有序系列。由于归并时是一层层向上进行的,因此需要将区间划分成2k个小区间,这样第1轮归并时,可以将2k个小区间归并成12k个区间。这样过k轮归并操作后就归并成一个有序的大区间。这也正是并行归并排序的算法思想。2、1算法流程图并行归并排序算法的思想可以通过下面的流程图表示:4开始结束讲数组分为2的k次方个小区间用多线程对这些小区间排序使用线性归并排序算法的归并函数将小区间合并并行归并排序算发流程图2、2代码分析功能函数头文件:MergeSort.h#defineMAX_MERGE_TURN24#defineMIN_PARALLEL_SORT_COUNT3#defineMIN_MERGE_COUNT2#defineCACHE_LINE_SIZE64typedefunsignedintUINT;#includeomp.h#includewindows.h#includeiostream#includecstdlib#includectimeusingnamespacestd;intcompare(int*one,int*two){if(*one*two)return1;elseif(*two*one)return-1;5elsereturn0;}void*GetCacheAlignedAddr(void*pAddr){intm=CACHE_LINE_SIZE;void*pRet=(void*)(((UINT)pAddr+m-1)&(-m));returnpRet;}/**串行归并函数归并好的数据存放在输出参数ppNewData中@paramvoid**ppData-待归并的数据@paramintnStart1-第个区间的开始位置(包括nStart1)@paramintnEnd1-第个区间的结束位置(包括nEnd1)@paramintnStart2-第个区间的开始位置(包括nStart2)@paramintnEnd2-第个区间的结束位置(包括nEnd2)@paramCOMPAREFUNCfunc-比较函数@paramvoid**ppNewData-存放归并后的数据@returnvoid**-返回归并后的数据指针(等于ppNewData)*/int**Serial_Merge(int**ppData,intnStart1,intnEnd1,intnStart2,intnEnd2,int(*func)(int*,int*),int**ppNewData){inti,j,k;i=nStart1;j=nStart2;k=nStart1;for(i=nStart1;i=nEnd1;){for(;j=nEnd2;j++){if((*func)(ppData[i],ppData[j])0){ppNewData[k]=ppData[i];++k;i++;6break;}else{ppNewData[k]=ppData[j];++k;}}//如果j已经到了尾部if(jnEnd2){for(;i=nEnd1;i++){ppNewData[k]=ppData[i];++k;}break;}}if(inEnd1){for(;j=nEnd2;j++){ppNewData[k]=ppData[j];++k;}}for(i=nStart1;i=nEnd2;i++){ppData[i]=ppNewData[i];}returnppData;}/**串行归并排序函数@paramvoid**ppData-待排序数据@paramintnBegin-排序区间的开始位置@paramintnEnd-排序区间的结束位置7@paramCOMPAREFUNCfunc-数据大小比较函数@returnvoid-无*/voidSerial_MergeSort(int**ppData,intnBegin,intnEnd,int(*func)(int*,int*)){if(nEnd-nBeginMIN_MERGE_COUNT){int*temp;if(*ppData[nBegin]*ppData[nEnd]){temp=ppData[nBegin];ppData[nBegin]=ppData[nEnd];ppData[nEnd]=temp;}return;}int**tempData=newint*[nEnd-nBegin+1];inti;inttmpInt=0;for(i=0;inEnd-nBegin+1;i++){tempData[i]=&tmpInt;}intnMid=(nBegin+nEnd)1;//除以Serial_MergeSort(ppData,nBegin,nMid,func);Serial_MergeSort(ppData,nMid+1,nEnd,func);Serial_Merge(ppData,nBegin,nMid,nMid+1,nEnd,func,tempData);}/**并行归并快速排序函数@paramvoid**ppData-待排序数据@paramintnLen-待排序数据长度@paramCOMPAREFUNCfunc-数据比较回调函数@returnvoid-无8*/voidParallel_MergeSort(int**ppData,intnLen,int(*func)(int*,int*)){inti,k;intnStep;intnLoopCount;intnCore;intnStart1,nEnd1;if(nLenMIN_PARALLEL_SORT_COUNT){Serial_MergeSort(ppData,0,nLen-1,func);return;}nCore=omp_get_num_procs();intnCount=nLen/MIN_PARALLEL_SORT_COUNT;intnTurn=MAX_MERGE_TURN;nLoopCount=1nTurn;//nLoopCount等于的nTurn次方while(nLoopCountnCount){nLoopCount=nLoopCount1;//除以--nTurn;}//判断nTurn是否为奇数if((nLoopCountnCore)&&(nTurn0x1)&&((nTurn&0x1)==0x1)){--nTurn;//把nTurn变成偶数,便于后面不拷贝数据nLoopCount=nLoopCount1;}nStep=nLen/nLoopCount;int*p=newint[nLoopCount*2+CACHE_LINE_SIZE];int*pnPos=(int*)GetCacheAlignedAddr(p);//将数据分成nLoopCount个小区间,并行地对每个区间进行串行排序#pragmaompparallelforprivate(nStart1,nEnd1)num_threads(nCore)9for(i=0;inLoopCount;i++){nStart1=i*nStep;nEnd1=nStart1+nStep-1;if(i==nLoopCount-1){nEnd1=nLen-1;}Serial_MergeSort(ppData,nStart1,nEnd1,func);pnPos[i+i]=nStart1;pnPos[i+i+1]=nEnd1;}//对排好序的各个相邻区间进行并行归并操作int**pp=newint*[nLen+CACHE_LINE_SIZE];int**ppOutData=(int**)GetCacheAlignedAd