许多流式计算应用离不开存储,也就是把数据存在硬盘上,例如历史数据的保存。毕竟硬盘适合长期地存储大量数据。在介绍具体方法之前,先讲一个实际项目里经常要用到的原则,那就是:存储读写速度要和内存计算速度匹配。
怎么理解呢?例如内存计算速度是10万events/s,存储读写速度是1万events/s,那么很容易引起数据不能及时写到存储中,而导致OOM等各种问题。还有,硬盘读写的速度、网络传输的速度经常受到外部因素的干扰,通常没有内存计算速度稳定,这点也要考虑到。
所以流式计算中使用存储,通常
(1)要设计缓存队列,CEP引擎中一般有自带的缓存队列,实现IO和计算的异步。当然,SODBASE CEP中进一步增强了避免OOM和预警的机制。
(2)尽可能的将内存计算和存储操作分离。原因:
一来是为了更好地使用CEP引擎自带的缓存队列。
二来是为了更好的管理。这样,在存储操作中可以更方便地使用批量写入、通过计算减少写入数据量,过滤掉不需要写入的数据,从而提高存储读写效率。
使用存储的方法一般有两类
第一类:在EPL中直接用。例如,在EPL中用Java函数,而Java函数又负责存储读写。
第二类:使用输入输出适配器。
本文主要介绍使用输入输出适配器。最后介绍一下,为什么输入适配器、CEP引擎、输出适配器恰好也能作很好地应用在Extract Transformation Load (ETL) 操作中。
1. 关系型数据库输入输出适配器
MysqL、Oracle、sql server、postgresdb等都是关系型数据库。不要小瞧关系数据库,大部分数据存储的企业应用可以用关系数据库解决。MysqL集群可以支撑亿级用户的特大型的互联网应用。以MysqL为例,常用的3个输出适配器:
com.sodbase.outputadaptor.database.MysqLQueryAdaptor
作用:对于每个结果事件,查询MysqL数据库,并将查询的结果插入到新的流中
0: stream name 查询的结果插入到新的流的名称
1: database name
2: user name
3: password
4: host name
5: port
6: sql 查询sql,允许使用?{...}变量,通过输出的事件属性值,如:?{name}
7: columnnames e.g. "name:string,age:double,ishappy:boolean" Three types "string,double,boolean" are supported
(2)数据库更新
com.sodbase.outputadaptor.MysqL.MysqLsqlExecutionAdaptor
作用:对于每个结果事件,执行sql语句,如DML、数据更新等
参数:
0 databasename
1 dbusername
2 dbpassword
3 hostname
4 dbport
5 sql
inputStreamConnected = params[6];//sql执行结束后,发事件给流inputStreamConnected,这个事件只有时间戳属性
(3)数据库备份
com.sodbase.outputadaptor.MysqL.MysqLBackupOutputAdaptor
作用:锁表执行sql语句,类似于上面的adatpor,通常用于执行备份语句。
0 databasename
1 dbusername
2 dbpassword
3 hostname
4 dbport
5 sql
6 tablename //用于锁定所需备份的表
7 inputStreamConnected //sql执行结束后,发事件给流inputStreamConnected,这个事件只有时间戳属性
其它Oracle、sql server、postgresdb适配器类似,也都支持。
另外,还有一个输入适配器com.sodbase.inputadaptor.database.MysqLInputAdaptor,EPL启动时查询数据库的数据插入到流中。
应用场景,例如:定时、超时任务防止系统宕机后丢失,将没有执行过的任务存在数据库中,重启服务器时,就会把这些任务再加到事件流,也就是任务队列里。
2. Nosql适配器和分布式缓存适配器
支持cassandra、hbase、monogodb,巨杉等nosql数据库,支持redis等分布式缓存数据库,用户也可以方便地自定义适配器。
3. 数据库分批写入
如果大家还记得kleen closure操作符,用它可以方便地做数据库分批写入,解决存储读写瓶颈问题
CREATE QUERY tensecondsdata SELECT tostring(T2.price) AS pricebatch,tostring(T2.name) AS namebatch FROM T1:timer,T2:stock,T3:timer PATTERN T1;T2^+;T3 WHERE T1._start_time_=T3._start_time_-10000 WITHIN 10000
这个EPL将数据分成10s为一批。timer是定时输入适配器,周期10s。如果要精确,可以用两个定时输入timer1和timer2,周期都是10s,但起点相差1个单位时间(一般单位是ms)。这是因为模式中T1._end_time_<T2._end_time_<T3._end_time_是一个开集合,我们用两个timer1,可以做成一个半开半闭集合,不漏掉恰好在timer时间点上的数据。
CREATE QUERY tensecondsdata SELECT tostring(T2.price) AS pricebatch,tostring(T2.name) AS namebatch FROM T1:timer1,T3:timer2 PATTERN T1;T2^+;T3 WHERE T1._start_time_=T3._start_time_-10001 WITHIN 10001
4. 应用示例
4.1 电压监测
4.1.1保存实时测量数据
注意这里是为了让大家更好了解细节,所以用给出EPL和XML。建议大家用SODBASE Studio建模。
CREATE QUERY VD0002 SELECT T1.lineid AS lineid,T1.voltagevalue AS voltagevalue FROM T1:VD0000_output PATTERN T1 WITHIN 0VD0000_output是电压测量数据流
<outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.MysqL.MysqLsqlExecutionAdaptor</outputAdaptorClassName> <adaptorParams>voltage</adaptorParams> <adaptorParams>user</adaptorParams> <adaptorParams>password</adaptorParams> <adaptorParams>192.168.1.3</adaptorParams> <adaptorParams>3306</adaptorParams> <adaptorParams>insert into historicalvoltage(lineid,voltagevalue,timestamp) values('?{lineid}',?{voltagevalue},'?{_end_time_}')</adaptorParams> <adaptorParams>operationcompletestream</adaptorParams> <isExternal>false</isExternal> <queryName>VD0002</queryName> </outputAdaptors>?{}是在sql语句中使用事件字段的值
4.1.2定时备份
有时用户需要将历史数据进行备份
CREATE QUERY VD0003 SELECT JAVA:com.example.voltage.Voltage:getDate() AS date,JAVA:com.example.voltage.Voltage:getDayStarttime() AS starttime,JAVA:com.example.voltage.Voltage:getDayEndtime() AS endtime FROM T1:timer PATTERN T1 WITHIN 0timer数据流是有定时触发输入适配器生成的,用法见前面介绍EPL的文章
<outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.MysqL.MysqLBackupOutputAdaptor</outputAdaptorClassName> <adaptorParams>voltage</adaptorParams> <adaptorParams>username</adaptorParams> <adaptorParams>password</adaptorParams> <adaptorParams>192.168.1.3</adaptorParams> <adaptorParams>3306</adaptorParams> <adaptorParams>select * into outfile 'D:/?{date}.txt' from historicalvoltage where timestamp>=?{starttime} and timestamp<?{endtime}</adaptorParams> <adaptorParams>historicalvoltage</adaptorParams> <adaptorParams>endbackupmessage</adaptorParams> <isExternal>false</isExternal> <queryName>VD0003</queryName> </outputAdaptors>
4.1.3 做每日统计
假设每天定时备份完,需要做日统计
CREATE QUERY VD0003_2 SELECT JAVA:com.example.voltage.Voltage:getYesterdayDate() AS date FROM T1:endbackupmessage PATTERN T1 WITHIN 0
输出适配器
<outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.MysqL.MysqLsqlExecutionAdaptor</outputAdaptorClassName> <adaptorParams>voltage</adaptorParams> <adaptorParams>user</adaptorParams> <adaptorParams>password</adaptorParams> <adaptorParams>192.168.1.3</adaptorParams> <adaptorParams>3306</adaptorParams> <adaptorParams>insert into daystatistic(lineid,date,upbound,bottombound) select lineid as lineid,?{date} as date,max(voltagevalue) as upbound,min(voltagevalue) as bottombound from historicalvoltage group by lineid</adaptorParams> <adaptorParams>endstatisticmessage</adaptorParams> <isExternal>false</isExternal> <queryName>VD0003_2</queryName> </outputAdaptors>
4.2 船舶活动区域监测
本节介绍GIS实时监测领域的应用示例,即监测船舶是否偏离了规定航线或规定活动区域。原理也可以应用到其他GIS实时监测应用中。示例场景为作业的船只通常有一定的活动范围,如果船只出现在异常的海域,应该向海岸管理人员预警提示。
具体EPL就不给出了,参考附件中的“时间处理拓扑图”。
filter1负责当船舶实时位置上报时,查询正常区域模型数据库。
filter2负责根据船舶实时位置更新正常区域模型,即此船只正常活动的区域(四个坐标点形成的矩形区域)。
ARA负责判断船舶是否偏离了正常航行区域,并进行预警。
整个拓扑图中的事件驱动架构则是由级联输入输出适配器完成的。级联输入适配器com.sodbase.inputadaptor.StubInputAdaptor级联输出适配器com.sodbase.outputadaptor.connection.ConnectToSodInputOutputAdaptor前文有介绍。
4.3 ETL、数据库监测、文件监测
ETL说通俗一些,其实都是是定时监测数据库(关系型或nosql数据库),然后根据EPL规则将数据进行转化。结果输出到另一个数据库中,一般是数据仓库中,那就是ETL。
如果结果输出到实时图表显示,那就是一种数据监测。有很多场合,数据库作为了中间媒介来观察数据的变化,也是一种数据监控的方法。
4.3.1 ETL示例
例:在一些项目中,需要把数据放到关系型数据库更好地做OLAP分析,如将数据从Nosql数据库同步到关系型数据库集群
例:数据质量管理、ETL、经营分析系统中将两个数据库表中的数据导入到DataWarehouse,统一gender字段的编码。
CREATE QUERY DQ0002 SELECT * FROM T1:MysqL数据输入,T2:MysqL数据输入2 PATTERN T1|T2 WITHIN 0DQ0002的输出适配器用 级联节点(no watermark),输出到流DQ0002_output
MysqL数据输入、MysqL数据输入2用的是MysqL输入适配器。
CREATE QUERY DQ0002_output SELECT T1.name AS name,JAVA:dq.Code:standardizeGender(T1.gender) AS gender,T1.age AS age FROM T1:DQ0002_output PATTERN T1 WITHIN 0DQ0002_output可以接数据存储写入的适配器。
4.3.2 数据库、文件监测
有些情况,也会定时监测是其他数据源,比如文件、文件夹、各类型URI,和监测数据库数据的原理是一样的,在经营分析系统、数据质量管理等项目中也经常会用到。
SODBASE CEP用于轻松、高效实施数据监测、监控类项目。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE Studio。