@@ -37,11 +37,14 @@
Blob line; /* The current line of input */
Blob aToken[5]; /* Tokenized version of line */
Blob err; /* Error message text */
int nToken; /* Number of tokens in line */
- int nIGot; /* Number of "igot" messages sent */
- int nFile; /* Number of files sent or received */
- int nDelta; /* Number of deltas sent or received */
+ int nIGotSent; /* Number of "igot" messages sent */
+ int nGimmeSent; /* Number of gimme messages sent */
+ int nFileSent; /* Number of files sent */
+ int nDeltaSent; /* Number of deltas sent */
+ int nFileRcvd; /* Number of files received */
+ int nDeltaRcvd; /* Number of deltas received */
int nDanglingFile; /* Number of dangling deltas received */
int mxSend; /* Stop sending "file" with pOut reaches this size */
};
@@ -101,18 +104,19 @@
if( pXfer->nToken==4 ){
Blob src;
int srcid = rid_from_uuid(&pXfer->aToken[2], 1);
if( content_get(srcid, &src)==0 ){
- content_put(&content, blob_str(&hash), srcid);
+ content_put(&content, blob_str(&pXfer->aToken[1]), srcid);
blob_appendf(pXfer->pOut, "gimme %b\n", &pXfer->aToken[2]);
+ pXfer->nGimmeSent++;
pXfer->nDanglingFile++;
return;
}
- pXfer->nDelta++;
+ pXfer->nDeltaRcvd++;
blob_delta_apply(&src, &content, &content);
blob_reset(&src);
}else{
- pXfer->nFile++;
+ pXfer->nFileRcvd++;
}
sha1sum_blob(&content, &hash);
if( !blob_eq_str(&pXfer->aToken[1], blob_str(&hash), -1) ){
blob_appendf(&pXfer->err, "content does not match sha1 hash");
@@ -140,31 +144,15 @@
Blob *pUuid, /* The UUID of the file to send */
int srcId /* Send as a delta against this record */
){
static const char *azQuery[] = {
- "SELECT srcid FROM delta JOIN pending ON pending.rid=delta.srcid"
- " WHERE delta.rid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=srcid)",
-
- "SELECT delta.rid FROM delta JOIN pending ON pending.rid=delta.rid"
- " WHERE srcid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=delta.rid)",
-
- "SELECT pid FROM plink JOIN pending ON rid=pid"
+ "SELECT pid FROM plink"
" WHERE cid=%d"
" AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
- "SELECT cid FROM plink JOIN pending ON rid=cid"
- " WHERE pid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=cid)",
-
- "SELECT pid FROM mlink JOIN pending ON rid=pid"
+ "SELECT pid FROM mlink"
" WHERE fid=%d"
" AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=pid)",
-
- "SELECT fid FROM mlink JOIN pending ON rid=fid"
- " WHERE pid=%d"
- " AND NOT EXISTS(SELECT 1 FROM phantom WHERE rid=fid)",
};
int i;
Blob src, delta;
int size = 0;
@@ -171,9 +159,9 @@
for(i=0; srcId==0 && i<count(azQuery); i++){
srcId = db_int(0, azQuery[i], rid);
}
- if( srcId && content_get(srcId, &src) ){
+ if( srcId>0 && content_get(srcId, &src) ){
char *zUuid = db_text(0, "SELECT uuid FROM blob WHERE rid=%d", srcId);
blob_delta_create(&src, pContent, &delta);
size = blob_size(&delta);
if( size>=blob_size(pContent)-50 ){
@@ -180,9 +168,9 @@
size = 0;
}else{
blob_appendf(pXfer->pOut, "file %b %s %d\n", pUuid, zUuid, size);
blob_append(pXfer->pOut, blob_buffer(&delta), size);
- blob_appendf(pXfer->pOut, "\n", 1);
+ /* blob_appendf(pXfer->pOut, "\n", 1); */
}
blob_reset(&delta);
free(zUuid);
blob_reset(&src);
@@ -191,8 +179,16 @@
}
/*
** Send the file identified by rid.
+**
+** The pUuid can be NULL in which case the correct UUID is computed
+** from the rid.
+**
+** If srcId is positive, then a delta is sent against that srcId.
+** If srcId is zero, then an attempt is made to find an appropriate
+** file to delta against. If srcId is negative, the file is sent
+** without deltaing.
*/
static void send_file(Xfer *pXfer, int rid, Blob *pUuid, int srcId){
Blob content, uuid;
int size = 0;
@@ -209,9 +205,9 @@
pUuid = &uuid;
}
if( pXfer->mxSend<=blob_size(pXfer->pOut) ){
blob_appendf(pXfer->pOut, "igot %b\n", pUuid);
- pXfer->nIGot++;
+ pXfer->nIGotSent++;
blob_reset(&uuid);
return;
}
content_get(rid, &content);
@@ -222,11 +218,11 @@
if( size==0 ){
int size = blob_size(&content);
blob_appendf(pXfer->pOut, "file %b %d\n", pUuid, size);
blob_append(pXfer->pOut, blob_buffer(&content), size);
- pXfer->nFile++;
+ pXfer->nFileSent++;
}else{
- pXfer->nDelta++;
+ pXfer->nDeltaSent++;
}
db_multi_exec("INSERT INTO sent VALUES(%d)", rid);
blob_reset(&uuid);
}
@@ -294,8 +290,9 @@
db_prepare(&q, "SELECT uuid FROM phantom JOIN blob USING(rid)");
while( db_step(&q)==SQLITE_ROW ){
const char *zUuid = db_column_text(&q, 0);
blob_appendf(pXfer->pOut, "gimme %s\n", zUuid);
+ pXfer->nGimmeSent++;
}
db_finalize(&q);
}
@@ -368,9 +365,8 @@
** message has been uncompressed and placed in the g.cgiIn blob.
** Process this message and form an appropriate reply.
*/
void page_xfer(void){
- int nToken;
int isPull = 0;
int isPush = 0;
int nErr = 0;
Xfer xfer;
@@ -380,8 +376,9 @@
cgi_set_content_type(g.zContentType);
blob_zero(&xfer.err);
xfer.pIn = &g.cgiIn;
xfer.pOut = cgi_output_blob();
+ xfer.mxSend = db_get_int("max-download", 1000000);
db_begin_transaction();
db_multi_exec(
"CREATE TEMP TABLE sent(rid INTEGER PRIMARY KEY);"
@@ -394,9 +391,9 @@
**
** Accept a file from the client.
*/
if( blob_eq(&xfer.aToken[0], "file") ){
- if( !isPush ){
+ if( !g.okWrite ){
cgi_reset_content();
@ error not\sauthorized\sto\swrite
nErr++;
break;
@@ -417,9 +414,9 @@
if( blob_eq(&xfer.aToken[0], "gimme")
&& xfer.nToken==2
&& blob_is_uuid(&xfer.aToken[1])
){
- if( isPull ){
+ if( g.okRead ){
int rid = rid_from_uuid(&xfer.aToken[1], 0);
if( rid ){
send_file(&xfer, rid, &xfer.aToken[1], 0);
}
@@ -433,9 +430,9 @@
if( xfer.nToken==2
&& blob_eq(&xfer.aToken[0], "igot")
&& blob_is_uuid(&xfer.aToken[1])
){
- if( isPush ){
+ if( g.okWrite ){
rid_from_uuid(&xfer.aToken[1], 1);
}
}else
@@ -447,9 +444,9 @@
if( xfer.nToken==2
&& blob_eq(&xfer.aToken[0], "leaf")
&& blob_is_uuid(&xfer.aToken[1])
){
- if( isPull ){
+ if( g.okRead ){
int rid = rid_from_uuid(&xfer.aToken[1], 0);
leaf_response(&xfer, rid);
}
}else
@@ -458,9 +455,9 @@
** push SERVERCODE PROJECTCODE
**
** The client wants either send or receive
*/
- if( nToken==3
+ if( xfer.nToken==3
&& (blob_eq(&xfer.aToken[0], "pull") || blob_eq(&xfer.aToken[0], "push"))
&& blob_is_uuid(&xfer.aToken[1])
&& blob_is_uuid(&xfer.aToken[2])
){
@@ -512,8 +509,9 @@
**
** The client knows nothing. Tell all.
*/
if( blob_eq(&xfer.aToken[0], "clone") ){
+ int rootid;
login_check_credentials();
if( !g.okRead || !g.okHistory ){
cgi_reset_content();
@ error not\sauthorized\sto\sclone
@@ -521,17 +519,24 @@
break;
}
isPull = 1;
@ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x"))
- send_leaves(&xfer);
+ rootid = db_int(0,
+ "SELECT pid FROM plink AS a"
+ " WHERE NOT EXISTS(SELECT 1 FROM plink WHERE cid=a.pid)"
+ );
+ if( rootid ){
+ send_file(&xfer, rootid, 0, -1);
+ leaf_response(&xfer, rootid);
+ }
}else
/* login USER NONCE SIGNATURE
**
** Check for a valid login. This has to happen before anything else.
*/
if( blob_eq(&xfer.aToken[0], "login")
- && nToken==4
+ && xfer.nToken==4
){
if( disableLogin ){
g.okRead = g.okWrite = 1;
}else{
@@ -598,19 +603,19 @@
void client_sync(int pushFlag, int pullFlag, int cloneFlag){
int go = 1; /* Loop until zero */
const char *zSCode = db_get("server-code", "x");
const char *zPCode = db_get("project-code", 0);
- int nMsg = 0;
- int nReq = 0;
- int nFileSend = 0;
- int nNoFileCycle = 0;
+ int nMsg = 0; /* Number of messages sent or received */
+ int nCycle = 0; /* Number of round trips to the server */
+ int nFileSend = 0;
Blob send; /* Text we are sending to the server */
Blob recv; /* Reply we got back from the server */
Xfer xfer; /* Transfer data */
memset(&xfer, 0, sizeof(xfer));
xfer.pIn = &recv;
xfer.pOut = &send;
+ xfer.mxSend = db_get_int("max-upload", 250000);
assert( pushFlag || pullFlag || cloneFlag );
assert( !g.urlIsFile ); /* This only works for networking */
@@ -621,13 +626,12 @@
blobarray_zero(xfer.aToken, count(xfer.aToken));
blob_zero(&send);
blob_zero(&recv);
blob_zero(&xfer.err);
-
-
- while( go ){
- go = 0;
- nReq = nMsg = 0;
+ blob_zero(&xfer.line);
+
+
+ while( go ){
/* Generate a request to be sent to the server.
** Always begin with a clone, pull, or push message
*/
@@ -638,30 +642,34 @@
pullFlag = 0;
nMsg++;
}else if( pullFlag ){
blob_appendf(&send, "pull %s %s\n", zSCode, zPCode);
- send_leaves(&xfer);
+ nMsg++;
request_phantoms(&xfer);
- nMsg++;
+ send_leaves(&xfer);
}
if( pushFlag ){
blob_appendf(&send, "push %s %s\n", zSCode, zPCode);
nMsg++;
}
/* Exchange messages with the server */
- nFileSend = xfer.nFile + xfer.nDelta + xfer.nDanglingFile;
- printf("Send: %3d files, %3d requests, %3d other msgs, %8d bytes\n",
- nFileSend, nReq, nMsg, blob_size(&send));
- xfer.nFile = 0;
- xfer.nDelta = 0;
- xfer.nDanglingFile = 0;
- nReq = nMsg = 0;
+ nFileSend = xfer.nFileSent + xfer.nDeltaSent;
+ printf("Send: %10d bytes, %3d messages, %3d files (%d+%d)\n",
+ blob_size(&send), nMsg+xfer.nGimmeSent+xfer.nIGotSent,
+ nFileSend, xfer.nFileSent, xfer.nDeltaSent);
+ nMsg = 0;
+ xfer.nFileSent = 0;
+ xfer.nDeltaSent = 0;
+ xfer.nGimmeSent = 0;
http_exchange(&send, &recv);
blob_reset(&send);
/* Process the reply that came back from the server */
while( blob_line(&recv, &xfer.line) ){
+ if( blob_buffer(&xfer.line)[0]=='#' ){
+ continue;
+ }
xfer.nToken = blob_tokenize(&xfer.line, xfer.aToken, count(xfer.aToken));
/* file UUID SIZE \n CONTENT
** file UUID DELTASRC SIZE \n CONTENT
@@ -679,8 +687,9 @@
if( blob_eq(&xfer.aToken[0], "gimme")
&& xfer.nToken==2
&& blob_is_uuid(&xfer.aToken[1])
){
+ nMsg++;
if( pushFlag ){
int rid = rid_from_uuid(&xfer.aToken[1], 0);
send_file(&xfer, rid, &xfer.aToken[1], 0);
}
@@ -693,10 +702,14 @@
if( xfer.nToken==2
&& blob_eq(&xfer.aToken[0], "igot")
&& blob_is_uuid(&xfer.aToken[1])
){
+ nMsg++;
if( pullFlag ){
- rid_from_uuid(&xfer.aToken[1], 1);
+ if( !db_exists("SELECT 1 FROM blob WHERE uuid='%b' AND size>=0",
+ &xfer.aToken[1]) ){
+ content_put(0, blob_str(&xfer.aToken[1]), 0);
+ }
}
}else
@@ -707,8 +720,9 @@
if( xfer.nToken==2
&& blob_eq(&xfer.aToken[0], "leaf")
&& blob_is_uuid(&xfer.aToken[1])
){
+ nMsg++;
if( pushFlag ){
int rid = rid_from_uuid(&xfer.aToken[1], 0);
leaf_response(&xfer, rid);
}
@@ -755,25 +769,38 @@
if( blob_size(&xfer.err) ){
fossil_fatal("%b", &xfer.err);
}
blobarray_reset(xfer.aToken, xfer.nToken);
+ blob_reset(&xfer.line);
}
- printf("Received: %3d files, %3d requests, %3d other msgs, %8d bytes\n",
- xfer.nFile + xfer.nDelta + xfer.nDanglingFile,
- nReq, nMsg, blob_size(&recv));
+ printf("Received: %10d bytes, %3d messages, %3d files (%d+%d+%d)\n",
+ blob_size(&recv), nMsg,
+ xfer.nFileRcvd + xfer.nDeltaRcvd + xfer.nDanglingFile,
+ xfer.nFileRcvd, xfer.nDeltaRcvd, xfer.nDanglingFile);
+
blob_reset(&recv);
- if( nFileSend + xfer.nFile + xfer.nDelta + xfer.nDanglingFile==0 ){
- nNoFileCycle++;
- if( nNoFileCycle>1 ){
- go = 0;
- }
- }else{
- nNoFileCycle = 0;
+ nMsg = 0;
+ xfer.nFileRcvd = 0;
+ xfer.nDeltaRcvd = 0;
+ xfer.nDanglingFile = 0;
+ nCycle++;
+ go = 0;
+
+ /* If we have received one or more files on this cycle and
+ ** we have one or more phantoms, then go for another round
+ */
+ if(xfer.nFileRcvd+xfer.nDeltaRcvd+xfer.nDanglingFile>0
+ && db_exists("SELECT 1 FROM phantom")
+ ){
+ go = 1;
}
- nReq = nMsg = 0;
- xfer.nFile = 0;
- xfer.nDelta = 0;
- xfer.nDanglingFile = 0;
+
+ /* If we have one or more files queued to send, then go
+ ** another round
+ */
+ if( xfer.nFileSent+xfer.nDeltaSent>0 ){
+ go = 1;
+ }
};
http_close();
db_end_transaction(0);
}