Overview
SHA1 Hash: | 573a464cb7be4d45ee10e206ec6be775f47b2323 |
---|---|
Date: | 2007-08-10 00:08:25 |
User: | drh |
Comment: | Complete rework of the xfer mechanism. Compiles but not yet working. |
Timelines: | ancestors | descendants | both | trunk |
Other Links: | files | ZIP archive | manifest |
Tags And Properties
- branch=trunk inherited from [a28c83647d]
- sym-trunk inherited from [a28c83647d]
Changes
[hide diffs]Modified src/checkin.c from [bad8f191d4] to [2d5771221e].
@@ -350,11 +350,11 @@ zFullname = db_column_text(&q, 1); rid = db_column_int(&q, 2); blob_zero(&content); blob_read_from_file(&content, zFullname); - nrid = content_put(&content, 0); + nrid = content_put(&content, 0, 0); if( rid>0 ){ content_deltify(rid, nrid, 0); } db_multi_exec("UPDATE vfile SET mrid=%d, rid=%d WHERE id=%d", nrid,nrid,id); } @@ -408,11 +408,11 @@ } blob_write_to_file(&manifest, zManifestFile); blob_reset(&manifest); blob_read_from_file(&manifest, zManifestFile); free(zManifestFile); - nvid = content_put(&manifest, 0); + nvid = content_put(&manifest, 0, 0); if( nvid==0 ){ fossil_panic("trouble committing manifest: %s", g.zErrMsg); } manifest_crosslink(nvid, &manifest); content_deltify(vid, nvid, 0);
Modified src/content.c from [9b11b55b6f] to [896054a3e4].
@@ -29,73 +29,54 @@ /* ** Return the srcid associated with rid. Or return 0 if rid is ** original content and not a delta. */ -static int findSrcid(int rid, const char *zDb){ - Stmt qsrc; - int srcid; - if( zDb ){ - db_prepare(&qsrc, "SELECT srcid FROM %s.delta WHERE rid=%d", zDb, rid); - }else{ - db_prepare(&qsrc, "SELECT srcid FROM delta WHERE rid=%d", rid); - } - if( db_step(&qsrc)==SQLITE_ROW ){ - srcid = db_column_int(&qsrc, 0); - }else{ - srcid = 0; - } - db_finalize(&qsrc); +static int findSrcid(int rid){ + int srcid = db_int(0, "SELECT srcid FROM delta WHERE rid=%d", rid); return srcid; } /* ** Extract the content for ID rid and put it into the -** uninitialized blob. +** uninitialized blob. Return 1 on success. If the record +** is a phantom, zero pBlob and return 0. */ -void content_get_from_db(int rid, Blob *pBlob, const char *zDb){ +int content_get(int rid, Blob *pBlob){ Stmt q; + Blob src; int srcid; + int rc = 0; + assert( g.repositoryOpen ); - srcid = findSrcid(rid, zDb); - if( zDb ){ - db_prepare(&q, - "SELECT content FROM %s.blob WHERE rid=%d AND size>=0", - zDb, rid - ); - }else{ - db_prepare(&q, - "SELECT content FROM blob WHERE rid=%d AND size>=0", - rid - ); - } + srcid = findSrcid(rid); + blob_zero(pBlob); if( srcid ){ - Blob src; - content_get_from_db(srcid, &src, zDb); - if( db_step(&q)==SQLITE_ROW ){ - Blob delta; - db_ephemeral_blob(&q, 0, &delta); - blob_uncompress(&delta, &delta); - blob_init(pBlob,0,0); - blob_delta_apply(&src, &delta, pBlob); - blob_reset(&delta); - }else{ - blob_init(pBlob, 0, 0); + if( content_get(srcid, &src) ){ + db_prepare(&q, "SELECT content FROM blob WHERE rid=%d AND size>=0", rid); + if( db_step(&q)==SQLITE_ROW ){ + Blob delta; + db_ephemeral_blob(&q, 0, &delta); + blob_uncompress(&delta, &delta); + blob_init(pBlob,0,0); + blob_delta_apply(&src, &delta, pBlob); + blob_reset(&delta); + rc = 1; + } + db_finalize(&q); + blob_reset(&src); } - blob_reset(&src); }else{ + db_prepare(&q, "SELECT content FROM blob WHERE rid=%d AND size>=0", rid); if( db_step(&q)==SQLITE_ROW ){ db_ephemeral_blob(&q, 0, pBlob); blob_uncompress(pBlob, pBlob); - }else{ - blob_init(pBlob,0, 0); + rc = 1; } - } - db_finalize(&q); -} -void content_get(int rid, Blob *pBlob){ - content_get_from_db(rid, pBlob, 0); + db_finalize(&q); + } + return rc; } /* ** COMMAND: test-content-get ** @@ -132,31 +113,60 @@ blob_uncompress(&content, &content); blob_write_to_file(&content, zFile); } /* +** When a record is converted from a phantom to a real record, +** if that record has other records that are derived by delta, +** then call manifest_crosslink() on those other records. +*/ +void after_dephantomize(int rid, int linkFlag){ + Stmt q; + db_prepare(&q, "SELECT rid FROM delta WHERE srcid=%d", rid); + while( db_step(&q)==SQLITE_ROW ){ + int tid = db_column_int(&q, 0); + after_dephantomize(tid, 1); + } + db_finalize(&q); + if( linkFlag ){ + Blob content; + content_get(rid, &content); + manifest_crosslink(rid, &content); + blob_reset(&content); + } +} + +/* ** Write content into the database. Return the record ID. If the ** content is already in the database, just return the record ID. ** -** A phantom is written if pBlob==0. If pBlob==0 then the UUID is set -** to zUuid. Otherwise zUuid is ignored. +** If srcId is specified, then pBlob is delta content from +** the srcId record. srcId might be a phantom. +** +** A phantom is written if pBlob==0. If pBlob==0 or if srcId is +** specified then the UUID is set to zUuid. Otherwise zUuid is +** ignored. In the future this might change such that the content +** hash is checked against zUuid to make sure it is correct. ** ** If the record already exists but is a phantom, the pBlob content ** is inserted and the phatom becomes a real record. */ -int content_put(Blob *pBlob, const char *zUuid){ +int content_put(Blob *pBlob, const char *zUuid, int srcId){ int size; int rid; Stmt s1; Blob cmpr; Blob hash; assert( g.repositoryOpen ); - if( pBlob==0 ){ + if( pBlob && srcId==0 ){ + sha1sum_blob(pBlob, &hash); + }else{ blob_init(&hash, zUuid, -1); + } + if( pBlob==0 ){ size = -1; }else{ - sha1sum_blob(pBlob, &hash); size = blob_size(pBlob); } db_begin_transaction(); /* Check to see if the entry already exists and if it does whether @@ -197,16 +207,19 @@ ); blob_compress(pBlob, &cmpr); db_bind_blob(&s1, ":data", &cmpr); db_exec(&s1); db_multi_exec("DELETE FROM phantom WHERE rid=%d", rid); + if( srcId==0 || db_int(0, "SELECT size FROM blob WHERE rid=%d", srcId)>0 ){ + after_dephantomize(rid, 0); + } }else{ /* We are creating a new entry */ db_prepare(&s1, "INSERT INTO blob(rcvid,size,uuid,content)" - "VALUES(%d,%d,'%s',:data)", - g.rcvid, size, blob_str(&hash) + "VALUES(%d,%d,'%b',:data)", + g.rcvid, size, &hash ); if( pBlob ){ blob_compress(pBlob, &cmpr); db_bind_blob(&s1, ":data", &cmpr); } @@ -215,10 +228,16 @@ if( !pBlob ){ db_multi_exec("INSERT OR IGNORE INTO phantom VALUES(%d)", rid); } } + /* If the srcId is specified, then the data we just added is + ** really a delta. Record this fact in the delta table. + */ + if( srcId ){ + db_multi_exec("REPLACE INTO delta(rid,srcid) VALUES(%d,%d)", rid, srcId); + } /* Finish the transaction and cleanup */ db_finalize(&s1); db_end_transaction(0); blob_reset(&hash); @@ -242,30 +261,32 @@ Blob content; if( g.argc!=3 ) usage("FILENAME"); db_must_be_within_tree(); user_select(); blob_read_from_file(&content, g.argv[2]); - rid = content_put(&content, 0); + rid = content_put(&content, 0, 0); printf("inserted as record %d\n", rid); } /* ** Make sure the content at rid is the original content and is not a ** delta. */ void content_undelta(int rid){ - if( findSrcid(rid, 0)>0 ){ + if( findSrcid(rid)>0 ){ Blob x; - Stmt s; - content_get(rid, &x); - db_prepare(&s, "UPDATE blob SET content=:c WHERE rid=%d", rid); - blob_compress(&x, &x); - db_bind_blob(&s, ":c", &x); - db_exec(&s); - db_finalize(&s); - blob_reset(&x); - db_multi_exec("DELETE FROM delta WHERE rid=%d", rid); + if( content_get(rid, &x) ){ + Stmt s; + db_prepare(&s, "UPDATE blob SET content=:c, size=%d WHERE rid=%d", + blob_size(&x), rid); + blob_compress(&x, &x); + db_bind_blob(&s, ":c", &x); + db_exec(&s); + db_finalize(&s); + blob_reset(&x); + db_multi_exec("DELETE FROM delta WHERE rid=%d", rid); + } } } /* ** COMMAND: test-content-undelta @@ -292,13 +313,13 @@ void content_deltify(int rid, int srcid, int force){ int s; Blob data, src, delta; Stmt s1, s2; if( srcid==rid ) return; - if( !force && findSrcid(rid, 0)>0 ) return; + if( !force && findSrcid(rid)>0 ) return; s = srcid; - while( (s = findSrcid(s, 0))>0 ){ + while( (s = findSrcid(s))>0 ){ if( s==rid ){ content_undelta(srcid); break; } }
Modified src/db.c from [98de757440] to [fafbfe094b].
@@ -699,11 +699,11 @@ blob_appendf(&manifest, "R %s\n", md5sum_finish(0)); blob_appendf(&manifest, "U %F\n", g.zLogin); md5sum_blob(&manifest, &hash); blob_appendf(&manifest, "Z %b\n", &hash); blob_reset(&hash); - content_put(&manifest, 0); + content_put(&manifest, 0, 0); db_end_transaction(0); printf("project-id: %s\n", db_get("project-code", 0)); printf("server-id: %s\n", db_get("server-code", 0)); printf("admin-user: %s (no password set yet!)\n", g.zLogin); printf("baseline: %s\n", db_text(0, "SELECT uuid FROM blob")); @@ -817,41 +817,10 @@ return db_int(dflt, "SELECT value FROM vvar WHERE name=%Q", zName); } void db_lset_int(const char *zName, int value){ db_multi_exec("REPLACE INTO vvar(name,value) VALUES(%Q,%d)", zName, value); } - -int db_row_to_table(const char *zFormat, ...){ - Stmt q; - va_list ap; - int rc; - - va_start(ap, zFormat); - rc = db_vprepare(&q, zFormat, ap); - va_end(ap); - if( rc!=SQLITE_OK ){ - return rc; - } - - @ <table border="0" cellpadding="0" cellspacing="0"> - if( db_step(&q)==SQLITE_ROW ){ - int ii; - for(ii=0; ii<sqlite3_column_count(q.pStmt); ii++){ - char *zCol = htmlize(sqlite3_column_name(q.pStmt, ii), -1); - char *zVal = htmlize(sqlite3_column_text(q.pStmt, ii), -1); - - @ <tr><td align=right>%s(zCol):<td width=10><td>%s(zVal) - - free(zVal); - free(zCol); - } - } - @ </table> - - return db_finalize(&q); -} - /* ** COMMAND: open ** ** Create a new local repository.
Modified src/manifest.c from [29068b9f31] to [02dabff247].
@@ -289,11 +289,11 @@ manifest_clear(&other); } /* ** Scan record rid/pContent to see if it is a manifest. If -** it is a manifest, then populate tables the mlink, plink, +** it is a manifest, then populate the mlink, plink, ** filename, and event tables with cross-reference information. */ int manifest_crosslink(int rid, Blob *pContent){ int i; Manifest m;
Modified src/vfile.c from [2bf3326797] to [fdfe66efd6].
@@ -49,11 +49,11 @@ } strcpy(z, zUuid); canonical16(z, sz); rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%Q", z); if( rid==0 && phantomize ){ - rid = content_put(0, zUuid); + rid = content_put(0, zUuid, 0); } return rid; } /*
Modified src/xfer.c from [0239f4e4fe] to [1398d9818f].
@@ -25,109 +25,43 @@ */ #include "config.h" #include "xfer.h" /* -** Try to locate a record that is similar to rid and is a likely -** candidate for delta against rid. The similar record must be -** referenced in the onremote table. -** -** Return the integer record ID of the similar record. Or return -** 0 if none is found. +** This structure holds information about the current state of either +** a client or a server that is participating in xfer. +*/ +typedef struct Xfer Xfer; +struct Xfer { + Blob *pIn; /* Input text from the other side */ + Blob *pOut; /* Compose our reply here */ + Blob line; /* The current line of input */ + Blob aToken[5]; /* Tokenized version of line */ + Blob err; /* Error message text */ + int nToken; /* Number of tokens in line */ + int nIGot; /* Number of "igot" messages sent */ + int nFile; /* Number of files sent or received */ + int nDelta; /* Number of deltas sent or received */ + int nDanglingFile; /* Number of dangling deltas received */ + int mxSend; /* Stop sending "file" with pOut reaches this size */ +}; + + +/* +** The input blob contains a UUID. Convert it into a record ID. +** Create a phantom record if no prior record exists and +** phantomize is true. +** +** Compare to uuid_to_rid(). This routine takes a blob argument +** and does less error checking. */ -static int similar_record(int rid, int traceFlag){ - int inCnt, outCnt; - int i; - Stmt q; - int queue[100]; - static const char *azQuery[] = { - /* Scan the delta table first */ - "SELECT srcid, EXISTS(SELECT 1 FROM onremote WHERE rid=srcid)" - " FROM delta" - " WHERE rid=:x" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)" - " UNION ALL " - "SELECT rid, EXISTS(SELECT 1 FROM onremote WHERE rid=delta.rid)" - " FROM delta" - " WHERE srcid=:x" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)", - - /* Then the plink table */ - "SELECT pid, EXISTS(SELECT 1 FROM onremote WHERE rid=pid)" - " FROM plink" - " WHERE cid=:x" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)" - " UNION ALL " - "SELECT cid, EXISTS(SELECT 1 FROM onremote WHERE rid=cid)" - " FROM plink" - " WHERE pid=:x" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)", - - /* Finally the mlink table */ - "SELECT pid, EXISTS(SELECT 1 FROM onremote WHERE rid=pid)" - " FROM mlink" - " WHERE fid=:x AND pid>0" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)" - " UNION ALL " - "SELECT fid, EXISTS(SELECT 1 FROM onremote WHERE rid=fid)" - " FROM mlink" - " WHERE pid=:x AND fid>0" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)", - }; - - for(i=0; i<sizeof(azQuery)/sizeof(azQuery[0]); i++){ - db_prepare(&q, azQuery[i]); - queue[0] = rid; - inCnt = 1; - outCnt = 0; - if( traceFlag ) printf("PASS %d\n", i+1); - while( outCnt<inCnt ){ - int xid = queue[outCnt%64]; - outCnt++; - db_bind_int(&q, ":x", xid); - if( traceFlag ) printf("xid=%d\n", xid); - while( db_step(&q)==SQLITE_ROW ){ - int nid = db_column_int(&q, 0); - int hit = db_column_int(&q, 1); - if( traceFlag ) printf("nid=%d hit=%d\n", nid, hit); - if( hit ){ - db_finalize(&q); - return nid; - } - if( inCnt<sizeof(queue)/sizeof(queue[0]) ){ - int i; - for(i=0; i<inCnt && queue[i]!=nid; i++){} - if( i>=inCnt ){ - queue[inCnt++] = nid; - } - } - } - db_reset(&q); - } - db_finalize(&q); - } - return 0; -} - -/* -** COMMAND: test-similar-record -*/ -void test_similar_record(void){ - int i; - if( g.argc<4 ){ - usage("SRC ONREMOTE..."); - } - db_must_be_within_tree(); - db_multi_exec( - "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);" - ); - for(i=3; i<g.argc; i++){ - int rid = name_to_rid(g.argv[i]); - printf("%s -> %d\n", g.argv[i], rid); - db_multi_exec("INSERT INTO onremote VALUES(%d)", rid); - } - printf("similar: %d\n", similar_record(name_to_rid(g.argv[2]), 1)); +static int rid_from_uuid(Blob *pUuid, int phantomize){ + int rid = db_int(0, "SELECT rid FROM blob WHERE uuid='%b'", pUuid); + if( rid==0 && phantomize ){ + rid = content_put(0, blob_str(pUuid), 0); + } + return rid; } /* ** The aToken[0..nToken-1] blob array is a parse of a "file" line @@ -144,162 +78,227 @@ ** content of DELTASRC. ** ** If any error occurs, write a message into pErr which has already ** be initialized to an empty string. */ -static void xfer_accept_file(Blob *pIn, Blob *aToken, int nToken, Blob *pErr){ +static void xfer_accept_file(Xfer *pXfer){ int n; int rid; Blob content, hash; - if( nToken<3 || nToken>4 || !blob_is_uuid(&aToken[1]) - || !blob_is_int(&aToken[nToken-1], &n) || n<=0 - || (nToken==4 && !blob_is_uuid(&aToken[2])) ){ - blob_appendf(pErr, "malformed file line"); + if( pXfer->nToken<3 + || pXfer->nToken>4 + || !blob_is_uuid(&pXfer->aToken[1]) + || !blob_is_int(&pXfer->aToken[pXfer->nToken-1], &n) + || n<=0 + || (pXfer->nToken==4 && !blob_is_uuid(&pXfer->aToken[2])) + ){ + blob_appendf(&pXfer->err, "malformed file line"); return; } blob_zero(&content); blob_zero(&hash); - blob_extract(pIn, n, &content); - if( nToken==4 ){ + blob_extract(pXfer->pIn, n, &content); + if( pXfer->nToken==4 ){ Blob src; - int srcid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[2]); - if( srcid==0 ){ - blob_appendf(pErr, "unknown delta source: %b", &aToken[2]); + int srcid = rid_from_uuid(&pXfer->aToken[2], 1); + if( content_get(srcid, &src)==0 ){ + content_put(&content, blob_str(&hash), srcid); + blob_appendf(pXfer->pOut, "gimme %b\n", &pXfer->aToken[2]); + pXfer->nDanglingFile++; return; } - content_get(srcid, &src); + pXfer->nDelta++; blob_delta_apply(&src, &content, &content); blob_reset(&src); + }else{ + pXfer->nFile++; } sha1sum_blob(&content, &hash); - if( !blob_eq_str(&aToken[1], blob_str(&hash), -1) ){ - blob_appendf(pErr, "content does not match sha1 hash"); + if( !blob_eq_str(&pXfer->aToken[1], blob_str(&hash), -1) ){ + blob_appendf(&pXfer->err, "content does not match sha1 hash"); } blob_reset(&hash); - rid = content_put(&content, 0); - manifest_crosslink(rid, &content); + rid = content_put(&content, 0, 0); if( rid==0 ){ - blob_appendf(pErr, "%s", g.zErrMsg); + blob_appendf(&pXfer->err, "%s", g.zErrMsg); + }else{ + manifest_crosslink(rid, &content); } } /* -** Send the file identified by rid. -** -** If pOut is not NULL, then append the text of the send message -** to pOut. Otherwise, append the text to the CGI output. +** Try to send a file as a delta. If successful, return the number +** of bytes in the delta. If not, return zero. +** +** If srcId is specified, use it. If not, try to figure out a +** reasonable srcId. */ -static int send_file(int rid, Blob *pOut){ - Blob content, uuid; - int size; - int srcid; - - - blob_zero(&uuid); - db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid); - if( blob_size(&uuid)==0 ){ - return 0; - } - content_get(rid, &content); - - if( blob_size(&content)>100 ){ - srcid = similar_record(rid, 0); - if( srcid ){ - Blob src; - content_get(srcid, &src); - if( blob_size(&src)>100 ){ - Blob delta; - blob_delta_create(&src, &content, &delta); - blob_reset(&content); - content = delta; - blob_append(&uuid, " ", 1); - blob_append(&content, "\n", 1); - db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d", srcid); - } - blob_reset(&src); +static int send_as_delta( + Xfer *pXfer, /* The transfer context */ + int rid, /* record id of the file to send */ + Blob *pContent, /* The content of the file to send */ + Blob *pUuid, /* The UUID of the file to send */ + int srcId /* Send as a delta against this record */ +){ + static const char *azQuery[] = { + "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid" + " WHERE delta.rid=%d" + " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)", + + "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid" + " WHERE srcid=%d" + " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)", + + "SELECT pid FROM plink JOIN pending ON rid=pid" + " WHERE cid=%d" + " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)", + + "SELECT cid FROM plink JOIN pending ON rid=cid" + " WHERE pid=%d" + " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)", + + "SELECT pid FROM mlink JOIN pending ON rid=pid" + " WHERE fid=%d" + " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)", + + "SELECT fid FROM mlink JOIN pending ON rid=fid" + " WHERE pid=%d" + " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)", + }; + int i; + Blob src, delta; + int size = 0; + + for(i=0; srcId==0 && i<count(azQuery); i++){ + srcId = db_int(0, azQuery[i], rid); + } + if( srcId && content_get(srcId, &src) ){ + char *zUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", srcId); + blob_delta_create(&src, pContent, &delta); + size = blob_size(&delta); + if( size>=blob_size(pContent)-50 ){ + size = 0; + }else{ + blob_appendf(pXfer->pOut, "file %b %s %d\n", pUuid, zUuid, size); + blob_append(pXfer->pOut, blob_buffer(&delta), size); + blob_appendf(pXfer->pOut, "\n", 1); } - } - size = blob_size(&content); - if( pOut ){ - blob_appendf(pOut, "file %b %d\n", &uuid, size); - blob_append(pOut, blob_buffer(&content), size); - }else{ - cgi_printf("file %b %d\n", &uuid, size); - cgi_append_content(blob_buffer(&content), size); - } - blob_reset(&content); - blob_reset(&uuid); - db_multi_exec("INSERT OR IGNORE INTO onremote VALUES(%d)", rid); + blob_reset(&delta); + free(zUuid); + blob_reset(&src); + } return size; } +/* +** Send the file identified by rid. +*/ +static void send_file(Xfer *pXfer, int rid, Blob *pUuid, int srcId){ + Blob content, uuid; + int size = 0; + + if( db_exists("SELECT 1 FROM sent WHERE rid=%d", rid) ){ + return; + } + blob_zero(&uuid); + if( pUuid==0 ){ + db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid); + if( blob_size(&uuid)==0 ){ + return; + } + pUuid = &uuid; + } + if( pXfer->mxSend<=blob_size(pXfer->pOut) ){ + blob_appendf(pXfer->pOut, "igot %b\n", pUuid); + pXfer->nIGot++; + blob_reset(&uuid); + return; + } + content_get(rid, &content); + + if( blob_size(&content)>100 ){ + size = send_as_delta(pXfer, rid, &content, pUuid, srcId); + } + if( size==0 ){ + int size = blob_size(&content); + blob_appendf(pXfer->pOut, "file %b %d\n", pUuid, size); + blob_append(pXfer->pOut, blob_buffer(&content), size); + pXfer->nFile++; + }else{ + pXfer->nDelta++; + } + db_multi_exec("INSERT INTO sent VALUES(%d)", rid); + blob_reset(&uuid); +} /* -** Send all pending files. +** This routine runs when either client or server is notified that +** the other side things rid is a leaf manifest. If we hold +** children of rid, then send them over to the other side. */ -static int send_all_pending(Blob *pOut){ - int rid, xid, i; - int nIgot = 0; - int sent = 0; - int nSent = 0; - int maxSize = db_get_int("http-msg-size", 500000); - static const char *azQuery[] = { - "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid" - " WHERE delta.rid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)", - - "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid" - " WHERE srcid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)", - - "SELECT pid FROM plink JOIN pending ON rid=pid" - " WHERE cid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)", - - "SELECT cid FROM plink JOIN pending ON rid=cid" - " WHERE pid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)", - - "SELECT pid FROM mlink JOIN pending ON rid=pid" - " WHERE fid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)", - - "SELECT fid FROM mlink JOIN pending ON rid=fid" - " WHERE pid=%d" - " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)", - }; - - rid = db_int(0, "SELECT rid FROM pending"); - while( rid && nIgot<200 ){ - db_multi_exec("DELETE FROM pending WHERE rid=%d", rid); - if( sent<maxSize ){ - sent += send_file(rid, pOut); - nSent++; - }else{ - char *zUuid = db_text(0, - "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid); - if( zUuid ){ - if( pOut ){ - blob_appendf(pOut, "igot %s\n", zUuid); - }else{ - cgi_printf("igot %s\n", zUuid); - } - free(zUuid); - nIgot++; - } +static void leaf_response(Xfer *pXfer, int rid){ + Stmt q1, q2; + db_prepare(&q1, + "SELECT cid, uuid FROM plink, blob" + " WHERE blob.rid=plink.cid" + " AND plink.pid=%d", + rid + ); + while( db_step(&q1)==SQLITE_ROW ){ + Blob uuid; + int cid; + + cid = db_column_int(&q1, 0); + db_ephemeral_blob(&q1, 1, &uuid); + send_file(pXfer, cid, &uuid, rid); + db_prepare(&q2, + "SELECT pid, uuid, fid FROM mlink, blob" + " WHERE rid=fid AND mid=%d", + cid + ); + while( db_step(&q2)==SQLITE_ROW ){ + int pid, fid; + pid = db_column_int(&q2, 0); + db_ephemeral_blob(&q2, 1, &uuid); + fid = db_column_int(&q2, 2); + send_file(pXfer, fid, &uuid, pid); } - xid = 0; - for(i=0; xid==0 && i<sizeof(azQuery)/sizeof(azQuery[0]); i++){ - xid = db_int(0, azQuery[i], rid); - } - rid = xid; - if( rid==0 ){ - rid = db_int(0, "SELECT rid FROM pending"); + db_finalize(&q2); + if( blob_size(pXfer->pOut)<pXfer->mxSend ){ + leaf_response(pXfer, cid); } } - return nSent; +} + +/* +** Sent a leaf message for every leaf. +*/ +static void send_leaves(Xfer *pXfer){ + Stmt q; + db_prepare(&q, + "SELECT uuid FROM blob WHERE rid IN" + " (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)" + ); + while( db_step(&q)==SQLITE_ROW ){ + const char *zUuid = db_column_text(&q, 0); + blob_appendf(pXfer->pOut, "leaf %s\n", zUuid); + } + db_finalize(&q); +} + +/* +** Sen a gimme message for every phantom. +*/ +static void request_phantoms(Xfer *pXfer){ + Stmt q; + db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)"); + while( db_step(&q)==SQLITE_ROW ){ + const char *zUuid = db_column_text(&q, 0); + blob_appendf(pXfer->pOut, "gimme %s\n", zUuid); + } + db_finalize(&q); } /* ** Check the signature on an application/x-fossil payload received by @@ -372,116 +371,126 @@ void page_xfer(void){ int nToken; int isPull = 0; int isPush = 0; int nErr = 0; - Blob line, errmsg, aToken[5]; + Xfer xfer; + + memset(&xfer, 0, sizeof(xfer)); + blobarray_zero(xfer.aToken, count(xfer.aToken)); + cgi_set_content_type(g.zContentType); + blob_zero(&xfer.err); + xfer.pIn = &g.cgiIn; + xfer.pOut = cgi_output_blob(); db_begin_transaction(); - blobarray_zero(aToken, count(aToken)); - cgi_set_content_type(g.zContentType); - blob_zero(&errmsg); db_multi_exec( - "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);" /* Client has */ - "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);" /* Client needs */ + "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);" ); - while( blob_line(&g.cgiIn, &line) ){ - nToken = blob_tokenize(&line, aToken, count(aToken)); + while( blob_line(xfer.pIn, &xfer.line) ){ + xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken)); /* file UUID SIZE \n CONTENT ** file UUID DELTASRC SIZE \n CONTENT ** ** Accept a file from the client. */ - if( blob_eq(&aToken[0], "file") && nToken>=3 && nToken<=4 ){ + if( blob_eq(&xfer.aToken[0], "file") ){ if( !isPush ){ cgi_reset_content(); @ error not\sauthorized\sto\swrite nErr++; break; } - xfer_accept_file(&g.cgiIn, aToken, nToken, &errmsg); - if( blob_size(&errmsg) ){ + xfer_accept_file(&xfer); + if( blob_size(&xfer.err) ){ cgi_reset_content(); - @ error %T(blob_str(&errmsg)) + @ error %T(blob_str(&xfer.err)) nErr++; break; } }else /* gimme UUID ** ** Client is requesting a file */ - if( blob_eq(&aToken[0], "gimme") && nToken==2 && blob_is_uuid(&aToken[1]) ){ + if( blob_eq(&xfer.aToken[0], "gimme") + && xfer.nToken==2 + && blob_is_uuid(&xfer.aToken[1]) + ){ if( isPull ){ - db_multi_exec( - "INSERT OR IGNORE INTO pending(rid) " - "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1] - ); + int rid = rid_from_uuid(&xfer.aToken[1], 0); + if( rid ){ + send_file(&xfer, rid, &xfer.aToken[1], 0); + } } }else /* igot UUID - ** leaf UUID ** ** Client announces that it has a particular file */ - if( nToken==2 - && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0],"leaf")) - && blob_is_uuid(&aToken[1]) ){ - if( isPull || isPush ){ - int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]); - if( rid>0 ){ - db_multi_exec( - "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid - ); - if( isPull && blob_eq(&aToken[0], "leaf") ){ - db_multi_exec( - "INSERT OR IGNORE INTO pending(rid) " - "SELECT cid FROM plink WHERE pid=%d", rid - ); - } - }else if( isPush ){ - content_put(0, blob_str(&aToken[1])); - } + if( xfer.nToken==2 + && blob_eq(&xfer.aToken[0], "igot") + && blob_is_uuid(&xfer.aToken[1]) + ){ + if( isPush ){ + rid_from_uuid(&xfer.aToken[1], 1); + } + }else + + + /* leaf UUID + ** + ** Client announces that it has a particular manifest + */ + if( xfer.nToken==2 + && blob_eq(&xfer.aToken[0], "leaf") + && blob_is_uuid(&xfer.aToken[1]) + ){ + if( isPull ){ + int rid = rid_from_uuid(&xfer.aToken[1], 0); + leaf_response(&xfer, rid); } }else /* pull SERVERCODE PROJECTCODE ** push SERVERCODE PROJECTCODE ** ** The client wants either send or receive */ if( nToken==3 - && (blob_eq(&aToken[0], "pull") || blob_eq(&aToken[0], "push")) - && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){ + && (blob_eq(&xfer.aToken[0], "pull") || blob_eq(&xfer.aToken[0], "push")) + && blob_is_uuid(&xfer.aToken[1]) + && blob_is_uuid(&xfer.aToken[2]) + ){ const char *zSCode; const char *zPCode; zSCode = db_get("server-code", 0); if( zSCode==0 ){ fossil_panic("missing server code"); } - if( blob_eq_str(&aToken[1], zSCode, -1) ){ + if( blob_eq_str(&xfer.aToken[1], zSCode, -1) ){ cgi_reset_content(); @ error server\sloop nErr++; break; } zPCode = db_get("project-code", 0); if( zPCode==0 ){ fossil_panic("missing project code"); } - if( !blob_eq_str(&aToken[2], zPCode, -1) ){ + if( !blob_eq_str(&xfer.aToken[2], zPCode, -1) ){ cgi_reset_content(); @ error wrong\sproject nErr++; break; } login_check_credentials(); - if( blob_eq(&aToken[0], "pull") ){ + if( blob_eq(&xfer.aToken[0], "pull") ){ if( !g.okRead ){ cgi_reset_content(); @ error not\sauthorized\sto\sread nErr++; break; @@ -492,91 +501,57 @@ cgi_reset_content(); @ error not\sauthorized\sto\swrite nErr++; break; } - isPush = 1; - + send_leaves(&xfer); + isPush = 1; } }else /* clone ** ** The client knows nothing. Tell all. */ - if( blob_eq(&aToken[0], "clone") ){ + if( blob_eq(&xfer.aToken[0], "clone") ){ login_check_credentials(); if( !g.okRead || !g.okHistory ){ cgi_reset_content(); @ error not\sauthorized\sto\sclone nErr++; break; } isPull = 1; @ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x")) - db_multi_exec( - "INSERT OR IGNORE INTO pending(rid) " - "SELECT mid FROM mlink JOIN blob ON mid=rid" - ); + send_leaves(&xfer); }else /* login USER NONCE SIGNATURE ** ** Check for a valid login. This has to happen before anything else. */ - if( blob_eq(&aToken[0], "login") && nToken==4 ){ + if( blob_eq(&xfer.aToken[0], "login") + && nToken==4 + ){ if( disableLogin ){ g.okRead = g.okWrite = 1; }else{ - check_login(&aToken[1], &aToken[2], &aToken[3]); + check_login(&xfer.aToken[1], &xfer.aToken[2], &xfer.aToken[3]); } }else /* Unknown message */ { cgi_reset_content(); - @ error bad\scommand:\s%F(blob_str(&line)) + @ error bad\scommand:\s%F(blob_str(&xfer.line)) } - blobarray_reset(aToken, nToken); - } - - /* The input message has now been processed. Generate a reply. */ + blobarray_reset(xfer.aToken, xfer.nToken); + } if( isPush ){ - Stmt q; - int nReq = 0; - db_prepare(&q, "SELECT uuid, rid FROM phantom JOIN blob USING (rid)"); - while( db_step(&q)==SQLITE_ROW && nReq++ < 200 ){ - const char *zUuid = db_column_text(&q, 0); - int rid = db_column_int(&q, 1); - int xid = similar_record(rid, 0); - @ gimme %s(zUuid) - if( xid ){ - char *zXUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", xid); - @ igot %s(zXUuid); - free(zXUuid); - } - } - db_finalize(&q); - } - if( isPull ){ - send_all_pending(0); - } - if( isPush || isPull ){ - /* Always send our leaves */ - Stmt q; - db_prepare(&q, - "SELECT uuid FROM blob WHERE rid IN" - " (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)" - ); - while( db_step(&q)==SQLITE_ROW ){ - const char *zUuid = db_column_text(&q, 0); - @ leaf %s(zUuid) - } - db_finalize(&q); - } - + request_phantoms(&xfer); + } db_end_transaction(0); } /* ** COMMAND: test-xfer @@ -620,43 +595,40 @@ ** are pulled if pullFlag is true. A full sync occurs if both are ** true. */ void client_sync(int pushFlag, int pullFlag, int cloneFlag){ int go = 1; /* Loop until zero */ - int nToken; const char *zSCode = db_get("server-code", "x"); const char *zPCode = db_get("project-code", 0); - int nFile = 0; int nMsg = 0; int nReq = 0; - int nFileSend; + int nFileSend = 0; int nNoFileCycle = 0; Blob send; /* Text we are sending to the server */ Blob recv; /* Reply we got back from the server */ - Blob line; /* A single line of the reply */ - Blob aToken[5]; /* A tokenization of line */ - Blob errmsg; /* Error message */ + Xfer xfer; /* Transfer data */ + + memset(&xfer, 0, sizeof(xfer)); + xfer.pIn = &recv; + xfer.pOut = &send; assert( pushFlag || pullFlag || cloneFlag ); assert( !g.urlIsFile ); /* This only works for networking */ db_begin_transaction(); db_multi_exec( - /* Records which we know the other side also has */ - "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);" - /* Records we know the other side needs */ - "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);" + "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);" ); - blobarray_zero(aToken, count(aToken)); + blobarray_zero(xfer.aToken, count(xfer.aToken)); blob_zero(&send); blob_zero(&recv); - blob_zero(&errmsg); + blob_zero(&xfer.err); while( go ){ go = 0; - nFile = nReq = nMsg = 0; + nReq = nMsg = 0; /* Generate a request to be sent to the server. ** Always begin with a clone, pull, or push message */ @@ -665,143 +637,102 @@ pushFlag = 0; pullFlag = 0; nMsg++; }else if( pullFlag ){ blob_appendf(&send, "pull %s %s\n", zSCode, zPCode); + send_leaves(&xfer); + request_phantoms(&xfer); nMsg++; } if( pushFlag ){ blob_appendf(&send, "push %s %s\n", zSCode, zPCode); nMsg++; } - if( pullFlag ){ - /* Send gimme message for every phantom that we hold. - */ - Stmt q; - db_prepare(&q, "SELECT uuid, rid FROM phantom JOIN blob USING (rid)"); - while( db_step(&q)==SQLITE_ROW && nReq<200 ){ - const char *zUuid = db_column_text(&q, 0); - int rid = db_column_int(&q, 1); - int xid = similar_record(rid, 0); - blob_appendf(&send,"gimme %s\n", zUuid); - nReq++; - if( xid ){ - blob_appendf(&send, "igot %z\n", - db_text(0, "SELECT uuid FROM blob WHERE rid=%d", xid)); - } - } - db_finalize(&q); - } - - if( pushFlag ){ - /* Send the server any files that the server has requested */ - nFile += send_all_pending(&send); - } - - if( pullFlag || pushFlag ){ - /* Always send our leaves */ - Stmt q; - db_prepare(&q, - "SELECT uuid FROM blob WHERE rid IN" - " (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)" - ); - while( db_step(&q)==SQLITE_ROW ){ - const char *zUuid = db_column_text(&q, 0); - blob_appendf(&send, "leaf %s\n", zUuid); - nMsg++; - } - db_finalize(&q); - } - /* Exchange messages with the server */ + nFileSend = xfer.nFile + xfer.nDelta + xfer.nDanglingFile; printf("Send: %3d files, %3d requests, %3d other msgs, %8d bytes\n", - nFile, nReq, nMsg, blob_size(&send)); - nFileSend = nFile; - nFile = nReq = nMsg = 0; + nFileSend, nReq, nMsg, blob_size(&send)); + xfer.nFile = 0; + xfer.nDelta = 0; + xfer.nDanglingFile = 0; + nReq = nMsg = 0; http_exchange(&send, &recv); blob_reset(&send); /* Process the reply that came back from the server */ - while( blob_line(&recv, &line) ){ - nToken = blob_tokenize(&line, aToken, count(aToken)); + while( blob_line(&recv, &xfer.line) ){ + xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken)); /* file UUID SIZE \n CONTENT ** file UUID DELTASRC SIZE \n CONTENT ** ** Receive a file transmitted from the other side */ - if( blob_eq(&aToken[0],"file") ){ - xfer_accept_file(&recv, aToken, nToken, &errmsg); - nFile++; - go = 1; + if( blob_eq(&xfer.aToken[0],"file") ){ + xfer_accept_file(&xfer); }else /* gimme UUID ** ** Server is requesting a file */ - if( blob_eq(&aToken[0], "gimme") && nToken==2 - && blob_is_uuid(&aToken[1]) ){ - nReq++; + if( blob_eq(&xfer.aToken[0], "gimme") + && xfer.nToken==2 + && blob_is_uuid(&xfer.aToken[1]) + ){ if( pushFlag ){ - db_multi_exec( - "INSERT OR IGNORE INTO pending(rid) " - "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1] - ); - go = 1; + int rid = rid_from_uuid(&xfer.aToken[1], 0); + send_file(&xfer, rid, &xfer.aToken[1], 0); } }else /* igot UUID - ** leaf UUID + ** + ** Server announces that it has a particular file + */ + if( xfer.nToken==2 + && blob_eq(&xfer.aToken[0], "igot") + && blob_is_uuid(&xfer.aToken[1]) + ){ + if( pullFlag ){ + rid_from_uuid(&xfer.aToken[1], 1); + } + }else + + + /* leaf UUID ** - ** Server proclaims that it has a particular file. A leaf message - ** means that the file is a leaf manifest on the server. + ** Server announces that it has a particular manifest */ - if( nToken==2 - && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0], "leaf")) - && blob_is_uuid(&aToken[1]) ){ - int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]); - nMsg++; - if( rid>0 ){ - db_multi_exec( - "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid - ); - /* Add to the pending set all children of the server's leaves */ - if( pushFlag && blob_eq(&aToken[0], "leaf") ){ - db_multi_exec( - "INSERT OR IGNORE INTO pending(rid) " - "SELECT cid FROM plink WHERE pid=%d", rid - ); - if( db_changes()>0 ){ - go = 1; - } - } - if( pullFlag && !go && - db_exists("SELECT 1 FROM phantom WHERE rid=%d", rid) ){ - go = 1; - } - }else if( pullFlag ){ - go = 1; - content_put(0, blob_str(&aToken[1])); + if( xfer.nToken==2 + && blob_eq(&xfer.aToken[0], "leaf") + && blob_is_uuid(&xfer.aToken[1]) + ){ + if( pushFlag ){ + int rid = rid_from_uuid(&xfer.aToken[1], 0); + leaf_response(&xfer, rid); } }else + /* push SERVERCODE PRODUCTCODE ** ** Should only happen in response to a clone. */ - if( blob_eq(&aToken[0],"push") && nToken==3 && cloneFlag - && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){ - - if( blob_eq_str(&aToken[1], zSCode, -1) ){ + if( blob_eq(&xfer.aToken[0],"push") + && xfer.nToken==3 + && cloneFlag + && blob_is_uuid(&xfer.aToken[1]) + && blob_is_uuid(&xfer.aToken[2]) + ){ + if( blob_eq_str(&xfer.aToken[1], zSCode, -1) ){ fossil_fatal("server loop"); } nMsg++; if( zPCode==0 ){ - zPCode = mprintf("%b", &aToken[2]); + zPCode = mprintf("%b", &xfer.aToken[2]); db_set("project-code", zPCode); } cloneFlag = 0; pullFlag = 1; }else @@ -808,41 +739,41 @@ /* error MESSAGE ** ** Report an error */ - if( blob_eq(&aToken[0],"error") && nToken==2 ){ - char *zMsg = blob_terminate(&aToken[1]); + if( blob_eq(&xfer.aToken[0],"error") && xfer.nToken==2 ){ + char *zMsg = blob_terminate(&xfer.aToken[1]); defossilize(zMsg); - blob_appendf(&errmsg, "server says: %s", zMsg); + blob_appendf(&xfer.err, "server says: %s", zMsg); }else /* Unknown message */ { - blob_appendf(&errmsg, "unknown command: %b", &aToken[0]); + blob_appendf(&xfer.err, "unknown command: %b", &xfer.aToken[0]); } - if( blob_size(&errmsg) ){ - fossil_fatal("%b", &errmsg); + if( blob_size(&xfer.err) ){ + fossil_fatal("%b", &xfer.err); } - blobarray_reset(aToken, nToken); + blobarray_reset(xfer.aToken, xfer.nToken); } printf("Received: %3d files, %3d requests, %3d other msgs, %8d bytes\n", - nFile, nReq, nMsg, blob_size(&recv)); + xfer.nFile + xfer.nDelta + xfer.nDanglingFile, + nReq, nMsg, blob_size(&recv)); blob_reset(&recv); - if( nFileSend + nFile==0 ){ + if( nFileSend + xfer.nFile + xfer.nDelta + xfer.nDanglingFile==0 ){ nNoFileCycle++; if( nNoFileCycle>1 ){ go = 0; } }else{ nNoFileCycle = 0; } - nFile = nReq = nMsg = 0; + nReq = nMsg = 0; + xfer.nFile = 0; + xfer.nDelta = 0; + xfer.nDanglingFile = 0; }; http_close(); db_end_transaction(0); - db_multi_exec( - "DROP TABLE onremote;" - "DROP TABLE pending;" - ); }