mercurial/sshpeer.py
changeset 33768 82d564d5ac4f
parent 33737 02a745c20121
child 33804 1f8460b55986
equal deleted inserted replaced
33767:b47fe9733d76 33768:82d564d5ac4f
   116 
   116 
   117 class sshpeer(wireproto.wirepeer):
   117 class sshpeer(wireproto.wirepeer):
   118     def __init__(self, ui, path, create=False):
   118     def __init__(self, ui, path, create=False):
   119         self._url = path
   119         self._url = path
   120         self.ui = ui
   120         self.ui = ui
   121         self.pipeo = self.pipei = self.pipee = None
   121         self._pipeo = self._pipei = self._pipee = None
   122 
   122 
   123         u = util.url(path, parsequery=False, parsefragment=False)
   123         u = util.url(path, parsequery=False, parsefragment=False)
   124         if u.scheme != 'ssh' or not u.host or u.path is None:
   124         if u.scheme != 'ssh' or not u.host or u.path is None:
   125             self._abort(error.RepoError(_("couldn't parse location %s") % path))
   125             self._abort(error.RepoError(_("couldn't parse location %s") % path))
   126 
   126 
   127         util.checksafessh(path)
   127         util.checksafessh(path)
   128 
   128 
   129         self.user = u.user
       
   130         if u.passwd is not None:
   129         if u.passwd is not None:
   131             self._abort(error.RepoError(_("password in URL not supported")))
   130             self._abort(error.RepoError(_("password in URL not supported")))
   132         self.host = u.host
   131 
   133         self.port = u.port
   132         self._user = u.user
   134         self.path = u.path or "."
   133         self._host = u.host
       
   134         self._port = u.port
       
   135         self._path = u.path or '.'
   135 
   136 
   136         sshcmd = self.ui.config("ui", "ssh")
   137         sshcmd = self.ui.config("ui", "ssh")
   137         remotecmd = self.ui.config("ui", "remotecmd")
   138         remotecmd = self.ui.config("ui", "remotecmd")
   138 
   139 
   139         args = util.sshargs(sshcmd, self.host, self.user, self.port)
   140         args = util.sshargs(sshcmd, self._host, self._user, self._port)
   140 
   141 
   141         if create:
   142         if create:
   142             cmd = '%s %s %s' % (sshcmd, args,
   143             cmd = '%s %s %s' % (sshcmd, args,
   143                 util.shellquote("%s init %s" %
   144                 util.shellquote("%s init %s" %
   144                     (_serverquote(remotecmd), _serverquote(self.path))))
   145                     (_serverquote(remotecmd), _serverquote(self._path))))
   145             ui.debug('running %s\n' % cmd)
   146             ui.debug('running %s\n' % cmd)
   146             res = ui.system(cmd, blockedtag='sshpeer')
   147             res = ui.system(cmd, blockedtag='sshpeer')
   147             if res != 0:
   148             if res != 0:
   148                 self._abort(error.RepoError(_("could not create remote repo")))
   149                 self._abort(error.RepoError(_("could not create remote repo")))
   149 
   150 
   152     def url(self):
   153     def url(self):
   153         return self._url
   154         return self._url
   154 
   155 
   155     def _validaterepo(self, sshcmd, args, remotecmd):
   156     def _validaterepo(self, sshcmd, args, remotecmd):
   156         # cleanup up previous run
   157         # cleanup up previous run
   157         self.cleanup()
   158         self._cleanup()
   158 
   159 
   159         cmd = '%s %s %s' % (sshcmd, args,
   160         cmd = '%s %s %s' % (sshcmd, args,
   160             util.shellquote("%s -R %s serve --stdio" %
   161             util.shellquote("%s -R %s serve --stdio" %
   161                 (_serverquote(remotecmd), _serverquote(self.path))))
   162                 (_serverquote(remotecmd), _serverquote(self._path))))
   162         self.ui.debug('running %s\n' % cmd)
   163         self.ui.debug('running %s\n' % cmd)
   163         cmd = util.quotecommand(cmd)
   164         cmd = util.quotecommand(cmd)
   164 
   165 
   165         # while self.subprocess isn't used, having it allows the subprocess to
   166         # while self._subprocess isn't used, having it allows the subprocess to
   166         # to clean up correctly later
   167         # to clean up correctly later
   167         #
   168         #
   168         # no buffer allow the use of 'select'
   169         # no buffer allow the use of 'select'
   169         # feel free to remove buffering and select usage when we ultimately
   170         # feel free to remove buffering and select usage when we ultimately
   170         # move to threading.
   171         # move to threading.
   171         sub = util.popen4(cmd, bufsize=0)
   172         sub = util.popen4(cmd, bufsize=0)
   172         self.pipeo, self.pipei, self.pipee, self.subprocess = sub
   173         self._pipeo, self._pipei, self._pipee, self._subprocess = sub
   173 
   174 
   174         self.pipei = util.bufferedinputpipe(self.pipei)
   175         self._pipei = util.bufferedinputpipe(self._pipei)
   175         self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
   176         self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
   176         self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
   177         self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
   177 
   178 
   178         # skip any noise generated by remote shell
   179         # skip any noise generated by remote shell
   179         self._callstream("hello")
   180         self._callstream("hello")
   180         r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
   181         r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
   181         lines = ["", "dummy"]
   182         lines = ["", "dummy"]
   182         max_noise = 500
   183         max_noise = 500
   183         while lines[-1] and max_noise:
   184         while lines[-1] and max_noise:
   184             l = r.readline()
   185             l = r.readline()
   185             self.readerr()
   186             self._readerr()
   186             if lines[-1] == "1\n" and l == "\n":
   187             if lines[-1] == "1\n" and l == "\n":
   187                 break
   188                 break
   188             if l:
   189             if l:
   189                 self.ui.debug("remote: ", l)
   190                 self.ui.debug("remote: ", l)
   190             lines.append(l)
   191             lines.append(l)
   200                 break
   201                 break
   201 
   202 
   202     def _capabilities(self):
   203     def _capabilities(self):
   203         return self._caps
   204         return self._caps
   204 
   205 
   205     def readerr(self):
   206     def _readerr(self):
   206         _forwardoutput(self.ui, self.pipee)
   207         _forwardoutput(self.ui, self._pipee)
   207 
   208 
   208     def _abort(self, exception):
   209     def _abort(self, exception):
   209         self.cleanup()
   210         self._cleanup()
   210         raise exception
   211         raise exception
   211 
   212 
   212     def cleanup(self):
   213     def _cleanup(self):
   213         if self.pipeo is None:
   214         if self._pipeo is None:
   214             return
   215             return
   215         self.pipeo.close()
   216         self._pipeo.close()
   216         self.pipei.close()
   217         self._pipei.close()
   217         try:
   218         try:
   218             # read the error descriptor until EOF
   219             # read the error descriptor until EOF
   219             for l in self.pipee:
   220             for l in self._pipee:
   220                 self.ui.status(_("remote: "), l)
   221                 self.ui.status(_("remote: "), l)
   221         except (IOError, ValueError):
   222         except (IOError, ValueError):
   222             pass
   223             pass
   223         self.pipee.close()
   224         self._pipee.close()
   224 
   225 
   225     __del__ = cleanup
   226     __del__ = _cleanup
   226 
   227 
   227     def _submitbatch(self, req):
   228     def _submitbatch(self, req):
   228         rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
   229         rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
   229         available = self._getamount()
   230         available = self._getamount()
   230         # TODO this response parsing is probably suboptimal for large
   231         # TODO this response parsing is probably suboptimal for large
   244         yield wireproto.unescapearg(work)
   245         yield wireproto.unescapearg(work)
   245 
   246 
   246     def _callstream(self, cmd, **args):
   247     def _callstream(self, cmd, **args):
   247         args = pycompat.byteskwargs(args)
   248         args = pycompat.byteskwargs(args)
   248         self.ui.debug("sending %s command\n" % cmd)
   249         self.ui.debug("sending %s command\n" % cmd)
   249         self.pipeo.write("%s\n" % cmd)
   250         self._pipeo.write("%s\n" % cmd)
   250         _func, names = wireproto.commands[cmd]
   251         _func, names = wireproto.commands[cmd]
   251         keys = names.split()
   252         keys = names.split()
   252         wireargs = {}
   253         wireargs = {}
   253         for k in keys:
   254         for k in keys:
   254             if k == '*':
   255             if k == '*':
   256                 break
   257                 break
   257             else:
   258             else:
   258                 wireargs[k] = args[k]
   259                 wireargs[k] = args[k]
   259                 del args[k]
   260                 del args[k]
   260         for k, v in sorted(wireargs.iteritems()):
   261         for k, v in sorted(wireargs.iteritems()):
   261             self.pipeo.write("%s %d\n" % (k, len(v)))
   262             self._pipeo.write("%s %d\n" % (k, len(v)))
   262             if isinstance(v, dict):
   263             if isinstance(v, dict):
   263                 for dk, dv in v.iteritems():
   264                 for dk, dv in v.iteritems():
   264                     self.pipeo.write("%s %d\n" % (dk, len(dv)))
   265                     self._pipeo.write("%s %d\n" % (dk, len(dv)))
   265                     self.pipeo.write(dv)
   266                     self._pipeo.write(dv)
   266             else:
   267             else:
   267                 self.pipeo.write(v)
   268                 self._pipeo.write(v)
   268         self.pipeo.flush()
   269         self._pipeo.flush()
   269 
   270 
   270         return self.pipei
   271         return self._pipei
   271 
   272 
   272     def _callcompressable(self, cmd, **args):
   273     def _callcompressable(self, cmd, **args):
   273         return self._callstream(cmd, **args)
   274         return self._callstream(cmd, **args)
   274 
   275 
   275     def _call(self, cmd, **args):
   276     def _call(self, cmd, **args):
   294             # XXX needs to be made better
   295             # XXX needs to be made better
   295             raise error.Abort(_('unexpected remote reply: %s') % r)
   296             raise error.Abort(_('unexpected remote reply: %s') % r)
   296         for d in iter(lambda: fp.read(4096), ''):
   297         for d in iter(lambda: fp.read(4096), ''):
   297             self._send(d)
   298             self._send(d)
   298         self._send("", flush=True)
   299         self._send("", flush=True)
   299         return self.pipei
   300         return self._pipei
   300 
   301 
   301     def _getamount(self):
   302     def _getamount(self):
   302         l = self.pipei.readline()
   303         l = self._pipei.readline()
   303         if l == '\n':
   304         if l == '\n':
   304             self.readerr()
   305             self._readerr()
   305             msg = _('check previous remote output')
   306             msg = _('check previous remote output')
   306             self._abort(error.OutOfBandError(hint=msg))
   307             self._abort(error.OutOfBandError(hint=msg))
   307         self.readerr()
   308         self._readerr()
   308         try:
   309         try:
   309             return int(l)
   310             return int(l)
   310         except ValueError:
   311         except ValueError:
   311             self._abort(error.ResponseError(_("unexpected response:"), l))
   312             self._abort(error.ResponseError(_("unexpected response:"), l))
   312 
   313 
   313     def _recv(self):
   314     def _recv(self):
   314         return self.pipei.read(self._getamount())
   315         return self._pipei.read(self._getamount())
   315 
   316 
   316     def _send(self, data, flush=False):
   317     def _send(self, data, flush=False):
   317         self.pipeo.write("%d\n" % len(data))
   318         self._pipeo.write("%d\n" % len(data))
   318         if data:
   319         if data:
   319             self.pipeo.write(data)
   320             self._pipeo.write(data)
   320         if flush:
   321         if flush:
   321             self.pipeo.flush()
   322             self._pipeo.flush()
   322         self.readerr()
   323         self._readerr()
   323 
   324 
   324 instance = sshpeer
   325 instance = sshpeer