Diff
Not logged in

Differences From:

File src/xfer.c part of check-in [8c828207a2] - Give an error if an attempt is made to merge, update, or checkout against an incomplete baseline - one that contains phantoms. Update the xfer protocol to converge on a stable synchronization faster and (hopeful) not quit until the sync is complete. by drh on 2007-08-27 00:04:32. Also file src/xfer.c part of check-in [15652ff081] - Merged drh's fixes new features (xfer, timeline handling, javascript based timeline highlighting) into my branch. by aku on 2007-08-29 02:55:33. [view]

To:

File src/xfer.c part of check-in [48c4e69d2b] - Cluster-based synchronization appears to be working. by drh on 2007-09-09 17:51:16. Also file src/xfer.c part of check-in [bbcb6326c9] - Pulled in the navbar and timeline changes. by aku on 2007-09-17 00:58:51. [view]

@@ -235,103 +235,9 @@
   blob_reset(&uuid);
 }
 
 /*
-** Send the file identified by mid and pUuid.  If that file happens
-** to be a manifest, then also send all of the associated content
-** files for that manifest.  If the file is not a manifest, then this
-** routine is the equivalent of send_file().
-*/
-static void send_manifest(Xfer *pXfer, int mid, Blob *pUuid, int srcId){
-  Stmt q2;
-  send_file(pXfer, mid, pUuid, srcId);
-  db_prepare(&q2,
-     "SELECT pid, uuid, fid FROM mlink, blob"
-     " WHERE rid=fid AND mid=%d",
-     mid
-  );
-  while( db_step(&q2)==SQLITE_ROW ){
-    int pid, fid;
-    Blob uuid;
-    pid = db_column_int(&q2, 0);
-    db_ephemeral_blob(&q2, 1, &uuid);
-    fid = db_column_int(&q2, 2);
-    send_file(pXfer, fid, &uuid, pid);
-  }
-  db_finalize(&q2);
-}
-
-/*
-** This routine runs when either client or server is notified that
-** the other side thinks rid is a leaf manifest.  If we hold
-** children of rid, then send them over to the other side.
-*/
-static void leaf_response(Xfer *pXfer, int rid){
-  Stmt q1;
-  db_prepare(&q1,
-      "SELECT cid, uuid FROM plink, blob"
-      " WHERE blob.rid=plink.cid"
-      "   AND plink.pid=%d",
-      rid
-  );
-  while( db_step(&q1)==SQLITE_ROW ){
-    Blob uuid;
-    int cid;
-
-    cid = db_column_int(&q1, 0);
-    db_ephemeral_blob(&q1, 1, &uuid);
-    send_manifest(pXfer, cid, &uuid, rid);
-    if( blob_size(pXfer->pOut)<pXfer->mxSend ){
-      leaf_response(pXfer, cid);
-    }
-  }
-}
-
-/*
-** Sent a leaf message for every leaf.
-*/
-static void send_leaves(Xfer *pXfer){
-  Stmt q;
-  db_prepare(&q,
-    "SELECT uuid FROM blob WHERE rid IN"
-    "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
-  );
-  while( db_step(&q)==SQLITE_ROW ){
-    const char *zUuid = db_column_text(&q, 0);
-    blob_appendf(pXfer->pOut, "leaf %s\n", zUuid);
-  }
-  db_finalize(&q);
-}
-
-/*
-** Sent leaf content for every leaf that is not found in the
-** onremote table.  This is intended to send leaf content for
-** every leaf that is unknown on the remote end.
-**
-** In addition, we might send "igot" messages for a few generations of
-** parents of the unknown leaves.  This will speed the transmission
-** of new branches.
-*/
-static void send_unknown_leaf_content(Xfer *pXfer){
-  Stmt q1;
-  db_prepare(&q1,
-    "SELECT rid, uuid FROM blob WHERE rid IN"
-    "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
-    "  AND NOT EXISTS(SELECT 1 FROM onremote WHERE rid=blob.rid)"
-  );
-  while( db_step(&q1)==SQLITE_ROW ){
-    Blob uuid;
-    int cid;
-
-    cid = db_column_int(&q1, 0);
-    db_ephemeral_blob(&q1, 1, &uuid);
-    send_manifest(pXfer, cid, &uuid, 0);
-  }
-  db_finalize(&q1);
-}
-
-/*
-** Sen a gimme message for every phantom.
+** Send a gimme message for every phantom.
 */
 static void request_phantoms(Xfer *pXfer){
   Stmt q;
   db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)");
@@ -397,8 +303,75 @@
   }
   db_reset(&q);
 }
 
+/*
+** Send the content of all files in the unsent table.
+**
+** This is really just an optimization.  If you clear the
+** unsent table, all the right files will still get transferred.
+** It just might require an extra round trip or two.
+*/
+static void send_unsent(Xfer *pXfer){
+  Stmt q;
+  db_prepare(&q, "SELECT rid FROM unsent");
+  while( db_step(&q)==SQLITE_ROW ){
+    int rid = db_column_int(&q, 0);
+    send_file(pXfer, rid, 0, 0);
+  }
+  db_finalize(&q);
+  db_multi_exec("DELETE FROM unsent");
+}
+
+/*
+** Check to see if the number of unclustered entries is greater than
+** 100 and if it is, form a new cluster.  Unclustered phantoms do not
+** count toward the 100 total.  And phantoms are never added to a new
+** cluster.
+*/
+static void create_cluster(void){
+  Blob cluster, cksum;
+  Stmt q;
+  int nUncl;
+  nUncl = db_int(0, "SELECT count(*) FROM unclustered"
+                    " WHERE NOT EXISTS(SELECT 1 FROM phantom"
+                                      " WHERE rid=unclustered.rid)");
+  if( nUncl<100 ){
+    return;
+  }
+  blob_zero(&cluster);
+  db_prepare(&q, "SELECT uuid FROM unclustered, blob"
+                 " WHERE NOT EXISTS(SELECT 1 FROM phantom"
+                 "                   WHERE rid!=unclustered.rid)"
+                 "   AND unclustered.rid=blob.rid"
+                 " ORDER BY 1");
+  while( db_step(&q)==SQLITE_ROW ){
+    blob_appendf(&cluster, "M %s\n", db_column_text(&q, 0));
+  }
+  db_finalize(&q);
+  md5sum_blob(&cluster, &cksum);
+  blob_appendf(&cluster, "Z %b\n", &cksum);
+  blob_reset(&cksum);
+  db_multi_exec("DELETE FROM unclustered");
+  content_put(&cluster, 0, 0);
+  blob_reset(&cluster);
+}
+
+/*
+** Send an igot message for every entry in unclustered table.
+** Return the number of messages sent.
+*/
+static int send_unclustered(Xfer *pXfer){
+  Stmt q;
+  int cnt = 0;
+  db_prepare(&q, "SELECT uuid FROM unclustered JOIN blob USING(rid)");
+  while( db_step(&q)==SQLITE_ROW ){
+    blob_appendf(pXfer->pOut, "igot %s\n", db_column_text(&q, 0));
+    cnt++;
+  }
+  db_finalize(&q);
+  return cnt;
+}
 
 /*
 ** If this variable is set, disable login checks.  Used for debugging
 ** only.
@@ -455,11 +428,9 @@
     }else
 
     /*   gimme UUID
     **
-    ** Client is requesting a file.  If the file is a manifest,
-    ** the server can assume that the client also needs all content
-    ** files associated with that manifest.
+    ** Client is requesting a file.  Send it.
     */
     if( blob_eq(&xfer.aToken[0], "gimme")
      && xfer.nToken==2
      && blob_is_uuid(&xfer.aToken[1])
@@ -466,9 +437,9 @@
     ){
       if( isPull ){
         int rid = rid_from_uuid(&xfer.aToken[1], 0);
         if( rid ){
-          send_manifest(&xfer, rid, &xfer.aToken[1], 0);
+          send_file(&xfer, rid, &xfer.aToken[1], 0);
         }
       }
     }else
 
@@ -484,30 +455,8 @@
         rid_from_uuid(&xfer.aToken[1], 1);
       }
     }else
 
-
-    /*   leaf UUID
-    **
-    ** Client announces that it has a particular manifest.  If
-    ** the server has children of this leaf, then send those
-    ** children back to the client.  If the server lacks this
-    ** leaf, request it.
-    */
-    if( xfer.nToken==2
-     && blob_eq(&xfer.aToken[0], "leaf")
-     && blob_is_uuid(&xfer.aToken[1])
-    ){
-      int rid = rid_from_uuid(&xfer.aToken[1], 0);
-      if( rid ){
-        remote_has(rid);
-        if( isPull ){
-          leaf_response(&xfer, rid);
-        }
-      }else if( isPush ){
-        content_put(0, blob_str(&xfer.aToken[1]), 0);
-      }
-    }else
 
     /*    pull  SERVERCODE  PROJECTCODE
     **    push  SERVERCODE  PROJECTCODE
     **
@@ -558,9 +507,8 @@
           @ error not\sauthorized\sto\swrite
           nErr++;
           break;
         }
-        send_leaves(&xfer);
         isPush = 1;
       }
     }else
 
@@ -568,9 +516,8 @@
     **
     ** The client knows nothing.  Tell all.
     */
     if( blob_eq(&xfer.aToken[0], "clone") ){
-      int rootid;
       login_check_credentials();
       if( !g.okClone ){
         cgi_reset_content();
         @ error not\sauthorized\sto\sclone
@@ -578,16 +525,8 @@
         break;
       }
       isPull = 1;
       @ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x"))
-      rootid = db_int(0,
-          "SELECT pid FROM plink AS a"
-          " WHERE NOT EXISTS(SELECT 1 FROM plink WHERE cid=a.pid)"
-      );
-      if( rootid ){
-        send_file(&xfer, rootid, 0, -1);
-        leaf_response(&xfer, rootid);
-      }
     }else
 
     /*    login  USER  NONCE  SIGNATURE
     **
@@ -603,8 +542,29 @@
         check_login(&xfer.aToken[1], &xfer.aToken[2], &xfer.aToken[3]);
       }
     }else
 
+    /*    cookie TEXT
+    **
+    ** A cookie contains a arbitrary-length argument that is server-defined.
+    ** The argument must be encoded so as not to contain any whitespace.
+    ** The server can optionally send a cookie to the client.  The client
+    ** might then return the same cookie back to the server on its next
+    ** communication.  The cookie might record information that helps
+    ** the server optimize a push or pull.
+    **
+    ** The client is not required to return a cookie.  So the server
+    ** must not depend on the cookie.  The cookie should be an optimization
+    ** only.  The client might also send a cookie that came from a different
+    ** server.  So the server must be prepared to distinguish its own cookie
+    ** from cookies originating from other servers.  The client might send
+    ** back several different cookies to the server.  The server should be
+    ** prepared to sift through the cookies and pick the one that it wants.
+    */
+    if( blob_eq(&xfer.aToken[0], "cookie") && xfer.nToken==2 ){
+      /* Process the cookie */
+    }else
+
     /* Unknown message
     */
     {
       cgi_reset_content();
@@ -615,9 +575,10 @@
   if( isPush ){
     request_phantoms(&xfer);
   }
   if( isPull ){
-    send_unknown_leaf_content(&xfer);
+    create_cluster();
+    send_unclustered(&xfer);
   }
   db_end_transaction(0);
 }
 
@@ -669,8 +630,9 @@
   const char *zPCode = db_get("project-code", 0);
   int nMsg = 0;          /* Number of messages sent or received */
   int nCycle = 0;        /* Number of round trips to the server */
   int nFileSend = 0;
+  const char *zCookie;   /* Server cookie */
   Blob send;        /* Text we are sending to the server */
   Blob recv;        /* Reply we got back from the server */
   Xfer xfer;        /* Transfer data */
 
@@ -712,14 +674,25 @@
 
   while( go ){
     int newPhantom = 0;
 
+    /* Send make the most recently received cookie.  Let the server
+    ** figure out if this is a cookie that it cares about.
+    */
+    zCookie = db_get("cookie", 0);
+    if( zCookie ){
+      blob_appendf(&send, "cookie %s\n", zCookie);
+    }
+
     /* Generate gimme messages for phantoms and leaf messages
     ** for all leaves.
     */
     if( pullFlag ){
       request_phantoms(&xfer);
-      send_leaves(&xfer);
+    }
+    if( pushFlag ){
+      send_unsent(&xfer);
+      nMsg += send_unclustered(&xfer);
     }
 
     /* Exchange messages with the server */
     nFileSend = xfer.nFileSent + xfer.nDeltaSent;
@@ -743,9 +716,8 @@
     if( pushFlag ){
       blob_appendf(&send, "push %s %s\n", zSCode, zPCode);
       nMsg++;
     }
-
 
     /* Process the reply that came back from the server */
     while( blob_line(&recv, &xfer.line) ){
       if( blob_buffer(&xfer.line)[0]=='#' ){
@@ -774,9 +746,9 @@
       ){
         nMsg++;
         if( pushFlag ){
           int rid = rid_from_uuid(&xfer.aToken[1], 0);
-          send_manifest(&xfer, rid, &xfer.aToken[1], 0);
+          send_file(&xfer, rid, &xfer.aToken[1], 0);
         }
       }else
 
       /*   igot UUID
@@ -806,32 +778,8 @@
         remote_has(rid);
       }else
 
 
-      /*   leaf UUID
-      **
-      ** Server announces that it has a particular manifest.  Send
-      ** any children of this leaf that we have if we are pushing.
-      ** Make the leaf a phantom if we are pulling.  Remember that the
-      ** remote end has the specified UUID.
-      */
-      if( xfer.nToken==2
-       && blob_eq(&xfer.aToken[0], "leaf")
-       && blob_is_uuid(&xfer.aToken[1])
-      ){
-        int rid = rid_from_uuid(&xfer.aToken[1], 0);
-        nMsg++;
-        if( pushFlag && rid ){
-          leaf_response(&xfer, rid);
-        }
-        if( pullFlag && rid==0 ){
-          rid = content_put(0, blob_str(&xfer.aToken[1]), 0);
-          newPhantom = 1;
-        }
-        remote_has(rid);
-      }else
-
-
       /*   push  SERVERCODE  PRODUCTCODE
       **
       ** Should only happen in response to a clone.  This message tells
       ** the client what product to use for the new database.
@@ -853,8 +801,21 @@
         cloneFlag = 0;
         pullFlag = 1;
         blob_appendf(&send, "pull %s %s\n", zSCode, zPCode);
         nMsg++;
+      }else
+
+      /*    cookie TEXT
+      **
+      ** The server might include a cookie in its reply.  The client
+      ** should remember this cookie and send it back to the server
+      ** in its next query.
+      **
+      ** Each cookie received overwrites the prior cookie from the
+      ** same server.
+      */
+      if( blob_eq(&xfer.aToken[0], "cookie") && xfer.nToken==2 ){
+        db_set("cookie", blob_str(&xfer.aToken[1]));
       }else
 
       /*   error MESSAGE
       **
@@ -882,25 +843,23 @@
             xfer.nFileRcvd + xfer.nDeltaRcvd + xfer.nDanglingFile,
             xfer.nFileRcvd, xfer.nDeltaRcvd, xfer.nDanglingFile);
 
     blob_reset(&recv);
+    nCycle++;
+    go = 0;
+
+    /* If we received one or more files on the previous exchange but
+    ** there are still phantoms, then go another round.
+    */
+    if( (xfer.nFileRcvd+xfer.nDeltaRcvd+xfer.nDanglingFile>0 || newPhantom)
+         && db_exists("SELECT 1 FROM phantom")
+    ){
+      go = 1;
+    }
     nMsg = 0;
     xfer.nFileRcvd = 0;
     xfer.nDeltaRcvd = 0;
     xfer.nDanglingFile = 0;
-    nCycle++;
-    go = 0;
-
-    /* If we have received one or more files on this cycle or if
-    ** we have received information that has caused us to create
-    ** new phantoms and we have one or more phantoms, then go for
-    ** another round
-    */
-    if( (xfer.nFileRcvd+xfer.nDeltaRcvd+xfer.nDanglingFile>0 || newPhantom)
-     && db_exists("SELECT 1 FROM phantom")
-    ){
-      go = 1;
-    }
 
     /* If we have one or more files queued to send, then go
     ** another round
     */