Mercurial > public > mercurial-scm > hg-stable
diff mercurial/exchange.py @ 26443:d947086d8973
streamclone: move code out of exchange.py
We bulk move functions from exchange.py related to streaming clones.
Function names were renamed slightly to drop a component redundant with
the module name. Docstrings and comments referencing old names and
locations were updated accordingly.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Fri, 02 Oct 2015 16:05:52 -0700 |
parents | 85b992177d2a |
children | e05fd574c922 |
line wrap: on
line diff
--- a/mercurial/exchange.py Fri Oct 02 15:58:24 2015 -0700 +++ b/mercurial/exchange.py Fri Oct 02 16:05:52 2015 -0700 @@ -5,11 +5,10 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. -import time from i18n import _ from node import hex, nullid import errno, urllib -import util, scmutil, changegroup, base85, error, store +import util, scmutil, changegroup, base85, error import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey import lock as lockmod import tags @@ -1468,131 +1467,3 @@ if recordout is not None: recordout(repo.ui.popbuffer()) return r - -# This is it's own function so extensions can override it. -def _walkstreamfiles(repo): - return repo.store.walk() - -def generatestreamclone(repo): - """Emit content for a streaming clone. - - This is a generator of raw chunks that constitute a streaming clone. - - The stream begins with a line of 2 space-delimited integers containing the - number of entries and total bytes size. - - Next, are N entries for each file being transferred. Each file entry starts - as a line with the file name and integer size delimited by a null byte. - The raw file data follows. Following the raw file data is the next file - entry, or EOF. - - When used on the wire protocol, an additional line indicating protocol - success will be prepended to the stream. This function is not responsible - for adding it. - - This function will obtain a repository lock to ensure a consistent view of - the store is captured. It therefore may raise LockError. - """ - entries = [] - total_bytes = 0 - # Get consistent snapshot of repo, lock during scan. - lock = repo.lock() - try: - repo.ui.debug('scanning\n') - for name, ename, size in _walkstreamfiles(repo): - if size: - entries.append((name, size)) - total_bytes += size - finally: - lock.release() - - repo.ui.debug('%d files, %d bytes to transfer\n' % - (len(entries), total_bytes)) - yield '%d %d\n' % (len(entries), total_bytes) - - svfs = repo.svfs - oldaudit = svfs.mustaudit - debugflag = repo.ui.debugflag - svfs.mustaudit = False - - try: - for name, size in entries: - if debugflag: - repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) - # partially encode name over the wire for backwards compat - yield '%s\0%d\n' % (store.encodedir(name), size) - if size <= 65536: - fp = svfs(name) - try: - data = fp.read(size) - finally: - fp.close() - yield data - else: - for chunk in util.filechunkiter(svfs(name), limit=size): - yield chunk - finally: - svfs.mustaudit = oldaudit - -def consumestreamclone(repo, fp): - """Apply the contents from a streaming clone file. - - This takes the output from "streamout" and applies it to the specified - repository. - - Like "streamout," the status line added by the wire protocol is not handled - by this function. - """ - lock = repo.lock() - try: - repo.ui.status(_('streaming all changes\n')) - l = fp.readline() - try: - total_files, total_bytes = map(int, l.split(' ', 1)) - except (ValueError, TypeError): - raise error.ResponseError( - _('unexpected response from remote server:'), l) - repo.ui.status(_('%d files to transfer, %s of data\n') % - (total_files, util.bytecount(total_bytes))) - handled_bytes = 0 - repo.ui.progress(_('clone'), 0, total=total_bytes) - start = time.time() - - tr = repo.transaction(_('clone')) - try: - for i in xrange(total_files): - # XXX doesn't support '\n' or '\r' in filenames - l = fp.readline() - try: - name, size = l.split('\0', 1) - size = int(size) - except (ValueError, TypeError): - raise error.ResponseError( - _('unexpected response from remote server:'), l) - if repo.ui.debugflag: - repo.ui.debug('adding %s (%s)\n' % - (name, util.bytecount(size))) - # for backwards compat, name was partially encoded - ofp = repo.svfs(store.decodedir(name), 'w') - for chunk in util.filechunkiter(fp, limit=size): - handled_bytes += len(chunk) - repo.ui.progress(_('clone'), handled_bytes, - total=total_bytes) - ofp.write(chunk) - ofp.close() - tr.close() - finally: - tr.release() - - # Writing straight to files circumvented the inmemory caches - repo.invalidate() - - elapsed = time.time() - start - if elapsed <= 0: - elapsed = 0.001 - repo.ui.progress(_('clone'), None) - repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % - (util.bytecount(total_bytes), elapsed, - util.bytecount(total_bytes / elapsed))) - finally: - lock.release()