Diff
Not logged in

Differences From:

File src/xfer.c part of check-in [bd3c1d0023] - Additional work on the xfer mechanism, trying to increase the use of delta compression. by drh on 2007-08-09 19:07:28. [view]

To:

File src/xfer.c part of check-in [573a464cb7] - Complete rework of the xfer mechanism. Compiles but not yet working. by drh on 2007-08-10 00:08:25. [view]

@@ -26,107 +26,41 @@
 #include "config.h"
 #include "xfer.h"
 
 /*
-** Try to locate a record that is similar to rid and is a likely
-** candidate for delta against rid.  The similar record must be
-** referenced in the onremote table.
-**
-** Return the integer record ID of the similar record.  Or return
-** 0 if none is found.
+** This structure holds information about the current state of either
+** a client or a server that is participating in xfer.
+*/
+typedef struct Xfer Xfer;
+struct Xfer {
+  Blob *pIn;          /* Input text from the other side */
+  Blob *pOut;         /* Compose our reply here */
+  Blob line;          /* The current line of input */
+  Blob aToken[5];     /* Tokenized version of line */
+  Blob err;           /* Error message text */
+  int nToken;         /* Number of tokens in line */
+  int nIGot;          /* Number of "igot" messages sent */
+  int nFile;          /* Number of files sent or received */
+  int nDelta;         /* Number of deltas sent or received */
+  int nDanglingFile;  /* Number of dangling deltas received */
+  int mxSend;         /* Stop sending "file" with pOut reaches this size */
+};
+
+
+/*
+** The input blob contains a UUID.  Convert it into a record ID.
+** Create a phantom record if no prior record exists and
+** phantomize is true.
+**
+** Compare to uuid_to_rid().  This routine takes a blob argument
+** and does less error checking.
 */
-static int similar_record(int rid, int traceFlag){
-  int inCnt, outCnt;
-  int i;
-  Stmt q;
-  int queue[100];
-  static const char *azQuery[] = {
-      /* Scan the delta table first */
-      "SELECT srcid, EXISTS(SELECT 1 FROM onremote WHERE rid=srcid)"
-      "  FROM delta"
-      " WHERE rid=:x"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)"
-      " UNION ALL "
-      "SELECT rid, EXISTS(SELECT 1 FROM onremote WHERE rid=delta.rid)"
-      "  FROM delta"
-      " WHERE srcid=:x"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
-
-      /* Then the plink table */
-      "SELECT pid, EXISTS(SELECT 1 FROM onremote WHERE rid=pid)"
-      "  FROM plink"
-      " WHERE cid=:x"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)"
-      " UNION ALL "
-      "SELECT cid, EXISTS(SELECT 1 FROM onremote WHERE rid=cid)"
-      "  FROM plink"
-      " WHERE pid=:x"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
-
-      /* Finally the mlink table */
-      "SELECT pid, EXISTS(SELECT 1 FROM onremote WHERE rid=pid)"
-      "  FROM mlink"
-      " WHERE fid=:x AND pid>0"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)"
-      " UNION ALL "
-      "SELECT fid, EXISTS(SELECT 1 FROM onremote WHERE rid=fid)"
-      "  FROM mlink"
-      " WHERE pid=:x AND fid>0"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
-  };
-
-  for(i=0; i<sizeof(azQuery)/sizeof(azQuery[0]); i++){
-    db_prepare(&q, azQuery[i]);
-    queue[0] = rid;
-    inCnt = 1;
-    outCnt = 0;
-    if( traceFlag ) printf("PASS %d\n", i+1);
-    while( outCnt<inCnt ){
-      int xid = queue[outCnt%64];
-      outCnt++;
-      db_bind_int(&q, ":x", xid);
-      if( traceFlag ) printf("xid=%d\n", xid);
-      while( db_step(&q)==SQLITE_ROW ){
-        int nid = db_column_int(&q, 0);
-        int hit = db_column_int(&q, 1);
-        if( traceFlag ) printf("nid=%d hit=%d\n", nid, hit);
-        if( hit  ){
-          db_finalize(&q);
-          return nid;
-        }
-        if( inCnt<sizeof(queue)/sizeof(queue[0]) ){
-          int i;
-          for(i=0; i<inCnt && queue[i]!=nid; i++){}
-          if( i>=inCnt ){
-            queue[inCnt++] = nid;
-          }
-        }
-      }
-      db_reset(&q);
-    }
-    db_finalize(&q);
-  }
-  return 0;
-}
-
-/*
-** COMMAND: test-similar-record
-*/
-void test_similar_record(void){
-  int i;
-  if( g.argc<4 ){
-    usage("SRC ONREMOTE...");
-  }
-  db_must_be_within_tree();
-  db_multi_exec(
-    "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);"
-  );
-  for(i=3; i<g.argc; i++){
-    int rid = name_to_rid(g.argv[i]);
-    printf("%s -> %d\n", g.argv[i], rid);
-    db_multi_exec("INSERT INTO onremote VALUES(%d)", rid);
-  }
-  printf("similar: %d\n", similar_record(name_to_rid(g.argv[2]), 1));
+static int rid_from_uuid(Blob *pUuid, int phantomize){
+  int rid = db_int(0, "SELECT rid FROM blob WHERE uuid='%b'", pUuid);
+  if( rid==0 && phantomize ){
+    rid = content_put(0, blob_str(pUuid), 0);
+  }
+  return rid;
 }
 
 
 /*
@@ -145,160 +79,225 @@
 **
 ** If any error occurs, write a message into pErr which has already
 ** be initialized to an empty string.
 */
-static void xfer_accept_file(Blob *pIn, Blob *aToken, int nToken, Blob *pErr){
+static void xfer_accept_file(Xfer *pXfer){
   int n;
   int rid;
   Blob content, hash;
 
-  if( nToken<3 || nToken>4 || !blob_is_uuid(&aToken[1])
-       || !blob_is_int(&aToken[nToken-1], &n) || n<=0
-       || (nToken==4 && !blob_is_uuid(&aToken[2])) ){
-    blob_appendf(pErr, "malformed file line");
+  if( pXfer->nToken<3
+   || pXfer->nToken>4
+   || !blob_is_uuid(&pXfer->aToken[1])
+   || !blob_is_int(&pXfer->aToken[pXfer->nToken-1], &n)
+   || n<=0
+   || (pXfer->nToken==4 && !blob_is_uuid(&pXfer->aToken[2]))
+  ){
+    blob_appendf(&pXfer->err, "malformed file line");
     return;
   }
   blob_zero(&content);
   blob_zero(&hash);
-  blob_extract(pIn, n, &content);
-  if( nToken==4 ){
+  blob_extract(pXfer->pIn, n, &content);
+  if( pXfer->nToken==4 ){
     Blob src;
-    int srcid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[2]);
-    if( srcid==0 ){
-      blob_appendf(pErr, "unknown delta source: %b", &aToken[2]);
+    int srcid = rid_from_uuid(&pXfer->aToken[2], 1);
+    if( content_get(srcid, &src)==0 ){
+      content_put(&content, blob_str(&hash), srcid);
+      blob_appendf(pXfer->pOut, "gimme %b\n", &pXfer->aToken[2]);
+      pXfer->nDanglingFile++;
       return;
     }
-    content_get(srcid, &src);
+    pXfer->nDelta++;
     blob_delta_apply(&src, &content, &content);
     blob_reset(&src);
+  }else{
+    pXfer->nFile++;
   }
   sha1sum_blob(&content, &hash);
-  if( !blob_eq_str(&aToken[1], blob_str(&hash), -1) ){
-    blob_appendf(pErr, "content does not match sha1 hash");
+  if( !blob_eq_str(&pXfer->aToken[1], blob_str(&hash), -1) ){
+    blob_appendf(&pXfer->err, "content does not match sha1 hash");
   }
   blob_reset(&hash);
-  rid = content_put(&content, 0);
-  manifest_crosslink(rid, &content);
+  rid = content_put(&content, 0, 0);
   if( rid==0 ){
-    blob_appendf(pErr, "%s", g.zErrMsg);
+    blob_appendf(&pXfer->err, "%s", g.zErrMsg);
+  }else{
+    manifest_crosslink(rid, &content);
   }
 }
 
 /*
-** Send the file identified by rid.
-**
-** If pOut is not NULL, then append the text of the send message
-** to pOut.  Otherwise, append the text to the CGI output.
+** Try to send a file as a delta.  If successful, return the number
+** of bytes in the delta.  If not, return zero.
+**
+** If srcId is specified, use it.  If not, try to figure out a
+** reasonable srcId.
 */
-static int send_file(int rid, Blob *pOut){
-  Blob content, uuid;
-  int size;
-  int srcid;
-
-
-  blob_zero(&uuid);
-  db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
-  if( blob_size(&uuid)==0 ){
-    return 0;
-  }
-  content_get(rid, &content);
-
-  if( blob_size(&content)>100 ){
-    srcid = similar_record(rid, 0);
-    if( srcid ){
-      Blob src;
-      content_get(srcid, &src);
-      if( blob_size(&src)>100 ){
-        Blob delta;
-        blob_delta_create(&src, &content, &delta);
-        blob_reset(&content);
-        content = delta;
-        blob_append(&uuid, " ", 1);
-        blob_append(&content, "\n", 1);
-        db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d", srcid);
-      }
-      blob_reset(&src);
+static int send_as_delta(
+  Xfer *pXfer,            /* The transfer context */
+  int rid,                /* record id of the file to send */
+  Blob *pContent,         /* The content of the file to send */
+  Blob *pUuid,            /* The UUID of the file to send */
+  int srcId               /* Send as a delta against this record */
+){
+  static const char *azQuery[] = {
+    "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid"
+    " WHERE delta.rid=%d"
+    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)",
+
+    "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid"
+    " WHERE srcid=%d"
+    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
+
+    "SELECT pid FROM plink JOIN pending ON rid=pid"
+    " WHERE cid=%d"
+    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
+
+    "SELECT cid FROM plink JOIN pending ON rid=cid"
+    " WHERE pid=%d"
+    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
+
+    "SELECT pid FROM mlink JOIN pending ON rid=pid"
+    " WHERE fid=%d"
+    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
+
+    "SELECT fid FROM mlink JOIN pending ON rid=fid"
+    " WHERE pid=%d"
+    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
+  };
+  int i;
+  Blob src, delta;
+  int size = 0;
+
+  for(i=0; srcId==0 && i<count(azQuery); i++){
+    srcId = db_int(0, azQuery[i], rid);
+  }
+  if( srcId && content_get(srcId, &src) ){
+    char *zUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", srcId);
+    blob_delta_create(&src, pContent, &delta);
+    size = blob_size(&delta);
+    if( size>=blob_size(pContent)-50 ){
+      size = 0;
+    }else{
+      blob_appendf(pXfer->pOut, "file %b %s %d\n", pUuid, zUuid, size);
+      blob_append(pXfer->pOut, blob_buffer(&delta), size);
+      blob_appendf(pXfer->pOut, "\n", 1);
     }
-  }
-  size = blob_size(&content);
-  if( pOut ){
-    blob_appendf(pOut, "file %b %d\n", &uuid, size);
-    blob_append(pOut, blob_buffer(&content), size);
-  }else{
-    cgi_printf("file %b %d\n", &uuid, size);
-    cgi_append_content(blob_buffer(&content), size);
-  }
-  blob_reset(&content);
-  blob_reset(&uuid);
-  db_multi_exec("INSERT OR IGNORE INTO onremote VALUES(%d)", rid);
+    blob_reset(&delta);
+    free(zUuid);
+    blob_reset(&src);
+  }
   return size;
 }
 
+/*
+** Send the file identified by rid.
+*/
+static void send_file(Xfer *pXfer, int rid, Blob *pUuid, int srcId){
+  Blob content, uuid;
+  int size = 0;
+
+  if( db_exists("SELECT 1 FROM sent WHERE rid=%d", rid) ){
+     return;
+  }
+  blob_zero(&uuid);
+  if( pUuid==0 ){
+    db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
+    if( blob_size(&uuid)==0 ){
+      return;
+    }
+    pUuid = &uuid;
+  }
+  if( pXfer->mxSend<=blob_size(pXfer->pOut) ){
+    blob_appendf(pXfer->pOut, "igot %b\n", pUuid);
+    pXfer->nIGot++;
+    blob_reset(&uuid);
+    return;
+  }
+  content_get(rid, &content);
+
+  if( blob_size(&content)>100 ){
+    size = send_as_delta(pXfer, rid, &content, pUuid, srcId);
+  }
+  if( size==0 ){
+    int size = blob_size(&content);
+    blob_appendf(pXfer->pOut, "file %b %d\n", pUuid, size);
+    blob_append(pXfer->pOut, blob_buffer(&content), size);
+    pXfer->nFile++;
+  }else{
+    pXfer->nDelta++;
+  }
+  db_multi_exec("INSERT INTO sent VALUES(%d)", rid);
+  blob_reset(&uuid);
+}
 
 /*
-** Send all pending files.
+** This routine runs when either client or server is notified that
+** the other side things rid is a leaf manifest.  If we hold
+** children of rid, then send them over to the other side.
 */
-static int send_all_pending(Blob *pOut){
-  int rid, xid, i;
-  int nIgot = 0;
-  int sent = 0;
-  int nSent = 0;
-  int maxSize = db_get_int("http-msg-size", 500000);
-  static const char *azQuery[] = {
-      "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid"
-      " WHERE delta.rid=%d"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)",
-
-      "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid"
-      " WHERE srcid=%d"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
-
-      "SELECT pid FROM plink JOIN pending ON rid=pid"
-      " WHERE cid=%d"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
-
-      "SELECT cid FROM plink JOIN pending ON rid=cid"
-      " WHERE pid=%d"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
-
-      "SELECT pid FROM mlink JOIN pending ON rid=pid"
-      " WHERE fid=%d"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
-
-      "SELECT fid FROM mlink JOIN pending ON rid=fid"
-      " WHERE pid=%d"
-      "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
-  };
-
-  rid = db_int(0, "SELECT rid FROM pending");
-  while( rid && nIgot<200 ){
-    db_multi_exec("DELETE FROM pending WHERE rid=%d", rid);
-    if( sent<maxSize ){
-      sent += send_file(rid, pOut);
-      nSent++;
-    }else{
-      char *zUuid = db_text(0,
-                      "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
-      if( zUuid ){
-        if( pOut ){
-          blob_appendf(pOut, "igot %s\n", zUuid);
-        }else{
-          cgi_printf("igot %s\n", zUuid);
-        }
-        free(zUuid);
-        nIgot++;
-      }
+static void leaf_response(Xfer *pXfer, int rid){
+  Stmt q1, q2;
+  db_prepare(&q1,
+      "SELECT cid, uuid FROM plink, blob"
+      " WHERE blob.rid=plink.cid"
+      "   AND plink.pid=%d",
+      rid
+  );
+  while( db_step(&q1)==SQLITE_ROW ){
+    Blob uuid;
+    int cid;
+
+    cid = db_column_int(&q1, 0);
+    db_ephemeral_blob(&q1, 1, &uuid);
+    send_file(pXfer, cid, &uuid, rid);
+    db_prepare(&q2,
+       "SELECT pid, uuid, fid FROM mlink, blob"
+       " WHERE rid=fid AND mid=%d",
+       cid
+    );
+    while( db_step(&q2)==SQLITE_ROW ){
+      int pid, fid;
+      pid = db_column_int(&q2, 0);
+      db_ephemeral_blob(&q2, 1, &uuid);
+      fid = db_column_int(&q2, 2);
+      send_file(pXfer, fid, &uuid, pid);
     }
-    xid = 0;
-    for(i=0; xid==0 && i<sizeof(azQuery)/sizeof(azQuery[0]); i++){
-      xid = db_int(0, azQuery[i], rid);
-    }
-    rid = xid;
-    if( rid==0 ){
-      rid = db_int(0, "SELECT rid FROM pending");
+    db_finalize(&q2);
+    if( blob_size(pXfer->pOut)<pXfer->mxSend ){
+      leaf_response(pXfer, cid);
     }
   }
-  return nSent;
+}
+
+/*
+** Sent a leaf message for every leaf.
+*/
+static void send_leaves(Xfer *pXfer){
+  Stmt q;
+  db_prepare(&q,
+    "SELECT uuid FROM blob WHERE rid IN"
+    "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
+  );
+  while( db_step(&q)==SQLITE_ROW ){
+    const char *zUuid = db_column_text(&q, 0);
+    blob_appendf(pXfer->pOut, "leaf %s\n", zUuid);
+  }
+  db_finalize(&q);
+}
+
+/*
+** Sen a gimme message for every phantom.
+*/
+static void request_phantoms(Xfer *pXfer){
+  Stmt q;
+  db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)");
+  while( db_step(&q)==SQLITE_ROW ){
+    const char *zUuid = db_column_text(&q, 0);
+    blob_appendf(pXfer->pOut, "gimme %s\n", zUuid);
+  }
+  db_finalize(&q);
 }
 
 
 /*
@@ -373,37 +372,40 @@
   int nToken;
   int isPull = 0;
   int isPush = 0;
   int nErr = 0;
-  Blob line, errmsg, aToken[5];
+  Xfer xfer;
+
+  memset(&xfer, 0, sizeof(xfer));
+  blobarray_zero(xfer.aToken, count(xfer.aToken));
+  cgi_set_content_type(g.zContentType);
+  blob_zero(&xfer.err);
+  xfer.pIn = &g.cgiIn;
+  xfer.pOut = cgi_output_blob();
 
   db_begin_transaction();
-  blobarray_zero(aToken, count(aToken));
-  cgi_set_content_type(g.zContentType);
-  blob_zero(&errmsg);
   db_multi_exec(
-     "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);" /* Client has */
-     "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);"  /* Client needs */
+     "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);"
   );
-  while( blob_line(&g.cgiIn, &line) ){
-    nToken = blob_tokenize(&line, aToken, count(aToken));
+  while( blob_line(xfer.pIn, &xfer.line) ){
+    xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken));
 
     /*   file UUID SIZE \n CONTENT
     **   file UUID DELTASRC SIZE \n CONTENT
     **
     ** Accept a file from the client.
     */
-    if( blob_eq(&aToken[0], "file") && nToken>=3 && nToken<=4 ){
+    if( blob_eq(&xfer.aToken[0], "file") ){
       if( !isPush ){
         cgi_reset_content();
         @ error not\sauthorized\sto\swrite
         nErr++;
         break;
       }
-      xfer_accept_file(&g.cgiIn, aToken, nToken, &errmsg);
-      if( blob_size(&errmsg) ){
+      xfer_accept_file(&xfer);
+      if( blob_size(&xfer.err) ){
         cgi_reset_content();
-        @ error %T(blob_str(&errmsg))
+        @ error %T(blob_str(&xfer.err))
         nErr++;
         break;
       }
     }else
@@ -411,40 +413,45 @@
     /*   gimme UUID
     **
     ** Client is requesting a file
     */
-    if( blob_eq(&aToken[0], "gimme") && nToken==2 && blob_is_uuid(&aToken[1]) ){
+    if( blob_eq(&xfer.aToken[0], "gimme")
+     && xfer.nToken==2
+     && blob_is_uuid(&xfer.aToken[1])
+    ){
       if( isPull ){
-        db_multi_exec(
-          "INSERT OR IGNORE INTO pending(rid) "
-          "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1]
-        );
+        int rid = rid_from_uuid(&xfer.aToken[1], 0);
+        if( rid ){
+          send_file(&xfer, rid, &xfer.aToken[1], 0);
+        }
       }
     }else
 
     /*   igot UUID
-    **   leaf UUID
     **
     ** Client announces that it has a particular file
     */
-    if( nToken==2
-          && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0],"leaf"))
-          && blob_is_uuid(&aToken[1]) ){
-      if( isPull || isPush ){
-        int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]);
-        if( rid>0 ){
-          db_multi_exec(
-            "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid
-          );
-          if( isPull && blob_eq(&aToken[0], "leaf") ){
-            db_multi_exec(
-              "INSERT OR IGNORE INTO pending(rid) "
-              "SELECT cid FROM plink WHERE pid=%d", rid
-            );
-          }
-        }else if( isPush ){
-          content_put(0, blob_str(&aToken[1]));
-        }
+    if( xfer.nToken==2
+     && blob_eq(&xfer.aToken[0], "igot")
+     && blob_is_uuid(&xfer.aToken[1])
+    ){
+      if( isPush ){
+        rid_from_uuid(&xfer.aToken[1], 1);
+      }
+    }else
+
+
+    /*   leaf UUID
+    **
+    ** Client announces that it has a particular manifest
+    */
+    if( xfer.nToken==2
+     && blob_eq(&xfer.aToken[0], "leaf")
+     && blob_is_uuid(&xfer.aToken[1])
+    ){
+      if( isPull ){
+        int rid = rid_from_uuid(&xfer.aToken[1], 0);
+        leaf_response(&xfer, rid);
       }
     }else
 
     /*    pull  SERVERCODE  PROJECTCODE
@@ -452,18 +459,20 @@
     **
     ** The client wants either send or receive
     */
     if( nToken==3
-               && (blob_eq(&aToken[0], "pull") || blob_eq(&aToken[0], "push"))
-               && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){
+     && (blob_eq(&xfer.aToken[0], "pull") || blob_eq(&xfer.aToken[0], "push"))
+     && blob_is_uuid(&xfer.aToken[1])
+     && blob_is_uuid(&xfer.aToken[2])
+    ){
       const char *zSCode;
       const char *zPCode;
 
       zSCode = db_get("server-code", 0);
       if( zSCode==0 ){
         fossil_panic("missing server code");
       }
-      if( blob_eq_str(&aToken[1], zSCode, -1) ){
+      if( blob_eq_str(&xfer.aToken[1], zSCode, -1) ){
         cgi_reset_content();
         @ error server\sloop
         nErr++;
         break;
@@ -471,16 +480,16 @@
       zPCode = db_get("project-code", 0);
       if( zPCode==0 ){
         fossil_panic("missing project code");
       }
-      if( !blob_eq_str(&aToken[2], zPCode, -1) ){
+      if( !blob_eq_str(&xfer.aToken[2], zPCode, -1) ){
         cgi_reset_content();
         @ error wrong\sproject
         nErr++;
         break;
       }
       login_check_credentials();
-      if( blob_eq(&aToken[0], "pull") ){
+      if( blob_eq(&xfer.aToken[0], "pull") ){
         if( !g.okRead ){
           cgi_reset_content();
           @ error not\sauthorized\sto\sread
           nErr++;
@@ -493,18 +502,18 @@
           @ error not\sauthorized\sto\swrite
           nErr++;
           break;
         }
-        isPush = 1;
-
+        send_leaves(&xfer);
+        isPush = 1;
       }
     }else
 
     /*    clone
     **
     ** The client knows nothing.  Tell all.
     */
-    if( blob_eq(&aToken[0], "clone") ){
+    if( blob_eq(&xfer.aToken[0], "clone") ){
       login_check_credentials();
       if( !g.okRead || !g.okHistory ){
         cgi_reset_content();
         @ error not\sauthorized\sto\sclone
@@ -512,70 +521,36 @@
         break;
       }
       isPull = 1;
       @ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x"))
-      db_multi_exec(
-        "INSERT OR IGNORE INTO pending(rid) "
-        "SELECT mid FROM mlink JOIN blob ON mid=rid"
-      );
+      send_leaves(&xfer);
     }else
 
     /*    login  USER  NONCE  SIGNATURE
     **
     ** Check for a valid login.  This has to happen before anything else.
     */
-    if( blob_eq(&aToken[0], "login") && nToken==4 ){
+    if( blob_eq(&xfer.aToken[0], "login")
+     && nToken==4
+    ){
       if( disableLogin ){
         g.okRead = g.okWrite = 1;
       }else{
-        check_login(&aToken[1], &aToken[2], &aToken[3]);
+        check_login(&xfer.aToken[1], &xfer.aToken[2], &xfer.aToken[3]);
       }
     }else
 
     /* Unknown message
     */
     {
       cgi_reset_content();
-      @ error bad\scommand:\s%F(blob_str(&line))
+      @ error bad\scommand:\s%F(blob_str(&xfer.line))
     }
-    blobarray_reset(aToken, nToken);
-  }
-
-  /* The input message has now been processed.  Generate a reply. */
+    blobarray_reset(xfer.aToken, xfer.nToken);
+  }
   if( isPush ){
-    Stmt q;
-    int nReq = 0;
-    db_prepare(&q, "SELECT uuid, rid FROM phantom JOIN blob USING (rid)");
-    while( db_step(&q)==SQLITE_ROW && nReq++ < 200 ){
-      const char *zUuid = db_column_text(&q, 0);
-      int rid = db_column_int(&q, 1);
-      int xid = similar_record(rid, 0);
-      @ gimme %s(zUuid)
-      if( xid ){
-        char *zXUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", xid);
-        @ igot %s(zXUuid);
-        free(zXUuid);
-      }
-    }
-    db_finalize(&q);
-  }
-  if( isPull ){
-    send_all_pending(0);
-  }
-  if( isPush || isPull ){
-    /* Always send our leaves */
-    Stmt q;
-    db_prepare(&q,
-       "SELECT uuid FROM blob WHERE rid IN"
-       "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
-    );
-    while( db_step(&q)==SQLITE_ROW ){
-      const char *zUuid = db_column_text(&q, 0);
-      @ leaf %s(zUuid)
-    }
-    db_finalize(&q);
-  }
-
+    request_phantoms(&xfer);
+  }
   db_end_transaction(0);
 }
 
 /*
@@ -621,41 +596,38 @@
 ** true.
 */
 void client_sync(int pushFlag, int pullFlag, int cloneFlag){
   int go = 1;        /* Loop until zero */
-  int nToken;
   const char *zSCode = db_get("server-code", "x");
   const char *zPCode = db_get("project-code", 0);
-  int nFile = 0;
   int nMsg = 0;
   int nReq = 0;
-  int nFileSend;
+  int nFileSend = 0;
   int nNoFileCycle = 0;
   Blob send;        /* Text we are sending to the server */
   Blob recv;        /* Reply we got back from the server */
-  Blob line;        /* A single line of the reply */
-  Blob aToken[5];   /* A tokenization of line */
-  Blob errmsg;      /* Error message */
+  Xfer xfer;        /* Transfer data */
+
+  memset(&xfer, 0, sizeof(xfer));
+  xfer.pIn = &recv;
+  xfer.pOut = &send;
 
   assert( pushFlag || pullFlag || cloneFlag );
   assert( !g.urlIsFile );          /* This only works for networking */
 
   db_begin_transaction();
   db_multi_exec(
-    /* Records which we know the other side also has */
-    "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);"
-    /* Records we know the other side needs */
-    "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);"
+    "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);"
   );
-  blobarray_zero(aToken, count(aToken));
+  blobarray_zero(xfer.aToken, count(xfer.aToken));
   blob_zero(&send);
   blob_zero(&recv);
-  blob_zero(&errmsg);
+  blob_zero(&xfer.err);
 
 
   while( go ){
     go = 0;
-    nFile = nReq = nMsg = 0;
+    nReq = nMsg = 0;
 
     /* Generate a request to be sent to the server.
     ** Always begin with a clone, pull, or push message
     */
@@ -666,141 +638,100 @@
       pullFlag = 0;
       nMsg++;
     }else if( pullFlag ){
       blob_appendf(&send, "pull %s %s\n", zSCode, zPCode);
+      send_leaves(&xfer);
+      request_phantoms(&xfer);
       nMsg++;
     }
     if( pushFlag ){
       blob_appendf(&send, "push %s %s\n", zSCode, zPCode);
       nMsg++;
     }
 
-    if( pullFlag ){
-      /* Send gimme message for every phantom that we hold.
-      */
-      Stmt q;
-      db_prepare(&q, "SELECT uuid, rid FROM phantom JOIN blob USING (rid)");
-      while( db_step(&q)==SQLITE_ROW && nReq<200 ){
-        const char *zUuid = db_column_text(&q, 0);
-        int rid = db_column_int(&q, 1);
-        int xid = similar_record(rid, 0);
-        blob_appendf(&send,"gimme %s\n", zUuid);
-        nReq++;
-        if( xid ){
-          blob_appendf(&send, "igot %z\n",
-             db_text(0, "SELECT uuid FROM blob WHERE rid=%d", xid));
-        }
-      }
-      db_finalize(&q);
-    }
-
-    if( pushFlag ){
-      /* Send the server any files that the server has requested */
-      nFile += send_all_pending(&send);
-    }
-
-    if( pullFlag || pushFlag ){
-      /* Always send our leaves */
-      Stmt q;
-      db_prepare(&q,
-         "SELECT uuid FROM blob WHERE rid IN"
-         "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
-      );
-      while( db_step(&q)==SQLITE_ROW ){
-        const char *zUuid = db_column_text(&q, 0);
-        blob_appendf(&send, "leaf %s\n", zUuid);
-        nMsg++;
-      }
-      db_finalize(&q);
-    }
-
     /* Exchange messages with the server */
+    nFileSend = xfer.nFile + xfer.nDelta + xfer.nDanglingFile;
     printf("Send:      %3d files, %3d requests, %3d other msgs, %8d bytes\n",
-            nFile, nReq, nMsg, blob_size(&send));
-    nFileSend = nFile;
-    nFile = nReq = nMsg = 0;
+            nFileSend, nReq, nMsg, blob_size(&send));
+    xfer.nFile = 0;
+    xfer.nDelta = 0;
+    xfer.nDanglingFile = 0;
+    nReq = nMsg = 0;
     http_exchange(&send, &recv);
     blob_reset(&send);
 
     /* Process the reply that came back from the server */
-    while( blob_line(&recv, &line) ){
-      nToken = blob_tokenize(&line, aToken, count(aToken));
+    while( blob_line(&recv, &xfer.line) ){
+      xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken));
 
       /*   file UUID SIZE \n CONTENT
       **   file UUID DELTASRC SIZE \n CONTENT
       **
       ** Receive a file transmitted from the other side
       */
-      if( blob_eq(&aToken[0],"file") ){
-        xfer_accept_file(&recv, aToken, nToken, &errmsg);
-        nFile++;
-        go = 1;
+      if( blob_eq(&xfer.aToken[0],"file") ){
+        xfer_accept_file(&xfer);
       }else
 
       /*   gimme UUID
       **
       ** Server is requesting a file
       */
-      if( blob_eq(&aToken[0], "gimme") && nToken==2
-               && blob_is_uuid(&aToken[1]) ){
-        nReq++;
+      if( blob_eq(&xfer.aToken[0], "gimme")
+       && xfer.nToken==2
+       && blob_is_uuid(&xfer.aToken[1])
+      ){
         if( pushFlag ){
-          db_multi_exec(
-            "INSERT OR IGNORE INTO pending(rid) "
-            "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1]
-          );
-          go = 1;
+          int rid = rid_from_uuid(&xfer.aToken[1], 0);
+          send_file(&xfer, rid, &xfer.aToken[1], 0);
         }
       }else
 
       /*   igot UUID
-      **   leaf UUID
+      **
+      ** Server announces that it has a particular file
+      */
+      if( xfer.nToken==2
+       && blob_eq(&xfer.aToken[0], "igot")
+       && blob_is_uuid(&xfer.aToken[1])
+      ){
+        if( pullFlag ){
+          rid_from_uuid(&xfer.aToken[1], 1);
+        }
+      }else
+
+
+      /*   leaf UUID
       **
-      ** Server proclaims that it has a particular file.  A leaf message
-      ** means that the file is a leaf manifest on the server.
+      ** Server announces that it has a particular manifest
       */
-      if( nToken==2
-              && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0], "leaf"))
-              && blob_is_uuid(&aToken[1]) ){
-        int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]);
-        nMsg++;
-        if( rid>0 ){
-          db_multi_exec(
-            "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid
-          );
-          /* Add to the pending set all children of the server's leaves */
-          if( pushFlag && blob_eq(&aToken[0], "leaf") ){
-            db_multi_exec(
-              "INSERT OR IGNORE INTO pending(rid) "
-              "SELECT cid FROM plink WHERE pid=%d", rid
-            );
-            if( db_changes()>0 ){
-              go = 1;
-            }
-          }
-          if( pullFlag && !go &&
-              db_exists("SELECT 1 FROM phantom WHERE rid=%d", rid) ){
-            go = 1;
-          }
-        }else if( pullFlag ){
-          go = 1;
-          content_put(0, blob_str(&aToken[1]));
+      if( xfer.nToken==2
+       && blob_eq(&xfer.aToken[0], "leaf")
+       && blob_is_uuid(&xfer.aToken[1])
+      ){
+        if( pushFlag ){
+          int rid = rid_from_uuid(&xfer.aToken[1], 0);
+          leaf_response(&xfer, rid);
         }
       }else
+
 
       /*   push  SERVERCODE  PRODUCTCODE
       **
       ** Should only happen in response to a clone.
       */
-      if( blob_eq(&aToken[0],"push") && nToken==3 && cloneFlag
-              && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){
-
-        if( blob_eq_str(&aToken[1], zSCode, -1) ){
+      if( blob_eq(&xfer.aToken[0],"push")
+       && xfer.nToken==3
+       && cloneFlag
+       && blob_is_uuid(&xfer.aToken[1])
+       && blob_is_uuid(&xfer.aToken[2])
+      ){
+        if( blob_eq_str(&xfer.aToken[1], zSCode, -1) ){
           fossil_fatal("server loop");
         }
         nMsg++;
         if( zPCode==0 ){
-          zPCode = mprintf("%b", &aToken[2]);
+          zPCode = mprintf("%b", &xfer.aToken[2]);
           db_set("project-code", zPCode);
         }
         cloneFlag = 0;
         pullFlag = 1;
@@ -809,40 +740,40 @@
       /*   error MESSAGE
       **
       ** Report an error
       */
-      if( blob_eq(&aToken[0],"error") && nToken==2 ){
-        char *zMsg = blob_terminate(&aToken[1]);
+      if( blob_eq(&xfer.aToken[0],"error") && xfer.nToken==2 ){
+        char *zMsg = blob_terminate(&xfer.aToken[1]);
         defossilize(zMsg);
-        blob_appendf(&errmsg, "server says: %s", zMsg);
+        blob_appendf(&xfer.err, "server says: %s", zMsg);
       }else
 
       /* Unknown message */
       {
-        blob_appendf(&errmsg, "unknown command: %b", &aToken[0]);
+        blob_appendf(&xfer.err, "unknown command: %b", &xfer.aToken[0]);
       }
 
-      if( blob_size(&errmsg) ){
-        fossil_fatal("%b", &errmsg);
+      if( blob_size(&xfer.err) ){
+        fossil_fatal("%b", &xfer.err);
       }
-      blobarray_reset(aToken, nToken);
+      blobarray_reset(xfer.aToken, xfer.nToken);
     }
     printf("Received:  %3d files, %3d requests, %3d other msgs, %8d bytes\n",
-            nFile, nReq, nMsg, blob_size(&recv));
+            xfer.nFile + xfer.nDelta + xfer.nDanglingFile,
+            nReq, nMsg, blob_size(&recv));
     blob_reset(&recv);
-    if( nFileSend + nFile==0 ){
+    if( nFileSend + xfer.nFile + xfer.nDelta + xfer.nDanglingFile==0 ){
       nNoFileCycle++;
       if( nNoFileCycle>1 ){
         go = 0;
       }
     }else{
       nNoFileCycle = 0;
     }
-    nFile = nReq = nMsg = 0;
+    nReq = nMsg = 0;
+    xfer.nFile = 0;
+    xfer.nDelta = 0;
+    xfer.nDanglingFile = 0;
   };
   http_close();
   db_end_transaction(0);
-  db_multi_exec(
-    "DROP TABLE onremote;"
-    "DROP TABLE pending;"
-  );
 }