REDOLOG文件里的用户数据和数据文件里的用户数据存储结构相同
几个月前同事给台湾一家公司培训《pg9 ad admin》时,有个学员提及WAL里记录的内容为Query时的sql语句(比如insert等),同事告知WAL里记录的tuple信息,而非sql,该学员坚持里面是sql或sql+tuple,并说oracle的redo日志里记录的是sql(不知到这个从哪里知道的,也许是日志挖掘出来sql的缘由吧)。便看了一下源码(还是开源的好)。
前面我写过一篇文章《Postgresql的存储系统二:REDOLOG文件存储结构》,见地址http://beigang.iteye.com/blog/1565121或http://blog.csdn.net/beiigang/article/details/7680905,其中提到Pg XLOG文件的存储格式大致如下:
<PageHeaderData>
<XLogRecord>
<rmgr-specific data>
<BkpBlock>
<XLogRecData>里面包括<CheckPoint>等
<BkpBlock>
<XLogRecData>
<BkpBlock>
<XLogRecData>
……
用户相关的数据写在XLogRecData结构(定义见下面)的buffer成员里,但具体写成什么样子没有提及,正好这儿再深入讨论一下。
typedefstruct XLogRecData
{
char *data; /* 资源管理器包含数据的开始 */
uint32 len; /* 资源管理器包含数据的长度 */
Buffer buffer; /* 有相应数据的buffer,如果有的话 */
bool buffer_std; /* buffer是否有标准pd_lower/pd_upper头 */
struct XLogRecData *next; /* 链里的下一个结构 */
} XLogRecData;
为了说清楚这个问题,跑了个例子如下:
INSERT INTO TABLE1(ID,GNAME) VALUES(18,’GangBei’);
看这个例子涉及的调用流程前,先回顾一下pg服务进程的调用流程,一切就绪后进入无限循环,等候客户端指令,
Postgres服务进程调用流程图
这个例子的调用流程和《Postgresql服务过程中的那些事二:Pg服务进程处理简单查询》系列博文中的流程大致相同,也是调用exec_simple_query方法,和前面《Postgresql服务过程中的那些事二:Pg服务进程处理简单查询》中select例子不同的是,本节中insert的例子在portalrun方法里调用了执行器的ExecInsert方法,最终调用了heap_insert方法,在这个方法里完成了记录写入数据文件,并调用了XLogInsert方法,完成了XLOG的WAL日志写入。更具体的方法调用流程参见下面的调用流程图,其他和《Postgresql服务过程中的那些事二:Pg服务进程处理简单查询》基本相同的部分略去。
在heap_insert方法里,组装好tuple,调用RelationGetBufferForTuple方法找到shmem里缓存数据文件块的buffer,调用RelationPutHeapTuple方法,把组装好的元组放到合适的buffer中合适的位置;然后组装XLogRecData类型变量rdata,把buffer赋给XLogRecData的成员buffer,接着调用XLogInsert方法,并传入rdata,在XLogInsert方法里,用memcpy方法把rdata写入shmem对应的cache里,最后pg都是通过操作系统接口I/O接口把WAL日志和数据写入对应的文件。
既然XLOG里写的 Insert的wal日志里的用户数据和数据文件中的一样,那我们简单看一下pg中数据文件里的tuple,tuple存放在堆中,一个tuple就是一行表记录,在数据文件的页里存放的结构如下图:
元组结构图
元组头结构和其字段表示意义见下面:
typedefstruct HeapTupleHeaderData
{
union
{
HeapTupleFieldst_heap;
DatumTupleFieldst_datum;
} t_choice;
ItemPointerDatat_ctid; /* current TID of this or newer tuple */
/* Fields below here must match MinimalTupleData! */
uint16 t_infomask2; /* number of attributes + varIoUs flags */
uint16 t_infomask; /* varIoUs flag bits,see below */
uint8 t_hoff; /* sizeof header incl. bitmap,padding */
/* ^ - 23 bytes - ^ */
bits8 t_bits[1]; /* bitmap of NULLs -- VARIABLE LENGTH */
/* MORE DATA FOLLOWS AT END OF STRUCT */
} HeapTupleHeaderData;
typedefstruct HeapTupleFields
{
TransactionIdt_xmin; /* inserting xact ID */
TransactionIdt_xmax; /* deleting or locking xact ID */
union
{
CommandId t_cid; /* inserting or deleting command ID,or both */
TransactionIdt_xvac; /* old-style VACUUM FULL xact ID */
} t_field3;
} HeapTupleFields;
typedefstruct DatumTupleFields
{
int32 datum_len_; /* varlena header (do not touch directly!) */
int32 datum_typmod; /* -1,or identifier of a record type */
Oid datum_typeid; /* composite type OID,or RECORDOID */
/*
* Note: field ordering is chosen with thought that Oid might someday
* widen to 64 bits.
*/
} DatumTupleFields;
typedefstruct ItemPointerData
{
BlockIdDataip_blkid;
OffsetNumberip_posid;
}
Postgresql的元组头结构是MVCC算法的基础。这个以后再说吧。
下面把heap_insert方法和XLogInsert方法贴到了下面,为了突显主题,删掉了其余代码,并把XLOG内容相关变量和方法置为红色,方便串读。
Oid
heap_insert(Relation relation,HeapTuple tup,CommandId cid,
int options,BulkInsertState bistate)
{
TransactionId xid = GetCurrentTransactionId();
HeapTuple heaptup;
Buffer buffer;
bool all_visible_cleared = false;
/*1 组装元组头信息 */
tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
HeapTupleHeaderSetXmin(tup->t_data,xid);
HeapTupleHeaderSetCmin(tup->t_data,cid);
HeapTupleHeaderSetXmax(tup->t_data,0);
tup->t_tableOid = RelationGetRelid(relation);
heaptup = tup;
/*2 Find buffer to insert this tuple into */
buffer = RelationGetBufferForTuple(relation,heaptup->t_len,
InvalidBuffer,options,bistate);
/*3
* We're about to do the actual insert -- check for conflict at the
* relation or buffer level first,to avoid possibly having to roll back
* work we've just done.
*/
CheckForSerializableConflictIn(relation,NULL,buffer);
/*4 NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
RelationPutHeapTuple(relation,buffer,heaptup)
MarkBufferDirty(buffer);
/* XLOG stuff */
if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
{
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
XLogRecData rdata[3];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = heaptup->t_self;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapInsert;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
xlhdr.t_infomask = heaptup->t_data->t_infomask;
xlhdr.t_hoff = heaptup->t_data->t_hoff;
/*
* note we mark rdata[1] as belonging to buffer; if XLogInsert decides
* to write the whole page to the xlog,we don't need to store
* xl_heap_header in the xlog.
*/
rdata[1].data = (char *) &xlhdr;
rdata[1].len = SizeOfHeapHeader;
rdata[1].buffer = buffer;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData,t_bits);
rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData,t_bits);
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
recptr = XLogInsert(RM_HEAP_ID,info,rdata);
PageSetLSN(page,recptr);
PageSetTLI(page,ThisTimeLineID);
}
END_CRIT_SECTION();
UnlockReleaseBuffer(buffer);
pgstat_count_heap_insert(relation);
return HeapTupleGetOid(tup);
}
XLogRecPtr
XLogInsert(RmgrId rmid,uint8 info,XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecord *record;
XLogContRecord *contrecord;
XLogRecPtr RecPtr;
XLogRecPtr WriteRqst;
uint32 freespace;
int curridx;
XLogRecData *rdt;
Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
pg_crc32 rdata_crc;
uint32 len,
write_len;
unsigned i;
TRACE_POSTGREsql_XLOG_INSERT(rmid,info);
/*
* Here we scan the rdata chain,determine which buffers must be backed
* up,and compute the CRC values for the data.
*/
START_CRIT_SECTION();
/* Now wait to get insert lock */
LWLockAcquire(WALInsertLock,LW_EXCLUSIVE);
/* Compute record's XLOG location */
curridx = Insert->curridx;
INSERT_RECPTR(RecPtr,Insert,curridx);
/*
* Append the data,including backup blocks if any
*/
/* 把rdata中的数据写入XLOG */
while (write_len)
{
while (rdata->data == NULL)
rdata = rdata->next;
if (freespace > 0)
{
if (rdata->len > freespace)
{
memcpy(Insert->currpos,rdata->data,freespace);
rdata->data += freespace;
rdata->len -= freespace;
write_len -= freespace;
}
else
{
memcpy(Insert->currpos,rdata->len);
freespace -= rdata->len;
write_len -= rdata->len;
Insert->currpos += rdata->len;
rdata = rdata->next;
continue;
}
}
/* Use next buffer */
updrqst = AdvanceXLInsertBuffer(false);
curridx = Insert->curridx;
/* Insert cont-record header */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
contrecord = (XLogContRecord *) Insert->currpos;
contrecord->xl_rem_len = write_len;
Insert->currpos += SizeOfXLogContRecord;
freespace = INSERT_FREESPACE(Insert);
}
LWLockRelease(WALInsertLock);
XactLastRecEnd = RecPtr;
END_CRIT_SECTION();
return RecPtr;
}
下面这个图是WAL日志中存放的有关的INSERT、UPDATE、DELETE操作的内容,该图引自《Internals Of Postgresql Wal》
就到这儿吧。
-----------------
转载请著明出处: blog.csdn.net/beiigang beigang.iteye.com