为InfoSphereWarehouse提供实时数据的高效解决方案互联网刘艳字体:[大中小]阅读:1我要评论(0)数据整合是数据仓库中的关键概念,ETL(数据的提取、转换和加载)过程的设计和实现是数据仓库解决方案中极其重要的一部分。由于传统的ETL过程中数据抽取是需要加载所有源数据库中的数据,这样对于需要经常进行数据集中的案例,将带来无可忍受的低效率。我们将介绍通过结合InfoSphereReplicationServer和InfoSphereDataStage,实现数据仓库的实时更新,并且仅仅需要抽取更新了的数据。简介信息是现代企业的重要资源,是企业运用科学管理、决策分析的基础,于是企业如何通过各种技术手段,并把数据转换为信息、知识,已经成了提高其核心竞争力的主要瓶颈。而ETL则是一个主要的技术手段。ETL(数据的提取、转换和加载)过程的设计和实现是数据仓库解决方案中极其重要的一部分。由于传统的ETL过程中数据抽取是需要加载所有源数据库中的数据,这样对于需要经常进行数据集中的案例,将带来无可忍受的低效率。例如一个有50G数据量的数据库,如果只有0.01%(也就是大约50M)的数据较上次加载有更新,但是为了抽取这部分数据,仍然需要抽取所有50G的数据,这将是非常低效的。在这篇文章中,我们将介绍通过结合InfoSphereReplicationServer和InfoSphereDataStage,实现数据仓库的实时更新,并且仅仅需要抽取更新了的数据。ETL过程简介ETL过程就是数据流动的过程,从不同的数据源流向不同的目标数据集中地。它是构建数据仓库的重要一环,用户从数据源抽取出所需的数据,经过数据清洗,最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中。它包涵三个阶段:E(Extract),T(Transform)和L(Load)。提取(Extract):从不同的数据库(DB2,oracle,flatfile等)中读取源数据。通过接口提取源数据,例如ODBC、专用数据库接口和平面文件提取器,并参照元数据来决定数据的提取及其提取方式。转换(Transform):开发者将提取的数据,按照业务需要转换为目标数据结构,并实现汇总。装载(Load):加载经转换和汇总的数据到目标数据仓库中,可实现SQL或批量加载。InfoSphereReplicationServer简介IBMInfoSphereReplicationServer是一个高速移动大量数据的企业软件应用程序,用于帮助企业连接分布在全球的业务、对客户进行快速响应以及从影响关键数据库系统的问题中恢复。只所以能够高效的提取数据是因为它用可恢复日志来记录数据库里数据的变化,Capture程序负责连续读取数据库的恢复日志并捕获对源数据库更改(指对数据的插入、删除和更新操作),Apply程序负责把这些变化的数据写入到目标数据库中。利用ReplicationServer的这一功能就可从大量的数据量中只提取出较上次更新的数据。ReplicationServer和Eventpublisher的架构InfoSphereReplicationServer中提供了两种不同类型的复制:Q复制和SQL复制。InfoSphereDataEventPublisher捕获“更改的数据”事件并以WebSphereMQ消息的形式发布这些事件,其他应用程序可以使用这些消息来驱动后续处理。SQL复制Capture捕获数据变化后存储在一个临时中间表(stagingtables),apply程序把这些更新复制到相应的目标表。随着数据量的加大和客户对实时数据复制的要求,Q复制应运而生。它的架构如图1所示:图1.SQL复制架构图查看原图(大图)Q复制一个高吞吐量低延迟的方案,它不用中间表来存储已经提交的事务性数据,而是捕获对源表的更改并将已提交的数据转换为消息,即用WebShpereMQ消息队列在源和目标数据库间传送数据。它的架构如图2所示:图2.Q复制架构图查看原图(大图)Eventpublisher(EP)不同于Q复制,EP不需要启动apply程序,捕获对源表的更改并将已落实的事务性数据转换为“可扩展标记语言”(XML)格式或定界格式(CSV:comma-separatedvalue)的消息,以供用户直接从接受队列读取消息。在本文中,我们将利用EP的这个特点和DataStage整合为数据仓库提供实时高效的数据。它的构架如图3所示:图3.EP架构图查看原图(大图)IBMInfoSphereDataStage简介IBMInfoSphereDataStage是一款强大的基于图形化界面的ETL工具,它可以从多个不同的业务系统,多个平台的数据源中抽取数据、转换数据、装载数据到各种目标系统中。它有如下特点:基于图形化的开发环境,无需手工编码便可快速开发ETL作业,实现复杂的数据合并和转换逻辑。并且可以在开发新的作业时快捷的重用已有作业中的逻辑。支持广泛的数据源。DataStage几乎支持所有的主流的数据库、企业级应用程序、文件作为数据源进行读取或写入数据。例如:DB2、Oracle、SQLServer、UniData、Informix、PeopleSoft、SAP、Siebel、顺序文件(如CSV)、XML文件等等。它也支持以多种常用的方式进行数据读取和写入,例如FTP、SFTP、JMS等等。强大的并行处理能力,能够对数据通过分割、管道等方式进行处理,提高硬件的使用效率,从而提高作业的性能。支持对数据进行批量和实时处理操作。InfoSphereReplicationServer和InfoSphereDataStage的整合DataStage可以读取在不同数据库中数据,但是没有能力通过读取可恢复日志只捕获较上次更新的数据;另一方面,ReplicationServer有能力捕获更新的数据却没有类似DataStage转换数据的功能,并且不像DataStage,支持对如此多的数据库,企业级应用程序和文件进行读写。所以本文将结合两者的优势,为Warehouse提供实时高效的数据,整合原理首先,利用ReplicationServer的EventPublisher(EP),Qcapture从可恢复日志中捕获更新的数据,并且把数据变化写到MQ队列中;接着,MQ消息通过MQ触发器触发了DataStage作业;最后,DataStage的作业从MQ队列里直接读取数据进行处理。EP支持两种类型的MQ消息:XML和CSV,XML格式有好的移植性和灵活性而CSV有很好的性能,在这里我们将以CSV作为样例。DataStage可以通过使用MQConnectorstage读取队列中的消息,然后基于所选的消息格式来解析消息,最后完成必要的转换。具体的架构图如图4所示:图4.总体架构图查看原图(大图)下面将具体介绍其实现。具体实现所需软件:IBMInfoSphereReplicationServer9.7IBMInfoSphereInformationServer8.1EventPublisher的配置如果source是Oracle,需要通过ReplicationServerOraclecapturefeature来完成对变化数据的提取,请参考“参考资料”部分。在本文中,我们source以DB2为例:1.创建DB2对象在本文中创建数据库SOURCE,和表”DEMO”.”CUSTOMER”,并import数据清单1.创建表及导入数据CREATETABLEDEMO.CUSTOMER(CUSTOMER_IDINTEGERNOTNULL,SEXCHAR(1),BIRTHDAYTIMESTAMP,SSNVARCHAR(30),CITYVARCHAR(25),STATEVARCHAR(25),ZIPVARCHAR(15),PHONEVARCHAR(15),PRI_LANGUAGEVARCHAR(15),LAST_UPDATETIMESTAMP,FIRST_NAMEVARCHAR(20),MIDDLE_NAMEVARCHAR(10),LAST_NAMEVARCHAR(20));ALTERTABLEDEMO.CUSTOMERADDPRIMARYKEY(CUSTOMER_ID);DB2importfromcustomer.ixfofixfinsertinto”DEMO”.”CUSTOMER”2.创建MQ对象创建Qmanager:crtmqmQManager启动Qmanager:strmqmQManager创建队列:runmqscQMamangermq.in清单2.创建MQ对象defineqlocal(ADMINQ)defineqlocal(RESTARTQ)defineqlocal(q1)defineqmodel(IBMQREP.SPILL.MODELQ)DEFSOPT(shared)MAXDEPTH(500000)MSGDLVSQ(fifo)DEFTYPE(permdyn)definechannel(CHANNEL1)chltype(svrconn)trptype(tcp)mcauser('mqm')definelistener(listener1)trptype(tcp)control(qmgr)port(2264)startlistener(listener1)end3.setupEventPublisher3.1创建controltables:asnclp–fcncap.in清单3.创建控制表asnclpsessionsettoqreplication;setrunscriptnowstoponsqlerroron;setqmanagerQManagerforcaptureschema;setservercapturetodbsource;createcontroltablesforcaptureserverusingrestartqRESTARTQadminqADMINQ;quit;3.2创建pubqmap和pub:asnclp–fcrtqmappub.in清单4.创建pubqmap及pubasnclpsessionsettoQreplication;setoutputcapturescriptqpubmap.sql;setlogqpub.log;setservercapturetodbsource;setrunscriptnowstoponsqlerroron;setqmanagerQManagerforcaptureschema;createpubqmappubqmap1usingsendqq1messageformatdelimitedmessagecontenttypeT;createpubusingpubqmapPUBQMAP1(pubnameeventpub1DEMO.CUSTOMER);quit;4.启动capture:清单5.启动captureasnqcapcapture_server=sourcecapture_schema=ASN2010-04-22-15.48.28.549339ASN0600IQCapture::Initial:Programmqpub9.7.0isstarting.2010-04-22-15.48.38.537012ASN7010IQCapture:ASN:WorkerThread:TheprogramsuccessfullyactivatedpublicationorQsubscriptionEVENTPUB1(sendqueueq1,publishingorreplicationqueuemapPUBQMAP1)forsourcetableDEMO.CUSTOMER.2010-04-22-15.48.38.641830ASN7000IQCapture:ASN:WorkerThread:0subscriptionsareactive.0subscription