Jedis源码分析(二)-Jedis的内部实现(Client,Pipeline,Transaction)

前端之家收集整理的这篇文章主要介绍了Jedis源码分析(二)-Jedis的内部实现(Client,Pipeline,Transaction)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

Jedis源码分析共有四个章节,以下为各章链接

  1. Jedis源码分析(一)-Jedis介绍
  2. Jedis源码分析(二)-Jedis类结构及实现
  3. Jedis源码分析(三)- JedisCluster类结构及实现
  4. Jedis源码分析(四)-JedisSentinel与ShardedJedis介绍

1 Jedis的类结构

​ 首先看Jedis的内部结构,图2-1中用橘色框标出了主要支架,为突出主要架构,或有稍许内容没有标出。

图1-1 Jedis的类结构

​ Jedis以输入的命令参数是否为二进制,将处理请求的具体实现部署在两个类中,例如JedisBinaryJedisClientBinaryClient。与Redis服务器的连接信息(Socket,host,port……)封装在Client的基类Connection中。BinaryJedis类中提供了Client,Pipeline和Transcation变量,对应3种请求模式。

2 Jedis的初始化流程

Jedis jedis = new Jedis("localhost",6379,15000);
Transaction t = jedis.multi();
Pipeline pipeline = jedis.pipelined();

​ Jedis通过传入Redis服务器地址(host,port)开始初始化,然后在BinaryJedis里实例化Client。Client通过Socket维持客户端与Redis服务器的连接与沟通。
前文提到Transaction和Pipeline很相似,它们继承同一个基类MultiKeyPipelineBase。区别在于Transaction在实例化的时候,就自动发送MULTI命令,开启事务模式,而Pipeline则按情况手动开启,它们均依靠Client发送命令。以下是Transaction和Pipeline初始化的具体实现:

//BinaryJedis类
public Transaction multi() {
client.multi();
transaction = new Transaction(client);
return transaction;
}
public Pipeline pipelined() {
pipeline = new Pipeline();
pipeline.setClient(client);
return pipeline;
}

​ 下面通过发送一个get key 的命令,看看,这三种模式是如何运转的。

3 Jedis工作模式的调用流程

3.1 client请求模式

​ 以get key 为例,为突出主要步骤,部分代码略有缩减。
用法

jedis.get("foo");


图3-1 Clinet模式的时序图


图3-2 Client模式的调用流程

​ 图3-1和3-2是client模式下,发送请求,读取回复的具体实现。可以清楚看到,在每次发送命令前,会先通过connect()方法判断是否已经连接,若未连接则进行如下操作:

  1. 实例化Socket,并配置,
  2. 连接Socket,获取OutputStream和InputStream
  3. 如果是SSL连接,则会通过SSLSocketFactory创建socket连接

Protocol是一个通讯工具类,将Redis的各类执行关键字存储为静态变量,可以直观调用命令,例如Protocol.Command.GET。同时,将命令包装成符合Redis的统一请求协议回复消息的处理也是在这个类进行,先通过通讯协提取出当次请求的回复消息,将Object类型的消息,格式化为String,List等具体类型,如果回复消息有Error则以异常的形式抛出。

3.2 Pipeline和Transaction模式


图3-3 Transaction和Pipeline的类结构

​ 图3-3 是Transaction和Pipeline两个类的的类结构。可以看到Pipeline和Transaction都继承自MultiKeyPipelineBase,其中,MultiKeyPipelineBasePipelineBase的区别在于处理的命令不同,内部均调用Client发送命令。从以下用例也可以看出两者的操作也十分类似。Pipeline有一个内部类对象MultiResponseBuilder,前文提到,当Redis事务结束时,会以List的形式,一次性返回所有命令的执行结果。MultiResponseBuilder对象就是用于,当Pipeline开始其实模式后,在事务结束时,存储所有返回结果。
Queable用一个LinkedList装入每个命令的返回结果,Response<T>是一个泛型,set(Object data)方法传入格式化之前的结果,get()方法返回格式化之后的结果。
Pipeline的使用方法

Pipeline p = jedis.pipelined();
//只发送命令,不读取结果,此时的Response<T>没有数据
Response<String> string = p.get("string");
Response<String> list = p.lpop("list");
Response<String> hash = p.hget("hash","foo");
Response<Set<String>> zset = p.zrange("zset",-1);
Response<String> set = p.spop("set");
//一次读取所有response.此时的Response<T>有数据
p.sync();

assertEquals("foo",string.get());
assertEquals("foo",list.get());
assertEquals("bar",hash.get());
assertEquals("foo",zset.get().iterator().next());
assertEquals("foo",set.get());

Transactions使用方法:
//开启事务
Transaction t = jedis.multi();
//命令进入服务端的待执行队列
Response<String> string = t.get("string");
Response<String> list = t.lpop("list");
Response<String> hash = t.hget("hash","foo");
Response<Set<String>> zset = t.zrange("zset",-1);
Response<String> set = t.spop("set");
//发送EXEC指令,执行所有命令,并返回结果
t.exec();

assertEquals("foo",set.get());


图3-4 Pipeline的调用时序图

图3-5 Pipeline的调用流程

​ 图3-4,3-5 显示了Pipeline从发送请求到读取回复的具体实现,Transaction的实现与其类似,因而没有另外做图说明。由上图可见,Pipeline通过Client发送命令,Client在sendCommand时,会同时执行pipelinedCommands++,记录发送命令的条数(参见图3-5)。之后,返回一个Response<T>实例,并将这个实例塞入了pipelinedResponses队列中。Response<T>主要有3个属性

  1. 格式化前的回复消息data,
  2. 格式化后的回复消息response,
  3. 格式化方式builder。

​ 刚发送消息后,Response<T>里面的dataresponse是空值,只有格式化的方式builderSync()用于一次性读取所有回复,首先调用client的getAll()方法getAll()方法根据之前记录的pipelinedCommands和Redis通讯协议,读取相同条数的回复消息到一个List,并返回给Pipeline。随后遍历这个List,逐个将回复消息赋给pipelinedResponses中每个Response<T>data

​ 在执行Response<T>.get()命令时,Response<T>里面data已经有值了,但是是Object类型的,因而还要调用build()方法,做一次数据转换,返回格式化之后的数据。

以上就是Pipeline的主要工作流程。Transaction的exec()方法sync()很相似,下文为exec()的具体实现。

public List<Object> exec() {
  // 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
  client.getMany(getPipelinedResponseLength());
  //发送EXEC指令,让服务端执行所有命令
  client.exec();
  //事务结束
  inTransaction = false;
  //从inputStream中读取所有回复
  List<Object> unformatted = client.getObjectMultiBulkReply();
  if (unformatted == null) {
    return null;
  }
  //和sync()一样
  List<Object> formatted = new ArrayList<Object>();
  for (Object o : unformatted) {
    try {
     formatted.add(generateResponse(o).get());
    } catch (JedisDataException e) {
    formatted.add(e);
    }
  }
  return formatted;
}

本节虽未将Pipeline和Transaction的方法实现尽述,但也大同小异。关键点在于理解第一章介绍的3类请求逻辑。

猜你在找的设计模式相关文章