事件驱动的应用架构和应用

整理文档很辛苦,赏杯茶钱您下走!

免费阅读已结束,点击下载阅读编辑剩下 ...

阅读已结束,您可以下载文档离线阅读编辑

资源描述

事件处理中间件原理和应用Event-DrivenApplicationServer李志强(li@talkweb..com.cn)湖南拓维信息系统股份有限公司研发中心2008/07/22主要内容和目的学习、研究事件流(EventStream)和负责事件处理(ESP/CEP)的基本概念理解事件驱动应用服务器的角色和基本原理基于Esper的应用和开发目录事件和事件处理EsperDEMO总结Q&A历史背景SOA&EDA集成项目发展趋势主要问题:网络不可靠网络带宽不够应用差异大变更不可避免EAI解决方案文件传输共享数据库远程过程调用面向消息从体系结构的角度SOA服务本质上是远程调用:点对点通讯增加复杂度,难以管理服务正交相关监控EDA基于消息传递,松耦合EDAVSSOAEDA“”:发布/订阅通过特定模式来对业务事件作出响应通常耦合度比较低SOA:请求/响应允许传递粗力度的事件结合SOA和EDASOA垂直(Vertical)系统的请求/响应处理EDA:横向(Horizontal)系统通讯EDA问题场景需要实时、连续地分析数据,并根据历史数据处理模式来自动检测、发现问题高实时性:低的处理、分析、响应延迟大数据量:巨量数据:每秒超100000个请求(事件)超过通常使用的OLTP系统的处理能力高事务数:事件处理实现方案对比数据库轮询数据库浪费大量数据空间实现复杂,不能简洁的实现临时逻辑和关联事件的因果关系等。分布式缓存或JINI网络空间非标准的event-processinglanguage规则引擎(Ruleengines)+JMS未对临时数据流的优化不能连续处理事件流什么是事件事件(Event)是有意义的状态变化:asignificantchangeinstate股票价格的变化密码变更最后一次服务的响应时间事件在系统中的表述XMLPOJOKey-value对事件的基本特征“”不只是发生什么事情意发生事件的不可变记录事件要素:标识、发生时间,有意义的属性事件间可能有某种关联:时间顺序、因果关系事件例子RFID设备跟踪位置信息设备标识XY基站信息变更Use-case根据用户的位置的变更,定向推送所在区域的服务信息:商场、电影院、公交站.统计/分析用户的日常行为规律。EDA基本原理EDA(Even-DrivenArchitect)松耦合基于事件基于消息排队的架构异步通讯事件处理模型EDA需求事件流:高吞吐量高可靠性低延迟事件关联提供有力的建模语言事件处理简单事件处理(SEP):SimpleEentProcessing事件流处理(ESP):StreamEentProcessing复杂事件流处理(CEP):ComplexEentProcessing简单事件处理(SEP)基于单个事件单个事件触发响应通常采用:JMSQueue:点对点Topic:发布/订阅EAI模式:channel,pipe,routeretc.事件流处理(ESP)“”基于流的处理“”单个时间不会触发反应需要分析事件流各种滑动窗口:基于时间基于事件数量复杂事件处理(CEP)复杂事件:大量其他事件触发的事件“”基于事件流的处理需要对多个事件(流)作复杂的分析分析、发生模式:过滤集合相关事件关联:时间关联、因果关系、空间etc.ESP 适用场景各类大数据量、高实时性系统金融分析RFID事件处理流程监控位置服务欺诈检测基于JAVA的EDASEPJMSESBESP&CEP无相关标准有产品:IBM、Weblogic,EsperJMS点对点JMS发布订阅ESB(EnterpriseServiceBus)目录事件和事件处理EsperDEMO总结Q&AEsper简介基于JAVA的ESP/CEP容器轻量级、可嵌入开源包括ESP和CEP项目背景商业支持被广泛集成到商业产品:weblogiceventserverEsper架构引擎(Engine):独立单元(时间、线程、事件流)基本语句(Statements):EventprocessingLanguage(EPL)事件处理器(Listener):简单javainterfaceEpser产生事件事件发送importnet.esper.client.*;//GetthesameengineinstanceEPServiceProviderengine=EPServiceProviderManager.getDefaultProvider();EPRuntimeruntimeEngine=engine.getRuntime();...LocationReportevent=newLocationReport(assetId,x,y,zone);runtimeEngine.sendEvent(event);EspersampleESP/CEPStatement(EPL)/AstatementcanproduceimpliciteventsinsertintoCountZoneselectzone,count(*)ascntfromLocationReport.std:unique('assetId')whereassetIdin(1,2,3)groupbyzoneEspersampleListenerandEngineimportnet.esper.client.*;//GetengineinstanceandregisterstatementEPServiceProviderengine=EPServiceProviderManager.getDefaultProvider();EPStatementstatement=engine.getEPAdministrator().createEQL(...);//Attachalistenerstatement.addListener(newUpdateListener(){publicvoidupdate(EventBean[]newEvents,EventBean[]oldEvents){//Handlecomplexevent...}});API概述EPServiceProvider引擎线程时间流EPStatement:Statment/Queries事件查询语言:EQLUpdateListener:ListenerPOJI事件事件可以是:POJOKey-value对(java.util.map)XML(org.w3c.dom.Node)处理模式连续处理结果集发生改变时通知Listerners新的事件到达旧事件超出结果集/范围内建数据库EQL(EPL)类SQL语法流streams:表事件Event:记录事件属性EventAttributes:记录字段查询Query:ESP查询CEP查询ESP查询单个事件:select*fromWithdrawal时间窗口(范围)内的事件:selectcount(*)fromWithdrawal(zone=10).win:time(30sec)批量处理:在通知Listener前累积事件,一次通知基于时间累积:select*fromWithdrawal.win:time_batch(4sec)基于数量累积:select*fromWithdrawal.win:length_batch(5)指定数量范围内的事件Select*fromWithdrawal.win:length(5)CEP查询定义事件匹配模式标识复杂事件流模式关键词模式周期性/重复性:every逻辑操作:and、or、not跟着发生:-子查询条件表达式:timer:within5秒内的所有A或B事件(AorB)wheretimer:within(5sec)定义时钟来监控事件的发生:timer:interval:A事件后等待10秒:A-timer:interval(10seconds)timer:at每5分钟everytimer:at(5,*,*,*,*)Events:–A1B1C1B2A2D1A3B3E1A4F1B4•pattern[everyA-B]–{A1,B1},{A2,B3},{A3,B3},{A4,B4}•pattern[every(A-B)]:„“子表达式–{A1,B1},{A2,B3},{A4,B4}Events:–A1B1C1B2A2D1A3B3E1A4F1B4•pattern[A-everyB]–{A1,B1},{A1,B2},{A1,B3},{A1,B4},pattern[everyA-everyB]–{A1,B1},{A1,B2},–{A1,B3},{A2,B3},{A3,B3},–{A1,B4},{A2,B4},{A3,B4}and{A4,B4}例子:温度检测温度传感器探测:采样样本Sample:温度,传感器标识需求:探测温度,如果温度过高就告警告警条件:同一个温度传感器在90秒内连续检测到3次温度超过50度CEPeverysample=Sample(temp50)-((Sample(sensor=sample.sensor,temp50)andnotSample(sensor=sample.sensor,temp=50))-(Sample(sensor=sample.sensor,temp50)andnotSample(sensor=sample.sensor,temp=50)))wheretimer:within(90seconds)EsperEQL/EPL事件过滤//Filterforlocationreportbylocationrectangleselect*fromLR(xin[4:10],yin[6:12])滑动窗口和聚合//Countallassetsreportingzone10inlast30secselectcount(*)fromLR(zone=10).win:time(30sec)分组窗口和输出速率限制//Getacountperzoneofthelast10minutesperzone//every1minuteselectzone,count(*)ascntfromLR.std:groupby('zone').win:time(10min)outputevery1minEsperEQL/EPLJoinsandouterJoins//Firewhenanyassetenterszone2beforezone1selectZone2.assetIdfromLR(zone=2).win:time(1day)Zone2leftouterjoinLR(zone=1).win:time(1day)Zone1onZone1.assetId=Zone2.assetIdwhereZone1.assetIdisnullEpserEQL/EPL历史数据和引用//Alertwhenwehittheminimuminventory//foragivenzoneselectzone,count(*)fromLR.std:unique(assetId)aslr,sql:db[selectminifromMinimumwherezone=${lr.zone}]havingminicount(*)epser提供基于LRU算法的历史数据超时控制Esper性能商业ESB:2600msg/son4*2.8GHZEsperRFID设备跟踪应用:1000groups,3000assets,20zo

1 / 63
下载文档,编辑使用

©2015-2020 m.777doc.com 三七文档.

备案号:鲁ICP备2024069028号-1 客服联系 QQ:2149211541

×
保存成功