对Riak Core的探索 (3) - 依赖注入 IoC

前端之家收集整理的这篇文章主要介绍了对Riak Core的探索 (3) - 依赖注入 IoC前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
二、 基于Riak Core分布式应用开发

开发一个基于Riak Core的分布式数据处理系统,意味着它要处理的数据将在hash后映射到一个环上,也就是说这些数据分布在各个数据节点的partition上,数据的处理也在各个partition上进行。

显然每个数据处理系统有它自己的数据处理逻辑。如果我们实现了这个处理逻辑然后以某种方式注入到Riak Core中,那么相应的系统也就开发成了。

基于Riak Core的开发本质上也是这样的:Riak Core屏蔽了所有和分布式有关的部分,开发者不用关心这些,它提供了某种接口,用户通过这些接口实现数据处理逻辑,然后将实现注入到Riak Core系统中。


1. riak_core_vnode behavIoUr —— 用户处理逻辑的实现接口

Riak Core为提供了一个riak_core_vnode回调模块作为用户实现业务逻辑的接口。源代码这里。riak_core_vnode有两个作用:
  1. 作为erlang behavIoUr提供回调函数实现接口,用户在回调函数中实现业务逻辑;
  2. 作为容器进程运行实现上述接口的用户模块,具体来讲它将作为一个基于gen_fsm的有限状态机进程运行。

也就是说,riak_core_vnode是作为一个容器以erlang进程的形式运行,而用户的业务逻辑栖身于容器中。(BTW:gen_server/gen_fsm都是这种设计和运行模式。)

我想这是一种典型的IoC(依赖注入)设计:系统将用户实现的功能注入到Riak Core系统中,由此构建出基于Riak Core的分布式应用系统。困难的dynamo分布式实现交给Riak Core完成,用户只关心业务逻辑的实现就行了。

2. 业务逻辑的实现,和其他

主要是实现riak_core_vnode behavIoUr的接口回调函数。除了业务逻辑的实现,还设计到数据的转移等。

Riak Core将partition上的数据处理操作抽象出来,由用户通过riak_core_vnode behavIoUr的回调函数实现,vnode的回调函数分为以下几类:
0) vnode进程生命周期管理,在vnode进程诞生和毁灭时将被调用的回调函数init/1,terminate/2
1) 对vnode进程负责的partition上数据的处理,当传来对该partition上的数据继续处理的命令时,由回调函数handle_command/3负责处理。例如,如果对数据的处理是存储,那么无非是get/put之类的命令,这些命令的格式(协议)都是由用户自己定义并实现的。
2) 当集群中增/减物理节点时,或者某个宕掉的物理节点恢复时,就需要对partition进行handoff的处理,其核心是对数据的搬运,它包括一组回调函数(is_empty/1,delete/1,handle_handoff_command/3,handoff_starting/2,handoff_cancelled/1,handoff_finished/2,handle_handoff_data/2,encode_handoff_item/2 )
3) 对进程错误的处理:与该vnode相关的进程如果出错会调用回调函数handle_exit
4) Covering:覆盖整个ring空间数据的特殊命令,例如对一段连续范围的搜索命令。某个连续的数据集在hash后有可能会落到所有的partition。

这些回调函数rzezeski的博文中有介绍

3. 业务逻辑的注入过程

我们的分布式数据处理系统作为一个OTP application发布。在application中实现了处理系统的启动,以及业务逻辑的注入。

application启动时会启动supervisor,后者监控着 riak_core_vnode_master子进程(riak_core_vnode_master是一个gen_server),参数配制成实现用户逻辑的定制vnode模块,从而实现业务逻辑注入。

riak_core_vnode_master可看成是运行着vnode的容器。

业务逻辑的注入

-module(rts_sup).

-behavIoUr(supervisor).

%% API 
-export([start_link/0]).      

%% Supervisor callbacks
-export([init/1]).

start_link() ->
    supervisor:start_link({local,?MODULE},?MODULE,[]).


init(_Args) ->
    VMaster = { rts_vnode_master,{riak_core_vnode_master,start_link,[rts_vnode]},permanent,5000,worker,[riak_core_vnode_master]},Entry = { rts_entry_vnode_master,[rts_entry_vnode]},Stat = { rts_stat_vnode_master,[rts_stat_vnode]},{ok,{{one_for_one,5,10},[VMaster,Entry,Stat]}}.


有几种执行特定分区上业务逻辑的API方法,都是riak_core_vnode_master模块提供的,这些函数的参数都是三个,分别是:IdxNode,Msg,VMasterMod

1. 同步发送
sync_command(IndexNode,VMaster)

2. 异步发送
riak_core_vnode_master:sync_spawn_command(IndexNode,ping,rts_vnode_master)

还有一个比较特殊command函数,它有两种

一种一上述两类相似,直接向vnode同步发送命令:
riak_core_vnode_master:command(IdxNode,
{entry,Client,Entry} = Msg,
rts_entry_vnode_master = VMaster).

一种向PreList里的所有节点发送command命令:
command(Preflist,VMaster)


IdxNode是{Index,Node}这样的tuple,其中Index是partition分区的id号,Node是分区所在的节点,知道了Node和rts_entry_vnode_master,就能得到vnode进程Pid,通过发送事件调用相关业务逻辑:
gen_fsm:send_event(Pid,make_request(Msg,Sender,Index));


IndexNode的计算:
1) 用riak_core_util:chash_key/1函数计算hash值,参数是一个含两个二进制数据的tuple
DocIdx = riak_core_util:chash_key({<<"ping">>,term_to_binary(now())}),
2) 通过哈希值得到Preflist,Preflist的长度可以指定:
PrefList = riak_core_apl:get_primary_apl(DocIdx,1,rts),

最后得到IndexNod
[{IndexNode,_Type}] = PrefList,





BTW:应用的启动过程
-module(rts_app).
-behavIoUr(application).

%% Application callbacks
-export([start/2,stop/1]).

start(_StartType,_StartArgs) ->
    case rts_sup:start_link() of
        {ok,Pid} ->
            ok = riak_core:register_vnode_module(rts_vnode),ok = riak_core_node_watcher:service_up(rts,self()),ok = riak_core:register_vnode_module(rts_entry_vnode),ok = riak_core_node_watcher:service_up(rts_entry,ok = riak_core:register_vnode_module(rts_stat_vnode),ok = riak_core_node_watcher:service_up(rts_stat,EntryRoute = {["rts","entry",client],rts_wm_entry,[]},webmachine_router:add_route(EntryRoute),Pid};
        {error,Reason} ->
            {error,Reason}
    end.

stop(_State) ->
    ok. 

猜你在找的设计模式相关文章