之前看Jedis源码觉得很简单,于是没有打算写总结。前几天听人说到
”pipeline就是 Socket 一次性 把整个命令包 写出去“
。怎么跟我理解的不一样呢?
PipeLine
我们从这个Test说起(建议使用p.sync();),如需源码可以翻看我前两篇文章,里面有下载链接。
public void pipelineResponse() {
Pipeline p = jedis.pipelined();
Response<String> string = p.get("string");
// System.out.println(string.get());无法获取,string为null,直到sync()调用,为什么?接着看吧
p.sync();
System.out.println(string.get()) //sync()调用后,string.get()返回命令执行完后的结果信息
Pipeline p = jedis.pipelined();
每个jedis中都持有一个 PipeLine pipeline对象,每次调用 jedis.pipelined(),都会产生一个新的PipeLine对象:
................
protected Pipeline pipeline = null;
................
public Pipeline pipelined() {
pipeline = new Pipeline();
pipeline.setClient(client);//每个pipeline 都一个 Client client对象,对于Client对象,你可以简单的理解为一个Socket对象
return pipeline;
}
Response string = p.get(“string”);
查看p.get(“..”)实现:
public Response<String> get(String key) {
getClient(key).get(key);//1 调用用这个pipeline的Client对象的get()
return getResponse(BuilderFactory.STRING);
}
查看 getClient(key).get(key)实现:
看到最底层,其实就是sendCommand方法的调用,(详见之前文章)其实就是初始化一个Socket做写操作:
protected Connection sendCommand(final ProtocolCommand cmd,final byte[]... args) {
..............
// 1.建立Socket连接
connect();
// 2.按照协议完成IO操作,也就是命令的执行
Protocol.sendCommand(outputStream,cmd,args);
..............
p.sync();
/** * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to * get return values from pipelined commands,capture the different Response<?> of the * commands you execute. */
public void sync() {
if (getPipelinedResponseLength() > 0) {
List<Object> unformatted = client.getMany(getPipelinedResponseLength());
for (Object o : unformatted) {
generateResponse(o);
}
}
}
client.getMany()
public List<Object> getMany(final int count) {
flush();
final List<Object> responses = new ArrayList<Object>(count);
for (int i = 0; i < count; i++) {
try {
responses.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
responses.add(e);
}
}
return responses;
}
public List<Object> getMany(final int count) {
flush();
final List<Object> responses = new ArrayList<Object>(count);
for (int i = 0; i < count; i++) {
try {
responses.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
responses.add(e);
}
}
return responses;
}
protected Object readProtocolWithCheckingBroken() {
try {
return Protocol.read(inputStream);//这个inputStream就是上边pipeline中Connection建立的Socket对象的Input流。
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}
总结:
同一个jedis返回的pipeline对象( Pipeline p = jedis.pipelined();)会有唯一一个Connection(理解为Socket就行了),无论你调用几次jedis.pipelined(),一个Jedis有且仅有一个Socket.
然后你用这个pipeline去执行一系列命令:
Response<String> hash = p.hget("hash","foo");
Response<Set<String>> zset = p.zrange("zset",0,-1);
Response<String> set = p.spop("set");
Response<Boolean> blist = p.exists("list");
当你执行上边的代码,这个建立了的Connection ,通过唯一的Socket对象去写入命令(执行redis命令),这个socket使用的是阻塞的IO流(详见源码)。也就是你每次调用一个命令(如 p.hget、 p.zrange等等),这个阻塞IO流就会写入一次命令。最终:
调用p.sync();一次性的从流中读取所有内容(之前所有redis命令的返回结果)
为什么使用Pipeline
使用pipeline的优势在于,我们不用等每一次redis命令返回,可以异步(当然这里的异步不是站在Socket IO的角度)的执行每一个命令,不去等待这个命令的返回。然而jedis.doXXX()(如,jedis.set(“string”,“foo”);)都是需要获取命令执行的返回结果之后,才去执行下一个jedis.doXXX()
看来同事说错了!并不是一次写入,而是一次又一次(也就是源码里所谓的异常方式)。读取是一次性读取
夜深了,碎吧,夜猫子们