comparison mercurial/localrepo.py @ 25237:7504a7325e4c

localrepo: extract stream clone application into reusable function The existing stream_in method assumes a streaming clone is applied via the wire protocol. Previous patches have enabled streaming clone data to be produced and consumed outside the context of the wire protocol. However, the consuming part was incomplete because it didn't deal with things like updating the branch caches or writing out a requirements file. This patch finishes the separation of stream clone handling from the wire protocol. After this patch, it is possible to consume stream clones from arbitrary sources, including files. Mozilla plans to leverage this to serve pre-generated stream clone files to consumers, drastically reducing the wall and CPU time required to clone large repositories. This will enable clones to be nearly as fast as `tar`.
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 21 May 2015 10:41:06 -0700
parents 5095059340dc
children 61b3529e2377
comparison
equal deleted inserted replaced
25236:5095059340dc 25237:7504a7325e4c
1753 functions, which are called before pushing changesets. 1753 functions, which are called before pushing changesets.
1754 """ 1754 """
1755 return util.hooks() 1755 return util.hooks()
1756 1756
1757 def stream_in(self, remote, remotereqs): 1757 def stream_in(self, remote, remotereqs):
1758 # Save remote branchmap. We will use it later
1759 # to speed up branchcache creation
1760 rbranchmap = None
1761 if remote.capable("branchmap"):
1762 rbranchmap = remote.branchmap()
1763
1764 fp = remote.stream_out()
1765 l = fp.readline()
1766 try:
1767 resp = int(l)
1768 except ValueError:
1769 raise error.ResponseError(
1770 _('unexpected response from remote server:'), l)
1771 if resp == 1:
1772 raise util.Abort(_('operation forbidden by server'))
1773 elif resp == 2:
1774 raise util.Abort(_('locking the remote repository failed'))
1775 elif resp != 0:
1776 raise util.Abort(_('the server sent an unknown error code'))
1777
1778 self.applystreamclone(remotereqs, rbranchmap, fp)
1779 return len(self.heads()) + 1
1780
1781 def applystreamclone(self, remotereqs, remotebranchmap, fp):
1782 """Apply stream clone data to this repository.
1783
1784 "remotereqs" is a set of requirements to handle the incoming data.
1785 "remotebranchmap" is the result of a branchmap lookup on the remote. It
1786 can be None.
1787 "fp" is a file object containing the raw stream data, suitable for
1788 feeding into exchange.consumestreamclone.
1789 """
1758 lock = self.lock() 1790 lock = self.lock()
1759 try: 1791 try:
1760 # Save remote branchmap. We will use it later
1761 # to speed up branchcache creation
1762 rbranchmap = None
1763 if remote.capable("branchmap"):
1764 rbranchmap = remote.branchmap()
1765
1766 fp = remote.stream_out()
1767 l = fp.readline()
1768 try:
1769 resp = int(l)
1770 except ValueError:
1771 raise error.ResponseError(
1772 _('unexpected response from remote server:'), l)
1773 if resp == 1:
1774 raise util.Abort(_('operation forbidden by server'))
1775 elif resp == 2:
1776 raise util.Abort(_('locking the remote repository failed'))
1777 elif resp != 0:
1778 raise util.Abort(_('the server sent an unknown error code'))
1779
1780 exchange.consumestreamclone(self, fp) 1792 exchange.consumestreamclone(self, fp)
1781 1793
1782 # new requirements = old non-format requirements + 1794 # new requirements = old non-format requirements +
1783 # new format-related remote requirements 1795 # new format-related remote requirements
1784 # requirements from the streamed-in repository 1796 # requirements from the streamed-in repository
1785 self.requirements = remotereqs | ( 1797 self.requirements = remotereqs | (
1786 self.requirements - self.supportedformats) 1798 self.requirements - self.supportedformats)
1787 self._applyopenerreqs() 1799 self._applyopenerreqs()
1788 self._writerequirements() 1800 self._writerequirements()
1789 1801
1790 if rbranchmap: 1802 if remotebranchmap:
1791 rbheads = [] 1803 rbheads = []
1792 closed = [] 1804 closed = []
1793 for bheads in rbranchmap.itervalues(): 1805 for bheads in remotebranchmap.itervalues():
1794 rbheads.extend(bheads) 1806 rbheads.extend(bheads)
1795 for h in bheads: 1807 for h in bheads:
1796 r = self.changelog.rev(h) 1808 r = self.changelog.rev(h)
1797 b, c = self.changelog.branchinfo(r) 1809 b, c = self.changelog.branchinfo(r)
1798 if c: 1810 if c:
1799 closed.append(h) 1811 closed.append(h)
1800 1812
1801 if rbheads: 1813 if rbheads:
1802 rtiprev = max((int(self.changelog.rev(node)) 1814 rtiprev = max((int(self.changelog.rev(node))
1803 for node in rbheads)) 1815 for node in rbheads))
1804 cache = branchmap.branchcache(rbranchmap, 1816 cache = branchmap.branchcache(remotebranchmap,
1805 self[rtiprev].node(), 1817 self[rtiprev].node(),
1806 rtiprev, 1818 rtiprev,
1807 closednodes=closed) 1819 closednodes=closed)
1808 # Try to stick it as low as possible 1820 # Try to stick it as low as possible
1809 # filter above served are unlikely to be fetch from a clone 1821 # filter above served are unlikely to be fetch from a clone
1812 if cache.validfor(rview): 1824 if cache.validfor(rview):
1813 self._branchcaches[candidate] = cache 1825 self._branchcaches[candidate] = cache
1814 cache.write(rview) 1826 cache.write(rview)
1815 break 1827 break
1816 self.invalidate() 1828 self.invalidate()
1817 return len(self.heads()) + 1
1818 finally: 1829 finally:
1819 lock.release() 1830 lock.release()
1820 1831
1821 def clone(self, remote, heads=[], stream=None): 1832 def clone(self, remote, heads=[], stream=None):
1822 '''clone remote repository. 1833 '''clone remote repository.