1.BlockIo狀態(tài)遷移
RecordFile是很重要的一個(gè)類,幾個(gè)重要的變量:
final TransactionManager txnMgr;
private final LinkedList free = new LinkedList();
private final HashMap inUse = new HashMap();
private final HashMap dirty = new HashMap();
private final HashMap inTxn = new HashMap();
private RandomAccessFile file;
free是一個(gè)LinkedList,F(xiàn)IFO.其他幾個(gè)維護(hù)著BlockIo對象的狀態(tài),如果BlockIo對象
從free里面取出來了,那么它的狀態(tài)就是inUse了,所以RecordFile類get方法最后
會將BlockIo對象放到inUse的Map中:
BlockIo node;
......
inUse.put(key, node);
node.setClean(); //注意此時(shí)node被設(shè)置為dirty=false,也就是說BlockIo也有dirty這個(gè)指標(biāo)
return node;
inUse的BlockIo對象如果被修改了,那么它的狀態(tài)就變成dirty了。由于從inUse中取出的對象是否發(fā)生
了改變RecordFile對象不知道,需要調(diào)用者調(diào)用一個(gè)方法release:
/**
* Releases a block.
*
* @param blockid The record number to release.
* @param isDirty If true, the block was modified since the get().
*/
void release(long blockid, boolean isDirty)
throws IOException {
BlockIo node = (BlockIo) inUse.get(new Long(blockid));
if (node == null)
throw new IOException("bad blockid " + blockid + " on release");
if (!node.isDirty() && isDirty)
node.setDirty();
release(node);
}
/**
* Releases a block.
*
* @param block The block to release.
*/
void release(BlockIo block) {
Long key = new Long(block.getBlockId());
inUse.remove(key);
if (block.isDirty()) {
// System.out.println( "Dirty: " + key + block );
dirty.put(key, block);
} else {
if (!transactionsDisabled && block.isInTransaction()) {
inTxn.put(key, block);
} else {
free.add(block);
}
}
}
release方法輸入兩個(gè)參數(shù),其中一個(gè)是isDirty,如果取出來的BlockIo對象修改了
就應(yīng)該將isDirty置為true,否則修改不會被保存。也就是說修改后,BlockIo對象
被放入map dirty中。
release以后,BlockIo對象可能有三個(gè)狀態(tài),首先它會從inUse map里面刪除。如果
BlockIo對象被修改,則被放入dirty map中。如果沒有修改,就有可能放入free map中。
做完修改后,可以關(guān)閉這個(gè)RecordFile對象。
RecordFile file = new RecordFile( testFileName );
byte[] data = file.get( 0 ).getData();
data[ 14 ] = (byte) 'b';
file.release( 0, true );
file.close();
close方法的工作如下:
void close() throws IOException {
if (!dirty.isEmpty()) {
commit();
}
txnMgr.shutdown();
......
file.close(); //此時(shí)RandomAccessFile對象close
file = null;
}
可以看到close方法里面主要就是commit這個(gè)事務(wù):
for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
BlockIo node = (BlockIo) i.next();
i.remove();
// System.out.println("node " + node + " map size now " + dirty.size());
if (transactionsDisabled) {
long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
file.write(node.getData());
node.setClean();
free.add(node);
}
else {
txnMgr.add(node);
inTxn.put(new Long(node.getBlockId()), node);
}
}
如果transaction被disable,那么每一個(gè)節(jié)點(diǎn)進(jìn)行更新,如下:
long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
file.write(node.getData());
node.setClean();
free.add(node);
從最后一行,BlockIo對象node的狀態(tài)重新變?yōu)閒ree。以上狀態(tài)的變化是如下一個(gè)循環(huán):
free -> inUse -> dirty -> inTxn -> free
2.BlockIo的獲取
RecordFile對應(yīng)兩個(gè)文件,一個(gè)是.db文件,另一個(gè)是.lg文件。緩存對象即BlockIo是操作
的最小單元,在get方法中,如果指定的blockid不在inTxn,dirty和free中,那么通過
getNewNode(blockid)得到一個(gè)新的BlockIo對象,如果blockid在有效范圍內(nèi)(這個(gè)判斷是通過
計(jì)算offset得到的,offset=blockid*BLOCK_SIZE,offset小于.db文件的大小,那說明blockid對應(yīng)
的數(shù)據(jù)塊在文件內(nèi)),那么就從.db文件中讀取去blockid對應(yīng)的數(shù)據(jù)庫。如果不在有效范圍內(nèi),那么
數(shù)據(jù)塊就是cleanData的copy,即A block of clean data。
// get a new node and read it from the file
node = getNewNode(blockid);
long offset = blockid * BLOCK_SIZE;
if (file.length() > 0 && offset <= file.length()) {
read(file, offset, node.getData(), BLOCK_SIZE);
} else {
System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
}
inUse.put(key, node);
node.setClean();
return node;
這里getNewNode()方法如下:
private BlockIo getNewNode(long blockid)
throws IOException
{
BlockIo retval = null;
if (!free.isEmpty()) {
retval = (BlockIo) free.removeFirst();
}
if (retval == null)
retval = new BlockIo(0, new byte[BLOCK_SIZE]);
retval.setBlockId(blockid);
retval.setView(null);
return retval;
}
getNewNode()不是直接new BlockIo(),而是從free中取,free中的
BlockIo對象沒有被使用,則直接利用,采取的方式是Least Recently Used策略。BlockIo實(shí)現(xiàn)了自定義的Externalizable
序列化:
// implement externalizable interface
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException {
blockId = in.readLong();
int length = in.readInt();
data = new byte[length];
in.readFully(data);
}
// implement externalizable interface
public void writeExternal(ObjectOutput out) throws IOException {
out.writeLong(blockId);
out.writeInt(data.length);
out.write(data);
}
3.TransactionManager的事務(wù)處理
序列化用在事務(wù)這塊,如果沒有啟動(dòng)事務(wù),RecordFile直接寫到.db文件中,不會進(jìn)行序列化操作。
RecordFile演示了事務(wù)的操作:
final TransactionManager txnMgr;
......
txnMgr = new TransactionManager(this);
......
if (!transactionsDisabled) {
txnMgr.start();
}
......
if (!transactionsDisabled)
txnMgr.add(node);
......
if (!transactionsDisabled) {
txnMgr.commit();
}
......
txnMgr.shutdown();
以上是一個(gè)完整的事務(wù)過程,下面對以上過程發(fā)生的操作深入闡述:
TransactionManager(this)以RecordFile作為參數(shù)進(jìn)行構(gòu)造:
TransactionManager(RecordFile owner) throws IOException {
this.owner = owner;
recover();
open();
}
TransactionManager持有RecordFile的引用,然后進(jìn)行recover和open操作。recover主要是
對log file進(jìn)行操作,如果有事務(wù)沒有執(zhí)行,那么執(zhí)行事務(wù)將log file中的數(shù)據(jù)寫到.db文件中
并且對RecordFile進(jìn)行sync()操作,最后把log file刪除。
private void recover() throws IOException {
String logName = makeLogName();
File logFile = new File(logName);
if (!logFile.exists())
return;
if (logFile.length() == 0) {
logFile.delete();
return;
}
FileInputStream fis = new FileInputStream(logFile);
ObjectInputStream ois = new ObjectInputStream(fis);
try {
if (ois.readShort() != Magic.LOGFILE_HEADER)
throw new Error("Bad magic on log file");
} catch (IOException e) {
// corrupted/empty logfile
logFile.delete();
return;
}
while (true) {
ArrayList blocks = null;
try {
blocks = (ArrayList) ois.readObject();
} catch (ClassNotFoundException e) {
throw new Error("Unexcepted exception: " + e);
} catch (IOException e) {
// corrupted logfile, ignore rest of transactions
break;
}
synchronizeBlocks(blocks.iterator(), false);
// ObjectInputStream must match exactly each
// ObjectOutputStream created during writes
try {
ois = new ObjectInputStream(fis);
} catch (IOException e) {
// corrupted logfile, ignore rest of transactions
break;
}
}
owner.sync();
logFile.delete();
}
open的操作相對簡單很多,只是進(jìn)行一些初始化賦值工作:
/** Opens the log file */
private void open() throws IOException {
fos = new FileOutputStream(makeLogName());
oos = new ObjectOutputStream(fos);
oos.writeShort(Magic.LOGFILE_HEADER);
oos.flush();
curTxn = -1;
}
下一步就是start這個(gè)txnMgr了:
void start() throws IOException {
curTxn++;
if (curTxn == _maxTxns) {
synchronizeLogFromMemory();
curTxn = 0;
}
txns[curTxn] = new ArrayList();
}
start的時(shí)候就將當(dāng)前事務(wù)數(shù)增加1,如果當(dāng)前事務(wù)數(shù)等于設(shè)置的最大事務(wù)數(shù),就進(jìn)行sync處理。
sync處理的代碼如下:
/** Synchs in-core transactions to data file and opens a fresh log */
private void synchronizeLogFromMemory() throws IOException {
close();
TreeSet blockList = new TreeSet( new BlockIoComparator() );
int numBlocks = 0;
int writtenBlocks = 0;
for (int i = 0; i < _maxTxns; i++) {
if (txns[i] == null)
continue;
// Add each block to the blockList, replacing the old copy of this
// block if necessary, thus avoiding writing the same block twice
for (Iterator k = txns[i].iterator(); k.hasNext(); ) {
BlockIo block = (BlockIo)k.next();
if ( blockList.contains( block ) ) {
block.decrementTransactionCount();
}
else {
writtenBlocks++;
boolean result = blockList.add( block );
}
numBlocks++;
}
txns[i] = null;
}
// Write the blocks from the blockList to disk
synchronizeBlocks(blockList.iterator(), true);
owner.sync();
open();
}
sync的主要操作就是synchronizeBlocks操作:
private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
throws IOException
{
// write block vector elements to the data file.
while ( blockIterator.hasNext() ) {
BlockIo cur = (BlockIo)blockIterator.next();
owner.synch(cur);
if (fromCore) {
cur.decrementTransactionCount();
if (!cur.isInTransaction()) {
owner.releaseFromTransaction(cur, true);
}
}
}
}
接下來的操作txnMgr.add(node):
/**
* Indicates the block is part of the transaction.
*/
void add(BlockIo block) throws IOException {
block.incrementTransactionCount();
txns[curTxn].add(block);
}
這個(gè)操作很簡單,就是將BlockIo對象放入到當(dāng)前事務(wù)的ArrayList當(dāng)中。之后commit操作:
/**
* Commits the transaction to the log file.
*/
void commit() throws IOException {
oos.writeObject(txns[curTxn]);
sync();
// set clean flag to indicate blocks have been written to log
setClean(txns[curTxn]);
// open a new ObjectOutputStream in order to store
// newer states of BlockIo
oos = new ObjectOutputStream(fos);
}
最后的操作就是txnMgr.shutdown():
/**
* Shutdowns the transaction manager. Resynchronizes outstanding
* logs.
*/
void shutdown() throws IOException {
synchronizeLogFromMemory();
close();
}