开发一个基于Riak Core的分布式数据处理系统,意味着它要处理的数据将在hash后映射到一个环上,也就是说这些数据分布在各个数据节点的partition上,数据的处理也在各个partition上进行。
显然每个数据处理系统有它自己的数据处理逻辑。如果我们实现了这个处理逻辑然后以某种方式注入到Riak Core中,那么相应的系统也就开发成了。
基于Riak Core的开发本质上也是这样的:Riak Core屏蔽了所有和分布式有关的部分,开发者不用关心这些,它提供了某种接口,用户通过这些接口实现数据处理逻辑,然后将实现注入到Riak Core系统中。
1. riak_core_vnode behavIoUr —— 用户处理逻辑的实现接口
Riak Core为提供了一个riak_core_vnode回调模块作为用户实现业务逻辑的接口。源代码看 这里。riak_core_vnode有两个作用:
也就是说,riak_core_vnode是作为一个容器以erlang进程的形式运行,而用户的业务逻辑栖身于容器中。(BTW:gen_server/gen_fsm都是这种设计和运行模式。)
我想这是一种典型的IoC(依赖注入)设计:系统将用户实现的功能注入到Riak Core系统中,由此构建出基于Riak Core的分布式应用系统。困难的dynamo分布式实现交给Riak Core完成,用户只关心业务逻辑的实现就行了。
2. 业务逻辑的实现,和其他
主要是实现riak_core_vnode behavIoUr的接口回调函数。除了业务逻辑的实现,还设计到数据的转移等。
Riak Core将partition上的数据处理操作抽象出来,由用户通过riak_core_vnode behavIoUr的回调函数实现,vnode的回调函数分为以下几类:
0) vnode进程生命周期管理,在vnode进程诞生和毁灭时将被调用的回调函数init/1,terminate/2
1) 对vnode进程负责的partition上数据的处理,当传来对该partition上的数据继续处理的命令时,由回调函数handle_command/3负责处理。例如,如果对数据的处理是存储,那么无非是get/put之类的命令,这些命令的格式(协议)都是由用户自己定义并实现的。
2) 当集群中增/减物理节点时,或者某个宕掉的物理节点恢复时,就需要对partition进行handoff的处理,其核心是对数据的搬运,它包括一组回调函数(is_empty/1,delete/1,handle_handoff_command/3,handoff_starting/2,handoff_cancelled/1,handoff_finished/2,handle_handoff_data/2,encode_handoff_item/2 )
3) 对进程错误的处理:与该vnode相关的进程如果出错会调用回调函数handle_exit
4) Covering:覆盖整个ring空间数据的特殊命令,例如对一段连续范围的搜索命令。某个连续的数据集在hash后有可能会落到所有的partition。
这些回调函数在 rzezeski的博文中有介绍
3. 业务逻辑的注入过程
我们的分布式数据处理系统作为一个OTP application发布。在application中实现了处理系统的启动,以及业务逻辑的注入。
application启动时会启动supervisor,后者监控着 riak_core_vnode_master子进程(riak_core_vnode_master是一个gen_server),参数配制成实现用户逻辑的定制vnode模块,从而实现业务逻辑注入。
riak_core_vnode_master可看成是运行着vnode的容器。
业务逻辑的注入
-module(rts_sup). -behavIoUr(supervisor). %% API -export([start_link/0]). %% Supervisor callbacks -export([init/1]). start_link() -> supervisor:start_link({local,?MODULE},?MODULE,[]). init(_Args) -> VMaster = { rts_vnode_master,{riak_core_vnode_master,start_link,[rts_vnode]},permanent,5000,worker,[riak_core_vnode_master]},Entry = { rts_entry_vnode_master,[rts_entry_vnode]},Stat = { rts_stat_vnode_master,[rts_stat_vnode]},{ok,{{one_for_one,5,10},[VMaster,Entry,Stat]}}.
有几种执行特定分区上业务逻辑的API方法,都是riak_core_vnode_master模块提供的,这些函数的参数都是三个,分别是:IdxNode,Msg,VMasterMod
1. 同步发送
sync_command(IndexNode,VMaster)
2. 异步发送
riak_core_vnode_master:sync_spawn_command(IndexNode,ping,rts_vnode_master)
还有一个比较特殊command函数,它有两种
一种一上述两类相似,直接向vnode同步发送命令:
riak_core_vnode_master:command(IdxNode,
{entry,Client,Entry} = Msg,
rts_entry_vnode_master = VMaster).
一种向PreList里的所有节点发送command命令:
command(Preflist,VMaster)
IdxNode是{Index,Node}这样的tuple,其中Index是partition分区的id号,Node是分区所在的节点,知道了Node和rts_entry_vnode_master,就能得到vnode进程Pid,通过发送事件调用相关业务逻辑:
gen_fsm:send_event(Pid,make_request(Msg,Sender,Index));
IndexNode的计算:
1) 用riak_core_util:chash_key/1函数计算hash值,参数是一个含两个二进制数据的tuple
DocIdx = riak_core_util:chash_key({<<"ping">>,term_to_binary(now())}),
2) 通过哈希值得到Preflist,Preflist的长度可以指定:
PrefList = riak_core_apl:get_primary_apl(DocIdx,1,rts),
最后得到IndexNod
[{IndexNode,_Type}] = PrefList,
BTW:应用的启动过程
-module(rts_app). -behavIoUr(application). %% Application callbacks -export([start/2,stop/1]). start(_StartType,_StartArgs) -> case rts_sup:start_link() of {ok,Pid} -> ok = riak_core:register_vnode_module(rts_vnode),ok = riak_core_node_watcher:service_up(rts,self()),ok = riak_core:register_vnode_module(rts_entry_vnode),ok = riak_core_node_watcher:service_up(rts_entry,ok = riak_core:register_vnode_module(rts_stat_vnode),ok = riak_core_node_watcher:service_up(rts_stat,EntryRoute = {["rts","entry",client],rts_wm_entry,[]},webmachine_router:add_route(EntryRoute),Pid}; {error,Reason} -> {error,Reason} end. stop(_State) -> ok.原文链接:https://www.f2er.com/javaschema/286882.html