如何在Oracle中集成Hadoop文:SRC张旭东许多垂直行业都在关注文件系统中庞大的数据。这些数据中通常包含大量无关的明细信息,以及部分可用于趋势分析或丰富其他数据的精华信息。尽管这些数据存储在数据库之外,但一些客户仍然希望将其与数据库中的数据整合在一起以提取对业务用户有价值的信息。本文详细介绍了如何从Oracle数据库访问存储在Hadoop集群里的数据。请注意,本文选择了Hadoop和HDFS作为示例,但这里的策略同样适用于其他分布式存储机制。本文中介绍了各种访问方法,还通过一个具体示例说明了其中一种访问方法的实现。要从Oracle数据库里访问某个文件系统中的外部文件或外部数据,最简单的方法莫过于使用外部表。外部表以表的形式展示存储在文件系统中的数据,并且可在SQL查询中完全透明地使用。因此,可以考虑用外部表从Oracle数据库中直接访问HDFS(Hadoop文件系统)中存储的数据。遗憾的是,常规的操作系统无法调用外部表驱动直接访问HDFS文件。FUSE(FileSysteminUserspace)项目针对这种情况提供了解决方法。有多种FUSE驱动程序支持用户挂载HDFS存储,并将其作为常规文件系统处理。通过使用一个此类驱动程序,并在数据库实例上挂载HDFS(如果是RAC数据库,则在其所有实例上挂载HDFS),即可使用外部表基础架构轻松访问HDFS文件。图1.用数据库内置的MapReduce通过外部表进行访问在图1中,我们利用OracleDatabase11g实现本文所述的数据库内的map-reduce。通常情况下,OracleDatabase11g中的并行执行框架足以满足针对外部表大多数的并行操作。在有些情况下(例如,如果FUSE不可用),外部表方法可能不适用。Oracle表函数提供了从Hadoop中获取数据的替代方法。本文附带的示例展示了一种这样的方法。更深入地来讲,我们用一个表函数来实现,这个表函数使用DBMS_SCHEDULER框架异步调用外部shell脚本,然后由这个shell脚本提交一个HadoopMap-Reduce作业。该表函数与映射器(mapper)之间使用Oracle高级队列特性进行通信。Hadoopmapper将数据排入一个公共队列,而表函数则从该队列中取出数据。由于该表函数能够并行运行,因此使用额外的逻辑来确保仅有一个服务进程提交外部作业。图2.利用表函数进行并行处理由于表函数可以并行运行,Hadoop流作业也可以不同程度地并行运行,并且后者不受Oracle查询协调器的控制,这种情况下,队列能提供负载平衡。下面我们将以一个实际示例展示图2的架构。请注意,我们的示例仅展示了使用表函数访问Hadoop中存储的数据的一个模板实现。显然可能存在其他的甚至可能更好的实现。下图是图2中原始示意图在技术上更准确、更具体的展示,解释了我们要在何处、如何使用后文给出的部分实际代码:图3.启动Mapper作业并检索数据第1步是确定由谁作为查询协调器。对此我们采用一种将具有相同键值的记录写入表的简单机制。首个插入胜出,作为此进程的查询协调器(QC)。请注意,QC表函数调用同时也承担着处理角色。在第2步中,该表函数调用(QC)使用dbms_scheduler(图3中的作业控制器)启动一个异步作业,该作业接着在Hadoop集群上运行同步bash脚本。这个bash脚本就是图3中的启动程序(launcher),它在Hadoop集群上启动mapper进程(第3步)。mapper进程处理数据,并在第5步写入一个队列。在本文的示例中,我们选择了一个在集群范围内可用的队列。现在,我们只是单纯地将任何输出直接写入到队列里。您可以通过批量处理输出并将其移入队列来提高性能。显然,您也可以选择管道和关系表等其他各种机制。随后的第6步是出队过程,这是通过数据库中的表函数并行调用来实现的。这些并行调用处理得到的数据将会提供给查询请求来使用。表函数同时处理Oracle数据库的数据和来自队列中的数据,并将来自两个来源的数据整合为单一结果集提供给最终用户。图4.监控进程Hadoop的进程(mapper)启动之后,作业监控器进程将监视启动程序脚本。一旦mapper完成Hadoop集群中数据的处理之后,bash脚本即完成,如图4所示。作业监控器将监视数据库调度程序队列,并在shell脚本完成时发出通知(第7步)。作业监控器检查数据队列中的剩余数据元素(第8步)。只要队列中存在数据,表函数调用就会继续处理数据(第6步)。图5.关闭处理当表函数并行调用取出队列中的全部数据之后,作业监控器将终止队列(图5所示的第9步)以确保Oracle中的表函数调用停止。此时,所有数据均已交付给请求这些数据的查询。本文中的示例表明,将Hadoop系统与OracleDatabase11g集成是非常容易的。本文中讨论的方法允许客户将Hadoop中的数据直接传递到Oracle查询中。这避免了将数据获取到本地文件系统并物化到Oracle表中,之后才能在SQL查询中访问这些数据的过程。示例代码图3至图5实现的解决方案使用以下代码。Oracle官方称:以下示例的所有代码均在OracleDatabase11g和5个节点的Hadoop集群上进行过测试。处理数据的表函数该脚本中包含某些设置组件。例如,脚本开始的部分创建了图3中第1步所展示的仲裁表。本例中使用的是一直广受欢迎的OE模式。connectoe/oe--Tabletouseaslockingmechanisimforthehdfsreaderas--leveragedinFigure3step1DROPTABLErun_hdfs_read;CREATETABLErun_hdfs_read(pk_idNUMBER,statusVARCHAR2(100),PRIMARYKEY(pk_id));--ObjecttypeusedforAQthatreceivesthedataCREATEORREPLACETYPEhadoop_row_objASOBJECT(aNUMBER,bNUMBER);/connect/assysdba--systemjobtolaunchexternalscript--thisjobisusedtoeventuallyrunthebashscript--describedinFigure3step3CREATEORREPLACEPROCEDURElaunch_hadoop_job_async(in_directoryINVARCHAR2,idNUMBER)IScntNUMBER;BEGINBEGINDBMS_SCHEDULER.DROP_JOB('ExtScript'||id,TRUE);EXCEPTIONWHENOTHERSTHENNULL;END;--RunascriptDBMS_SCHEDULER.CREATE_JOB(job_name='ExtScript'||id,job_type='EXECUTABLE',job_action='/bin/bash',number_of_arguments=1);DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE('ExtScript'||id,1,in_directory);DBMS_SCHEDULER.ENABLE('ExtScript'||id);--Waittillthejobisdone.ThisensuresthehadoopjobiscompletedLOOPSELECTCOUNT(*)INTOcntFROMDBA_SCHEDULER_JOBSWHEREjob_name='EXTSCRIPT'||id;DBMS_OUTPUT.put_line('SchedulerCountis'||cnt);IF(cnt=0)THENEXIT;ELSEDBMS_LOCK.sleep(5);ENDIF;ENDLOOP;--Waittillthequeueisemptyandthendropit--asshowninFigure5--TheTFwillgetanexceptionanditwillfinishquietlyLOOPSELECTSUM(c)INTOcntFROM(SELECTenqueued_msgs-dequeued_msgscFROMgv$persistent_queuesWHEREqueue_name='HADOOP_MR_QUEUE'UNIONALLSELECTnum_msgs+spill_msgscFROMgv$buffered_queuesWHEREqueue_name='HADOOP_MR_QUEUE'UNIONALLSELECT0cFROMDUAL);IF(cnt=0)THEN--Queueisdone.stopit.DBMS_AQADM.STOP_QUEUE('HADOOP_MR_QUEUE');DBMS_AQADM.DROP_QUEUE('HADOOP_MR_QUEUE');RETURN;ELSE--WaitforawhileDBMS_LOCK.sleep(5);ENDIF;ENDLOOP;END;/--Grantsneededtomakehadoopreaderpackageworkgrantexecuteonlaunch_hadoop_job_asynctooe;GRANTSELECTONv_$sessionTOoe;GRANTSELECTONv_$instanceTOoe;GRANTSELECTONv_$px_processTOoe;GRANTEXECUTEONDBMS_AQADMTOoe;GRANTEXECUTEONDBMS_AQTOoe;connectoe/oe--SimplereaderpackagetoreadafilecontainingtwonumbersCREATEORREPLACEPACKAGEhdfs_readerIS--Returntypeofpl/sqltablefunctionTYPEreturn_rows_tISTABLEOFhadoop_row_obj;--ChecksifcurrentinvocationisserialFUNCTIONis_serialRETURNBOOLEAN;--FunctiontoactuallylaunchaHadoopjobFUNCTIONlaunch_hadoop_job(in_directoryINVARCHAR2,idINOUTNUMBER)RETURNBOOLEAN;--TftoreadfromHadoop--Thisisthemainprocessingcodereadingfromthequeuein--Figure3step6.Italsocontainsthecodetoinsertinto--thetableinFigure3step1FUNCTIONread_from_hdfs_file(pcurINSYS_REFCURSOR,in_directoryINVARCHAR2)RETURNreturn_rows_tPIPELINEDPARALLEL_ENABLE(PARTITIONpcurBYANY);END;/CREATEORREPLACEPACKAGEBODYhdfs_readerIS--Checksifcurrentprocessisapx_processFUNCTIONis_serialRETURNBOOLEANIScNUMBER;BEGINSELECTCOUNT(*)INTOcFROMv$px_processWHEREsid=SYS_CONTEXT('USERENV','SESSIONID');IFc0THENRETURNFALSE;ELSERETURNTRUE;ENDIF;EXCEPTIONWHENOTHERSTHENRAISE;END;FUNCTIONlaunch_hadoop_job(in_direc