comparison mercurial/sshpeer.py @ 33789:82d564d5ac4f

sshpeer: make instance attributes and methods internal Peer types are supposed to conform to a formal interface defined by peer.peerrepository and wireproto.wirepeer. Every "public" attribute on *peer types makes it harder to understand what attributes are part of the interface and what are instance specific. This commit converts a number of "public" instance attributes and methods on sshpeer to internal so they can't be confused to be part of the peer API. The URL-related instance attributes were introduced in 876333a295ff in 2005. AFAICT most of them aren't used and could potentially be removed. But I kept them around anyway. I also reorded some code to make things slightly easier to read. .. api:: Rename attributes on sshpeer to reflect peer API Differential Revision: https://phab.mercurial-scm.org/D331
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 10 Aug 2017 20:55:28 -0700
parents 02a745c20121
children 1f8460b55986
comparison
equal deleted inserted replaced
33788:b47fe9733d76 33789:82d564d5ac4f
116 116
117 class sshpeer(wireproto.wirepeer): 117 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False): 118 def __init__(self, ui, path, create=False):
119 self._url = path 119 self._url = path
120 self.ui = ui 120 self.ui = ui
121 self.pipeo = self.pipei = self.pipee = None 121 self._pipeo = self._pipei = self._pipee = None
122 122
123 u = util.url(path, parsequery=False, parsefragment=False) 123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None: 124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path)) 125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 126
127 util.checksafessh(path) 127 util.checksafessh(path)
128 128
129 self.user = u.user
130 if u.passwd is not None: 129 if u.passwd is not None:
131 self._abort(error.RepoError(_("password in URL not supported"))) 130 self._abort(error.RepoError(_("password in URL not supported")))
132 self.host = u.host 131
133 self.port = u.port 132 self._user = u.user
134 self.path = u.path or "." 133 self._host = u.host
134 self._port = u.port
135 self._path = u.path or '.'
135 136
136 sshcmd = self.ui.config("ui", "ssh") 137 sshcmd = self.ui.config("ui", "ssh")
137 remotecmd = self.ui.config("ui", "remotecmd") 138 remotecmd = self.ui.config("ui", "remotecmd")
138 139
139 args = util.sshargs(sshcmd, self.host, self.user, self.port) 140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
140 141
141 if create: 142 if create:
142 cmd = '%s %s %s' % (sshcmd, args, 143 cmd = '%s %s %s' % (sshcmd, args,
143 util.shellquote("%s init %s" % 144 util.shellquote("%s init %s" %
144 (_serverquote(remotecmd), _serverquote(self.path)))) 145 (_serverquote(remotecmd), _serverquote(self._path))))
145 ui.debug('running %s\n' % cmd) 146 ui.debug('running %s\n' % cmd)
146 res = ui.system(cmd, blockedtag='sshpeer') 147 res = ui.system(cmd, blockedtag='sshpeer')
147 if res != 0: 148 if res != 0:
148 self._abort(error.RepoError(_("could not create remote repo"))) 149 self._abort(error.RepoError(_("could not create remote repo")))
149 150
152 def url(self): 153 def url(self):
153 return self._url 154 return self._url
154 155
155 def _validaterepo(self, sshcmd, args, remotecmd): 156 def _validaterepo(self, sshcmd, args, remotecmd):
156 # cleanup up previous run 157 # cleanup up previous run
157 self.cleanup() 158 self._cleanup()
158 159
159 cmd = '%s %s %s' % (sshcmd, args, 160 cmd = '%s %s %s' % (sshcmd, args,
160 util.shellquote("%s -R %s serve --stdio" % 161 util.shellquote("%s -R %s serve --stdio" %
161 (_serverquote(remotecmd), _serverquote(self.path)))) 162 (_serverquote(remotecmd), _serverquote(self._path))))
162 self.ui.debug('running %s\n' % cmd) 163 self.ui.debug('running %s\n' % cmd)
163 cmd = util.quotecommand(cmd) 164 cmd = util.quotecommand(cmd)
164 165
165 # while self.subprocess isn't used, having it allows the subprocess to 166 # while self._subprocess isn't used, having it allows the subprocess to
166 # to clean up correctly later 167 # to clean up correctly later
167 # 168 #
168 # no buffer allow the use of 'select' 169 # no buffer allow the use of 'select'
169 # feel free to remove buffering and select usage when we ultimately 170 # feel free to remove buffering and select usage when we ultimately
170 # move to threading. 171 # move to threading.
171 sub = util.popen4(cmd, bufsize=0) 172 sub = util.popen4(cmd, bufsize=0)
172 self.pipeo, self.pipei, self.pipee, self.subprocess = sub 173 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
173 174
174 self.pipei = util.bufferedinputpipe(self.pipei) 175 self._pipei = util.bufferedinputpipe(self._pipei)
175 self.pipei = doublepipe(self.ui, self.pipei, self.pipee) 176 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
176 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee) 177 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
177 178
178 # skip any noise generated by remote shell 179 # skip any noise generated by remote shell
179 self._callstream("hello") 180 self._callstream("hello")
180 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40))) 181 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
181 lines = ["", "dummy"] 182 lines = ["", "dummy"]
182 max_noise = 500 183 max_noise = 500
183 while lines[-1] and max_noise: 184 while lines[-1] and max_noise:
184 l = r.readline() 185 l = r.readline()
185 self.readerr() 186 self._readerr()
186 if lines[-1] == "1\n" and l == "\n": 187 if lines[-1] == "1\n" and l == "\n":
187 break 188 break
188 if l: 189 if l:
189 self.ui.debug("remote: ", l) 190 self.ui.debug("remote: ", l)
190 lines.append(l) 191 lines.append(l)
200 break 201 break
201 202
202 def _capabilities(self): 203 def _capabilities(self):
203 return self._caps 204 return self._caps
204 205
205 def readerr(self): 206 def _readerr(self):
206 _forwardoutput(self.ui, self.pipee) 207 _forwardoutput(self.ui, self._pipee)
207 208
208 def _abort(self, exception): 209 def _abort(self, exception):
209 self.cleanup() 210 self._cleanup()
210 raise exception 211 raise exception
211 212
212 def cleanup(self): 213 def _cleanup(self):
213 if self.pipeo is None: 214 if self._pipeo is None:
214 return 215 return
215 self.pipeo.close() 216 self._pipeo.close()
216 self.pipei.close() 217 self._pipei.close()
217 try: 218 try:
218 # read the error descriptor until EOF 219 # read the error descriptor until EOF
219 for l in self.pipee: 220 for l in self._pipee:
220 self.ui.status(_("remote: "), l) 221 self.ui.status(_("remote: "), l)
221 except (IOError, ValueError): 222 except (IOError, ValueError):
222 pass 223 pass
223 self.pipee.close() 224 self._pipee.close()
224 225
225 __del__ = cleanup 226 __del__ = _cleanup
226 227
227 def _submitbatch(self, req): 228 def _submitbatch(self, req):
228 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req)) 229 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
229 available = self._getamount() 230 available = self._getamount()
230 # TODO this response parsing is probably suboptimal for large 231 # TODO this response parsing is probably suboptimal for large
244 yield wireproto.unescapearg(work) 245 yield wireproto.unescapearg(work)
245 246
246 def _callstream(self, cmd, **args): 247 def _callstream(self, cmd, **args):
247 args = pycompat.byteskwargs(args) 248 args = pycompat.byteskwargs(args)
248 self.ui.debug("sending %s command\n" % cmd) 249 self.ui.debug("sending %s command\n" % cmd)
249 self.pipeo.write("%s\n" % cmd) 250 self._pipeo.write("%s\n" % cmd)
250 _func, names = wireproto.commands[cmd] 251 _func, names = wireproto.commands[cmd]
251 keys = names.split() 252 keys = names.split()
252 wireargs = {} 253 wireargs = {}
253 for k in keys: 254 for k in keys:
254 if k == '*': 255 if k == '*':
256 break 257 break
257 else: 258 else:
258 wireargs[k] = args[k] 259 wireargs[k] = args[k]
259 del args[k] 260 del args[k]
260 for k, v in sorted(wireargs.iteritems()): 261 for k, v in sorted(wireargs.iteritems()):
261 self.pipeo.write("%s %d\n" % (k, len(v))) 262 self._pipeo.write("%s %d\n" % (k, len(v)))
262 if isinstance(v, dict): 263 if isinstance(v, dict):
263 for dk, dv in v.iteritems(): 264 for dk, dv in v.iteritems():
264 self.pipeo.write("%s %d\n" % (dk, len(dv))) 265 self._pipeo.write("%s %d\n" % (dk, len(dv)))
265 self.pipeo.write(dv) 266 self._pipeo.write(dv)
266 else: 267 else:
267 self.pipeo.write(v) 268 self._pipeo.write(v)
268 self.pipeo.flush() 269 self._pipeo.flush()
269 270
270 return self.pipei 271 return self._pipei
271 272
272 def _callcompressable(self, cmd, **args): 273 def _callcompressable(self, cmd, **args):
273 return self._callstream(cmd, **args) 274 return self._callstream(cmd, **args)
274 275
275 def _call(self, cmd, **args): 276 def _call(self, cmd, **args):
294 # XXX needs to be made better 295 # XXX needs to be made better
295 raise error.Abort(_('unexpected remote reply: %s') % r) 296 raise error.Abort(_('unexpected remote reply: %s') % r)
296 for d in iter(lambda: fp.read(4096), ''): 297 for d in iter(lambda: fp.read(4096), ''):
297 self._send(d) 298 self._send(d)
298 self._send("", flush=True) 299 self._send("", flush=True)
299 return self.pipei 300 return self._pipei
300 301
301 def _getamount(self): 302 def _getamount(self):
302 l = self.pipei.readline() 303 l = self._pipei.readline()
303 if l == '\n': 304 if l == '\n':
304 self.readerr() 305 self._readerr()
305 msg = _('check previous remote output') 306 msg = _('check previous remote output')
306 self._abort(error.OutOfBandError(hint=msg)) 307 self._abort(error.OutOfBandError(hint=msg))
307 self.readerr() 308 self._readerr()
308 try: 309 try:
309 return int(l) 310 return int(l)
310 except ValueError: 311 except ValueError:
311 self._abort(error.ResponseError(_("unexpected response:"), l)) 312 self._abort(error.ResponseError(_("unexpected response:"), l))
312 313
313 def _recv(self): 314 def _recv(self):
314 return self.pipei.read(self._getamount()) 315 return self._pipei.read(self._getamount())
315 316
316 def _send(self, data, flush=False): 317 def _send(self, data, flush=False):
317 self.pipeo.write("%d\n" % len(data)) 318 self._pipeo.write("%d\n" % len(data))
318 if data: 319 if data:
319 self.pipeo.write(data) 320 self._pipeo.write(data)
320 if flush: 321 if flush:
321 self.pipeo.flush() 322 self._pipeo.flush()
322 self.readerr() 323 self._readerr()
323 324
324 instance = sshpeer 325 instance = sshpeer