SparkPython接口例子大全pysparkversion1print(pysparkversion:+str(sc.version))234pysparkversion:1.2.2filter1#filter2x=sc.parallelize([1,2,3])3y=x.filter(lambdax:x%2==1)#filtersoutevenelements4print(x.collect())5print(y.collect())67[1,2,3]8[1,3]map01#map02#sc=sparkcontext,parallelizecreatesanRDDfromthepassedobject03x=sc.parallelize([1,2,3])04y=x.map(lambdax:(x,x**2))0506#collectcopiesRDDelementstoalistonthedriver07print(x.collect())08print(y.collect())0910[1,2,3]11[(1,1),(2,4),(3,9)]flatMap1#flatMap2x=sc.parallelize([1,2,3])3y=x.flatMap(lambdax:(x,100*x,x**2))4print(x.collect())5print(y.collect())67[1,2,3]8[1,100,1,2,200,4,3,300,9]mapPartitions01#mapPartitions02x=sc.parallelize([1,2,3],2)03deff(iterator):yieldsum(iterator)04y=x.mapPartitions(f)05#glom()flattenselementsonthesamepartition06print(x.glom().collect())07print(y.glom().collect())0809[[1],[2,3]]10[[1],[5]]mapPartitionsWithIndex01#mapPartitionsWithIndex02x=sc.parallelize([1,2,3],2)03deff(partitionIndex,iterator):yield(partitionIndex,sum(iterator))04y=x.mapPartitionsWithIndex(f)0506#glom()flattenselementsonthesamepartition07print(x.glom().collect())08print(y.glom().collect())0910[[1],[2,3]]11[[(0,1)],[(1,5)]]getNumPartitions1#getNumPartitions2x=sc.parallelize([1,2,3],2)3y=x.getNumPartitions()4print(x.glom().collect())5print(y)67[[1],[2,3]]82distinct1#distinct2x=sc.parallelize(['A','A','B'])3y=x.distinct()4print(x.collect())5print(y.collect())67['A','A','B']8['A','B']sample01#sample02x=sc.parallelize(range(7))03#call'sample'5times04ylist=[x.sample(withReplacement=False,fraction=0.5)foriinrange(5)]05print('x='+str(x.collect()))06forcnt,yinzip(range(len(ylist)),ylist):07print('sample:'+str(cnt)+'y='+str(y.collect()))0809x=[0,1,2,3,4,5,6]10sample:0y=[0,2,5,6]11sample:1y=[2,6]12sample:2y=[0,4,5,6]13sample:3y=[0,2,6]14sample:4y=[0,3,4]takeSample01#takeSample02x=sc.parallelize(range(7))03#call'sample'5times04ylist=[x.takeSample(withReplacement=False,num=3)foriinrange(5)]05print('x='+str(x.collect()))06forcnt,yinzip(range(len(ylist)),ylist):07print('sample:'+str(cnt)+'y='+str(y))#nocollectony0809x=[0,1,2,3,4,5,6]10sample:0y=[0,2,6]11sample:1y=[6,4,2]12sample:2y=[2,0,4]13sample:3y=[5,4,1]14sample:4y=[3,1,4]union01#union02x=sc.parallelize(['A','A','B'])03y=sc.parallelize(['D','C','A'])04z=x.union(y)05print(x.collect())06print(y.collect())07print(z.collect())0809['A','A','B']10['D','C','A']11['A','A','B','D','C','A']intersection01#intersection02x=sc.parallelize(['A','A','B'])03y=sc.parallelize(['A','C','D'])04z=x.intersection(y)05print(x.collect())06print(y.collect())07print(z.collect())0809['A','A','B']10['A','C','D']11['A']sortByKey1#sortByKey2x=sc.parallelize([('B',1),('A',2),('C',3)])3y=x.sortByKey()4print(x.collect())5print(y.collect())67[('B',1),('A',2),('C',3)]8[('A',2),('B',1),('C',3)]sortBy1#sortBy2x=sc.parallelize(['Cat','Apple','Bat'])3defkeyGen(val):returnval[0]4y=x.sortBy(keyGen)5print(y.collect())67['Apple','Bat','Cat']glom1#glom2x=sc.parallelize(['C','B','A'],2)3y=x.glom()4print(x.collect())5print(y.collect())67['C','B','A']8[['C'],['B','A']]cartesian01#cartesian02x=sc.parallelize(['A','B'])03y=sc.parallelize(['C','D'])04z=x.cartesian(y)05print(x.collect())06print(y.collect())07print(z.collect())0809['A','B']10['C','D']11[('A','C'),('A','D'),('B','C'),('B','D')]groupBy1#groupBy2x=sc.parallelize([1,2,3])3y=x.groupBy(lambdax:'A'if(x%2==1)else'B')4print(x.collect())5#yisnested,thisiteratesthroughit6print([(j[0],[iforiinj[1]])forjiny.collect()])78[1,2,3]9[('A',[1,3]),('B',[2])]pipe1#pipe2x=sc.parallelize(['A','Ba','C','AD'])3y=x.pipe('grep-iA')#callsouttogrep,mayfailunderWindows4print(x.collect())5print(y.collect())67['A','Ba','C','AD']8['A','Ba','AD']foreach01#foreach02from__future__importprint_function03x=sc.parallelize([1,2,3])04deff(el):05'''sideeffect:appendthecurrentRDDelementstoafile'''06f1=open(./foreachExample.txt,'a+')07print(el,file=f1)0809#firstclearthefilecontents10open('./foreachExample.txt','w').close()1112y=x.foreach(f)#writesintoforeachExample.txt1314print(x.collect())15print(y)#foreachreturns'None'16#printthecontentsofforeachExample.txt17withopen(./foreachExample.txt,r)asforeachExample:18print(foreachExample.read())1920[1,2,3]21None223231242foreachPartition01#foreachPartition02from__future__importprint_function03x=sc.parallelize([1,2,3],5)04deff(parition):05'''sideeffect:appendthecurrentRDDpartitioncontentstoafile'''06f1=open(./foreachPartitionExample.txt,'a+')07print([elforelinparition],file=f1)0809#firstclearthefilecontents10open('./foreachPartitionExample.txt','w').close()1112y=x.foreachPartition(f)#writesintoforeachExample.txt1314print(x.glom().collect())15print(y)#foreachreturns'None'16#printthecontentsofforeachExample.txt17withopen(./foreachPartitionExample.txt,r)asforeachExample:18print(foreachExample.read())1920[[],[1],[],[2],[3]]21None22[]23[]24[1]25[2]26[3]collect1#collect2x=sc.parallelize([1,2,3])3y=x.collect()4print(x)#distributed5print(y)#notdistributed67ParallelCollectionRDD[87]atparallelizeatspanclass=wp_keywordlink_affiliate