$treeview $search $mathjax $extrastylesheet
librsync
2.0.2
$projectbrief
|
$projectbrief
|
$searchbox |
00001 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- 00002 * 00003 * librsync -- library for network deltas 00004 * 00005 * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net> 00006 * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au> 00007 * 00008 * This program is free software; you can redistribute it and/or modify 00009 * it under the terms of the GNU Lesser General Public License as published by 00010 * the Free Software Foundation; either version 2.1 of the License, or 00011 * (at your option) any later version. 00012 * 00013 * This program is distributed in the hope that it will be useful, 00014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 * GNU Lesser General Public License for more details. 00017 * 00018 * You should have received a copy of the GNU Lesser General Public License 00019 * along with this program; if not, write to the Free Software 00020 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 00021 */ 00022 00023 /*= 00024 | Let's climb to the TOP of that 00025 | MOUNTAIN and think about STRIP 00026 | MINING!! 00027 */ 00028 00029 /** \file delta.c -- Generate in streaming mode an rsync delta given a set of 00030 * signatures, and a new file. 00031 * 00032 * The size of blocks for signature generation is determined by the block size 00033 * in the incoming signature. 00034 * 00035 * To calculate a signature, we need to be able to see at least one block of 00036 * the new file at a time. Once we have that, we calculate its weak signature, 00037 * and see if there is any block in the signature hash table that has the same 00038 * weak sum. If there is one, then we also compute the strong sum of the new 00039 * block, and cross check that. If they're the same, then we can assume we have 00040 * a match. 00041 * 00042 * The final block of the file has to be handled a little differently, because 00043 * it may be a short match. Short blocks in the signature don't include their 00044 * length -- we just allow for the final short block of the file to match any 00045 * block in the signature, and if they have the same checksum we assume they 00046 * must have the same length. Therefore, when we emit a COPY command, we have 00047 * to send it with a length that is the same as the block matched, and not the 00048 * block length from the signature. 00049 * 00050 * Profiling results as of v1.26, 2001-03-18: 00051 * 00052 * If everything matches, then we spend almost all our time in rs_mdfour64 and 00053 * rs_weak_sum, which is unavoidable and therefore a good profile. 00054 * 00055 * If nothing matches, it is not so good. 00056 * 00057 * 2002-06-26: Donovan Baarda 00058 * 00059 * The following is based entirely on pysync. It is much cleaner than the 00060 * previous incarnation of this code. It is slightly complicated because in 00061 * this case the output can block, so the main delta loop needs to stop when 00062 * this happens. 00063 * 00064 * In pysync a 'last' attribute is used to hold the last miss or match for 00065 * extending if possible. In this code, basis_len and scoop_pos are used 00066 * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0 00067 * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie, 00068 * nothing). 00069 * 00070 * Pysync is also slightly different in that a 'flush' method is available to 00071 * force output of accumulated data. This 'flush' is use to finalise delta 00072 * calculation. In librsync input is terminated with an eof flag on the input 00073 * stream. I have structured this code similar to pysync with a seperate flush 00074 * function that is used when eof is reached. This allows for a flush style API 00075 * if one is ever needed. Note that flush in pysync can be used for more than 00076 * just terminating delta calculation, so a flush based API can in some ways be 00077 * more flexible... 00078 * 00079 * The input data is first scanned, then processed. Scanning identifies input 00080 * data as misses or matches, and emits the instruction stream. Processing the 00081 * data consumes it off the input scoop and outputs the processed miss data 00082 * into the tube. 00083 * 00084 * The scoop contains all data yet to be processed. The scoop_pos is an index 00085 * into the scoop that indicates the point scanned to. As data is scanned, 00086 * scoop_pos is incremented. As data is processed, it is removed from the scoop 00087 * and scoop_pos adjusted. Everything gets complicated because the tube can 00088 * block. When the tube is blocked, no data can be processed. */ 00089 00090 #include "config.h" 00091 00092 #include <assert.h> 00093 #include <stdlib.h> 00094 #include <stdio.h> 00095 00096 #include "librsync.h" 00097 #include "emit.h" 00098 #include "stream.h" 00099 #include "util.h" 00100 #include "sumset.h" 00101 #include "job.h" 00102 #include "trace.h" 00103 #include "rollsum.h" 00104 00105 /* used by rdiff, but now redundant */ 00106 int rs_roll_paranoia = 0; 00107 00108 static rs_result rs_delta_s_scan(rs_job_t *job); 00109 static rs_result rs_delta_s_flush(rs_job_t *job); 00110 static rs_result rs_delta_s_end(rs_job_t *job); 00111 static inline void rs_getinput(rs_job_t *job); 00112 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, 00113 size_t *match_len); 00114 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, 00115 size_t match_len); 00116 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len); 00117 static inline rs_result rs_appendflush(rs_job_t *job); 00118 static inline rs_result rs_processmatch(rs_job_t *job); 00119 static inline rs_result rs_processmiss(rs_job_t *job); 00120 00121 /** Get a block of data if possible, and see if it matches. 00122 * 00123 * On each call, we try to process all of the input data available on the scoop 00124 * and input buffer. */ 00125 static rs_result rs_delta_s_scan(rs_job_t *job) 00126 { 00127 const size_t block_len = job->signature->block_len; 00128 rs_long_t match_pos; 00129 size_t match_len; 00130 rs_result result; 00131 Rollsum test; 00132 00133 rs_job_check(job); 00134 /* read the input into the scoop */ 00135 rs_getinput(job); 00136 /* output any pending output from the tube */ 00137 result = rs_tube_catchup(job); 00138 /* while output is not blocked and there is a block of data */ 00139 while ((result == RS_DONE) 00140 && ((job->scoop_pos + block_len) < job->scoop_avail)) { 00141 /* check if this block matches */ 00142 if (rs_findmatch(job, &match_pos, &match_len)) { 00143 /* append the match and reset the weak_sum */ 00144 result = rs_appendmatch(job, match_pos, match_len); 00145 RollsumInit(&job->weak_sum); 00146 } else { 00147 /* rotate the weak_sum and append the miss byte */ 00148 RollsumRotate(&job->weak_sum, job->scoop_next[job->scoop_pos], 00149 job->scoop_next[job->scoop_pos + block_len]); 00150 result = rs_appendmiss(job, 1); 00151 if (rs_roll_paranoia) { 00152 RollsumInit(&test); 00153 RollsumUpdate(&test, job->scoop_next + job->scoop_pos, 00154 block_len); 00155 if (RollsumDigest(&test) != RollsumDigest(&job->weak_sum)) { 00156 rs_fatal("mismatch between rolled sum " FMT_WEAKSUM 00157 " and check " FMT_WEAKSUM "", 00158 RollsumDigest(&job->weak_sum), 00159 RollsumDigest(&test)); 00160 } 00161 00162 } 00163 } 00164 } 00165 /* if we completed OK */ 00166 if (result == RS_DONE) { 00167 /* if we reached eof, we can flush the last fragment */ 00168 if (job->stream->eof_in) { 00169 job->statefn = rs_delta_s_flush; 00170 return RS_RUNNING; 00171 } else { 00172 /* we are blocked waiting for more data */ 00173 return RS_BLOCKED; 00174 } 00175 } 00176 return result; 00177 } 00178 00179 static rs_result rs_delta_s_flush(rs_job_t *job) 00180 { 00181 rs_long_t match_pos; 00182 size_t match_len; 00183 rs_result result; 00184 00185 rs_job_check(job); 00186 /* read the input into the scoop */ 00187 rs_getinput(job); 00188 /* output any pending output */ 00189 result = rs_tube_catchup(job); 00190 /* while output is not blocked and there is any remaining data */ 00191 while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) { 00192 /* check if this block matches */ 00193 if (rs_findmatch(job, &match_pos, &match_len)) { 00194 /* append the match and reset the weak_sum */ 00195 result = rs_appendmatch(job, match_pos, match_len); 00196 RollsumInit(&job->weak_sum); 00197 } else { 00198 /* rollout from weak_sum and append the miss byte */ 00199 RollsumRollout(&job->weak_sum, job->scoop_next[job->scoop_pos]); 00200 rs_trace("block reduced to " FMT_SIZE "", job->weak_sum.count); 00201 result = rs_appendmiss(job, 1); 00202 } 00203 } 00204 /* if we are not blocked, flush and set end statefn. */ 00205 if (result == RS_DONE) { 00206 result = rs_appendflush(job); 00207 job->statefn = rs_delta_s_end; 00208 } 00209 if (result == RS_DONE) { 00210 return RS_RUNNING; 00211 } 00212 return result; 00213 } 00214 00215 static rs_result rs_delta_s_end(rs_job_t *job) 00216 { 00217 rs_emit_end_cmd(job); 00218 return RS_DONE; 00219 } 00220 00221 static inline void rs_getinput(rs_job_t *job) 00222 { 00223 size_t len; 00224 00225 len = rs_scoop_total_avail(job); 00226 if (job->scoop_avail < len) { 00227 rs_scoop_input(job, len); 00228 } 00229 } 00230 00231 /** find a match at scoop_pos, returning the match_pos and match_len. 00232 * 00233 * Note that this will calculate weak_sum if required. It will also determine 00234 * the match_len. 00235 * 00236 * This routine could be modified to do xdelta style matches that would extend 00237 * matches past block boundaries by matching backwards and forwards beyond the 00238 * block boundaries. Extending backwards would require decrementing scoop_pos 00239 * as appropriate. */ 00240 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, 00241 size_t *match_len) 00242 { 00243 const size_t block_len = job->signature->block_len; 00244 00245 /* calculate the weak_sum if we don't have one */ 00246 if (job->weak_sum.count == 0) { 00247 /* set match_len to min(block_len, scan_avail) */ 00248 *match_len = job->scoop_avail - job->scoop_pos; 00249 if (*match_len > block_len) { 00250 *match_len = block_len; 00251 } 00252 /* Update the weak_sum */ 00253 RollsumUpdate(&job->weak_sum, job->scoop_next + job->scoop_pos, 00254 *match_len); 00255 rs_trace("calculate weak sum from scratch length " FMT_SIZE "", 00256 job->weak_sum.count); 00257 } else { 00258 /* set the match_len to the weak_sum count */ 00259 *match_len = job->weak_sum.count; 00260 } 00261 *match_pos = 00262 rs_signature_find_match(job->signature, RollsumDigest(&job->weak_sum), 00263 job->scoop_next + job->scoop_pos, *match_len); 00264 return *match_pos != -1; 00265 } 00266 00267 /** Append a match at match_pos of length match_len to the delta, extending a 00268 * previous match if possible, or flushing any previous miss/match. */ 00269 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, 00270 size_t match_len) 00271 { 00272 rs_result result = RS_DONE; 00273 00274 /* if last was a match that can be extended, extend it */ 00275 if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) { 00276 job->basis_len += match_len; 00277 } else { 00278 /* else appendflush the last value */ 00279 result = rs_appendflush(job); 00280 /* make this the new match value */ 00281 job->basis_pos = match_pos; 00282 job->basis_len = match_len; 00283 } 00284 /* increment scoop_pos to point at next unscanned data */ 00285 job->scoop_pos += match_len; 00286 /* we can only process from the scoop if output is not blocked */ 00287 if (result == RS_DONE) { 00288 /* process the match data off the scoop */ 00289 result = rs_processmatch(job); 00290 } 00291 return result; 00292 } 00293 00294 /** Append a miss of length miss_len to the delta, extending a previous miss 00295 * if possible, or flushing any previous match. 00296 * 00297 * This also breaks misses up into 32KB segments to avoid accumulating too much 00298 * in memory. */ 00299 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len) 00300 { 00301 const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */ 00302 rs_result result = RS_DONE; 00303 00304 /* If last was a match, or max_miss misses, appendflush it. */ 00305 if (job->basis_len || (job->scoop_pos >= max_miss)) { 00306 result = rs_appendflush(job); 00307 } 00308 /* increment scoop_pos */ 00309 job->scoop_pos += miss_len; 00310 return result; 00311 } 00312 00313 /** Flush any accumulating hit or miss, appending it to the delta. */ 00314 static inline rs_result rs_appendflush(rs_job_t *job) 00315 { 00316 /* if last is a match, emit it and reset last by resetting basis_len */ 00317 if (job->basis_len) { 00318 rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len, 00319 job->basis_pos); 00320 rs_emit_copy_cmd(job, job->basis_pos, job->basis_len); 00321 job->basis_len = 0; 00322 return rs_processmatch(job); 00323 /* else if last is a miss, emit and process it */ 00324 } else if (job->scoop_pos) { 00325 rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos); 00326 rs_emit_literal_cmd(job, job->scoop_pos); 00327 return rs_processmiss(job); 00328 } 00329 /* otherwise, nothing to flush so we are done */ 00330 return RS_DONE; 00331 } 00332 00333 /** Process matching data in the scoop. 00334 * 00335 * The scoop contains match data at scoop_next of length scoop_pos. This 00336 * function processes that match data, returning RS_DONE if it completes, or 00337 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to 00338 * still point at the next unscanned data. 00339 * 00340 * This function currently just removes data from the scoop and adjusts 00341 * scoop_pos appropriately. In the future this could be used for something like 00342 * context compressing of miss data. Note that it also calls rs_tube_catchup to 00343 * output any pending output. */ 00344 static inline rs_result rs_processmatch(rs_job_t *job) 00345 { 00346 job->scoop_avail -= job->scoop_pos; 00347 job->scoop_next += job->scoop_pos; 00348 job->scoop_pos = 0; 00349 return rs_tube_catchup(job); 00350 } 00351 00352 /** Process miss data in the scoop. 00353 * 00354 * The scoop contains miss data at scoop_next of length scoop_pos. This 00355 * function processes that miss data, returning RS_DONE if it completes, or 00356 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to 00357 * still point at the next unscanned data. 00358 * 00359 * This function uses rs_tube_copy to queue copying from the scoop into output. 00360 * and uses rs_tube_catchup to do the copying. This automaticly removes data 00361 * from the scoop, but this can block. While rs_tube_catchup is blocked, 00362 * scoop_pos does not point at legit data, so scanning can also not proceed. 00363 * 00364 * In the future this could do compression of miss data before outputing it. */ 00365 static inline rs_result rs_processmiss(rs_job_t *job) 00366 { 00367 rs_tube_copy(job, job->scoop_pos); 00368 job->scoop_pos = 0; 00369 return rs_tube_catchup(job); 00370 } 00371 00372 /** State function that does a slack delta containing only literal data to 00373 * recreate the input. */ 00374 static rs_result rs_delta_s_slack(rs_job_t *job) 00375 { 00376 rs_buffers_t *const stream = job->stream; 00377 size_t avail = stream->avail_in; 00378 00379 if (avail) { 00380 rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail); 00381 rs_emit_literal_cmd(job, avail); 00382 rs_tube_copy(job, avail); 00383 return RS_RUNNING; 00384 } else if (rs_job_input_is_ending(job)) { 00385 job->statefn = rs_delta_s_end; 00386 return RS_RUNNING; 00387 } 00388 return RS_BLOCKED; 00389 } 00390 00391 /** State function for writing out the header of the encoding job. */ 00392 static rs_result rs_delta_s_header(rs_job_t *job) 00393 { 00394 rs_emit_delta_header(job); 00395 if (job->signature) { 00396 job->statefn = rs_delta_s_scan; 00397 } else { 00398 rs_trace("no signature provided for delta, using slack deltas"); 00399 job->statefn = rs_delta_s_slack; 00400 } 00401 return RS_RUNNING; 00402 } 00403 00404 rs_job_t *rs_delta_begin(rs_signature_t *sig) 00405 { 00406 rs_job_t *job; 00407 00408 job = rs_job_new("delta", rs_delta_s_header); 00409 /* Caller can pass NULL sig or empty sig for "slack deltas". */ 00410 if (sig && sig->count > 0) { 00411 rs_signature_check(sig); 00412 /* Caller must have called rs_build_hash_table() by now. */ 00413 assert(sig->hashtable); 00414 job->signature = sig; 00415 RollsumInit(&job->weak_sum); 00416 } 00417 return job; 00418 }