Artifact Content
Not logged in

Artifact 04b08e89d93300592abad44257337daf2901c9fc

File src/xfer.c part of check-in [4c072728b4] - Add a quick hack that disables delta compression on xfer. This gives people a chance to synchronize the previous fix. We will eventually remove the hack. by drh on 2007-08-09 03:47:28.

/*
** Copyright (c) 2007 D. Richard Hipp
**
** This program is free software; you can redistribute it and/or
** modify it under the terms of the GNU General Public
** License version 2 as published by the Free Software Foundation.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
** General Public License for more details.
** 
** You should have received a copy of the GNU General Public
** License along with this library; if not, write to the
** Free Software Foundation, Inc., 59 Temple Place - Suite 330,
** Boston, MA  02111-1307, USA.
**
** Author contact information:
**   drh@hwaci.com
**   http://www.hwaci.com/drh/
**
*******************************************************************************
**
** This file contains code to implement the file transfer protocol.
*/
#include "config.h"
#include "xfer.h"

/*
** Try to locate a record that is similar to rid and is a likely
** candidate for delta against rid.  The similar record must be
** referenced in the onremote table.
**
** Return the integer record ID of the similar record.  Or return
** 0 if none is found.
*/
static int similar_record(int rid, int traceFlag){
  int inCnt, outCnt;
  Stmt q;
  int queue[100];

return 0;

  db_prepare(&q,
      "SELECT srcid, EXISTS(SELECT 1 FROM onremote WHERE rid=srcid)"
      "  FROM delta"
      " WHERE rid=:x"
      " UNION ALL "
      "SELECT rid, EXISTS(SELECT 1 FROM onremote WHERE rid=delta.rid)"
      "  FROM delta"
      " WHERE srcid=:x"
  );
  queue[0] = rid;
  inCnt = 1;
  outCnt = 0;
  while( outCnt<inCnt ){
    int xid = queue[outCnt%64];
    outCnt++;
    db_bind_int(&q, ":x", xid);
    if( traceFlag ) printf("xid=%d\n", xid);
    while( db_step(&q)==SQLITE_ROW ){
      int nid = db_column_int(&q, 0);
      int hit = db_column_int(&q, 1);
      if( traceFlag ) printf("nid=%d hit=%d\n", nid, hit);
      if( hit  ){
        db_finalize(&q);
        return nid;
      }
      if( inCnt<sizeof(queue)/sizeof(queue[0]) ){
        int i;
        for(i=0; i<inCnt && queue[i]!=nid; i++){}
        if( i>=inCnt ){
          queue[inCnt++] = nid;
        }
      }
    }
    db_reset(&q);
  }
  db_finalize(&q);
  return 0;
}

/*
** COMMAND: test-similar-record
*/
void test_similar_record(void){
  int i;
  if( g.argc<4 ){
    usage("SRC ONREMOTE...");
  }
  db_must_be_within_tree();
  db_multi_exec(
    "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);"
  );
  for(i=3; i<g.argc; i++){
    int rid = name_to_rid(g.argv[i]);
    printf("%s -> %d\n", g.argv[i], rid);
    db_multi_exec("INSERT INTO onremote VALUES(%d)", rid);
  }
  printf("similar: %d\n", similar_record(name_to_rid(g.argv[2]), 1));
}


/*
** The aToken[0..nToken-1] blob array is a parse of a "file" line 
** message.  This routine finishes parsing that message and does
** a record insert of the file.
**
** The file line is in one of the following two forms:
**
**      file UUID SIZE \n CONTENT
**      file UUID DELTASRC SIZE \n CONTENT
**
** The content is SIZE bytes immediately following the newline.
** If DELTASRC exists, then the CONTENT is a delta against the
** content of DELTASRC.
**
** If any error occurs, write a message into pErr which has already
** be initialized to an empty string.
*/
static void xfer_accept_file(Blob *pIn, Blob *aToken, int nToken, Blob *pErr){
  int n;
  int rid;
  Blob content, hash;
  
  if( nToken<3 || nToken>4 || !blob_is_uuid(&aToken[1])
       || !blob_is_int(&aToken[nToken-1], &n) || n<=0
       || (nToken==4 && !blob_is_uuid(&aToken[2])) ){
    blob_appendf(pErr, "malformed file line");
    return;
  }
  blob_zero(&content);
  blob_zero(&hash);
  blob_extract(pIn, n, &content);
  if( nToken==4 ){
    Blob src;
    int srcid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[2]);
    if( srcid==0 ){
      blob_appendf(pErr, "unknown delta source: %b", &aToken[2]);
      return;
    }
    content_get(srcid, &src);
    blob_delta_apply(&src, &content, &content);
    blob_reset(&src);
  }
  sha1sum_blob(&content, &hash);
  if( !blob_eq_str(&aToken[1], blob_str(&hash), -1) ){
    blob_appendf(pErr, "content does not match sha1 hash");
  }
  blob_reset(&hash);
  rid = content_put(&content, 0);
  manifest_crosslink(rid, &content);
  if( rid==0 ){
    blob_appendf(pErr, "%s", g.zErrMsg);
  }
}

/*
** Send the file identified by rid.
**
** If pOut is not NULL, then append the text of the send message
** to pOut.  Otherwise, append the text to the CGI output.
*/
static int send_file(int rid, Blob *pOut){
  Blob content, uuid;
  int size;
  int srcid;


  blob_zero(&uuid);
  db_blob(&uuid, "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
  if( blob_size(&uuid)==0 ){
    return 0;
  }
  content_get(rid, &content);

  srcid = similar_record(rid, 0);
  if( srcid ){
    Blob src, delta;
    Blob srcuuid;
    content_get(srcid, &src);
    blob_delta_create(&src, &content, &delta);
    blob_reset(&src);
    blob_reset(&content);
    blob_zero(&srcuuid);
    db_blob(&srcuuid, "SELECT uuid FROM blob WHERE rid=%d", srcid);
    size = blob_size(&delta);
    if( pOut ){
      blob_appendf(pOut, "file %b %b %d\n", &uuid, &srcuuid, size);
      blob_append(pOut, blob_buffer(&delta), size);
    }else{
      cgi_printf("file %b %b %d\n", &uuid, &srcuuid, size);
      cgi_append_content(blob_buffer(&delta), size);
    }
    blob_reset(&delta);
    blob_reset(&srcuuid);
    blob_reset(&uuid);
  }else{
    size = blob_size(&content);
    if( pOut ){
      blob_appendf(pOut, "file %b %d\n", &uuid, size);
      blob_append(pOut, blob_buffer(&content), size);
    }else{
      cgi_printf("file %b %d\n", &uuid, size);
      cgi_append_content(blob_buffer(&content), size);
    }
    blob_reset(&content);
    blob_reset(&uuid);
  }
  db_multi_exec("INSERT OR IGNORE INTO onremote VALUES(%d)", rid);
  return size;
}


/*
** Send all pending files.
*/
static int send_all_pending(Blob *pOut){
  int iRidSent = 0;
  int sent = 0;
  int nSent = 0;
  int maxSize = db_get_int("http-msg-size", 500000);
  Stmt q;

  db_prepare(&q, "SELECT rid FROM pending ORDER BY rid");
  while( db_step(&q)==SQLITE_ROW ){
    int rid = db_column_int(&q, 0);
    if( sent<maxSize ){
      sent += send_file(rid, pOut);
      nSent++;
      iRidSent = rid;
    }else{
      char *zUuid = db_text(0,
                      "SELECT uuid FROM blob WHERE rid=%d AND size>=0", rid);
      if( zUuid ){
        if( pOut ){
          blob_appendf(pOut, "igot %s\n", zUuid);
        }else{
          cgi_printf("igot %s\n", zUuid);
        }
        free(zUuid);
      }
    }
  }
  db_finalize(&q);
  
  /* Delete the 'pending' records for all files just sent. Otherwise,
  ** we can wind up sending some files more than once.
  */
  if( nSent>0 ){
    db_multi_exec("DELETE FROM pending WHERE rid <= %d", iRidSent);
  }

  return nSent;
}


/*
** Check the signature on an application/x-fossil payload received by
** the HTTP server.  The signature is a line of the following form:
**
**        login LOGIN NONCE SIGNATURE
**
** The NONCE is a random string.  The server will never accept a
** repeat NONCE.  SIGNATURE is the SHA1 checksum of the NONCE 
** concatenated with the users password.
**
** The parameters to this routine are ephermeral blobs holding the
** LOGIN, NONCE and SIGNATURE.
**
** This routine attempts to locate the user and verify the signature.
** If everything checks out, the USER.CAP column for the USER table
** is consulted to set privileges in the global g variable.
**
** If anything fails to check out, no changes are made to privileges.
**
** Signature generation on the client side is handled by the 
** http_exchange() routine.
*/
void check_login(Blob *pLogin, Blob *pNonce, Blob *pSig){
  Stmt q;
  int rc;

  if( db_exists("SELECT 1 FROM rcvfrom WHERE nonce=%B", pNonce) ){
    return;  /* Never accept a repeated nonce */
  }
  db_prepare(&q, "SELECT pw, cap, uid FROM user WHERE login=%B", pLogin);
  if( db_step(&q)==SQLITE_ROW ){
    Blob pw, combined, hash;
    blob_zero(&pw);
    db_ephemeral_blob(&q, 0, &pw);
    blob_zero(&combined);
    blob_copy(&combined, pNonce);
    blob_append(&combined, blob_buffer(&pw), blob_size(&pw));
    /* CGIDEBUG(("presig=[%s]\n", blob_str(&combined))); */
    sha1sum_blob(&combined, &hash);
    rc = blob_compare(&hash, pSig);
    blob_reset(&hash);
    blob_reset(&combined);
    if( rc==0 ){
      const char *zCap;
      zCap = db_column_text(&q, 1);
      login_set_capabilities(zCap);
      g.userUid = db_column_int(&q, 2);
      g.zLogin = mprintf("%b", pLogin);
      g.zNonce = mprintf("%b", pNonce);
    }
  }
  db_reset(&q);
}


/*
** If this variable is set, disable login checks.  Used for debugging
** only.
*/
static int disableLogin = 0;

/*
** WEBPAGE: xfer
**
** This is the transfer handler on the server side.  The transfer
** message has been uncompressed and placed in the g.cgiIn blob.
** Process this message and form an appropriate reply.
*/
void page_xfer(void){
  int nToken;
  int isPull = 0;
  int isPush = 0;
  int nErr = 0;
  Blob line, errmsg, aToken[5];

  db_begin_transaction();
  blobarray_zero(aToken, count(aToken));
  cgi_set_content_type(g.zContentType);
  blob_zero(&errmsg);
  db_multi_exec(
     "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);" /* Client has */
     "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);"  /* Client needs */
  );
  while( blob_line(&g.cgiIn, &line) ){
    nToken = blob_tokenize(&line, aToken, count(aToken));

    /*   file UUID SIZE \n CONTENT
    **   file UUID DELTASRC SIZE \n CONTENT
    **
    ** Accept a file from the client.
    */
    if( blob_eq(&aToken[0], "file") && nToken>=3 && nToken<=4 ){
      if( !isPush ){
        cgi_reset_content();
        @ error not\sauthorized\sto\swrite
        nErr++;
        break;
      }
      xfer_accept_file(&g.cgiIn, aToken, nToken, &errmsg);
      if( blob_size(&errmsg) ){
        cgi_reset_content();
        @ error %T(blob_str(&errmsg))
        nErr++;
        break;
      }
    }else

    /*   gimme UUID
    **
    ** Client is requesting a file
    */
    if( blob_eq(&aToken[0], "gimme") && nToken==2 && blob_is_uuid(&aToken[1]) ){
      if( isPull ){
        db_multi_exec(
          "INSERT OR IGNORE INTO pending(rid) "
          "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1]
        );
      }
    }else

    /*   igot UUID
    **   leaf UUID
    **
    ** Client announces that it has a particular file
    */
    if( nToken==2
          && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0],"leaf"))
          && blob_is_uuid(&aToken[1]) ){
      if( isPull || isPush ){
        int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]);
        if( rid>0 ){
          db_multi_exec(
            "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid
          );
          if( isPull && blob_eq(&aToken[0], "leaf") ){
            db_multi_exec(
              "INSERT OR IGNORE INTO pending(rid) "
              "SELECT cid FROM plink WHERE pid=%d", rid
            );
          }
        }else if( isPush ){
          content_put(0, blob_str(&aToken[1]));
        }
      }
    }else

    /*    pull  SERVERCODE  PROJECTCODE
    **    push  SERVERCODE  PROJECTCODE
    **
    ** The client wants either send or receive
    */
    if( nToken==3
               && (blob_eq(&aToken[0], "pull") || blob_eq(&aToken[0], "push"))
               && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){
      const char *zSCode;
      const char *zPCode;

      zSCode = db_get("server-code", 0);
      if( zSCode==0 ){
        fossil_panic("missing server code");
      }
      if( blob_eq_str(&aToken[1], zSCode, -1) ){
        cgi_reset_content();
        @ error server\sloop
        nErr++;
        break;
      }
      zPCode = db_get("project-code", 0);
      if( zPCode==0 ){
        fossil_panic("missing project code");
      }
      if( !blob_eq_str(&aToken[2], zPCode, -1) ){
        cgi_reset_content();
        @ error wrong\sproject
        nErr++;
        break;
      }
      login_check_credentials();
      if( blob_eq(&aToken[0], "pull") ){
        if( !g.okRead ){
          cgi_reset_content();
          @ error not\sauthorized\sto\sread
          nErr++;
          break;
        }
        isPull = 1;
      }else{
        if( !g.okWrite ){
          cgi_reset_content();
          @ error not\sauthorized\sto\swrite
          nErr++;
          break;
        }
        isPush = 1;

      }
    }else

    /*    clone
    **
    ** The client knows nothing.  Tell all.
    */
    if( blob_eq(&aToken[0], "clone") ){
      login_check_credentials();
      if( !g.okRead ){
        cgi_reset_content();
        @ error not\sauthorized\sto\sread
        nErr++;
        break;
      }
      isPull = 1;
      @ push %s(db_get("server-code", "x")) %s(db_get("project-code", "x"))
      db_multi_exec(
        "INSERT OR IGNORE INTO pending(rid) "
        "SELECT rid FROM blob WHERE size>=0"
      );
    }else

    /*    login  USER  NONCE  SIGNATURE
    **
    ** Check for a valid login.  This has to happen before anything else.
    */
    if( blob_eq(&aToken[0], "login") && nToken==4 ){
      if( disableLogin ){
        g.okRead = g.okWrite = 1;
      }else{
        check_login(&aToken[1], &aToken[2], &aToken[3]);
      }
    }else

    /* Unknown message
    */
    {
      cgi_reset_content();
      @ error bad\scommand:\s%F(blob_str(&line))
    }
    blobarray_reset(aToken, nToken);
  }

  /* The input message has now been processed.  Generate a reply. */
  if( isPush ){
    Stmt q;
    db_prepare(&q, "SELECT uuid FROM blob WHERE size<0");
    while( db_step(&q)==SQLITE_ROW ){
      const char *zUuid = db_column_text(&q, 0);
      @ gimme %s(zUuid)
    }
    db_finalize(&q);
  }
  if( isPull ){
    send_all_pending(0);
  }
  if( isPush || isPull ){
    /* Always send our leaves */
    Stmt q;
    db_prepare(&q, 
       "SELECT uuid FROM blob WHERE rid IN"
       "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
    );
    while( db_step(&q)==SQLITE_ROW ){
      const char *zUuid = db_column_text(&q, 0);
      @ leaf %s(zUuid)
    }
    db_finalize(&q);
  }

  db_end_transaction(0);
}

/*
** COMMAND: test-xfer
**
** This command is used for debugging the server.  There is a single
** argument which is the uncompressed content of an "xfer" message
** from client to server.  This command interprets that message as
** if had been received by the server.
**
** On the client side, run:
**
**      fossil push http://bogus/ --httptrace
**
** Or a similar command to provide the output.  The content of the
** message will appear on standard output.  Capture this message
** into a file named (for example) out.txt.  Then run the
** server in gdb:
**
**     gdb fossil
**     r test-xfer out.txt
*/
void cmd_test_xfer(void){
  int notUsed;
  if( g.argc!=2 && g.argc!=3 ){
    usage("?MESSAGEFILE?");
  }
  db_must_be_within_tree();
  blob_zero(&g.cgiIn);
  blob_read_from_file(&g.cgiIn, g.argc==2 ? "-" : g.argv[2]);
  disableLogin = 1;
  page_xfer();
  printf("%s\n", cgi_extract_content(&notUsed));
}


/*
** Sync to the host identified in g.urlName and g.urlPath.  This
** routine is called by the client.
**
** Records are pushed to the server if pushFlag is true.  Records
** are pulled if pullFlag is true.  A full sync occurs if both are
** true.
*/
void client_sync(int pushFlag, int pullFlag, int cloneFlag){
  int go = 1;        /* Loop until zero */
  int nToken;
  const char *zSCode = db_get("server-code", "x");
  const char *zPCode = db_get("project-code", 0);
  int nFile = 0;
  int nMsg = 0;
  int nReq = 0;
  int nFileSend;
  int nNoFileCycle = 0;
  Blob send;        /* Text we are sending to the server */
  Blob recv;        /* Reply we got back from the server */
  Blob line;        /* A single line of the reply */
  Blob aToken[5];   /* A tokenization of line */
  Blob errmsg;      /* Error message */

  assert( pushFlag || pullFlag || cloneFlag );
  assert( !g.urlIsFile );          /* This only works for networking */

  db_begin_transaction();
  db_multi_exec(
    /* Records which we know the other side also has */
    "CREATE TEMP TABLE onremote(rid INTEGER PRIMARY KEY);"
    /* Records we know the other side needs */
    "CREATE TEMP TABLE pending(rid INTEGER PRIMARY KEY);"
  );
  blobarray_zero(aToken, count(aToken));
  blob_zero(&send);
  blob_zero(&recv);
  blob_zero(&errmsg);


  while( go ){
    go = 0;
    nFile = nReq = nMsg = 0;

    /* Generate a request to be sent to the server.
    ** Always begin with a clone, pull, or push message
    */
    
    if( cloneFlag ){
      blob_appendf(&send, "clone\n");
      pushFlag = 0;
      pullFlag = 0;
      nMsg++;
    }else if( pullFlag ){
      blob_appendf(&send, "pull %s %s\n", zSCode, zPCode);
      nMsg++;
    }
    if( pushFlag ){
      blob_appendf(&send, "push %s %s\n", zSCode, zPCode);
      nMsg++;
    }

    if( pullFlag ){
      /* Send gimme message for every phantom that we hold.
      */
      Stmt q;
      db_prepare(&q, "SELECT uuid FROM blob WHERE size<0");
      while( db_step(&q)==SQLITE_ROW ){
        const char *zUuid = db_column_text(&q, 0);
        blob_appendf(&send,"gimme %s\n", zUuid);
        nReq++;
      }
      db_finalize(&q);
    }

    if( pushFlag ){
      /* Send the server any files that the server has requested */
      nFile += send_all_pending(&send);
    }

    if( pullFlag || pushFlag ){
      /* Always send our leaves */
      Stmt q;
      db_prepare(&q, 
         "SELECT uuid FROM blob WHERE rid IN"
         "  (SELECT cid FROM plink EXCEPT SELECT pid FROM plink)"
      );
      while( db_step(&q)==SQLITE_ROW ){
        const char *zUuid = db_column_text(&q, 0);
        blob_appendf(&send, "leaf %s\n", zUuid);
        nMsg++;
      }
      db_finalize(&q);
    }

    /* Exchange messages with the server */
    printf("Send:      %3d files, %3d requests, %3d other msgs, %8d bytes\n",
            nFile, nReq, nMsg, blob_size(&send));
    nFileSend = nFile;
    nFile = nReq = nMsg = 0;
    http_exchange(&send, &recv);
    blob_reset(&send);

    /* Process the reply that came back from the server */
    while( blob_line(&recv, &line) ){
      nToken = blob_tokenize(&line, aToken, count(aToken));

      /*   file UUID SIZE \n CONTENT
      **   file UUID DELTASRC SIZE \n CONTENT
      **
      ** Receive a file transmitted from the other side
      */
      if( blob_eq(&aToken[0],"file") ){
        xfer_accept_file(&recv, aToken, nToken, &errmsg);
        nFile++;
        go = 1;
      }else

      /*   gimme UUID
      **
      ** Server is requesting a file
      */
      if( blob_eq(&aToken[0], "gimme") && nToken==2
               && blob_is_uuid(&aToken[1]) ){
        nReq++;
        if( pushFlag ){
          db_multi_exec(
            "INSERT OR IGNORE INTO pending(rid) "
            "SELECT rid FROM blob WHERE uuid=%B AND size>=0", &aToken[1]
          );
          go = 1;
        }
      }else
  
      /*   igot UUID
      **   leaf UUID
      **
      ** Server proclaims that it has a particular file.  A leaf message
      ** means that the file is a leaf manifest on the server.
      */
      if( nToken==2
              && (blob_eq(&aToken[0], "igot") || blob_eq(&aToken[0], "leaf"))
              && blob_is_uuid(&aToken[1]) ){
        int rid = db_int(0, "SELECT rid FROM blob WHERE uuid=%B", &aToken[1]);
        nMsg++;
        if( rid>0 ){
          db_multi_exec(
            "INSERT OR IGNORE INTO onremote(rid) VALUES(%d)", rid
          );
          /* Add to the pending set all children of the server's leaves */
          if( pushFlag && blob_eq(&aToken[0], "leaf") ){
            db_multi_exec(
              "INSERT OR IGNORE INTO pending(rid) "
              "SELECT cid FROM plink WHERE pid=%d", rid
            );
            if( db_changes()>0 ){
              go = 1;
            }
          }
          if( pullFlag && !go && 
              db_exists("SELECT 1 FROM blob WHERE rid=%d AND size<0", rid) ){
            go = 1;
          }
        }else if( pullFlag ){
          go = 1;
          content_put(0, blob_str(&aToken[1]));
        }
      }else
  
      /*   push  SERVERCODE  PRODUCTCODE
      **
      ** Should only happen in response to a clone.
      */
      if( blob_eq(&aToken[0],"push") && nToken==3 && cloneFlag
              && blob_is_uuid(&aToken[1]) && blob_is_uuid(&aToken[2]) ){

        if( blob_eq_str(&aToken[1], zSCode, -1) ){
          fossil_fatal("server loop");
        }
        nMsg++;
        if( zPCode==0 ){
          zPCode = mprintf("%b", &aToken[2]);
          db_set("project-code", zPCode);
        }
        cloneFlag = 0;
        pullFlag = 1;
      }else

      /*   error MESSAGE
      **
      ** Report an error
      */        
      if( blob_eq(&aToken[0],"error") && nToken==2 ){
        char *zMsg = blob_terminate(&aToken[1]);
        defossilize(zMsg);
        blob_appendf(&errmsg, "server says: %s", zMsg);
      }else

      /* Unknown message */
      {
        blob_appendf(&errmsg, "unknown command: %b", &aToken[0]);
      }

      if( blob_size(&errmsg) ){
        fossil_fatal("%b", &errmsg);
      }
      blobarray_reset(aToken, nToken);
    }
    printf("Received:  %3d files, %3d requests, %3d other msgs, %8d bytes\n",
            nFile, nReq, nMsg, blob_size(&recv));
    blob_reset(&recv);
    if( nFileSend + nFile==0 ){
      nNoFileCycle++;
      if( nNoFileCycle>1 ){
        go = 0;
      }
    }else{
      nNoFileCycle = 0;
    }
    nFile = nReq = nMsg = 0;
  };
  http_close();
  db_end_transaction(0);
  db_multi_exec(
    "DROP TABLE onremote;"
    "DROP TABLE pending;"
  );
}