当你的项目数据量上去了之后,通常会遇到两种情况,第一种情况应是最大可能的使用cache来对抗上层的高并发,第二种情况同样也是需要使用分库
分表对抗上层的高并发。。。逼逼逼起来容易,做起来并不那么乐观,由此引入的问题,不见得你有好的解决方案,下面就具体分享下。
比如在我们的千人千面系统中,会针对商品,订单等维度为某一个商家店铺自动化建立大约400个数据模型,然后买家在淘宝下订单之后,淘宝会将订单推
送过来,订单会在400个模型中兜一圈,从而推送更贴切符合该买家行为习惯的短信和邮件,这是一个真实的业务场景,为了应对高并发,这些模型自然都是缓
存在Cache中,模型都是从db中灌到redis的,那如果有新的模型进来了,我如何通知redis进行缓存更新呢???通常的做法就是在添加模型的时候,顺便更新
redis。。。对吧,如下图:
说的简单,我把自己的手头代码写好就可以了,我要高内聚,所以你必须碰一鼻子灰。
除了一鼻子灰之后,也许你还会遇到更新database成功,再更新redis的时候失败,可人家不管,而且错误日志还是别人的日志系统里面,所以你很难甚至
无法保证这个db和cache的缓存一致性,那这个时候能不能换个思路,我直接写个程序订阅database的binlog,从binlog中分析出模型数据的CURD操作,根
据这些CURD的实际情况更新Redis的缓存数据,第一个可以实现和web的解耦,第二个实现了高度的缓存一致性,所以新的架构是这样的。
上面这张图,相信大家都能看得懂,重点就是这个处理binlog程序,从binlog中分析出CURD从而更新Redis,其实这个binlog程序就是本篇所说的canal。。。
一个伪装成MysqL的slave,不断的通过dump命令从MysqL中盗出binlog日志,从而完美的实现了这个需求。
本篇开头也说到了,数据量大了之后,必然会存在分库分表,甚至database都要分散到多台服务器上,现在的电商项目,都是业务赶着技术跑。。。
谁也不知道下一个业务会是一个怎样的奇葩,所以必然会导致你要做一些跨服务器join查询,查询的函数给你
不过如果你的业务真的很重要,可能DBA会给你做数据异构,所谓的数据异构,那就是
将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。。。。。
那如果用canal来订阅binlog,就可以改造成下面这种架构。
好了,canal的应用场景给大家也介绍到了,最主要是理解这种思想,人家搞不定的东西,你的价值就出来了。
MysqL的binlog功能
开启binlog,并且将binlog的格式改为Row,这样就可以获取到CURD的二进制内容,MysqL\MysqL Server 5.7\my.ini
使用命令验证,并且开启binlog的过期时间为30天,默认情况下binlog是不过期的,这就导致你的磁盘可能会爆满,直到挂掉。
MysqL的账号权限,方便canal去偷binlog日志。
show grants for 'canal'
github的地址: https://github.com/alibaba/canal/releases
canal
配置文件
canal的模式是这样的,一个canal里面可能会有多个instance,也就说一个instance可以监控一个MysqL实例,多个instance也就可以对应多台服务器
的MysqL实例。也就是一个canal就可以监控分库分表下的多机器MysqL。
它是全局性的canal服务器配置,具体如下,这里面的参数涉及到方方面面。
detecing config
canal.instance.detecting.enable = false
canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
support maximum transaction size,more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
MysqL fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
binlog filter config
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
binlog ddl isolation
canal.instance.get.ddl.isolation = false
#################################################
######### destinations #############
#################################################
<span style="color: #ff0000;">canal.destinations=
conf root dir
canal.conf.dir = ../conf
auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = 127.0.0.1:1099
canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
#################################################
MysqL serverId
canal.instance.MysqL.slaveId = 1234
position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.standby.address =
canal.instance.standby.journal.name =
canal.instance.standby.position =
canal.instance.standby.timestamp =
username/password,需要改成自己的数据库信息
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.defaultDatabaseName = datamip
canal.instance.connectionCharset = UTF-8
table regex
<span style="color: #ff0000;">canal.instance.filter.regex = .\..
#################################################
由于是全局性的配置,所以上面三处标红的地方要注意一下:
canal.port= 11111 当前canal的服务器端口号
canal.destinations= example 当前默认开启了一个名为example的instance实例,如果想开多个instance,用","逗号隔开就可以了。。。
canal.instance.filter.regex = .*\\..* MysqL实例下的所有db的所有表都在监控范围内。
这个就是具体的某个instances实例的配置,未涉及到的配置都会从canal.properties上继承。
position info
<span style="color: #ff0000;">canal.instance.master.address = 192.168.23.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.standby.address =
canal.instance.standby.journal.name =
canal.instance.standby.position =
canal.instance.standby.timestamp =
username/password
<span style="color: #ff0000;">canal.instance.dbUsername = canal
canal.instance.dbPassword =
canal.instance.defaultDatabaseName =<span style="color: #ff0000;">datamip
canal.instance.connectionCharset = UTF-8
table regex
<span style="color: #ff0000;">canal.instance.filter.regex = .\..
table black regex
canal.instance.filter.black.regex =
#################################################
上面标红的地方注意下就好了,去偷binlog的时候,需要知道的MysqL地址和用户名,密码。
大家要记得把/canal/bin 目录配置到 /etc/profile 的 Path中,方便快速开启,通过下图你会看到11111端口已经在centos上开启了。
代码
canal driver 需要在maven仓库中获取一下:http://www.mvnrepository.com/artifact/com.alibaba.otter/canal.client/1.0.24,不过依赖还是蛮多的。
代码进行验证
下面的代码对table的CURD都做了一个基本的判断,看看是不是能够智能感知,然后可以根据实际情况进行redis的更新操作。。。
<span style="color: #0000ff;">import<span style="color: #000000;"> java.net.InetSocketAddress;
<span style="color: #0000ff;">import<span style="color: #000000;"> java.util.List;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.client.CanalConnector;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.client.CanalConnectors;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.CanalEntry.Column;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.CanalEntry.Entry;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.CanalEntry.EventType;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.CanalEntry.Header;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.alibaba.otter.canal.protocol.Message;
<span style="color: #0000ff;">import<span style="color: #000000;"> com.google.protobuf.InvalidProtocolBufferException;
<span style="color: #0000ff;">public <span style="color: #0000ff;">class<span style="color: #000000;"> App {
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> main(String[] args) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> InterruptedException {
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 第一步:与canal进行连接</span>
CanalConnector connector = CanalConnectors.newSingleConnector(<span style="color: #0000ff;">new</span> InetSocketAddress("192.168.23.170",11111<span style="color: #000000;">),</span>"example","",""<span style="color: #000000;">);
connector.connect();
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 第二步:开启<a href="/tag/dingyue/" target="_blank" class="keywords">订阅</a></span>
<span style="color: #000000;"> connector.subscribe();
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 第三步:循环<a href="/tag/dingyue/" target="_blank" class="keywords">订阅</a></span>
<span style="color: #0000ff;">while</span> (<span style="color: #0000ff;">true</span><span style="color: #000000;">) {
</span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 每次读取 1000 条</span>
Message message = connector.getWithoutAck(1000<span style="color: #000000;">);
</span><span style="color: #0000ff;">long</span> batchID =<span style="color: #000000;"> message.getId();
</span><span style="color: #0000ff;">int</span> size =<span style="color: #000000;"> message.getEntries().size();
</span><span style="color: #0000ff;">if</span> (batchID == -1 || size == 0<span style="color: #000000;">) {
System.out.println(</span>"当前暂时没有数据"<span style="color: #000000;">);
Thread.sleep(</span>1000); <span style="color: #008000;">//</span><span style="color: #008000;"> 没有数据</span>
} <span style="color: #0000ff;">else</span><span style="color: #000000;"> {
System.out.println(</span>"-------------------------- 有数据啦 -----------------------"<span style="color: #000000;">);
PrintEntry(message.getEntries());
}
</span><span style="color: #008000;">//</span><span style="color: #008000;"> position id ack (方便处理下一条)</span>
<span style="color: #000000;"> connector.ack(batchID);
} </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (Exception e) {
</span><span style="color: #008000;">//</span><span style="color: #008000;"> TODO: handle exception</span>
<span style="color: #000000;">
} <span style="color: #0000ff;">finally<span style="color: #000000;"> {
Thread.sleep(1000<span style="color: #000000;">);
}
}
}
</span><span style="color: #008000;">//</span><span style="color: #008000;"> <a href="/tag/huoqu/" target="_blank" class="keywords">获取</a>每条打印的记录</span>
@SuppressWarnings("static-access"<span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> PrintEntry(List<Entry><span style="color: #000000;"> entrys) {
</span><span style="color: #0000ff;">for</span><span style="color: #000000;"> (Entry entry : entrys) {
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 第一步:拆解entry 实体</span>
Header header =<span style="color: #000000;"> entry.getHeader();
EntryType entryType </span>=<span style="color: #000000;"> entry.getEntryType();
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 第二步: 如果当前是RowData,那就是我需要的数据</span>
<span style="color: #0000ff;">if</span> (entryType ==<span style="color: #000000;"> EntryType.ROWDATA) {
String tableName </span>=<span style="color: #000000;"> header.getTableName();
String schemaName </span>=<span style="color: #000000;"> header.getSchemaName();
RowChange rowChange </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;
</span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
rowChange </span>=<span style="color: #000000;"> RowChange.parseFrom(entry.getStoreValue());
} </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType </span>=<span style="color: #000000;"> rowChange.getEventType();
System.out.println(String.format(</span>"当前正在操作 %s.%s, Action= %s"<span style="color: #000000;">,schemaName,tableName,eventType));
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 如果是‘<a href="/tag/chaxun/" target="_blank" class="keywords">查询</a>’ 或者 是 ‘DDL’ 操作,那么<a href="/tag/sql/" target="_blank" class="keywords">sql</a>直接打出来</span>
<span style="color: #0000ff;">if</span> (eventType == EventType.QUERY ||<span style="color: #000000;"> rowChange.getIsDdl()) {
System.out.println(</span>"rowchange <a href="/tag/sql/" target="_blank" class="keywords">sql</a> ----->" +<span style="color: #000000;"> rowChange.get<a href="/tag/sql/" target="_blank" class="keywords">sql</a>());
</span><span style="color: #0000ff;">return</span><span style="color: #000000;">;
}
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 第三步:追踪到 columns 级别</span>
rowChange.getRowDatasList().forEach((rowData) -><span style="color: #000000;"> {
</span><span style="color: #008000;">//</span><span style="color: #008000;"> <a href="/tag/huoqu/" target="_blank" class="keywords">获取</a>更新之前的column情况</span>
List<Column> beforeColumns =<span style="color: #000000;"> rowData.getBeforeColumnsList();
</span><span style="color: #008000;">//</span><span style="color: #008000;"> <a href="/tag/huoqu/" target="_blank" class="keywords">获取</a>更新之后的 column 情况</span>
List<Column> afterColumns =<span style="color: #000000;"> rowData.getAfterColumnsList();
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 当前执行的是 <a href="/tag/shanchu/" target="_blank" class="keywords">删除</a>操作</span>
<span style="color: #0000ff;">if</span> (eventType ==<span style="color: #000000;"> EventType.DELETE) {
PrintColumn(beforeColumns);
}
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 当前执行的是 插入操作</span>
<span style="color: #0000ff;">if</span> (eventType ==<span style="color: #000000;"> eventType.INSERT) {
PrintColumn(afterColumns);
}
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 当前执行的是 更新操作</span>
<span style="color: #0000ff;">if</span> (eventType ==<span style="color: #000000;"> eventType.UPDATE) {
PrintColumn(afterColumns);
}
});
}
}
}
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 每个row上面的每一个column 的更改情况</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> PrintColumn(List<Column><span style="color: #000000;"> columns) {
columns.forEach((column) </span>-><span style="color: #000000;"> {
String columnName </span>=<span style="color: #000000;"> column.getName();
String columnValue </span>=<span style="color: #000000;"> column.getValue();
String columnType </span>=<span style="color: #000000;"> column.get<a href="/tag/MysqL/" target="_blank" class="keywords">MysqL</a>Type();
</span><span style="color: #0000ff;">boolean</span> isUpdated = column.getUpdated(); <span style="color: #008000;">//</span><span style="color: #008000;"> 判断 该字段是否更新</span>
<span style="color: #000000;">
System.out.println(String.format("columnName=%s,columnValue=%s,columnType=%s,isUpdated=%s"<span style="color: #000000;">,columnName,columnValue,columnType,isUpdated));
});
}
}
Update操作
Insert操作
Delete 操作
从结果中看,没毛病,有图有真相,好了,本篇就说到这里,对于开发的你,肯定是有帮助的~~~