Diff
Not logged in

Differences From:

File src/xfer.c part of check-in [573a464cb7] - Complete rework of the xfer mechanism. Compiles but not yet working. by drh on 2007-08-10 00:08:25. [view]

To:

File src/xfer.c part of check-in [edbb332d54] - The xfer mechanism has been completely reworked to better support delta compression and to require fewer round-trips. The wire protocol is roughly the same but is different enough that you will need to recompile before sync will work. by drh on 2007-08-10 02:59:52. [view]

@@ -37,11 +37,14 @@
   Blob line;          /* The current line of input */
   Blob aToken[5];     /* Tokenized version of line */
   Blob err;           /* Error message text */
   int nToken;         /* Number of tokens in line */
-  int nIGot;          /* Number of "igot" messages sent */
-  int nFile;          /* Number of files sent or received */
-  int nDelta;         /* Number of deltas sent or received */
+  int nIGotSent;      /* Number of "igot" messages sent */
+  int nGimmeSent;     /* Number of gimme messages sent */
+  int nFileSent;      /* Number of files sent */
+  int nDeltaSent;     /* Number of deltas sent */
+  int nFileRcvd;      /* Number of files received */
+  int nDeltaRcvd;     /* Number of deltas received */
   int nDanglingFile;  /* Number of dangling deltas received */
   int mxSend;         /* Stop sending "file" with pOut reaches this size */
 };
 
@@ -101,18 +104,19 @@
   if( pXfer->nToken==4 ){
     Blob src;
     int srcid = rid_from_uuid(&pXfer->aToken[2], 1);
     if( content_get(srcid, &src)==0 ){
-      content_put(&content, blob_str(&hash), srcid);
+      content_put(&content, blob_str(&pXfer->aToken[1]), srcid);
       blob_appendf(pXfer->pOut, "gimme %b\n", &pXfer->aToken[2]);
+      pXfer->nGimmeSent++;
       pXfer->nDanglingFile++;
       return;
     }
-    pXfer->nDelta++;
+    pXfer->nDeltaRcvd++;
     blob_delta_apply(&src, &content, &content);
     blob_reset(&src);
   }else{
-    pXfer->nFile++;
+    pXfer->nFileRcvd++;
   }
   sha1sum_blob(&content, &hash);
   if( !blob_eq_str(&pXfer->aToken[1], blob_str(&hash), -1) ){
     blob_appendf(&pXfer->err, "content does not match sha1 hash");
@@ -140,31 +144,15 @@
   Blob *pUuid,            /* The UUID of the file to send */
   int srcId               /* Send as a delta against this record */
 ){
   static const char *azQuery[] = {
-    "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid"
-    " WHERE delta.rid=%d"
-    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)",
-
-    "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid"
-    " WHERE srcid=%d"
-    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
-
-    "SELECT pid FROM plink JOIN pending ON rid=pid"
+    "SELECT pid FROM plink"
     " WHERE cid=%d"
     "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
 
-    "SELECT cid FROM plink JOIN pending ON rid=cid"
-    " WHERE pid=%d"
-    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
-
-    "SELECT pid FROM mlink JOIN pending ON rid=pid"
+    "SELECT pid FROM mlink"
     " WHERE fid=%d"
     "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
-
-    "SELECT fid FROM mlink JOIN pending ON rid=fid"
-    " WHERE pid=%d"
-    "   AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
   };
   int i;
   Blob src, delta;
   int size = 0;
@@ -171,9 +159,9 @@
 
   for(i=0; srcId==0 && i<count(azQuery); i++){
     srcId = db_int(0, azQuery[i], rid);
   }
-  if( srcId && content_get(srcId, &src) ){
+  if( srcId>0 && content_get(srcId, &src) ){
     char *zUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", srcId);
     blob_delta_create(&src, pContent, &delta);
     size = blob_size(&delta);
     if( size>=blob_size(pContent)-50 ){
@@ -180,9 +168,9 @@
       size = 0;
     }else{
       blob_appendf(pXfer->pOut, "file %b %s %d\n", pUuid, zUuid, size);
       blob_append(pXfer->pOut, blob_buffer(&delta), size);
-      blob_appendf(pXfer->pOut, "\n", 1);
+      /* blob_appendf(pXfer->pOut, "\n", 1); */
     }
     blob_reset(&delta);
     free(zUuid);
     blob_reset(&src);
@@ -191,8 +179,16 @@
 }
 
 /*
 ** Send the file identified by rid.
+**
+** The pUuid can be NULL in which case the correct UUID is computed
+** from the rid.
+**
+** If srcId is positive, then a delta is sent against that srcId.
+** If srcId is zero, then an attempt is made to find an appropriate
+** file to delta against.   If srcId is negative, the file is sent
+** without deltaing.
 */
 static void send_file(Xfer *pXfer, int rid, Blob *pUuid, int srcId){
   Blob content, uuid;
   int size = 0;
@@ -209,9 +205,9 @@
     pUuid = &uuid;
   }
   if( pXfer->mxSend<=blob_size(pXfer->pOut) ){
     blob_appendf(pXfer->pOut, "igot %b\n", pUuid);
-    pXfer->nIGot++;
+    pXfer->nIGotSent++;
     blob_reset(&uuid);
     return;
   }
   content_get(rid, &content);
@@ -222,11 +218,11 @@
   if( size==0 ){
     int size = blob_size(&content);
     blob_appendf(pXfer->pOut, "file %b %d\n", pUuid, size);
     blob_append(pXfer->pOut, blob_buffer(&content), size);
-    pXfer->nFile++;
+    pXfer->nFileSent++;
   }else{
-    pXfer->nDelta++;
+    pXfer->nDeltaSent++;
   }
   db_multi_exec("INSERT INTO sent VALUES(%d)", rid);
   blob_reset(&uuid);
 }
@@ -294,8 +290,9 @@
   db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)");
   while( db_step(&q)==SQLITE_ROW ){
     const char *zUuid = db_column_text(&q, 0);
     blob_appendf(pXfer->pOut, "gimme %s\n", zUuid);
+    pXfer->nGimmeSent++;
   }
   db_finalize(&q);
 }
 
@@ -368,9 +365,8 @@
 ** message has been uncompressed and placed in the g.cgiIn blob.
 ** Process this message and form an appropriate reply.
 */
 void page_xfer(void){
-  int nToken;
   int isPull = 0;
   int isPush = 0;
   int nErr = 0;
   Xfer xfer;
@@ -380,8 +376,9 @@
   cgi_set_content_type(g.zContentType);
   blob_zero(&xfer.err);
   xfer.pIn = &g.cgiIn;
   xfer.pOut = cgi_output_blob();
+  xfer.mxSend = db_get_int("max-download", 1000000);
 
   db_begin_transaction();
   db_multi_exec(
      "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);"
@@ -394,9 +391,9 @@
     **
     ** Accept a file from the client.
     */
     if( blob_eq(&xfer.aToken[0], "file") ){
-      if( !isPush ){
+      if( !g.okWrite ){
         cgi_reset_content();
         @ error not\sauthorized\sto\swrite
         nErr++;
         break;
@@ -417,9 +414,9 @@
     if( blob_eq(&xfer.aToken[0], "gimme")
      && xfer.nToken==2
      && blob_is_uuid(&xfer.aToken[1])
     ){
-      if( isPull ){
+      if( g.okRead ){
         int rid = rid_from_uuid(&xfer.aToken[1], 0);
         if( rid ){
           send_file(&xfer, rid, &xfer.aToken[1], 0);
         }
@@ -433,9 +430,9 @@
     if( xfer.nToken==2
      && blob_eq(&xfer.aToken[0], "igot")
      && blob_is_uuid(&xfer.aToken[1])
     ){
-      if( isPush ){
+      if( g.okWrite ){
         rid_from_uuid(&xfer.aToken[1], 1);
       }
     }else
 
@@ -447,9 +444,9 @@
     if( xfer.nToken==2
      && blob_eq(&xfer.aToken[0], "leaf")
      && blob_is_uuid(&xfer.aToken[1])
     ){
-      if( isPull ){
+      if( g.okRead ){
         int rid = rid_from_uuid(&xfer.aToken[1], 0);
         leaf_response(&xfer, rid);
       }
     }else
@@ -458,9 +455,9 @@
     **    push  SERVERCODE  PROJECTCODE
     **
     ** The client wants either send or receive
     */
-    if( nToken==3
+    if( xfer.nToken==3
      && (blob_eq(&xfer.aToken[0], "pull") || blob_eq(&xfer.aToken[0], "push"))
      && blob_is_uuid(&xfer.aToken[1])
      && blob_is_uuid(&xfer.aToken[2])
     ){
@@ -512,8 +509,9 @@
     **
     ** The client knows nothing.  Tell all.
     */
     if( blob_eq(&xfer.aToken[0], "clone") ){
+      int rootid;
       login_check_credentials();
       if( !g.okRead || !g.okHistory ){
         cgi_reset_content();
         @ error not\sauthorized\sto\sclone
@@ -521,17 +519,24 @@
         break;
       }
       isPull = 1;
       @ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x"))
-      send_leaves(&xfer);
+      rootid = db_int(0,
+          "SELECT pid FROM plink AS a"
+          " WHERE NOT EXISTS(SELECT 1 FROM plink WHERE cid=a.pid)"
+      );
+      if( rootid ){
+        send_file(&xfer, rootid, 0, -1);
+        leaf_response(&xfer, rootid);
+      }
     }else
 
     /*    login  USER  NONCE  SIGNATURE
     **
     ** Check for a valid login.  This has to happen before anything else.
     */
     if( blob_eq(&xfer.aToken[0], "login")
-     && nToken==4
+     && xfer.nToken==4
     ){
       if( disableLogin ){
         g.okRead = g.okWrite = 1;
       }else{
@@ -598,19 +603,19 @@
 void client_sync(int pushFlag, int pullFlag, int cloneFlag){
   int go = 1;        /* Loop until zero */
   const char *zSCode = db_get("server-code", "x");
   const char *zPCode = db_get("project-code", 0);
-  int nMsg = 0;
-  int nReq = 0;
-  int nFileSend = 0;
-  int nNoFileCycle = 0;
+  int nMsg = 0;          /* Number of messages sent or received */
+  int nCycle = 0;        /* Number of round trips to the server */
+  int nFileSend = 0;
   Blob send;        /* Text we are sending to the server */
   Blob recv;        /* Reply we got back from the server */
   Xfer xfer;        /* Transfer data */
 
   memset(&xfer, 0, sizeof(xfer));
   xfer.pIn = &recv;
   xfer.pOut = &send;
+  xfer.mxSend = db_get_int("max-upload", 250000);
 
   assert( pushFlag || pullFlag || cloneFlag );
   assert( !g.urlIsFile );          /* This only works for networking */
 
@@ -621,13 +626,12 @@
   blobarray_zero(xfer.aToken, count(xfer.aToken));
   blob_zero(&send);
   blob_zero(&recv);
   blob_zero(&xfer.err);
-
-
-  while( go ){
-    go = 0;
-    nReq = nMsg = 0;
+  blob_zero(&xfer.line);
+
+
+  while( go ){
 
     /* Generate a request to be sent to the server.
     ** Always begin with a clone, pull, or push message
     */
@@ -638,30 +642,34 @@
       pullFlag = 0;
       nMsg++;
     }else if( pullFlag ){
       blob_appendf(&send, "pull %s %s\n", zSCode, zPCode);
-      send_leaves(&xfer);
+      nMsg++;
       request_phantoms(&xfer);
-      nMsg++;
+      send_leaves(&xfer);
     }
     if( pushFlag ){
       blob_appendf(&send, "push %s %s\n", zSCode, zPCode);
       nMsg++;
     }
 
     /* Exchange messages with the server */
-    nFileSend = xfer.nFile + xfer.nDelta + xfer.nDanglingFile;
-    printf("Send:      %3d files, %3d requests, %3d other msgs, %8d bytes\n",
-            nFileSend, nReq, nMsg, blob_size(&send));
-    xfer.nFile = 0;
-    xfer.nDelta = 0;
-    xfer.nDanglingFile = 0;
-    nReq = nMsg = 0;
+    nFileSend = xfer.nFileSent + xfer.nDeltaSent;
+    printf("Send:      %10d bytes, %3d messages, %3d files (%d+%d)\n",
+            blob_size(&send), nMsg+xfer.nGimmeSent+xfer.nIGotSent,
+            nFileSend, xfer.nFileSent, xfer.nDeltaSent);
+    nMsg = 0;
+    xfer.nFileSent = 0;
+    xfer.nDeltaSent = 0;
+    xfer.nGimmeSent = 0;
     http_exchange(&send, &recv);
     blob_reset(&send);
 
     /* Process the reply that came back from the server */
     while( blob_line(&recv, &xfer.line) ){
+      if( blob_buffer(&xfer.line)[0]=='#' ){
+        continue;
+      }
       xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken));
 
       /*   file UUID SIZE \n CONTENT
       **   file UUID DELTASRC SIZE \n CONTENT
@@ -679,8 +687,9 @@
       if( blob_eq(&xfer.aToken[0], "gimme")
        && xfer.nToken==2
        && blob_is_uuid(&xfer.aToken[1])
       ){
+        nMsg++;
         if( pushFlag ){
           int rid = rid_from_uuid(&xfer.aToken[1], 0);
           send_file(&xfer, rid, &xfer.aToken[1], 0);
         }
@@ -693,10 +702,14 @@
       if( xfer.nToken==2
        && blob_eq(&xfer.aToken[0], "igot")
        && blob_is_uuid(&xfer.aToken[1])
       ){
+        nMsg++;
         if( pullFlag ){
-          rid_from_uuid(&xfer.aToken[1], 1);
+          if( !db_exists("SELECT 1 FROM blob WHERE uuid='%b' AND size>=0",
+                &xfer.aToken[1]) ){
+            content_put(0, blob_str(&xfer.aToken[1]), 0);
+          }
         }
       }else
 
 
@@ -707,8 +720,9 @@
       if( xfer.nToken==2
        && blob_eq(&xfer.aToken[0], "leaf")
        && blob_is_uuid(&xfer.aToken[1])
       ){
+        nMsg++;
         if( pushFlag ){
           int rid = rid_from_uuid(&xfer.aToken[1], 0);
           leaf_response(&xfer, rid);
         }
@@ -755,25 +769,38 @@
       if( blob_size(&xfer.err) ){
         fossil_fatal("%b", &xfer.err);
       }
       blobarray_reset(xfer.aToken, xfer.nToken);
+      blob_reset(&xfer.line);
     }
-    printf("Received:  %3d files, %3d requests, %3d other msgs, %8d bytes\n",
-            xfer.nFile + xfer.nDelta + xfer.nDanglingFile,
-            nReq, nMsg, blob_size(&recv));
+    printf("Received:  %10d bytes, %3d messages, %3d files (%d+%d+%d)\n",
+            blob_size(&recv), nMsg,
+            xfer.nFileRcvd + xfer.nDeltaRcvd + xfer.nDanglingFile,
+            xfer.nFileRcvd, xfer.nDeltaRcvd, xfer.nDanglingFile);
+
     blob_reset(&recv);
-    if( nFileSend + xfer.nFile + xfer.nDelta + xfer.nDanglingFile==0 ){
-      nNoFileCycle++;
-      if( nNoFileCycle>1 ){
-        go = 0;
-      }
-    }else{
-      nNoFileCycle = 0;
+    nMsg = 0;
+    xfer.nFileRcvd = 0;
+    xfer.nDeltaRcvd = 0;
+    xfer.nDanglingFile = 0;
+    nCycle++;
+    go = 0;
+
+    /* If we have received one or more files on this cycle and
+    ** we have one or more phantoms, then go for another round
+    */
+    if(xfer.nFileRcvd+xfer.nDeltaRcvd+xfer.nDanglingFile>0
+     && db_exists("SELECT 1 FROM phantom")
+    ){
+      go = 1;
     }
-    nReq = nMsg = 0;
-    xfer.nFile = 0;
-    xfer.nDelta = 0;
-    xfer.nDanglingFile = 0;
+
+    /* If we have one or more files queued to send, then go
+    ** another round
+    */
+    if( xfer.nFileSent+xfer.nDeltaSent>0 ){
+      go = 1;
+    }
   };
   http_close();
   db_end_transaction(0);
 }