@@ -26,107 +26,41 @@
#include "config.h"
#include "xfer.h"
/*
-** Try to locate a record that is similar to rid and is a likely
-** candidate for delta against rid. The similar record must be
-** referenced in the onremote table.
-**
-** Return the integer record ID of the similar record. Or return
-** 0 if none is found.
+** This structure holds information about the current state of either
+** a client or a server that is participating in xfer.
+*/
+typedef struct Xfer Xfer;
+struct Xfer {
+ Blob *pIn; /* Input text from the other side */
+ Blob *pOut; /* Compose our reply here */
+ Blob line; /* The current line of input */
+ Blob aToken[5]; /* Tokenized version of line */
+ Blob err; /* Error message text */
+ int nToken; /* Number of tokens in line */
+ int nIGot; /* Number of "igot" messages sent */
+ int nFile; /* Number of files sent or received */
+ int nDelta; /* Number of deltas sent or received */
+ int nDanglingFile; /* Number of dangling deltas received */
+ int mxSend; /* Stop sending "file" with pOut reaches this size */
+};
+
+
+/*
+** The input blob contains a UUID. Convert it into a record ID.
+** Create a phantom record if no prior record exists and
+** phantomize is true.
+**
+** Compare to uuid_to_rid(). This routine takes a blob argument
+** and does less error checking.
*/
-static int similar_record(int rid, int traceFlag){
- int inCnt, outCnt;
- int i;
- Stmt q;
- int queue[100];
- static const char *azQuery[] = {
- /* Scan the delta table first */
- "SELECT srcid, EXISTS(SELECT 1 FROM onremote WHERE rid=srcid)"
- " FROM delta"
- " WHERE rid=:x"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)"
- " UNION ALL "
- "SELECT rid, EXISTS(SELECT 1 FROM onremote WHERE rid=delta.rid)"
- " FROM delta"
- " WHERE srcid=:x"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
-
- /* Then the plink table */
- "SELECT pid, EXISTS(SELECT 1 FROM onremote WHERE rid=pid)"
- " FROM plink"
- " WHERE cid=:x"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)"
- " UNION ALL "
- "SELECT cid, EXISTS(SELECT 1 FROM onremote WHERE rid=cid)"
- " FROM plink"
- " WHERE pid=:x"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
-
- /* Finally the mlink table */
- "SELECT pid, EXISTS(SELECT 1 FROM onremote WHERE rid=pid)"
- " FROM mlink"
- " WHERE fid=:x AND pid>0"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)"
- " UNION ALL "
- "SELECT fid, EXISTS(SELECT 1 FROM onremote WHERE rid=fid)"
- " FROM mlink"
- " WHERE pid=:x AND fid>0"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
- };
-
- for(i=0; i<sizeof(azQuery)/sizeof(azQuery[0]); i++){
- db_prepare(&q, azQuery[i]);
- queue[0] = rid;
- inCnt = 1;
- outCnt = 0;
- if( traceFlag ) printf("PASS %d\n", i+1);
- while( outCnt<inCnt ){
- int xid = queue[outCnt%64];
- outCnt++;
- db_bind_int(&q, ":x", xid);
- if( traceFlag ) printf("xid=%d\n", xid);
- while( db_step(&q)==SQLITE_ROW ){
- int nid = db_column_int(&q, 0);
- int hit = db_column_int(&q, 1);
- if( traceFlag ) printf("nid=%d hit=%d\n", nid, hit);
- if( hit ){
- db_finalize(&q);
- return nid;
- }
- if( inCnt<sizeof(queue)/sizeof(queue[0]) ){
- int i;
- for(i=0; i<inCnt && queue[i]!=nid; i++){}
- if( i>=inCnt ){
- queue[inCnt++] = nid;
- }
- }
- }
- db_reset(&q);
- }
- db_finalize(&q);
- }
- return 0;
-}
-
-/*
-** COMMAND: test-similar-record
-*/
-void test_similar_record(void){
- int i;
- if( g.argc<4 ){
- usage("SRC ONREMOTE...");
- }
- db_must_be_within_tree();
- db_multi_exec(
- "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);"
- );
- for(i=3; i<g.argc; i++){
- int rid = name_to_rid(g.argv[i]);
- printf("%s -> %d\n", g.argv[i], rid);
- db_multi_exec("INSERT INTO onremote VALUES(%d)", rid);
- }
- printf("similar: %d\n", similar_record(name_to_rid(g.argv[2]), 1));
+static int rid_from_uuid(Blob *pUuid, int phantomize){
+ int rid = db_int(0, "SELECT rid FROM blob WHERE uuid='%b'", pUuid);
+ if( rid==0 && phantomize ){
+ rid = content_put(0, blob_str(pUuid), 0);
+ }
+ return rid;
}
/*
@@ -145,160 +79,225 @@
**
** If any error occurs, write a message into pErr which has already
** be initialized to an empty string.
*/
-static void xfer_accept_file(Blob *pIn, Blob *aToken, int nToken, Blob *pErr){
+static void xfer_accept_file(Xfer *pXfer){
int n;
int rid;
Blob content, hash;
- if( nToken<3 || nToken>4 || !blob_is_uuid(&aToken[1])
- || !blob_is_int(&aToken[nToken-1], &n) || n<=0
- || (nToken==4 && !blob_is_uuid(&aToken[2])) ){
- blob_appendf(pErr, "malformed file line");
+ if( pXfer->nToken<3
+ || pXfer->nToken>4
+ || !blob_is_uuid(&pXfer->aToken[1])
+ || !blob_is_int(&pXfer->aToken[pXfer->nToken-1], &n)
+ || n<=0
+ || (pXfer->nToken==4 && !blob_is_uuid(&pXfer->aToken[2]))
+ ){
+ blob_appendf(&pXfer->err, "malformed file line");
return;
}
blob_zero(&content);
blob_zero(&hash);
- blob_extract(pIn, n, &content);
- if( nToken==4 ){
+ blob_extract(pXfer->pIn, n, &content);
+ if( pXfer->nToken==4 ){
Blob src;
- int srcid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[2]);
- if( srcid==0 ){
- blob_appendf(pErr, "unknown delta source: %b", &aToken[2]);
+ int srcid = rid_from_uuid(&pXfer->aToken[2], 1);
+ if( content_get(srcid, &src)==0 ){
+ content_put(&content, blob_str(&hash), srcid);
+ blob_appendf(pXfer->pOut, "gimme %b\n", &pXfer->aToken[2]);
+ pXfer->nDanglingFile++;
return;
}
- content_get(srcid, &src);
+ pXfer->nDelta++;
blob_delta_apply(&src, &content, &content);
blob_reset(&src);
+ }else{
+ pXfer->nFile++;
}
sha1sum_blob(&content, &hash);
- if( !blob_eq_str(&aToken[1], blob_str(&hash), -1) ){
- blob_appendf(pErr, "content does not match sha1 hash");
+ if( !blob_eq_str(&pXfer->aToken[1], blob_str(&hash), -1) ){
+ blob_appendf(&pXfer->err, "content does not match sha1 hash");
}
blob_reset(&hash);
- rid = content_put(&content, 0);
- manifest_crosslink(rid, &content);
+ rid = content_put(&content, 0, 0);
if( rid==0 ){
- blob_appendf(pErr, "%s", g.zErrMsg);
+ blob_appendf(&pXfer->err, "%s", g.zErrMsg);
+ }else{
+ manifest_crosslink(rid, &content);
}
}
/*
-** Send the file identified by rid.
-**
-** If pOut is not NULL, then append the text of the send message
-** to pOut. Otherwise, append the text to the CGI output.
+** Try to send a file as a delta. If successful, return the number
+** of bytes in the delta. If not, return zero.
+**
+** If srcId is specified, use it. If not, try to figure out a
+** reasonable srcId.
*/
-static int send_file(int rid, Blob *pOut){
- Blob content, uuid;
- int size;
- int srcid;
-
-
- blob_zero(&uuid);
- db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
- if( blob_size(&uuid)==0 ){
- return 0;
- }
- content_get(rid, &content);
-
- if( blob_size(&content)>100 ){
- srcid = similar_record(rid, 0);
- if( srcid ){
- Blob src;
- content_get(srcid, &src);
- if( blob_size(&src)>100 ){
- Blob delta;
- blob_delta_create(&src, &content, &delta);
- blob_reset(&content);
- content = delta;
- blob_append(&uuid, " ", 1);
- blob_append(&content, "\n", 1);
- db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d", srcid);
- }
- blob_reset(&src);
+static int send_as_delta(
+ Xfer *pXfer, /* The transfer context */
+ int rid, /* record id of the file to send */
+ Blob *pContent, /* The content of the file to send */
+ Blob *pUuid, /* The UUID of the file to send */
+ int srcId /* Send as a delta against this record */
+){
+ static const char *azQuery[] = {
+ "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid"
+ " WHERE delta.rid=%d"
+ " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)",
+
+ "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid"
+ " WHERE srcid=%d"
+ " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
+
+ "SELECT pid FROM plink JOIN pending ON rid=pid"
+ " WHERE cid=%d"
+ " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
+
+ "SELECT cid FROM plink JOIN pending ON rid=cid"
+ " WHERE pid=%d"
+ " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
+
+ "SELECT pid FROM mlink JOIN pending ON rid=pid"
+ " WHERE fid=%d"
+ " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
+
+ "SELECT fid FROM mlink JOIN pending ON rid=fid"
+ " WHERE pid=%d"
+ " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
+ };
+ int i;
+ Blob src, delta;
+ int size = 0;
+
+ for(i=0; srcId==0 && i<count(azQuery); i++){
+ srcId = db_int(0, azQuery[i], rid);
+ }
+ if( srcId && content_get(srcId, &src) ){
+ char *zUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", srcId);
+ blob_delta_create(&src, pContent, &delta);
+ size = blob_size(&delta);
+ if( size>=blob_size(pContent)-50 ){
+ size = 0;
+ }else{
+ blob_appendf(pXfer->pOut, "file %b %s %d\n", pUuid, zUuid, size);
+ blob_append(pXfer->pOut, blob_buffer(&delta), size);
+ blob_appendf(pXfer->pOut, "\n", 1);
}
- }
- size = blob_size(&content);
- if( pOut ){
- blob_appendf(pOut, "file %b %d\n", &uuid, size);
- blob_append(pOut, blob_buffer(&content), size);
- }else{
- cgi_printf("file %b %d\n", &uuid, size);
- cgi_append_content(blob_buffer(&content), size);
- }
- blob_reset(&content);
- blob_reset(&uuid);
- db_multi_exec("INSERT OR IGNORE INTO onremote VALUES(%d)", rid);
+ blob_reset(&delta);
+ free(zUuid);
+ blob_reset(&src);
+ }
return size;
}
+/*
+** Send the file identified by rid.
+*/
+static void send_file(Xfer *pXfer, int rid, Blob *pUuid, int srcId){
+ Blob content, uuid;
+ int size = 0;
+
+ if( db_exists("SELECT 1 FROM sent WHERE rid=%d", rid) ){
+ return;
+ }
+ blob_zero(&uuid);
+ if( pUuid==0 ){
+ db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
+ if( blob_size(&uuid)==0 ){
+ return;
+ }
+ pUuid = &uuid;
+ }
+ if( pXfer->mxSend<=blob_size(pXfer->pOut) ){
+ blob_appendf(pXfer->pOut, "igot %b\n", pUuid);
+ pXfer->nIGot++;
+ blob_reset(&uuid);
+ return;
+ }
+ content_get(rid, &content);
+
+ if( blob_size(&content)>100 ){
+ size = send_as_delta(pXfer, rid, &content, pUuid, srcId);
+ }
+ if( size==0 ){
+ int size = blob_size(&content);
+ blob_appendf(pXfer->pOut, "file %b %d\n", pUuid, size);
+ blob_append(pXfer->pOut, blob_buffer(&content), size);
+ pXfer->nFile++;
+ }else{
+ pXfer->nDelta++;
+ }
+ db_multi_exec("INSERT INTO sent VALUES(%d)", rid);
+ blob_reset(&uuid);
+}
/*
-** Send all pending files.
+** This routine runs when either client or server is notified that
+** the other side things rid is a leaf manifest. If we hold
+** children of rid, then send them over to the other side.
*/
-static int send_all_pending(Blob *pOut){
- int rid, xid, i;
- int nIgot = 0;
- int sent = 0;
- int nSent = 0;
- int maxSize = db_get_int("http-msg-size", 500000);
- static const char *azQuery[] = {
- "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid"
- " WHERE delta.rid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)",
-
- "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid"
- " WHERE srcid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
-
- "SELECT pid FROM plink JOIN pending ON rid=pid"
- " WHERE cid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
-
- "SELECT cid FROM plink JOIN pending ON rid=cid"
- " WHERE pid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
-
- "SELECT pid FROM mlink JOIN pending ON rid=pid"
- " WHERE fid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
-
- "SELECT fid FROM mlink JOIN pending ON rid=fid"
- " WHERE pid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
- };
-
- rid = db_int(0, "SELECT rid FROM pending");
- while( rid && nIgot<200 ){
- db_multi_exec("DELETE FROM pending WHERE rid=%d", rid);
- if( sent<maxSize ){
- sent += send_file(rid, pOut);
- nSent++;
- }else{
- char *zUuid = db_text(0,
- "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
- if( zUuid ){
- if( pOut ){
- blob_appendf(pOut, "igot %s\n", zUuid);
- }else{
- cgi_printf("igot %s\n", zUuid);
- }
- free(zUuid);
- nIgot++;
- }
+static void leaf_response(Xfer *pXfer, int rid){
+ Stmt q1, q2;
+ db_prepare(&q1,
+ "SELECT cid, uuid FROM plink, blob"
+ " WHERE blob.rid=plink.cid"
+ " AND plink.pid=%d",
+ rid
+ );
+ while( db_step(&q1)==SQLITE_ROW ){
+ Blob uuid;
+ int cid;
+
+ cid = db_column_int(&q1, 0);
+ db_ephemeral_blob(&q1, 1, &uuid);
+ send_file(pXfer, cid, &uuid, rid);
+ db_prepare(&q2,
+ "SELECT pid, uuid, fid FROM mlink, blob"
+ " WHERE rid=fid AND mid=%d",
+ cid
+ );
+ while( db_step(&q2)==SQLITE_ROW ){
+ int pid, fid;
+ pid = db_column_int(&q2, 0);
+ db_ephemeral_blob(&q2, 1, &uuid);
+ fid = db_column_int(&q2, 2);
+ send_file(pXfer, fid, &uuid, pid);
}
- xid = 0;
- for(i=0; xid==0 && i<sizeof(azQuery)/sizeof(azQuery[0]); i++){
- xid = db_int(0, azQuery[i], rid);
- }
- rid = xid;
- if( rid==0 ){
- rid = db_int(0, "SELECT rid FROM pending");
+ db_finalize(&q2);
+ if( blob_size(pXfer->pOut)<pXfer->mxSend ){
+ leaf_response(pXfer, cid);
}
}
- return nSent;
+}
+
+/*
+** Sent a leaf message for every leaf.
+*/
+static void send_leaves(Xfer *pXfer){
+ Stmt q;
+ db_prepare(&q,
+ "SELECT uuid FROM blob WHERE rid IN"
+ " (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
+ );
+ while( db_step(&q)==SQLITE_ROW ){
+ const char *zUuid = db_column_text(&q, 0);
+ blob_appendf(pXfer->pOut, "leaf %s\n", zUuid);
+ }
+ db_finalize(&q);
+}
+
+/*
+** Sen a gimme message for every phantom.
+*/
+static void request_phantoms(Xfer *pXfer){
+ Stmt q;
+ db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)");
+ while( db_step(&q)==SQLITE_ROW ){
+ const char *zUuid = db_column_text(&q, 0);
+ blob_appendf(pXfer->pOut, "gimme %s\n", zUuid);
+ }
+ db_finalize(&q);
}
/*
@@ -373,37 +372,40 @@
int nToken;
int isPull = 0;
int isPush = 0;
int nErr = 0;
- Blob line, errmsg, aToken[5];
+ Xfer xfer;
+
+ memset(&xfer, 0, sizeof(xfer));
+ blobarray_zero(xfer.aToken, count(xfer.aToken));
+ cgi_set_content_type(g.zContentType);
+ blob_zero(&xfer.err);
+ xfer.pIn = &g.cgiIn;
+ xfer.pOut = cgi_output_blob();
db_begin_transaction();
- blobarray_zero(aToken, count(aToken));
- cgi_set_content_type(g.zContentType);
- blob_zero(&errmsg);
db_multi_exec(
- "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);" /* Client has */
- "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);" /* Client needs */
+ "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);"
);
- while( blob_line(&g.cgiIn, &line) ){
- nToken = blob_tokenize(&line, aToken, count(aToken));
+ while( blob_line(xfer.pIn, &xfer.line) ){
+ xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken));
/* file UUID SIZE \n CONTENT
** file UUID DELTASRC SIZE \n CONTENT
**
** Accept a file from the client.
*/
- if( blob_eq(&aToken[0], "file") && nToken>=3 && nToken<=4 ){
+ if( blob_eq(&xfer.aToken[0], "file") ){
if( !isPush ){
cgi_reset_content();
@ error not\sauthorized\sto\swrite
nErr++;
break;
}
- xfer_accept_file(&g.cgiIn, aToken, nToken, &errmsg);
- if( blob_size(&errmsg) ){
+ xfer_accept_file(&xfer);
+ if( blob_size(&xfer.err) ){
cgi_reset_content();
- @ error %T(blob_str(&errmsg))
+ @ error %T(blob_str(&xfer.err))
nErr++;
break;
}
}else
@@ -411,40 +413,45 @@
/* gimme UUID
**
** Client is requesting a file
*/
- if( blob_eq(&aToken[0], "gimme") && nToken==2 && blob_is_uuid(&aToken[1]) ){
+ if( blob_eq(&xfer.aToken[0], "gimme")
+ && xfer.nToken==2
+ && blob_is_uuid(&xfer.aToken[1])
+ ){
if( isPull ){
- db_multi_exec(
- "INSERT OR IGNORE INTO pending(rid) "
- "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1]
- );
+ int rid = rid_from_uuid(&xfer.aToken[1], 0);
+ if( rid ){
+ send_file(&xfer, rid, &xfer.aToken[1], 0);
+ }
}
}else
/* igot UUID
- ** leaf UUID
**
** Client announces that it has a particular file
*/
- if( nToken==2
- && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0],"leaf"))
- && blob_is_uuid(&aToken[1]) ){
- if( isPull || isPush ){
- int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]);
- if( rid>0 ){
- db_multi_exec(
- "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid
- );
- if( isPull && blob_eq(&aToken[0], "leaf") ){
- db_multi_exec(
- "INSERT OR IGNORE INTO pending(rid) "
- "SELECT cid FROM plink WHERE pid=%d", rid
- );
- }
- }else if( isPush ){
- content_put(0, blob_str(&aToken[1]));
- }
+ if( xfer.nToken==2
+ && blob_eq(&xfer.aToken[0], "igot")
+ && blob_is_uuid(&xfer.aToken[1])
+ ){
+ if( isPush ){
+ rid_from_uuid(&xfer.aToken[1], 1);
+ }
+ }else
+
+
+ /* leaf UUID
+ **
+ ** Client announces that it has a particular manifest
+ */
+ if( xfer.nToken==2
+ && blob_eq(&xfer.aToken[0], "leaf")
+ && blob_is_uuid(&xfer.aToken[1])
+ ){
+ if( isPull ){
+ int rid = rid_from_uuid(&xfer.aToken[1], 0);
+ leaf_response(&xfer, rid);
}
}else
/* pull SERVERCODE PROJECTCODE
@@ -452,18 +459,20 @@
**
** The client wants either send or receive
*/
if( nToken==3
- && (blob_eq(&aToken[0], "pull") || blob_eq(&aToken[0], "push"))
- && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){
+ && (blob_eq(&xfer.aToken[0], "pull") || blob_eq(&xfer.aToken[0], "push"))
+ && blob_is_uuid(&xfer.aToken[1])
+ && blob_is_uuid(&xfer.aToken[2])
+ ){
const char *zSCode;
const char *zPCode;
zSCode = db_get("server-code", 0);
if( zSCode==0 ){
fossil_panic("missing server code");
}
- if( blob_eq_str(&aToken[1], zSCode, -1) ){
+ if( blob_eq_str(&xfer.aToken[1], zSCode, -1) ){
cgi_reset_content();
@ error server\sloop
nErr++;
break;
@@ -471,16 +480,16 @@
zPCode = db_get("project-code", 0);
if( zPCode==0 ){
fossil_panic("missing project code");
}
- if( !blob_eq_str(&aToken[2], zPCode, -1) ){
+ if( !blob_eq_str(&xfer.aToken[2], zPCode, -1) ){
cgi_reset_content();
@ error wrong\sproject
nErr++;
break;
}
login_check_credentials();
- if( blob_eq(&aToken[0], "pull") ){
+ if( blob_eq(&xfer.aToken[0], "pull") ){
if( !g.okRead ){
cgi_reset_content();
@ error not\sauthorized\sto\sread
nErr++;
@@ -493,18 +502,18 @@
@ error not\sauthorized\sto\swrite
nErr++;
break;
}
- isPush = 1;
-
+ send_leaves(&xfer);
+ isPush = 1;
}
}else
/* clone
**
** The client knows nothing. Tell all.
*/
- if( blob_eq(&aToken[0], "clone") ){
+ if( blob_eq(&xfer.aToken[0], "clone") ){
login_check_credentials();
if( !g.okRead || !g.okHistory ){
cgi_reset_content();
@ error not\sauthorized\sto\sclone
@@ -512,70 +521,36 @@
break;
}
isPull = 1;
@ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x"))
- db_multi_exec(
- "INSERT OR IGNORE INTO pending(rid) "
- "SELECT mid FROM mlink JOIN blob ON mid=rid"
- );
+ send_leaves(&xfer);
}else
/* login USER NONCE SIGNATURE
**
** Check for a valid login. This has to happen before anything else.
*/
- if( blob_eq(&aToken[0], "login") && nToken==4 ){
+ if( blob_eq(&xfer.aToken[0], "login")
+ && nToken==4
+ ){
if( disableLogin ){
g.okRead = g.okWrite = 1;
}else{
- check_login(&aToken[1], &aToken[2], &aToken[3]);
+ check_login(&xfer.aToken[1], &xfer.aToken[2], &xfer.aToken[3]);
}
}else
/* Unknown message
*/
{
cgi_reset_content();
- @ error bad\scommand:\s%F(blob_str(&line))
+ @ error bad\scommand:\s%F(blob_str(&xfer.line))
}
- blobarray_reset(aToken, nToken);
- }
-
- /* The input message has now been processed. Generate a reply. */
+ blobarray_reset(xfer.aToken, xfer.nToken);
+ }
if( isPush ){
- Stmt q;
- int nReq = 0;
- db_prepare(&q, "SELECT uuid, rid FROM phantom JOIN blob USING (rid)");
- while( db_step(&q)==SQLITE_ROW && nReq++ < 200 ){
- const char *zUuid = db_column_text(&q, 0);
- int rid = db_column_int(&q, 1);
- int xid = similar_record(rid, 0);
- @ gimme %s(zUuid)
- if( xid ){
- char *zXUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", xid);
- @ igot %s(zXUuid);
- free(zXUuid);
- }
- }
- db_finalize(&q);
- }
- if( isPull ){
- send_all_pending(0);
- }
- if( isPush || isPull ){
- /* Always send our leaves */
- Stmt q;
- db_prepare(&q,
- "SELECT uuid FROM blob WHERE rid IN"
- " (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
- );
- while( db_step(&q)==SQLITE_ROW ){
- const char *zUuid = db_column_text(&q, 0);
- @ leaf %s(zUuid)
- }
- db_finalize(&q);
- }
-
+ request_phantoms(&xfer);
+ }
db_end_transaction(0);
}
/*
@@ -621,41 +596,38 @@
** true.
*/
void client_sync(int pushFlag, int pullFlag, int cloneFlag){
int go = 1; /* Loop until zero */
- int nToken;
const char *zSCode = db_get("server-code", "x");
const char *zPCode = db_get("project-code", 0);
- int nFile = 0;
int nMsg = 0;
int nReq = 0;
- int nFileSend;
+ int nFileSend = 0;
int nNoFileCycle = 0;
Blob send; /* Text we are sending to the server */
Blob recv; /* Reply we got back from the server */
- Blob line; /* A single line of the reply */
- Blob aToken[5]; /* A tokenization of line */
- Blob errmsg; /* Error message */
+ Xfer xfer; /* Transfer data */
+
+ memset(&xfer, 0, sizeof(xfer));
+ xfer.pIn = &recv;
+ xfer.pOut = &send;
assert( pushFlag || pullFlag || cloneFlag );
assert( !g.urlIsFile ); /* This only works for networking */
db_begin_transaction();
db_multi_exec(
- /* Records which we know the other side also has */
- "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);"
- /* Records we know the other side needs */
- "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);"
+ "CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);"
);
- blobarray_zero(aToken, count(aToken));
+ blobarray_zero(xfer.aToken, count(xfer.aToken));
blob_zero(&send);
blob_zero(&recv);
- blob_zero(&errmsg);
+ blob_zero(&xfer.err);
while( go ){
go = 0;
- nFile = nReq = nMsg = 0;
+ nReq = nMsg = 0;
/* Generate a request to be sent to the server.
** Always begin with a clone, pull, or push message
*/
@@ -666,141 +638,100 @@
pullFlag = 0;
nMsg++;
}else if( pullFlag ){
blob_appendf(&send, "pull %s %s\n", zSCode, zPCode);
+ send_leaves(&xfer);
+ request_phantoms(&xfer);
nMsg++;
}
if( pushFlag ){
blob_appendf(&send, "push %s %s\n", zSCode, zPCode);
nMsg++;
}
- if( pullFlag ){
- /* Send gimme message for every phantom that we hold.
- */
- Stmt q;
- db_prepare(&q, "SELECT uuid, rid FROM phantom JOIN blob USING (rid)");
- while( db_step(&q)==SQLITE_ROW && nReq<200 ){
- const char *zUuid = db_column_text(&q, 0);
- int rid = db_column_int(&q, 1);
- int xid = similar_record(rid, 0);
- blob_appendf(&send,"gimme %s\n", zUuid);
- nReq++;
- if( xid ){
- blob_appendf(&send, "igot %z\n",
- db_text(0, "SELECT uuid FROM blob WHERE rid=%d", xid));
- }
- }
- db_finalize(&q);
- }
-
- if( pushFlag ){
- /* Send the server any files that the server has requested */
- nFile += send_all_pending(&send);
- }
-
- if( pullFlag || pushFlag ){
- /* Always send our leaves */
- Stmt q;
- db_prepare(&q,
- "SELECT uuid FROM blob WHERE rid IN"
- " (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
- );
- while( db_step(&q)==SQLITE_ROW ){
- const char *zUuid = db_column_text(&q, 0);
- blob_appendf(&send, "leaf %s\n", zUuid);
- nMsg++;
- }
- db_finalize(&q);
- }
-
/* Exchange messages with the server */
+ nFileSend = xfer.nFile + xfer.nDelta + xfer.nDanglingFile;
printf("Send: %3d files, %3d requests, %3d other msgs, %8d bytes\n",
- nFile, nReq, nMsg, blob_size(&send));
- nFileSend = nFile;
- nFile = nReq = nMsg = 0;
+ nFileSend, nReq, nMsg, blob_size(&send));
+ xfer.nFile = 0;
+ xfer.nDelta = 0;
+ xfer.nDanglingFile = 0;
+ nReq = nMsg = 0;
http_exchange(&send, &recv);
blob_reset(&send);
/* Process the reply that came back from the server */
- while( blob_line(&recv, &line) ){
- nToken = blob_tokenize(&line, aToken, count(aToken));
+ while( blob_line(&recv, &xfer.line) ){
+ xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken));
/* file UUID SIZE \n CONTENT
** file UUID DELTASRC SIZE \n CONTENT
**
** Receive a file transmitted from the other side
*/
- if( blob_eq(&aToken[0],"file") ){
- xfer_accept_file(&recv, aToken, nToken, &errmsg);
- nFile++;
- go = 1;
+ if( blob_eq(&xfer.aToken[0],"file") ){
+ xfer_accept_file(&xfer);
}else
/* gimme UUID
**
** Server is requesting a file
*/
- if( blob_eq(&aToken[0], "gimme") && nToken==2
- && blob_is_uuid(&aToken[1]) ){
- nReq++;
+ if( blob_eq(&xfer.aToken[0], "gimme")
+ && xfer.nToken==2
+ && blob_is_uuid(&xfer.aToken[1])
+ ){
if( pushFlag ){
- db_multi_exec(
- "INSERT OR IGNORE INTO pending(rid) "
- "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1]
- );
- go = 1;
+ int rid = rid_from_uuid(&xfer.aToken[1], 0);
+ send_file(&xfer, rid, &xfer.aToken[1], 0);
}
}else
/* igot UUID
- ** leaf UUID
+ **
+ ** Server announces that it has a particular file
+ */
+ if( xfer.nToken==2
+ && blob_eq(&xfer.aToken[0], "igot")
+ && blob_is_uuid(&xfer.aToken[1])
+ ){
+ if( pullFlag ){
+ rid_from_uuid(&xfer.aToken[1], 1);
+ }
+ }else
+
+
+ /* leaf UUID
**
- ** Server proclaims that it has a particular file. A leaf message
- ** means that the file is a leaf manifest on the server.
+ ** Server announces that it has a particular manifest
*/
- if( nToken==2
- && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0], "leaf"))
- && blob_is_uuid(&aToken[1]) ){
- int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]);
- nMsg++;
- if( rid>0 ){
- db_multi_exec(
- "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid
- );
- /* Add to the pending set all children of the server's leaves */
- if( pushFlag && blob_eq(&aToken[0], "leaf") ){
- db_multi_exec(
- "INSERT OR IGNORE INTO pending(rid) "
- "SELECT cid FROM plink WHERE pid=%d", rid
- );
- if( db_changes()>0 ){
- go = 1;
- }
- }
- if( pullFlag && !go &&
- db_exists("SELECT 1 FROM phantom WHERE rid=%d", rid) ){
- go = 1;
- }
- }else if( pullFlag ){
- go = 1;
- content_put(0, blob_str(&aToken[1]));
+ if( xfer.nToken==2
+ && blob_eq(&xfer.aToken[0], "leaf")
+ && blob_is_uuid(&xfer.aToken[1])
+ ){
+ if( pushFlag ){
+ int rid = rid_from_uuid(&xfer.aToken[1], 0);
+ leaf_response(&xfer, rid);
}
}else
+
/* push SERVERCODE PRODUCTCODE
**
** Should only happen in response to a clone.
*/
- if( blob_eq(&aToken[0],"push") && nToken==3 && cloneFlag
- && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){
-
- if( blob_eq_str(&aToken[1], zSCode, -1) ){
+ if( blob_eq(&xfer.aToken[0],"push")
+ && xfer.nToken==3
+ && cloneFlag
+ && blob_is_uuid(&xfer.aToken[1])
+ && blob_is_uuid(&xfer.aToken[2])
+ ){
+ if( blob_eq_str(&xfer.aToken[1], zSCode, -1) ){
fossil_fatal("server loop");
}
nMsg++;
if( zPCode==0 ){
- zPCode = mprintf("%b", &aToken[2]);
+ zPCode = mprintf("%b", &xfer.aToken[2]);
db_set("project-code", zPCode);
}
cloneFlag = 0;
pullFlag = 1;
@@ -809,40 +740,40 @@
/* error MESSAGE
**
** Report an error
*/
- if( blob_eq(&aToken[0],"error") && nToken==2 ){
- char *zMsg = blob_terminate(&aToken[1]);
+ if( blob_eq(&xfer.aToken[0],"error") && xfer.nToken==2 ){
+ char *zMsg = blob_terminate(&xfer.aToken[1]);
defossilize(zMsg);
- blob_appendf(&errmsg, "server says: %s", zMsg);
+ blob_appendf(&xfer.err, "server says: %s", zMsg);
}else
/* Unknown message */
{
- blob_appendf(&errmsg, "unknown command: %b", &aToken[0]);
+ blob_appendf(&xfer.err, "unknown command: %b", &xfer.aToken[0]);
}
- if( blob_size(&errmsg) ){
- fossil_fatal("%b", &errmsg);
+ if( blob_size(&xfer.err) ){
+ fossil_fatal("%b", &xfer.err);
}
- blobarray_reset(aToken, nToken);
+ blobarray_reset(xfer.aToken, xfer.nToken);
}
printf("Received: %3d files, %3d requests, %3d other msgs, %8d bytes\n",
- nFile, nReq, nMsg, blob_size(&recv));
+ xfer.nFile + xfer.nDelta + xfer.nDanglingFile,
+ nReq, nMsg, blob_size(&recv));
blob_reset(&recv);
- if( nFileSend + nFile==0 ){
+ if( nFileSend + xfer.nFile + xfer.nDelta + xfer.nDanglingFile==0 ){
nNoFileCycle++;
if( nNoFileCycle>1 ){
go = 0;
}
}else{
nNoFileCycle = 0;
}
- nFile = nReq = nMsg = 0;
+ nReq = nMsg = 0;
+ xfer.nFile = 0;
+ xfer.nDelta = 0;
+ xfer.nDanglingFile = 0;
};
http_close();
db_end_transaction(0);
- db_multi_exec(
- "DROP TABLE onremote;"
- "DROP TABLE pending;"
- );
}