116 |
116 |
117 class sshpeer(wireproto.wirepeer): |
117 class sshpeer(wireproto.wirepeer): |
118 def __init__(self, ui, path, create=False): |
118 def __init__(self, ui, path, create=False): |
119 self._url = path |
119 self._url = path |
120 self.ui = ui |
120 self.ui = ui |
121 self.pipeo = self.pipei = self.pipee = None |
121 self._pipeo = self._pipei = self._pipee = None |
122 |
122 |
123 u = util.url(path, parsequery=False, parsefragment=False) |
123 u = util.url(path, parsequery=False, parsefragment=False) |
124 if u.scheme != 'ssh' or not u.host or u.path is None: |
124 if u.scheme != 'ssh' or not u.host or u.path is None: |
125 self._abort(error.RepoError(_("couldn't parse location %s") % path)) |
125 self._abort(error.RepoError(_("couldn't parse location %s") % path)) |
126 |
126 |
127 util.checksafessh(path) |
127 util.checksafessh(path) |
128 |
128 |
129 self.user = u.user |
|
130 if u.passwd is not None: |
129 if u.passwd is not None: |
131 self._abort(error.RepoError(_("password in URL not supported"))) |
130 self._abort(error.RepoError(_("password in URL not supported"))) |
132 self.host = u.host |
131 |
133 self.port = u.port |
132 self._user = u.user |
134 self.path = u.path or "." |
133 self._host = u.host |
|
134 self._port = u.port |
|
135 self._path = u.path or '.' |
135 |
136 |
136 sshcmd = self.ui.config("ui", "ssh") |
137 sshcmd = self.ui.config("ui", "ssh") |
137 remotecmd = self.ui.config("ui", "remotecmd") |
138 remotecmd = self.ui.config("ui", "remotecmd") |
138 |
139 |
139 args = util.sshargs(sshcmd, self.host, self.user, self.port) |
140 args = util.sshargs(sshcmd, self._host, self._user, self._port) |
140 |
141 |
141 if create: |
142 if create: |
142 cmd = '%s %s %s' % (sshcmd, args, |
143 cmd = '%s %s %s' % (sshcmd, args, |
143 util.shellquote("%s init %s" % |
144 util.shellquote("%s init %s" % |
144 (_serverquote(remotecmd), _serverquote(self.path)))) |
145 (_serverquote(remotecmd), _serverquote(self._path)))) |
145 ui.debug('running %s\n' % cmd) |
146 ui.debug('running %s\n' % cmd) |
146 res = ui.system(cmd, blockedtag='sshpeer') |
147 res = ui.system(cmd, blockedtag='sshpeer') |
147 if res != 0: |
148 if res != 0: |
148 self._abort(error.RepoError(_("could not create remote repo"))) |
149 self._abort(error.RepoError(_("could not create remote repo"))) |
149 |
150 |
152 def url(self): |
153 def url(self): |
153 return self._url |
154 return self._url |
154 |
155 |
155 def _validaterepo(self, sshcmd, args, remotecmd): |
156 def _validaterepo(self, sshcmd, args, remotecmd): |
156 # cleanup up previous run |
157 # cleanup up previous run |
157 self.cleanup() |
158 self._cleanup() |
158 |
159 |
159 cmd = '%s %s %s' % (sshcmd, args, |
160 cmd = '%s %s %s' % (sshcmd, args, |
160 util.shellquote("%s -R %s serve --stdio" % |
161 util.shellquote("%s -R %s serve --stdio" % |
161 (_serverquote(remotecmd), _serverquote(self.path)))) |
162 (_serverquote(remotecmd), _serverquote(self._path)))) |
162 self.ui.debug('running %s\n' % cmd) |
163 self.ui.debug('running %s\n' % cmd) |
163 cmd = util.quotecommand(cmd) |
164 cmd = util.quotecommand(cmd) |
164 |
165 |
165 # while self.subprocess isn't used, having it allows the subprocess to |
166 # while self._subprocess isn't used, having it allows the subprocess to |
166 # to clean up correctly later |
167 # to clean up correctly later |
167 # |
168 # |
168 # no buffer allow the use of 'select' |
169 # no buffer allow the use of 'select' |
169 # feel free to remove buffering and select usage when we ultimately |
170 # feel free to remove buffering and select usage when we ultimately |
170 # move to threading. |
171 # move to threading. |
171 sub = util.popen4(cmd, bufsize=0) |
172 sub = util.popen4(cmd, bufsize=0) |
172 self.pipeo, self.pipei, self.pipee, self.subprocess = sub |
173 self._pipeo, self._pipei, self._pipee, self._subprocess = sub |
173 |
174 |
174 self.pipei = util.bufferedinputpipe(self.pipei) |
175 self._pipei = util.bufferedinputpipe(self._pipei) |
175 self.pipei = doublepipe(self.ui, self.pipei, self.pipee) |
176 self._pipei = doublepipe(self.ui, self._pipei, self._pipee) |
176 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee) |
177 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee) |
177 |
178 |
178 # skip any noise generated by remote shell |
179 # skip any noise generated by remote shell |
179 self._callstream("hello") |
180 self._callstream("hello") |
180 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40))) |
181 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40))) |
181 lines = ["", "dummy"] |
182 lines = ["", "dummy"] |
182 max_noise = 500 |
183 max_noise = 500 |
183 while lines[-1] and max_noise: |
184 while lines[-1] and max_noise: |
184 l = r.readline() |
185 l = r.readline() |
185 self.readerr() |
186 self._readerr() |
186 if lines[-1] == "1\n" and l == "\n": |
187 if lines[-1] == "1\n" and l == "\n": |
187 break |
188 break |
188 if l: |
189 if l: |
189 self.ui.debug("remote: ", l) |
190 self.ui.debug("remote: ", l) |
190 lines.append(l) |
191 lines.append(l) |
200 break |
201 break |
201 |
202 |
202 def _capabilities(self): |
203 def _capabilities(self): |
203 return self._caps |
204 return self._caps |
204 |
205 |
205 def readerr(self): |
206 def _readerr(self): |
206 _forwardoutput(self.ui, self.pipee) |
207 _forwardoutput(self.ui, self._pipee) |
207 |
208 |
208 def _abort(self, exception): |
209 def _abort(self, exception): |
209 self.cleanup() |
210 self._cleanup() |
210 raise exception |
211 raise exception |
211 |
212 |
212 def cleanup(self): |
213 def _cleanup(self): |
213 if self.pipeo is None: |
214 if self._pipeo is None: |
214 return |
215 return |
215 self.pipeo.close() |
216 self._pipeo.close() |
216 self.pipei.close() |
217 self._pipei.close() |
217 try: |
218 try: |
218 # read the error descriptor until EOF |
219 # read the error descriptor until EOF |
219 for l in self.pipee: |
220 for l in self._pipee: |
220 self.ui.status(_("remote: "), l) |
221 self.ui.status(_("remote: "), l) |
221 except (IOError, ValueError): |
222 except (IOError, ValueError): |
222 pass |
223 pass |
223 self.pipee.close() |
224 self._pipee.close() |
224 |
225 |
225 __del__ = cleanup |
226 __del__ = _cleanup |
226 |
227 |
227 def _submitbatch(self, req): |
228 def _submitbatch(self, req): |
228 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req)) |
229 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req)) |
229 available = self._getamount() |
230 available = self._getamount() |
230 # TODO this response parsing is probably suboptimal for large |
231 # TODO this response parsing is probably suboptimal for large |
244 yield wireproto.unescapearg(work) |
245 yield wireproto.unescapearg(work) |
245 |
246 |
246 def _callstream(self, cmd, **args): |
247 def _callstream(self, cmd, **args): |
247 args = pycompat.byteskwargs(args) |
248 args = pycompat.byteskwargs(args) |
248 self.ui.debug("sending %s command\n" % cmd) |
249 self.ui.debug("sending %s command\n" % cmd) |
249 self.pipeo.write("%s\n" % cmd) |
250 self._pipeo.write("%s\n" % cmd) |
250 _func, names = wireproto.commands[cmd] |
251 _func, names = wireproto.commands[cmd] |
251 keys = names.split() |
252 keys = names.split() |
252 wireargs = {} |
253 wireargs = {} |
253 for k in keys: |
254 for k in keys: |
254 if k == '*': |
255 if k == '*': |
256 break |
257 break |
257 else: |
258 else: |
258 wireargs[k] = args[k] |
259 wireargs[k] = args[k] |
259 del args[k] |
260 del args[k] |
260 for k, v in sorted(wireargs.iteritems()): |
261 for k, v in sorted(wireargs.iteritems()): |
261 self.pipeo.write("%s %d\n" % (k, len(v))) |
262 self._pipeo.write("%s %d\n" % (k, len(v))) |
262 if isinstance(v, dict): |
263 if isinstance(v, dict): |
263 for dk, dv in v.iteritems(): |
264 for dk, dv in v.iteritems(): |
264 self.pipeo.write("%s %d\n" % (dk, len(dv))) |
265 self._pipeo.write("%s %d\n" % (dk, len(dv))) |
265 self.pipeo.write(dv) |
266 self._pipeo.write(dv) |
266 else: |
267 else: |
267 self.pipeo.write(v) |
268 self._pipeo.write(v) |
268 self.pipeo.flush() |
269 self._pipeo.flush() |
269 |
270 |
270 return self.pipei |
271 return self._pipei |
271 |
272 |
272 def _callcompressable(self, cmd, **args): |
273 def _callcompressable(self, cmd, **args): |
273 return self._callstream(cmd, **args) |
274 return self._callstream(cmd, **args) |
274 |
275 |
275 def _call(self, cmd, **args): |
276 def _call(self, cmd, **args): |
294 # XXX needs to be made better |
295 # XXX needs to be made better |
295 raise error.Abort(_('unexpected remote reply: %s') % r) |
296 raise error.Abort(_('unexpected remote reply: %s') % r) |
296 for d in iter(lambda: fp.read(4096), ''): |
297 for d in iter(lambda: fp.read(4096), ''): |
297 self._send(d) |
298 self._send(d) |
298 self._send("", flush=True) |
299 self._send("", flush=True) |
299 return self.pipei |
300 return self._pipei |
300 |
301 |
301 def _getamount(self): |
302 def _getamount(self): |
302 l = self.pipei.readline() |
303 l = self._pipei.readline() |
303 if l == '\n': |
304 if l == '\n': |
304 self.readerr() |
305 self._readerr() |
305 msg = _('check previous remote output') |
306 msg = _('check previous remote output') |
306 self._abort(error.OutOfBandError(hint=msg)) |
307 self._abort(error.OutOfBandError(hint=msg)) |
307 self.readerr() |
308 self._readerr() |
308 try: |
309 try: |
309 return int(l) |
310 return int(l) |
310 except ValueError: |
311 except ValueError: |
311 self._abort(error.ResponseError(_("unexpected response:"), l)) |
312 self._abort(error.ResponseError(_("unexpected response:"), l)) |
312 |
313 |
313 def _recv(self): |
314 def _recv(self): |
314 return self.pipei.read(self._getamount()) |
315 return self._pipei.read(self._getamount()) |
315 |
316 |
316 def _send(self, data, flush=False): |
317 def _send(self, data, flush=False): |
317 self.pipeo.write("%d\n" % len(data)) |
318 self._pipeo.write("%d\n" % len(data)) |
318 if data: |
319 if data: |
319 self.pipeo.write(data) |
320 self._pipeo.write(data) |
320 if flush: |
321 if flush: |
321 self.pipeo.flush() |
322 self._pipeo.flush() |
322 self.readerr() |
323 self._readerr() |
323 |
324 |
324 instance = sshpeer |
325 instance = sshpeer |