mercurial/wireprotov2peer.py
changeset 40026 7e807b8a9e56
parent 40024 86b22a4cfab1
child 40134 cfeba1aafb9d
--- a/mercurial/wireprotov2peer.py	Wed Sep 26 18:07:55 2018 -0700
+++ b/mercurial/wireprotov2peer.py	Wed Sep 26 18:08:08 2018 -0700
@@ -13,8 +13,12 @@
 from . import (
     encoding,
     error,
+    pycompat,
     sslutil,
+    url as urlmod,
+    util,
     wireprotoframing,
+    wireprototypes,
 )
 from .utils import (
     cborutil,
@@ -112,9 +116,10 @@
     events occur.
     """
 
-    def __init__(self, requestid, command):
+    def __init__(self, requestid, command, fromredirect=False):
         self.requestid = requestid
         self.command = command
+        self.fromredirect = fromredirect
 
         # Whether all remote input related to this command has been
         # received.
@@ -132,6 +137,7 @@
         self._pendingevents = []
         self._decoder = cborutil.bufferingdecoder()
         self._seeninitial = False
+        self._redirect = None
 
     def _oninputcomplete(self):
         with self._lock:
@@ -146,10 +152,19 @@
 
         with self._lock:
             for o in self._decoder.getavailable():
-                if not self._seeninitial:
+                if not self._seeninitial and not self.fromredirect:
                     self._handleinitial(o)
                     continue
 
+                # We should never see an object after a content redirect,
+                # as the spec says the main status object containing the
+                # content redirect is the only object in the stream. Fail
+                # if we see a misbehaving server.
+                if self._redirect:
+                    raise error.Abort(_('received unexpected response data '
+                                        'after content redirect; the remote is '
+                                        'buggy'))
+
                 self._pendingevents.append(o)
 
             self._serviceable.set()
@@ -160,7 +175,16 @@
             return
 
         elif o[b'status'] == b'redirect':
-            raise error.Abort(_('redirect responses not yet supported'))
+            l = o[b'location']
+            self._redirect = wireprototypes.alternatelocationresponse(
+                url=l[b'url'],
+                mediatype=l[b'mediatype'],
+                size=l.get(b'size'),
+                fullhashes=l.get(b'fullhashes'),
+                fullhashseed=l.get(b'fullhashseed'),
+                serverdercerts=l.get(b'serverdercerts'),
+                servercadercerts=l.get(b'servercadercerts'))
+            return
 
         atoms = [{'msg': o[b'error'][b'message']}]
         if b'args' in o[b'error']:
@@ -214,13 +238,17 @@
     with the higher-level peer API.
     """
 
-    def __init__(self, ui, clientreactor):
+    def __init__(self, ui, clientreactor, opener=None,
+                 requestbuilder=util.urlreq.request):
         self._ui = ui
         self._reactor = clientreactor
         self._requests = {}
         self._futures = {}
         self._responses = {}
+        self._redirects = []
         self._frameseof = False
+        self._opener = opener or urlmod.opener(ui)
+        self._requestbuilder = requestbuilder
 
     def callcommand(self, command, args, f, redirect=None):
         """Register a request to call a command.
@@ -269,7 +297,12 @@
                 self._ui.note(_('received %r\n') % frame)
                 self._processframe(frame)
 
-        if self._frameseof:
+        # Also try to read the first redirect.
+        if self._redirects:
+            if not self._processredirect(*self._redirects[0]):
+                self._redirects.pop(0)
+
+        if self._frameseof and not self._redirects:
             return None
 
         return True
@@ -318,10 +351,27 @@
         # This can raise. The caller can handle it.
         response._onresponsedata(meta['data'])
 
+        # If we got a content redirect response, we want to fetch it and
+        # expose the data as if we received it inline. But we also want to
+        # keep our internal request accounting in order. Our strategy is to
+        # basically put meaningful response handling on pause until EOS occurs
+        # and the stream accounting is in a good state. At that point, we follow
+        # the redirect and replace the response object with its data.
+
+        redirect = response._redirect
+        handlefuture = False if redirect else True
+
         if meta['eos']:
             response._oninputcomplete()
             del self._requests[frame.requestid]
 
+            if redirect:
+                self._followredirect(frame.requestid, redirect)
+                return
+
+        if not handlefuture:
+            return
+
         # If the command has a decoder, we wait until all input has been
         # received before resolving the future. Otherwise we resolve the
         # future immediately.
@@ -336,6 +386,82 @@
             self._futures[frame.requestid].set_result(decoded)
             del self._futures[frame.requestid]
 
+    def _followredirect(self, requestid, redirect):
+        """Called to initiate redirect following for a request."""
+        self._ui.note(_('(following redirect to %s)\n') % redirect.url)
+
+        # TODO handle framed responses.
+        if redirect.mediatype != b'application/mercurial-cbor':
+            raise error.Abort(_('cannot handle redirects for the %s media type')
+                              % redirect.mediatype)
+
+        if redirect.fullhashes:
+            self._ui.warn(_('(support for validating hashes on content '
+                            'redirects not supported)\n'))
+
+        if redirect.serverdercerts or redirect.servercadercerts:
+            self._ui.warn(_('(support for pinning server certificates on '
+                            'content redirects not supported)\n'))
+
+        headers = {
+            r'Accept': redirect.mediatype,
+        }
+
+        req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
+
+        try:
+            res = self._opener.open(req)
+        except util.urlerr.httperror as e:
+            if e.code == 401:
+                raise error.Abort(_('authorization failed'))
+            raise
+        except util.httplib.HTTPException as e:
+            self._ui.debug('http error requesting %s\n' % req.get_full_url())
+            self._ui.traceback()
+            raise IOError(None, e)
+
+        urlmod.wrapresponse(res)
+
+        # The existing response object is associated with frame data. Rather
+        # than try to normalize its state, just create a new object.
+        oldresponse = self._responses[requestid]
+        self._responses[requestid] = commandresponse(requestid,
+                                                     oldresponse.command,
+                                                     fromredirect=True)
+
+        self._redirects.append((requestid, res))
+
+    def _processredirect(self, rid, res):
+        """Called to continue processing a response from a redirect."""
+        response = self._responses[rid]
+
+        try:
+            data = res.read(32768)
+            response._onresponsedata(data)
+
+            # We're at end of stream.
+            if not data:
+                response._oninputcomplete()
+
+            if rid not in self._futures:
+                return
+
+            if response.command not in COMMAND_DECODERS:
+                self._futures[rid].set_result(response.objects())
+                del self._futures[rid]
+            elif response._inputcomplete:
+                decoded = COMMAND_DECODERS[response.command](response.objects())
+                self._futures[rid].set_result(decoded)
+                del self._futures[rid]
+
+            return bool(data)
+
+        except BaseException as e:
+            self._futures[rid].set_exception(e)
+            del self._futures[rid]
+            response._oninputcomplete()
+            return False
+
 def decodebranchmap(objs):
     # Response should be a single CBOR map of branch name to array of nodes.
     bm = next(objs)