diff mercurial/streamclone.py @ 26443:d947086d8973

streamclone: move code out of exchange.py We bulk move functions from exchange.py related to streaming clones. Function names were renamed slightly to drop a component redundant with the module name. Docstrings and comments referencing old names and locations were updated accordingly.
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 02 Oct 2015 16:05:52 -0700
parents ef8d27f53204
children 623743010133
line wrap: on
line diff
--- a/mercurial/streamclone.py	Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/streamclone.py	Fri Oct 02 16:05:52 2015 -0700
@@ -7,14 +7,144 @@
 
 from __future__ import absolute_import
 
+import time
+
 from .i18n import _
 from . import (
     branchmap,
     error,
-    exchange,
+    store,
     util,
 )
 
+# This is it's own function so extensions can override it.
+def _walkstreamfiles(repo):
+    return repo.store.walk()
+
+def generatev1(repo):
+    """Emit content for version 1 of a streaming clone.
+
+    This is a generator of raw chunks that constitute a streaming clone.
+
+    The stream begins with a line of 2 space-delimited integers containing the
+    number of entries and total bytes size.
+
+    Next, are N entries for each file being transferred. Each file entry starts
+    as a line with the file name and integer size delimited by a null byte.
+    The raw file data follows. Following the raw file data is the next file
+    entry, or EOF.
+
+    When used on the wire protocol, an additional line indicating protocol
+    success will be prepended to the stream. This function is not responsible
+    for adding it.
+
+    This function will obtain a repository lock to ensure a consistent view of
+    the store is captured. It therefore may raise LockError.
+    """
+    entries = []
+    total_bytes = 0
+    # Get consistent snapshot of repo, lock during scan.
+    lock = repo.lock()
+    try:
+        repo.ui.debug('scanning\n')
+        for name, ename, size in _walkstreamfiles(repo):
+            if size:
+                entries.append((name, size))
+                total_bytes += size
+    finally:
+            lock.release()
+
+    repo.ui.debug('%d files, %d bytes to transfer\n' %
+                  (len(entries), total_bytes))
+    yield '%d %d\n' % (len(entries), total_bytes)
+
+    svfs = repo.svfs
+    oldaudit = svfs.mustaudit
+    debugflag = repo.ui.debugflag
+    svfs.mustaudit = False
+
+    try:
+        for name, size in entries:
+            if debugflag:
+                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
+            # partially encode name over the wire for backwards compat
+            yield '%s\0%d\n' % (store.encodedir(name), size)
+            if size <= 65536:
+                fp = svfs(name)
+                try:
+                    data = fp.read(size)
+                finally:
+                    fp.close()
+                yield data
+            else:
+                for chunk in util.filechunkiter(svfs(name), limit=size):
+                    yield chunk
+    finally:
+        svfs.mustaudit = oldaudit
+
+def consumev1(repo, fp):
+    """Apply the contents from version 1 of a streaming clone file handle.
+
+    This takes the output from "streamout" and applies it to the specified
+    repository.
+
+    Like "streamout," the status line added by the wire protocol is not handled
+    by this function.
+    """
+    lock = repo.lock()
+    try:
+        repo.ui.status(_('streaming all changes\n'))
+        l = fp.readline()
+        try:
+            total_files, total_bytes = map(int, l.split(' ', 1))
+        except (ValueError, TypeError):
+            raise error.ResponseError(
+                _('unexpected response from remote server:'), l)
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (total_files, util.bytecount(total_bytes)))
+        handled_bytes = 0
+        repo.ui.progress(_('clone'), 0, total=total_bytes)
+        start = time.time()
+
+        tr = repo.transaction(_('clone'))
+        try:
+            for i in xrange(total_files):
+                # XXX doesn't support '\n' or '\r' in filenames
+                l = fp.readline()
+                try:
+                    name, size = l.split('\0', 1)
+                    size = int(size)
+                except (ValueError, TypeError):
+                    raise error.ResponseError(
+                        _('unexpected response from remote server:'), l)
+                if repo.ui.debugflag:
+                    repo.ui.debug('adding %s (%s)\n' %
+                                  (name, util.bytecount(size)))
+                # for backwards compat, name was partially encoded
+                ofp = repo.svfs(store.decodedir(name), 'w')
+                for chunk in util.filechunkiter(fp, limit=size):
+                    handled_bytes += len(chunk)
+                    repo.ui.progress(_('clone'), handled_bytes,
+                                     total=total_bytes)
+                    ofp.write(chunk)
+                ofp.close()
+            tr.close()
+        finally:
+            tr.release()
+
+        # Writing straight to files circumvented the inmemory caches
+        repo.invalidate()
+
+        elapsed = time.time() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        repo.ui.progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(total_bytes), elapsed,
+                        util.bytecount(total_bytes / elapsed)))
+    finally:
+        lock.release()
+
 def streamin(repo, remote, remotereqs):
     # Save remote branchmap. We will use it later
     # to speed up branchcache creation
@@ -46,11 +176,11 @@
     "remotebranchmap" is the result of a branchmap lookup on the remote. It
     can be None.
     "fp" is a file object containing the raw stream data, suitable for
-    feeding into exchange.consumestreamclone.
+    feeding into consumev1().
     """
     lock = repo.lock()
     try:
-        exchange.consumestreamclone(repo, fp)
+        consumev1(repo, fp)
 
         # new requirements = old non-format requirements +
         #                    new format-related remote requirements