UNSAFE MODULE; IMPORT FileAttr, MD5, MD5Digest, OSError, Pathname, RsyncBlock, RsyncBlockArraySort, Thread, UnixMisc, Ustat, Word, Wr; REVEAL T = Public BRANDED OBJECT buf: ADDRESS := NIL; size: CARDINAL := 0; start: UNTRACED REF CHAR := NIL; limit: UNTRACED REF CHAR := NIL; md5: TEXT := NIL; END; CONST CharOffset = 3; RsyncFile
CharOffset is an arbitrary non-zero value which improves the quality
of the rolling checksum a little bit. It must be the same on the client
and the server. PROCEDUREChooseBlockSize (fileSize: CARDINAL): CARDINAL =
Choose a good block size for the given file size.
We choose a block size based on several considerations. First, we want to keep the network pipeline full as well as we can. The server cannot do anything with a file until it has all the block information from the client. We would like to have all the block information for a file sitting in the server's receive buffer before the server finishes with the previous file. Otherwise the server will have to stop sending until it has read all of the block information from the network. To avoid this, we try to choose a block size such that all of the block information will fit into the network receive buffer on the server machine.
We clamp the block size within a range that we consider
reasonable.
We would also like to have the final remainder block be as
small as possible, since it will always have to be sent verbatim
by the server. After we have chosen a target block size, we
search in its neighborhood for a block size that will minimize
the remainder.
CONST
MinBlockSize = 1024;
MaxBlockSize = 16 * 1024;
ReceiveBufferSize = 15 * 1024; (* A guess. *)
BlockInfoSize = 26; (* It's 42 chars, but we assume decent compression. *)
SearchRegion = 10; (* On each side of the first choice. *)
MaxBlocks = ReceiveBufferSize DIV BlockInfoSize;
VAR
bestRemainder: CARDINAL;
blockSize := fileSize DIV MaxBlocks;
loSearch := blockSize - SearchRegion;
hiSearch := blockSize + SearchRegion;
BEGIN
IF loSearch < MinBlockSize THEN
loSearch := MinBlockSize;
hiSearch := loSearch + 2*SearchRegion;
ELSIF hiSearch > MaxBlockSize THEN
hiSearch := MaxBlockSize;
loSearch := hiSearch - 2*SearchRegion;
END;
bestRemainder := MaxBlockSize; (* Effectively infinity. *)
FOR bs := loSearch TO hiSearch DO
WITH rem = fileSize MOD bs DO
IF rem < bestRemainder THEN
bestRemainder := rem;
blockSize := bs;
END;
END;
END;
RETURN blockSize;
END ChooseBlockSize;
PROCEDURE Close (rf: T)
RAISES {OSError.E} =
BEGIN
IF rf.buf # NIL THEN
UnixMisc.Unmap(rf.buf, rf.size);
rf.buf := NIL;
END;
rf.size := 0;
rf.start := NIL;
rf.limit := NIL;
END Close;
PROCEDURE GetMD5 (rf: T): TEXT =
VAR
md5 := MD5.New();
BEGIN
TRY
md5.updateRaw(rf.start, rf.size);
FINALLY
RETURN md5.finish();
END;
END GetMD5;
PROCEDURE Open (p: Pathname.T;
blockSize: CARDINAL := 0): T
RAISES {OSError.E} =
VAR
rf: T;
statbuf: Ustat.struct_stat;
BEGIN
rf := NEW(T);
rf.buf := UnixMisc.MapFile(p, statbuf);
rf.attr := FileAttr.FromStat(statbuf);
rf.size := ORD(statbuf.st_size);
rf.start := rf.buf;
rf.limit := rf.start + rf.size;
IF blockSize = 0 THEN
blockSize := ChooseBlockSize(rf.size);
END;
rf.blockSize := blockSize;
RETURN rf;
END Open;
***************************************************************************
TYPE
BlockIteratorRep = BlockIterator OBJECT
rf: T;
ptr: UNTRACED REF CHAR;
blockNum: CARDINAL := 0;
OVERRIDES
next := BlockIteratorNext;
END;
PROCEDURE IterateBlocks (rf: T): BlockIterator =
BEGIN
RETURN NEW(BlockIteratorRep, rf := rf, ptr := rf.start);
END IterateBlocks;
PROCEDURE BlockIteratorNext (bi: BlockIteratorRep;
VAR block: RsyncBlock.T): BOOLEAN =
VAR
blockSize: CARDINAL;
BEGIN
IF bi.ptr >= bi.rf.limit THEN (* All done. *)
RETURN FALSE;
END;
blockSize := MIN(bi.rf.limit - bi.ptr, bi.rf.blockSize);
block.num := bi.blockNum;
block.md5 := BlockMD5(bi.ptr, blockSize);
block.rsum := BlockRsum(bi.ptr, blockSize);
INC(bi.blockNum);
INC(bi.ptr, blockSize);
RETURN TRUE;
END BlockIteratorNext;
***************************************************************************
PROCEDURE***************************************************************************BlockMD5 (ptr: UNTRACED REF CHAR; count: CARDINAL): MD5Digest.T = VAR digest: MD5Digest.T; md5 := MD5.New(); BEGIN TRY md5.updateRaw(ptr, count); FINALLY md5.finishRaw(digest); RETURN digest; END; END BlockMD5; PROCEDUREBlockRsum (ptr: UNTRACED REF CHAR; count: CARDINAL): Word.T = VAR limit := ptr + count; a, b: Word.T := 0; BEGIN (* This first loop is just an unrolled version of the second loop. *) WHILE ptr <= limit - 4 DO b := Word.Plus( b, Word.Plus( Word.Times(4, Word.Plus(a, ORD(ptr^))), Word.Plus( Word.Times(3, ORD(LOOPHOLE(ptr+1, UNTRACED REF CHAR)^)), Word.Plus( Word.Times(2, ORD(LOOPHOLE(ptr+2, UNTRACED REF CHAR)^)), Word.Plus( ORD(LOOPHOLE(ptr+3, UNTRACED REF CHAR)^), 10*CharOffset))))); a := Word.Plus( a, Word.Plus( ORD(ptr^), Word.Plus( ORD(LOOPHOLE(ptr+1, UNTRACED REF CHAR)^), Word.Plus( ORD(LOOPHOLE(ptr+2, UNTRACED REF CHAR)^), Word.Plus( ORD(LOOPHOLE(ptr+3, UNTRACED REF CHAR)^), 4*CharOffset))))); INC(ptr, 4); END; WHILE ptr < limit DO a := Word.Plus(a, ORD(ptr^)+CharOffset); b := Word.Plus(b, a); INC(ptr); END; RETURN Word.Or(Word.Shift(Word.And(b, 16_ffff), 16), Word.And(a, 16_ffff)); END BlockRsum;
CONST HashBits = 14; HashMask = Word.Not(Word.Shift(Word.Not(0), HashBits)); HashShift = (Word.Size - HashBits) DIV 2;To hash a rolling checksum, we take its middle
HashBits bits.
TYPE
HashTab = ARRAY [0..HashMask] OF INTEGER;
DiffIteratorRep = DiffIterator OBJECT
rf: T;
ptr: UNTRACED REF CHAR; (* Start of checksummed block. *)
rsum: Word.T;
blocks: REF ARRAY OF RsyncBlock.T;
seqVec: REF ARRAY OF CARDINAL := NIL; (* Block # => "blocks" index. *)
hashTab: REF HashTab := NIL;
OVERRIDES
next := DiffIteratorNext;
END;
PROCEDURE DiffIteratorNext (di: DiffIteratorRep;
wr: Wr.T;
VAR blocks: BlockRange): BOOLEAN
RAISES {Thread.Alerted, Wr.Failure} =
VAR
hash: Word.T;
blockIndex := -1;
blockNum: CARDINAL;
a, b: Word.T;
buff: ARRAY [0..8191] OF CHAR;
buffCt: CARDINAL := 0;
BEGIN
IF di.ptr >= di.rf.limit THEN (* We have already sent everything. *)
(* NIL out some pointers in an attempt to get their large
amounts of memory collected earlier. *)
di.blocks := NIL;
di.seqVec := NIL;
di.hashTab := NIL;
RETURN FALSE;
END;
(* Scan forward, copying characters, until we find a matching block
or we reach the end of the file. *)
VAR
limit := di.rf.limit;
blockSize := di.rf.blockSize;
hashTab := di.hashTab;
rsum := di.rsum;
ptr := di.ptr;
BEGIN
REPEAT
(* See whether the client has a block matching our current one. *)
IF limit - ptr >= blockSize AND hashTab # NIL THEN
(* It's worth looking for a matching block. *)
hash := Word.Extract(rsum, HashShift, HashBits);
blockIndex := hashTab[hash];
IF blockIndex >= 0 THEN (* A block matches the hash. *)
di.ptr := ptr;
di.rsum := rsum;
blockIndex := FindMatch(di, hash, blockIndex);
IF blockIndex >= 0 THEN EXIT END; (* Found a match. *)
END;
END;
(* No matching block at this position; output the character. *)
IF buffCt >= NUMBER(buff) THEN (* Flush buffer. *)
Wr.PutString(wr, buff);
buffCt := 0;
END;
buff[buffCt] := ptr^;
INC(buffCt);
(* Update the checksum, if we're going to use it. *)
IF limit - (ptr+1) >= blockSize
AND hashTab # NIL THEN
a := Word.And(rsum, 16_ffff);
b := Word.And(Word.RightShift(rsum, 16), 16_ffff);
(* Unchecksum the first character of the block. *)
WITH ch = ORD(ptr^) + CharOffset DO
a := Word.Minus(a, ch);
b := Word.Minus(b, Word.Times(blockSize, ch));
END;
(* Checksum the character just entering the block. *)
WITH ch = ORD(LOOPHOLE(ptr+blockSize, UNTRACED REF CHAR)^)
+ CharOffset DO
a := Word.Plus(a, ch);
b := Word.Plus(b, a);
END;
rsum := Word.Or(Word.Shift(Word.And(b, 16_ffff), 16),
Word.And(a, 16_ffff));
END;
INC(ptr); (* Advance the block. *)
UNTIL ptr >= limit;
IF buffCt > 0 THEN (* Write out the remaining buffered characters. *)
Wr.PutString(wr, SUBARRAY(buff, 0, buffCt));
buffCt := 0;
END;
IF blockIndex >= 0 THEN (* Found a matching block. *)
blocks.start := di.blocks[blockIndex].num;
blockNum := blocks.start;
(* Scan forward through consecutive blocks until we find one that
doesn't match. *)
REPEAT
(* Advance past the matching block. *)
INC(blockNum);
INC(ptr, blockSize);
(* Bail out if there are no more complete blocks, or if we've
just matched the last block of the client's file. *)
IF limit - ptr < blockSize OR blockNum > LAST(di.seqVec^) THEN
EXIT;
END;
blockIndex := di.seqVec[blockNum];
UNTIL NOT MD5Digest.Equal(di.blocks[blockIndex].md5,
BlockMD5(ptr, blockSize));
IF limit - ptr >= blockSize THEN (* Compute rolling checksum. *)
rsum := BlockRsum(ptr, blockSize);
END;
blocks.count := blockNum - blocks.start;
ELSE
blocks.start := 0;
blocks.count := 0;
END;
di.ptr := ptr;
di.rsum := rsum;
RETURN TRUE;
END;
END DiffIteratorNext;
PROCEDURE FindMatch (di: DiffIteratorRep;
hash: Word.T;
blockIndex: INTEGER): INTEGER =
Returns index of matching block, or -1 if no match.
VAR
md5: MD5Digest.T;
needMD5 := TRUE;
BEGIN
IF blockIndex >= 0 THEN
REPEAT
IF di.blocks[blockIndex].rsum = di.rsum THEN
(* The full 32-bit checksums match too. Check the MD5s. *)
IF needMD5 THEN
md5 := BlockMD5(di.ptr, di.rf.blockSize);
needMD5 := FALSE;
END;
IF MD5Digest.Equal(di.blocks[blockIndex].md5, md5) THEN (* Found. *)
RETURN blockIndex;
END;
END;
INC(blockIndex);
UNTIL blockIndex > LAST(di.blocks^)
OR Word.Extract(di.blocks[blockIndex].rsum, HashShift, HashBits) # hash;
END;
RETURN -1;
END FindMatch;
PROCEDURE HashCompare (READONLY a, b: RsyncBlock.T): [-1..1] =
VAR
aHash := Word.Extract(a.rsum, HashShift, HashBits);
bHash := Word.Extract(b.rsum, HashShift, HashBits);
BEGIN
IF aHash < bHash THEN RETURN -1 END;
IF aHash > bHash THEN RETURN 1 END;
RETURN 0;
END HashCompare;
PROCEDURE IterateDiffs (rf: T;
blocks: REF ARRAY OF RsyncBlock.T): DiffIterator =
VAR
di: DiffIteratorRep;
hashTabIndex: CARDINAL;
hash: Word.T;
BEGIN
di := NEW(DiffIteratorRep,
rf := rf,
ptr := rf.start,
blocks := blocks);
IF NUMBER(blocks^) > 0 AND rf.size >= rf.blockSize THEN
RsyncBlockArraySort.Sort(di.blocks^, HashCompare);
di.seqVec := NEW(REF ARRAY OF CARDINAL, NUMBER(di.blocks^));
di.hashTab := NEW(REF HashTab);
hashTabIndex := 0;
FOR blockIndex := FIRST(di.blocks^) TO LAST(di.blocks^) DO
di.seqVec[di.blocks[blockIndex].num] := blockIndex;
hash := Word.Extract(di.blocks[blockIndex].rsum, HashShift, HashBits);
IF hash >= hashTabIndex THEN (* Not a repeat of previous hash *)
FOR hti := hashTabIndex TO hash-1 DO
di.hashTab[hti] := -1;
END;
di.hashTab[hash] := blockIndex;
hashTabIndex := hash + 1;
END;
END;
FOR hti := hashTabIndex TO LAST(di.hashTab^) DO
di.hashTab[hti] := -1;
END;
di.rsum := BlockRsum(di.ptr, rf.blockSize);
END;
RETURN di;
END IterateDiffs;
BEGIN
END RsyncFile.