--- a/mercurial/wireprotov2peer.py Wed Sep 26 18:07:55 2018 -0700
+++ b/mercurial/wireprotov2peer.py Wed Sep 26 18:08:08 2018 -0700
@@ -13,8 +13,12 @@
from . import (
encoding,
error,
+ pycompat,
sslutil,
+ url as urlmod,
+ util,
wireprotoframing,
+ wireprototypes,
)
from .utils import (
cborutil,
@@ -112,9 +116,10 @@
events occur.
"""
- def __init__(self, requestid, command):
+ def __init__(self, requestid, command, fromredirect=False):
self.requestid = requestid
self.command = command
+ self.fromredirect = fromredirect
# Whether all remote input related to this command has been
# received.
@@ -132,6 +137,7 @@
self._pendingevents = []
self._decoder = cborutil.bufferingdecoder()
self._seeninitial = False
+ self._redirect = None
def _oninputcomplete(self):
with self._lock:
@@ -146,10 +152,19 @@
with self._lock:
for o in self._decoder.getavailable():
- if not self._seeninitial:
+ if not self._seeninitial and not self.fromredirect:
self._handleinitial(o)
continue
+ # We should never see an object after a content redirect,
+ # as the spec says the main status object containing the
+ # content redirect is the only object in the stream. Fail
+ # if we see a misbehaving server.
+ if self._redirect:
+ raise error.Abort(_('received unexpected response data '
+ 'after content redirect; the remote is '
+ 'buggy'))
+
self._pendingevents.append(o)
self._serviceable.set()
@@ -160,7 +175,16 @@
return
elif o[b'status'] == b'redirect':
- raise error.Abort(_('redirect responses not yet supported'))
+ l = o[b'location']
+ self._redirect = wireprototypes.alternatelocationresponse(
+ url=l[b'url'],
+ mediatype=l[b'mediatype'],
+ size=l.get(b'size'),
+ fullhashes=l.get(b'fullhashes'),
+ fullhashseed=l.get(b'fullhashseed'),
+ serverdercerts=l.get(b'serverdercerts'),
+ servercadercerts=l.get(b'servercadercerts'))
+ return
atoms = [{'msg': o[b'error'][b'message']}]
if b'args' in o[b'error']:
@@ -214,13 +238,17 @@
with the higher-level peer API.
"""
- def __init__(self, ui, clientreactor):
+ def __init__(self, ui, clientreactor, opener=None,
+ requestbuilder=util.urlreq.request):
self._ui = ui
self._reactor = clientreactor
self._requests = {}
self._futures = {}
self._responses = {}
+ self._redirects = []
self._frameseof = False
+ self._opener = opener or urlmod.opener(ui)
+ self._requestbuilder = requestbuilder
def callcommand(self, command, args, f, redirect=None):
"""Register a request to call a command.
@@ -269,7 +297,12 @@
self._ui.note(_('received %r\n') % frame)
self._processframe(frame)
- if self._frameseof:
+ # Also try to read the first redirect.
+ if self._redirects:
+ if not self._processredirect(*self._redirects[0]):
+ self._redirects.pop(0)
+
+ if self._frameseof and not self._redirects:
return None
return True
@@ -318,10 +351,27 @@
# This can raise. The caller can handle it.
response._onresponsedata(meta['data'])
+ # If we got a content redirect response, we want to fetch it and
+ # expose the data as if we received it inline. But we also want to
+ # keep our internal request accounting in order. Our strategy is to
+ # basically put meaningful response handling on pause until EOS occurs
+ # and the stream accounting is in a good state. At that point, we follow
+ # the redirect and replace the response object with its data.
+
+ redirect = response._redirect
+ handlefuture = False if redirect else True
+
if meta['eos']:
response._oninputcomplete()
del self._requests[frame.requestid]
+ if redirect:
+ self._followredirect(frame.requestid, redirect)
+ return
+
+ if not handlefuture:
+ return
+
# If the command has a decoder, we wait until all input has been
# received before resolving the future. Otherwise we resolve the
# future immediately.
@@ -336,6 +386,82 @@
self._futures[frame.requestid].set_result(decoded)
del self._futures[frame.requestid]
+ def _followredirect(self, requestid, redirect):
+ """Called to initiate redirect following for a request."""
+ self._ui.note(_('(following redirect to %s)\n') % redirect.url)
+
+ # TODO handle framed responses.
+ if redirect.mediatype != b'application/mercurial-cbor':
+ raise error.Abort(_('cannot handle redirects for the %s media type')
+ % redirect.mediatype)
+
+ if redirect.fullhashes:
+ self._ui.warn(_('(support for validating hashes on content '
+ 'redirects not supported)\n'))
+
+ if redirect.serverdercerts or redirect.servercadercerts:
+ self._ui.warn(_('(support for pinning server certificates on '
+ 'content redirects not supported)\n'))
+
+ headers = {
+ r'Accept': redirect.mediatype,
+ }
+
+ req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
+
+ try:
+ res = self._opener.open(req)
+ except util.urlerr.httperror as e:
+ if e.code == 401:
+ raise error.Abort(_('authorization failed'))
+ raise
+ except util.httplib.HTTPException as e:
+ self._ui.debug('http error requesting %s\n' % req.get_full_url())
+ self._ui.traceback()
+ raise IOError(None, e)
+
+ urlmod.wrapresponse(res)
+
+ # The existing response object is associated with frame data. Rather
+ # than try to normalize its state, just create a new object.
+ oldresponse = self._responses[requestid]
+ self._responses[requestid] = commandresponse(requestid,
+ oldresponse.command,
+ fromredirect=True)
+
+ self._redirects.append((requestid, res))
+
+ def _processredirect(self, rid, res):
+ """Called to continue processing a response from a redirect."""
+ response = self._responses[rid]
+
+ try:
+ data = res.read(32768)
+ response._onresponsedata(data)
+
+ # We're at end of stream.
+ if not data:
+ response._oninputcomplete()
+
+ if rid not in self._futures:
+ return
+
+ if response.command not in COMMAND_DECODERS:
+ self._futures[rid].set_result(response.objects())
+ del self._futures[rid]
+ elif response._inputcomplete:
+ decoded = COMMAND_DECODERS[response.command](response.objects())
+ self._futures[rid].set_result(decoded)
+ del self._futures[rid]
+
+ return bool(data)
+
+ except BaseException as e:
+ self._futures[rid].set_exception(e)
+ del self._futures[rid]
+ response._oninputcomplete()
+ return False
+
def decodebranchmap(objs):
# Response should be a single CBOR map of branch name to array of nodes.
bm = next(objs)