jedis 源码阅读三——PipeLine

前端之家收集整理的这篇文章主要介绍了jedis 源码阅读三——PipeLine前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

之前看Jedis源码觉得很简单,于是没有打算写总结。前几天听人说到

”pipeline就是 Socket 一次性 把整个命令包 写出去“

。怎么跟我理解的不一样呢?

PipeLine

我们从这个Test说起(建议使用p.sync();),如需源码可以翻看我前两篇文章,里面有下载链接

public void pipelineResponse() {

        Pipeline p = jedis.pipelined();   
        Response<String> string = p.get("string");
        // System.out.println(string.get());无法获取,string为null,直到sync()调用,为什么?接着看吧
        p.sync();
        System.out.println(string.get()) //sync()调用后,string.get()返回命令执行完后的结果信息

Pipeline p = jedis.pipelined();

每个jedis中都持有一个 PipeLine pipeline对象,每次调用 jedis.pipelined(),都会产生一个新的PipeLine对象:

................
     protected Pipeline pipeline = null;
 ................
    public Pipeline pipelined() {
        pipeline = new Pipeline();
        pipeline.setClient(client);//每个pipeline 都一个 Client client对象,对于Client对象,你可以简单的理解为一个Socket对象
        return pipeline;
    }

Response string = p.get(“string”);

查看p.get(“..”)实现:

public Response<String> get(String key) {
    getClient(key).get(key);//1 调用用这个pipeline的Client对象的get()
    return getResponse(BuilderFactory.STRING);
  }

查看 getClient(key).get(key)实现:

看到最底层,其实就是sendCommand方法调用,(详见之前文章)其实就是初始化一个Socket做写操作:

protected Connection sendCommand(final ProtocolCommand cmd,final byte[]... args) {
 ..............
            // 1.建立Socket连接
            connect();
            // 2.按照协议完成IO操作,也就是命令的执行
            Protocol.sendCommand(outputStream,cmd,args);
  ..............

p.sync();

/** * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to * get return values from pipelined commands,capture the different Response&lt;?&gt; of the * commands you execute. */
  public void sync() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      for (Object o : unformatted) {
        generateResponse(o);
      }
    }
  }

client.getMany()

public List<Object> getMany(final int count) {
        flush();
        final List<Object> responses = new ArrayList<Object>(count);
        for (int i = 0; i < count; i++) {
            try {
                responses.add(readProtocolWithCheckingBroken());
            } catch (JedisDataException e) {
                responses.add(e);
            }
        }
        return responses;
    }



    public List<Object> getMany(final int count) {
        flush();
        final List<Object> responses = new ArrayList<Object>(count);
        for (int i = 0; i < count; i++) {
            try {
                responses.add(readProtocolWithCheckingBroken());
            } catch (JedisDataException e) {
                responses.add(e);
            }
        }
        return responses;
    }
    protected Object readProtocolWithCheckingBroken() {
        try {
            return Protocol.read(inputStream);//这个inputStream就是上边pipeline中Connection建立的Socket对象的Input流。
        } catch (JedisConnectionException exc) {
            broken = true;
            throw exc;
        }
    }

总结:

同一个jedis返回的pipeline对象( Pipeline p = jedis.pipelined();)会有唯一一个Connection(理解为Socket就行了),无论你调用几次jedis.pipelined(),一个Jedis有且仅有一个Socket.

然后你用这个pipeline去执行一系列命令:

Response<String> hash = p.hget("hash","foo");
        Response<Set<String>> zset = p.zrange("zset",0,-1);
        Response<String> set = p.spop("set");
        Response<Boolean> blist = p.exists("list");

当你执行上边的代码,这个建立了的Connection ,通过唯一的Socket对象去写入命令(执行redis命令),这个socket使用的是阻塞的IO流(详见源码)。也就是你每次调用一个命令(如 p.hget、 p.zrange等等),这个阻塞IO流就会写入一次命令。最终:
调用p.sync();一次性的从流中读取所有内容(之前所有redis命令的返回结果)

为什么使用Pipeline

使用pipeline的优势在于,我们不用等每一次redis命令返回,可以异步(当然这里的异步不是站在Socket IO的角度)的执行每一个命令,不去等待这个命令的返回。然而jedis.doXXX()(如,jedis.set(“string”,“foo”);)都是需要获取命令执行的返回结果之后,才去执行下一个jedis.doXXX()

看来同事说错了!并不是一次写入,而是一次又一次(也就是源码里所谓的异常方式)。读取是一次性读取

夜深了,碎吧,夜猫子们

猜你在找的设计模式相关文章