事件处理中间件原理和应用Event-DrivenApplicationServer李志强(li@talkweb..com.cn)湖南拓维信息系统股份有限公司研发中心2008/07/22主要内容和目的学习、研究事件流(EventStream)和负责事件处理(ESP/CEP)的基本概念理解事件驱动应用服务器的角色和基本原理基于Esper的应用和开发目录事件和事件处理EsperDEMO总结Q&A历史背景SOA&EDA集成项目发展趋势主要问题:网络不可靠网络带宽不够应用差异大变更不可避免EAI解决方案文件传输共享数据库远程过程调用面向消息从体系结构的角度SOA服务本质上是远程调用:点对点通讯增加复杂度,难以管理服务正交相关监控EDA基于消息传递,松耦合EDAVSSOAEDA“”:发布/订阅通过特定模式来对业务事件作出响应通常耦合度比较低SOA:请求/响应允许传递粗力度的事件结合SOA和EDASOA垂直(Vertical)系统的请求/响应处理EDA:横向(Horizontal)系统通讯EDA问题场景需要实时、连续地分析数据,并根据历史数据处理模式来自动检测、发现问题高实时性:低的处理、分析、响应延迟大数据量:巨量数据:每秒超100000个请求(事件)超过通常使用的OLTP系统的处理能力高事务数:事件处理实现方案对比数据库轮询数据库浪费大量数据空间实现复杂,不能简洁的实现临时逻辑和关联事件的因果关系等。分布式缓存或JINI网络空间非标准的event-processinglanguage规则引擎(Ruleengines)+JMS未对临时数据流的优化不能连续处理事件流什么是事件事件(Event)是有意义的状态变化:asignificantchangeinstate股票价格的变化密码变更最后一次服务的响应时间事件在系统中的表述XMLPOJOKey-value对事件的基本特征“”不只是发生什么事情意发生事件的不可变记录事件要素:标识、发生时间,有意义的属性事件间可能有某种关联:时间顺序、因果关系事件例子RFID设备跟踪位置信息设备标识XY基站信息变更Use-case根据用户的位置的变更,定向推送所在区域的服务信息:商场、电影院、公交站.统计/分析用户的日常行为规律。EDA基本原理EDA(Even-DrivenArchitect)松耦合基于事件基于消息排队的架构异步通讯事件处理模型EDA需求事件流:高吞吐量高可靠性低延迟事件关联提供有力的建模语言事件处理简单事件处理(SEP):SimpleEentProcessing事件流处理(ESP):StreamEentProcessing复杂事件流处理(CEP):ComplexEentProcessing简单事件处理(SEP)基于单个事件单个事件触发响应通常采用:JMSQueue:点对点Topic:发布/订阅EAI模式:channel,pipe,routeretc.事件流处理(ESP)“”基于流的处理“”单个时间不会触发反应需要分析事件流各种滑动窗口:基于时间基于事件数量复杂事件处理(CEP)复杂事件:大量其他事件触发的事件“”基于事件流的处理需要对多个事件(流)作复杂的分析分析、发生模式:过滤集合相关事件关联:时间关联、因果关系、空间etc.ESP 适用场景各类大数据量、高实时性系统金融分析RFID事件处理流程监控位置服务欺诈检测基于JAVA的EDASEPJMSESBESP&CEP无相关标准有产品:IBM、Weblogic,EsperJMS点对点JMS发布订阅ESB(EnterpriseServiceBus)目录事件和事件处理EsperDEMO总结Q&AEsper简介基于JAVA的ESP/CEP容器轻量级、可嵌入开源包括ESP和CEP项目背景商业支持被广泛集成到商业产品:weblogiceventserverEsper架构引擎(Engine):独立单元(时间、线程、事件流)基本语句(Statements):EventprocessingLanguage(EPL)事件处理器(Listener):简单javainterfaceEpser产生事件事件发送importnet.esper.client.*;//GetthesameengineinstanceEPServiceProviderengine=EPServiceProviderManager.getDefaultProvider();EPRuntimeruntimeEngine=engine.getRuntime();...LocationReportevent=newLocationReport(assetId,x,y,zone);runtimeEngine.sendEvent(event);EspersampleESP/CEPStatement(EPL)/AstatementcanproduceimpliciteventsinsertintoCountZoneselectzone,count(*)ascntfromLocationReport.std:unique('assetId')whereassetIdin(1,2,3)groupbyzoneEspersampleListenerandEngineimportnet.esper.client.*;//GetengineinstanceandregisterstatementEPServiceProviderengine=EPServiceProviderManager.getDefaultProvider();EPStatementstatement=engine.getEPAdministrator().createEQL(...);//Attachalistenerstatement.addListener(newUpdateListener(){publicvoidupdate(EventBean[]newEvents,EventBean[]oldEvents){//Handlecomplexevent...}});API概述EPServiceProvider引擎线程时间流EPStatement:Statment/Queries事件查询语言:EQLUpdateListener:ListenerPOJI事件事件可以是:POJOKey-value对(java.util.map)XML(org.w3c.dom.Node)处理模式连续处理结果集发生改变时通知Listerners新的事件到达旧事件超出结果集/范围内建数据库EQL(EPL)类SQL语法流streams:表事件Event:记录事件属性EventAttributes:记录字段查询Query:ESP查询CEP查询ESP查询单个事件:select*fromWithdrawal时间窗口(范围)内的事件:selectcount(*)fromWithdrawal(zone=10).win:time(30sec)批量处理:在通知Listener前累积事件,一次通知基于时间累积:select*fromWithdrawal.win:time_batch(4sec)基于数量累积:select*fromWithdrawal.win:length_batch(5)指定数量范围内的事件Select*fromWithdrawal.win:length(5)CEP查询定义事件匹配模式标识复杂事件流模式关键词模式周期性/重复性:every逻辑操作:and、or、not跟着发生:-子查询条件表达式:timer:within5秒内的所有A或B事件(AorB)wheretimer:within(5sec)定义时钟来监控事件的发生:timer:interval:A事件后等待10秒:A-timer:interval(10seconds)timer:at每5分钟everytimer:at(5,*,*,*,*)Events:–A1B1C1B2A2D1A3B3E1A4F1B4•pattern[everyA-B]–{A1,B1},{A2,B3},{A3,B3},{A4,B4}•pattern[every(A-B)]:„“子表达式–{A1,B1},{A2,B3},{A4,B4}Events:–A1B1C1B2A2D1A3B3E1A4F1B4•pattern[A-everyB]–{A1,B1},{A1,B2},{A1,B3},{A1,B4},pattern[everyA-everyB]–{A1,B1},{A1,B2},–{A1,B3},{A2,B3},{A3,B3},–{A1,B4},{A2,B4},{A3,B4}and{A4,B4}例子:温度检测温度传感器探测:采样样本Sample:温度,传感器标识需求:探测温度,如果温度过高就告警告警条件:同一个温度传感器在90秒内连续检测到3次温度超过50度CEPeverysample=Sample(temp50)-((Sample(sensor=sample.sensor,temp50)andnotSample(sensor=sample.sensor,temp=50))-(Sample(sensor=sample.sensor,temp50)andnotSample(sensor=sample.sensor,temp=50)))wheretimer:within(90seconds)EsperEQL/EPL事件过滤//Filterforlocationreportbylocationrectangleselect*fromLR(xin[4:10],yin[6:12])滑动窗口和聚合//Countallassetsreportingzone10inlast30secselectcount(*)fromLR(zone=10).win:time(30sec)分组窗口和输出速率限制//Getacountperzoneofthelast10minutesperzone//every1minuteselectzone,count(*)ascntfromLR.std:groupby('zone').win:time(10min)outputevery1minEsperEQL/EPLJoinsandouterJoins//Firewhenanyassetenterszone2beforezone1selectZone2.assetIdfromLR(zone=2).win:time(1day)Zone2leftouterjoinLR(zone=1).win:time(1day)Zone1onZone1.assetId=Zone2.assetIdwhereZone1.assetIdisnullEpserEQL/EPL历史数据和引用//Alertwhenwehittheminimuminventory//foragivenzoneselectzone,count(*)fromLR.std:unique(assetId)aslr,sql:db[selectminifromMinimumwherezone=${lr.zone}]havingminicount(*)epser提供基于LRU算法的历史数据超时控制Esper性能商业ESB:2600msg/son4*2.8GHZEsperRFID设备跟踪应用:1000groups,3000assets,20zo