PostgreSQL的存储系统二:REDOLOG文件存储结构二

前端之家收集整理的这篇文章主要介绍了PostgreSQL的存储系统二:REDOLOG文件存储结构二前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

REDOLOG文件里的用户数据和数据文件里的用户数据存储结构相同

几个月前同事给台湾一家公司培训《pg9 ad admin》时,有个学员提及WAL里记录的内容Query时的sql语句(比如insert等),同事告知WAL里记录的tuple信息,而非sql,该学员坚持里面是sqlsql+tuple,并说oracleredo日志里记录的是sql(不知到这个从哪里知道的,也许是日志挖掘出来sql的缘由吧)。便看了一下源码(还是开源的好)。

前面我写过一篇文章Postgresql的存储系统二:REDOLOG文件存储结构》,见地址http://beigang.iteye.com/blog/1565121http://blog.csdn.net/beiigang/article/details/7680905,其中提到Pg XLOG文件的存储格式大致如下:

<PageHeaderData>

<XLogRecord>

<rmgr-specific data>

<BkpBlock>

<XLogRecData>里面包括<CheckPoint>

<BkpBlock>

<XLogRecData>

<BkpBlock>

<XLogRecData>

……

用户相关的数据写在XLogRecData结构(定义见下面)的buffer成员里,但具体写成什么样子没有提及,正好这儿再深入讨论一下。

typedefstruct XLogRecData

{

char *data; /* 资源管理器包含数据的开始 */

uint32 len; /* 资源管理器包含数据的长度 */

Buffer buffer; /* 有相应数据的buffer,如果有的话 */

bool buffer_std; /* buffer是否有标准pd_lower/pd_upper */

struct XLogRecData *next; /* 链里的下一个结构 */

} XLogRecData;

为了说清楚这个问题,跑了个例子如下:

INSERT INTO TABLE1(ID,GNAME) VALUES(18,’GangBei’);

看这个例子涉及的调用流程前,先回顾一下pg服务进程的调用流程,一切就绪后进入无限循环,等候客户端指令,


Postgres服务进程调用流程图

这个例子的调用流程和《Postgresql服务过程中的那些事二:Pg服务进程处理简单查询》系列博文中的流程大致相同,也是调用exec_simple_query方法,和前面《Postgresql服务过程中的那些事二:Pg服务进程处理简单查询》中select例子不同的是,本节中insert的例子在portalrun方法调用了执行器的ExecInsert方法,最终调用了heap_insert方法,在这个方法里完成了记录写入数据文件,并调用了XLogInsert方法,完成了XLOG的WAL日志写入。更具体的方法调用流程参见下面的调用流程图,其他和《Postgresql服务过程中的那些事二:Pg服务进程处理简单查询》基本相同的部分略去。



Insert sql 语句调用流程

在heap_insert方法里,组装好tuple,调用RelationGetBufferForTuple方法找到shmem里缓存数据文件块的buffer,调用RelationPutHeapTuple方法,把组装好的元组放到合适的buffer中合适的位置;然后组装XLogRecData类型变量rdata,把buffer赋给XLogRecData的成员buffer,接着调用XLogInsert方法,并传入rdata,在XLogInsert方法里,用memcpy方法rdata写入shmem对应的cache里,最后pg都是通过操作系统接口I/O接口把WAL日志和数据写入对应的文件

既然XLOG里写的 Insertwal日志里的用户数据和数据文件中的一样,那我们简单看一下pg中数据文件里的tupletuple存放在堆中,一个tuple就是一行表记录,在数据文件的页里存放的结构如下图:



数据文件页面布局图




元组结构图

元组头结构和其字段表示意义见下面:

typedefstruct HeapTupleHeaderData

{

union

{

HeapTupleFieldst_heap;

DatumTupleFieldst_datum;

} t_choice;

ItemPointerDatat_ctid; /* current TID of this or newer tuple */

/* Fields below here must match MinimalTupleData! */

uint16 t_infomask2; /* number of attributes + varIoUs flags */

uint16 t_infomask; /* varIoUs flag bits,see below */

uint8 t_hoff; /* sizeof header incl. bitmap,padding */

/* ^ - 23 bytes - ^ */

bits8 t_bits[1]; /* bitmap of NULLs -- VARIABLE LENGTH */

/* MORE DATA FOLLOWS AT END OF STRUCT */

} HeapTupleHeaderData;

typedefstruct HeapTupleFields

{

TransactionIdt_xmin; /* inserting xact ID */

TransactionIdt_xmax; /* deleting or locking xact ID */

union

{

CommandId t_cid; /* inserting or deleting command ID,or both */

TransactionIdt_xvac; /* old-style VACUUM FULL xact ID */

} t_field3;

} HeapTupleFields;

typedefstruct DatumTupleFields

{

int32 datum_len_; /* varlena header (do not touch directly!) */

int32 datum_typmod; /* -1,or identifier of a record type */

Oid datum_typeid; /* composite type OID,or RECORDOID */

/*

* Note: field ordering is chosen with thought that Oid might someday

* widen to 64 bits.

*/

} DatumTupleFields;

typedefstruct ItemPointerData

{

BlockIdDataip_blkid;

OffsetNumberip_posid;

}

Postgresql的元组头结构是MVCC算法的基础。这个以后再说吧。

下面把heap_insert方法XLogInsert方法贴到了下面,为了突显主题,删掉了其余代码,并把XLOG内容相关变量和方法置为红色,方便串读。

Oid

heap_insert(Relation relation,HeapTuple tup,CommandId cid,

int options,BulkInsertState bistate)

{

TransactionId xid = GetCurrentTransactionId();

HeapTuple heaptup;

Buffer buffer;

bool all_visible_cleared = false;

/*1 组装元组头信息 */

tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);

tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);

tup->t_data->t_infomask |= HEAP_XMAX_INVALID;

HeapTupleHeaderSetXmin(tup->t_data,xid);

HeapTupleHeaderSetCmin(tup->t_data,cid);

HeapTupleHeaderSetXmax(tup->t_data,0);

tup->t_tableOid = RelationGetRelid(relation);

heaptup = tup;

/*2 Find buffer to insert this tuple into */

buffer = RelationGetBufferForTuple(relation,heaptup->t_len,

InvalidBuffer,options,bistate);

/*3

* We're about to do the actual insert -- check for conflict at the

* relation or buffer level first,to avoid possibly having to roll back

* work we've just done.

*/

CheckForSerializableConflictIn(relation,NULL,buffer);

/*4 NO EREPORT(ERROR) from here till changes are logged */

START_CRIT_SECTION();

RelationPutHeapTuple(relation,buffer,heaptup)

MarkBufferDirty(buffer);

/* XLOG stuff */

if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))

{

xl_heap_insert xlrec;

xl_heap_header xlhdr;

XLogRecPtr recptr;

XLogRecData rdata[3];

Page page = BufferGetPage(buffer);

uint8 info = XLOG_HEAP_INSERT;

xlrec.all_visible_cleared = all_visible_cleared;

xlrec.target.node = relation->rd_node;

xlrec.target.tid = heaptup->t_self;

rdata[0].data = (char *) &xlrec;

rdata[0].len = SizeOfHeapInsert;

rdata[0].buffer = InvalidBuffer;

rdata[0].next = &(rdata[1]);

xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;

xlhdr.t_infomask = heaptup->t_data->t_infomask;

xlhdr.t_hoff = heaptup->t_data->t_hoff;

/*

* note we mark rdata[1] as belonging to buffer; if XLogInsert decides

* to write the whole page to the xlog,we don't need to store

* xl_heap_header in the xlog.

*/

rdata[1].data = (char *) &xlhdr;

rdata[1].len = SizeOfHeapHeader;

rdata[1].buffer = buffer;

rdata[1].buffer_std = true;

rdata[1].next = &(rdata[2]);

/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */

rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData,t_bits);

rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData,t_bits);

rdata[2].buffer = buffer;

rdata[2].buffer_std = true;

rdata[2].next = NULL;

recptr = XLogInsert(RM_HEAP_ID,info,rdata);

PageSetLSN(page,recptr);

PageSetTLI(page,ThisTimeLineID);

}

END_CRIT_SECTION();

UnlockReleaseBuffer(buffer);

pgstat_count_heap_insert(relation);

return HeapTupleGetOid(tup);

}

XLogRecPtr

XLogInsert(RmgrId rmid,uint8 info,XLogRecData *rdata)

{

XLogCtlInsert *Insert = &XLogCtl->Insert;

XLogRecord *record;

XLogContRecord *contrecord;

XLogRecPtr RecPtr;

XLogRecPtr WriteRqst;

uint32 freespace;

int curridx;

XLogRecData *rdt;

Buffer dtbuf[XLR_MAX_BKP_BLOCKS];

bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];

BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];

XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];

XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];

XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];

XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];

pg_crc32 rdata_crc;

uint32 len,

write_len;

unsigned i;

TRACE_POSTGREsql_XLOG_INSERT(rmid,info);

/*

* Here we scan the rdata chain,determine which buffers must be backed

* up,and compute the CRC values for the data.

*/

START_CRIT_SECTION();

/* Now wait to get insert lock */

LWLockAcquire(WALInsertLock,LW_EXCLUSIVE);

/* Compute record's XLOG location */

curridx = Insert->curridx;

INSERT_RECPTR(RecPtr,Insert,curridx);

/*

* Append the data,including backup blocks if any

*/

/* rdata中的数据写入XLOG */

while (write_len)

{

while (rdata->data == NULL)

rdata = rdata->next;

if (freespace > 0)

{

if (rdata->len > freespace)

{

memcpy(Insert->currpos,rdata->data,freespace);

rdata->data += freespace;

rdata->len -= freespace;

write_len -= freespace;

}

else

{

memcpy(Insert->currpos,rdata->len);

freespace -= rdata->len;

write_len -= rdata->len;

Insert->currpos += rdata->len;

rdata = rdata->next;

continue;

}

}

/* Use next buffer */

updrqst = AdvanceXLInsertBuffer(false);

curridx = Insert->curridx;

/* Insert cont-record header */

Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;

contrecord = (XLogContRecord *) Insert->currpos;

contrecord->xl_rem_len = write_len;

Insert->currpos += SizeOfXLogContRecord;

freespace = INSERT_FREESPACE(Insert);

}

LWLockRelease(WALInsertLock);

XactLastRecEnd = RecPtr;

END_CRIT_SECTION();

return RecPtr;

}

下面这个图是WAL日志中存放的有关的INSERT、UPDATE、DELETE操作的内容,该图引自《Internals Of Postgresql Wal》



就到这儿吧。

-----------------

转载请著明出处: blog.csdn.net/beiigang beigang.iteye.com

猜你在找的Postgre SQL相关文章