Overview
SHA1 Hash: | edbb332d548cc19d0f884d4c9277cbc81d3feb32 |
---|---|
Date: | 2007-08-10 02:59:52 |
User: | drh |
Comment: | The xfer mechanism has been completely reworked to better support delta compression and to require fewer round-trips. The wire protocol is roughly the same but is different enough that you will need to recompile before sync will work. |
Timelines: | ancestors | descendants | both | trunk |
Other Links: | files | ZIP archive | manifest |
Tags And Properties
- branch=trunk inherited from [a28c83647d]
- sym-trunk inherited from [a28c83647d]
Changes
[hide diffs]Modified src/verify.c from [2055907abb] to [a3038ce20f].
@@ -43,19 +43,20 @@ blob_zero(&uuid); db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d", rid); if( blob_size(&uuid)!=UUID_SIZE ){ fossil_panic("not a valid rid: %d", rid); } - content_get(rid, &content); - sha1sum_blob(&content, &hash); - blob_reset(&content); - if( blob_compare(&uuid, &hash) ){ - fossil_fatal("hash of rid %d (%b) does not match its uuid (%b)", - rid, &hash, &uuid); + if( content_get(rid, &content) ){ + sha1sum_blob(&content, &hash); + blob_reset(&content); + if( blob_compare(&uuid, &hash) ){ + fossil_fatal("hash of rid %d (%b) does not match its uuid (%b)", + rid, &hash, &uuid); + } + blob_reset(&hash); } blob_reset(&uuid); - blob_reset(&hash); } /* ** */
Modified src/xfer.c from [1398d9818f] to [e782ae3d33].
@@ -36,13 +36,16 @@ Blob *pOut; /* Compose our reply here */ Blob line; /* The current line of input */ Blob aToken[5]; /* Tokenized version of line */ Blob err; /* Error message text */ int nToken; /* Number of tokens in line */ - int nIGot; /* Number of "igot" messages sent */ - int nFile; /* Number of files sent or received */ - int nDelta; /* Number of deltas sent or received */ + int nIGotSent; /* Number of "igot" messages sent */ + int nGimmeSent; /* Number of gimme messages sent */ + int nFileSent; /* Number of files sent */ + int nDeltaSent; /* Number of deltas sent */ + int nFileRcvd; /* Number of files received */ + int nDeltaRcvd; /* Number of deltas received */ int nDanglingFile; /* Number of dangling deltas received */ int mxSend; /* Stop sending "file" with pOut reaches this size */ }; @@ -100,20 +103,21 @@ blob_extract(pXfer->pIn, n, &content); if( pXfer->nToken==4 ){ Blob src; int srcid = rid_from_uuid(&pXfer->aToken[2], 1); if( content_get(srcid, &src)==0 ){ - content_put(&content, blob_str(&hash), srcid); + content_put(&content, blob_str(&pXfer->aToken[1]), srcid); blob_appendf(pXfer->pOut, "gimme %b\n", &pXfer->aToken[2]); + pXfer->nGimmeSent++; pXfer->nDanglingFile++; return; } - pXfer->nDelta++; + pXfer->nDeltaRcvd++; blob_delta_apply(&src, &content, &content); blob_reset(&src); }else{ - pXfer->nFile++; + pXfer->nFileRcvd++; } sha1sum_blob(&content, &hash); if( !blob_eq_str(&pXfer->aToken[1], blob_str(&hash), -1) ){ blob_appendf(&pXfer->err, "content does not match sha1 hash"); } @@ -139,51 +143,35 @@ Blob *pContent, /* The content of the file to send */ Blob *pUuid, /* The UUID of the file to send */ int srcId /* Send as a delta against this record */ ){ static const char *azQuery[] = { - "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid" - " WHERE delta.rid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)", - - "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid" - " WHERE srcid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)", - - "SELECT pid FROM plink JOIN pending ON rid=pid" + "SELECT pid FROM plink" " WHERE cid=%d" " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)", - "SELECT cid FROM plink JOIN pending ON rid=cid" - " WHERE pid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)", - - "SELECT pid FROM mlink JOIN pending ON rid=pid" + "SELECT pid FROM mlink" " WHERE fid=%d" " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)", - - "SELECT fid FROM mlink JOIN pending ON rid=fid" - " WHERE pid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)", }; int i; Blob src, delta; int size = 0; for(i=0; srcId==0 && i<count(azQuery); i++){ srcId = db_int(0, azQuery[i], rid); } - if( srcId && content_get(srcId, &src) ){ + if( srcId>0 && content_get(srcId, &src) ){ char *zUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", srcId); blob_delta_create(&src, pContent, &delta); size = blob_size(&delta); if( size>=blob_size(pContent)-50 ){ size = 0; }else{ blob_appendf(pXfer->pOut, "file %b %s %d\n", pUuid, zUuid, size); blob_append(pXfer->pOut, blob_buffer(&delta), size); - blob_appendf(pXfer->pOut, "\n", 1); + /* blob_appendf(pXfer->pOut, "\n", 1); */ } blob_reset(&delta); free(zUuid); blob_reset(&src); } @@ -190,10 +178,18 @@ return size; } /* ** Send the file identified by rid. +** +** The pUuid can be NULL in which case the correct UUID is computed +** from the rid. +** +** If srcId is positive, then a delta is sent against that srcId. +** If srcId is zero, then an attempt is made to find an appropriate +** file to delta against. If srcId is negative, the file is sent +** without deltaing. */ static void send_file(Xfer *pXfer, int rid, Blob *pUuid, int srcId){ Blob content, uuid; int size = 0; @@ -208,11 +204,11 @@ } pUuid = &uuid; } if( pXfer->mxSend<=blob_size(pXfer->pOut) ){ blob_appendf(pXfer->pOut, "igot %b\n", pUuid); - pXfer->nIGot++; + pXfer->nIGotSent++; blob_reset(&uuid); return; } content_get(rid, &content); @@ -221,13 +217,13 @@ } if( size==0 ){ int size = blob_size(&content); blob_appendf(pXfer->pOut, "file %b %d\n", pUuid, size); blob_append(pXfer->pOut, blob_buffer(&content), size); - pXfer->nFile++; + pXfer->nFileSent++; }else{ - pXfer->nDelta++; + pXfer->nDeltaSent++; } db_multi_exec("INSERT INTO sent VALUES(%d)", rid); blob_reset(&uuid); } @@ -293,10 +289,11 @@ Stmt q; db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)"); while( db_step(&q)==SQLITE_ROW ){ const char *zUuid = db_column_text(&q, 0); blob_appendf(pXfer->pOut, "gimme %s\n", zUuid); + pXfer->nGimmeSent++; } db_finalize(&q); } @@ -367,11 +364,10 @@ ** This is the transfer handler on the server side. The transfer ** message has been uncompressed and placed in the g.cgiIn blob. ** Process this message and form an appropriate reply. */ void page_xfer(void){ - int nToken; int isPull = 0; int isPush = 0; int nErr = 0; Xfer xfer; @@ -379,10 +375,11 @@ blobarray_zero(xfer.aToken, count(xfer.aToken)); cgi_set_content_type(g.zContentType); blob_zero(&xfer.err); xfer.pIn = &g.cgiIn; xfer.pOut = cgi_output_blob(); + xfer.mxSend = db_get_int("max-download", 1000000); db_begin_transaction(); db_multi_exec( "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);" ); @@ -393,11 +390,11 @@ ** file UUID DELTASRC SIZE \n CONTENT ** ** Accept a file from the client. */ if( blob_eq(&xfer.aToken[0], "file") ){ - if( !isPush ){ + if( !g.okWrite ){ cgi_reset_content(); @ error not\sauthorized\sto\swrite nErr++; break; } @@ -416,11 +413,11 @@ */ if( blob_eq(&xfer.aToken[0], "gimme") && xfer.nToken==2 && blob_is_uuid(&xfer.aToken[1]) ){ - if( isPull ){ + if( g.okRead ){ int rid = rid_from_uuid(&xfer.aToken[1], 0); if( rid ){ send_file(&xfer, rid, &xfer.aToken[1], 0); } } @@ -432,11 +429,11 @@ */ if( xfer.nToken==2 && blob_eq(&xfer.aToken[0], "igot") && blob_is_uuid(&xfer.aToken[1]) ){ - if( isPush ){ + if( g.okWrite ){ rid_from_uuid(&xfer.aToken[1], 1); } }else @@ -446,11 +443,11 @@ */ if( xfer.nToken==2 && blob_eq(&xfer.aToken[0], "leaf") && blob_is_uuid(&xfer.aToken[1]) ){ - if( isPull ){ + if( g.okRead ){ int rid = rid_from_uuid(&xfer.aToken[1], 0); leaf_response(&xfer, rid); } }else @@ -457,11 +454,11 @@ /* pull SERVERCODE PROJECTCODE ** push SERVERCODE PROJECTCODE ** ** The client wants either send or receive */ - if( nToken==3 + if( xfer.nToken==3 && (blob_eq(&xfer.aToken[0], "pull") || blob_eq(&xfer.aToken[0], "push")) && blob_is_uuid(&xfer.aToken[1]) && blob_is_uuid(&xfer.aToken[2]) ){ const char *zSCode; @@ -511,28 +508,36 @@ /* clone ** ** The client knows nothing. Tell all. */ if( blob_eq(&xfer.aToken[0], "clone") ){ + int rootid; login_check_credentials(); if( !g.okRead || !g.okHistory ){ cgi_reset_content(); @ error not\sauthorized\sto\sclone nErr++; break; } isPull = 1; @ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x")) - send_leaves(&xfer); + rootid = db_int(0, + "SELECT pid FROM plink AS a" + " WHERE NOT EXISTS(SELECT 1 FROM plink WHERE cid=a.pid)" + ); + if( rootid ){ + send_file(&xfer, rootid, 0, -1); + leaf_response(&xfer, rootid); + } }else /* login USER NONCE SIGNATURE ** ** Check for a valid login. This has to happen before anything else. */ if( blob_eq(&xfer.aToken[0], "login") - && nToken==4 + && xfer.nToken==4 ){ if( disableLogin ){ g.okRead = g.okWrite = 1; }else{ check_login(&xfer.aToken[1], &xfer.aToken[2], &xfer.aToken[3]); @@ -597,21 +602,21 @@ */ void client_sync(int pushFlag, int pullFlag, int cloneFlag){ int go = 1; /* Loop until zero */ const char *zSCode = db_get("server-code", "x"); const char *zPCode = db_get("project-code", 0); - int nMsg = 0; - int nReq = 0; - int nFileSend = 0; - int nNoFileCycle = 0; + int nMsg = 0; /* Number of messages sent or received */ + int nCycle = 0; /* Number of round trips to the server */ + int nFileSend = 0; Blob send; /* Text we are sending to the server */ Blob recv; /* Reply we got back from the server */ Xfer xfer; /* Transfer data */ memset(&xfer, 0, sizeof(xfer)); xfer.pIn = &recv; xfer.pOut = &send; + xfer.mxSend = db_get_int("max-upload", 250000); assert( pushFlag || pullFlag || cloneFlag ); assert( !g.urlIsFile ); /* This only works for networking */ db_begin_transaction(); @@ -620,15 +625,14 @@ ); blobarray_zero(xfer.aToken, count(xfer.aToken)); blob_zero(&send); blob_zero(&recv); blob_zero(&xfer.err); - - - while( go ){ - go = 0; - nReq = nMsg = 0; + blob_zero(&xfer.line); + + + while( go ){ /* Generate a request to be sent to the server. ** Always begin with a clone, pull, or push message */ @@ -637,32 +641,36 @@ pushFlag = 0; pullFlag = 0; nMsg++; }else if( pullFlag ){ blob_appendf(&send, "pull %s %s\n", zSCode, zPCode); - send_leaves(&xfer); + nMsg++; request_phantoms(&xfer); - nMsg++; + send_leaves(&xfer); } if( pushFlag ){ blob_appendf(&send, "push %s %s\n", zSCode, zPCode); nMsg++; } /* Exchange messages with the server */ - nFileSend = xfer.nFile + xfer.nDelta + xfer.nDanglingFile; - printf("Send: %3d files, %3d requests, %3d other msgs, %8d bytes\n", - nFileSend, nReq, nMsg, blob_size(&send)); - xfer.nFile = 0; - xfer.nDelta = 0; - xfer.nDanglingFile = 0; - nReq = nMsg = 0; + nFileSend = xfer.nFileSent + xfer.nDeltaSent; + printf("Send: %10d bytes, %3d messages, %3d files (%d+%d)\n", + blob_size(&send), nMsg+xfer.nGimmeSent+xfer.nIGotSent, + nFileSend, xfer.nFileSent, xfer.nDeltaSent); + nMsg = 0; + xfer.nFileSent = 0; + xfer.nDeltaSent = 0; + xfer.nGimmeSent = 0; http_exchange(&send, &recv); blob_reset(&send); /* Process the reply that came back from the server */ while( blob_line(&recv, &xfer.line) ){ + if( blob_buffer(&xfer.line)[0]=='#' ){ + continue; + } xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken)); /* file UUID SIZE \n CONTENT ** file UUID DELTASRC SIZE \n CONTENT ** @@ -678,10 +686,11 @@ */ if( blob_eq(&xfer.aToken[0], "gimme") && xfer.nToken==2 && blob_is_uuid(&xfer.aToken[1]) ){ + nMsg++; if( pushFlag ){ int rid = rid_from_uuid(&xfer.aToken[1], 0); send_file(&xfer, rid, &xfer.aToken[1], 0); } }else @@ -692,12 +701,16 @@ */ if( xfer.nToken==2 && blob_eq(&xfer.aToken[0], "igot") && blob_is_uuid(&xfer.aToken[1]) ){ + nMsg++; if( pullFlag ){ - rid_from_uuid(&xfer.aToken[1], 1); + if( !db_exists("SELECT 1 FROM blob WHERE uuid='%b' AND size>=0", + &xfer.aToken[1]) ){ + content_put(0, blob_str(&xfer.aToken[1]), 0); + } } }else /* leaf UUID @@ -706,10 +719,11 @@ */ if( xfer.nToken==2 && blob_eq(&xfer.aToken[0], "leaf") && blob_is_uuid(&xfer.aToken[1]) ){ + nMsg++; if( pushFlag ){ int rid = rid_from_uuid(&xfer.aToken[1], 0); leaf_response(&xfer, rid); } }else @@ -754,26 +768,39 @@ if( blob_size(&xfer.err) ){ fossil_fatal("%b", &xfer.err); } blobarray_reset(xfer.aToken, xfer.nToken); + blob_reset(&xfer.line); } - printf("Received: %3d files, %3d requests, %3d other msgs, %8d bytes\n", - xfer.nFile + xfer.nDelta + xfer.nDanglingFile, - nReq, nMsg, blob_size(&recv)); + printf("Received: %10d bytes, %3d messages, %3d files (%d+%d+%d)\n", + blob_size(&recv), nMsg, + xfer.nFileRcvd + xfer.nDeltaRcvd + xfer.nDanglingFile, + xfer.nFileRcvd, xfer.nDeltaRcvd, xfer.nDanglingFile); + blob_reset(&recv); - if( nFileSend + xfer.nFile + xfer.nDelta + xfer.nDanglingFile==0 ){ - nNoFileCycle++; - if( nNoFileCycle>1 ){ - go = 0; - } - }else{ - nNoFileCycle = 0; + nMsg = 0; + xfer.nFileRcvd = 0; + xfer.nDeltaRcvd = 0; + xfer.nDanglingFile = 0; + nCycle++; + go = 0; + + /* If we have received one or more files on this cycle and + ** we have one or more phantoms, then go for another round + */ + if(xfer.nFileRcvd+xfer.nDeltaRcvd+xfer.nDanglingFile>0 + && db_exists("SELECT 1 FROM phantom") + ){ + go = 1; } - nReq = nMsg = 0; - xfer.nFile = 0; - xfer.nDelta = 0; - xfer.nDanglingFile = 0; + + /* If we have one or more files queued to send, then go + ** another round + */ + if( xfer.nFileSent+xfer.nDeltaSent>0 ){ + go = 1; + } }; http_close(); db_end_transaction(0); }