引言
我们在这个系列第一篇文章中提到过,如果需要消息落地而对存储子系统的选择上,从速度上来说 文件系统>分布式KV(持久化)>分布式文件系统>数据库
。而NSQ选择了文件系统作为存储子系统。这篇文章将重点介绍nsq 对于文件的操作。
何时写入文件?
在内存的msg chan buffer 已满的时候,会将msg 写入文件,代码如下:
func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b,m,c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR,"CHANNEL(%s): Failed to write message to backend - %s",c.name,err) return err } } return nil }
写一条message
func (d *diskQueue) writeOne(data []byte) error {
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { ... writePos int64 ... writeFile *os.File ... }
if d.writePos > 0 { _,err = d.writeFile.Seek(d.writePos,0)
然后以二进制的方式写入data的size:
dataLen := int32(len(data)) d.writeBuf.Reset() err = binary.Write(&d.writeBuf,binary.BigEndian,dataLen)
此处的巧妙在于binary.Write
会根据写入数据的类型写入一段固定大小的数据。此处dataLen 是int32,所以会写入一段4个byte的数据来表示data的size。读取的时候先读一段4个byte的数据就知道了data的size。
之后写入data:
_,err = d.writeBuf.Write(data) _,err = d.writeFile.Write(d.writeBuf.Bytes())
读一条message
readOne
函数以byte 数组的形式读一条message 出来
func (d *diskQueue) readOne() ([]byte,error) {
diskQueue
维护了当前读取的文件和文件的offset
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { readPos int64 ... readFile *os.File
if d.readPos > 0 { _,err = d.readFile.Seek(d.readPos,0) if err != nil { d.readFile.Close() d.readFile = nil return nil,err } }
先把一个message的大小读出来:
err = binary.Read(d.reader,&msgSize)
msgSize
和写文件时候的dataLen
都是int32
类型
有了msgSize
,定义一段msgSize
大小的buffer,从文件里读一段数据来填满这个buffer,buffer里面的数据就是一条message
readBuf := make([]byte,msgSize) _,err = io.ReadFull(d.reader,readBuf)