comparison mercurial/sshpeer.py @ 17192:1ac628cd7113

peer: introduce real peer classes This change separates peer implementations from the repository implementation. localpeer currently is a simple pass-through to localrepository, except for legacy calls, which have already been removed from localpeer. This ensures that the local client code only uses the most modern peer API when talking to local repos. Peers have a .local() method which returns either None or the underlying localrepository (or descendant thereof). Repos have a .peer() method to return a freshly constructed localpeer. The latter is used by hg.peer(), and also to allow folks to pass either a peer or a repo to some generic helper methods. We might want to get rid of .peer() eventually. The only user of locallegacypeer is debugdiscovery, which uses it to pose as a pre-setdiscovery client. But we decided to leave the old API defined in locallegacypeer for clarity and maybe for other uses in the future. It might be nice to actually define the peer API directly in peer.py as stub methods. One problem there is, however, that localpeer implements lock/addchangegroup, whereas the true remote peers implement unbundle. It might be desireable to get rid of this distinction eventually.
author Peter Arrenbrecht <peter.arrenbrecht@gmail.com>
date Fri, 13 Jul 2012 21:47:06 +0200
parents mercurial/sshrepo.py@cfb6682961b8
children 9baf4330d88f
comparison
equal deleted inserted replaced
17191:5884812686f7 17192:1ac628cd7113
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 import re
9 from i18n import _
10 import util, error, wireproto
11
12 class remotelock(object):
13 def __init__(self, repo):
14 self.repo = repo
15 def release(self):
16 self.repo.unlock()
17 self.repo = None
18 def __del__(self):
19 if self.repo:
20 self.release()
21
22 def _serverquote(s):
23 '''quote a string for the remote shell ... which we assume is sh'''
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
27
28 class sshpeer(wireproto.wirepeer):
29 def __init__(self, ui, path, create=False):
30 self._url = path
31 self.ui = ui
32 self.pipeo = self.pipei = self.pipee = None
33
34 u = util.url(path, parsequery=False, parsefragment=False)
35 if u.scheme != 'ssh' or not u.host or u.path is None:
36 self._abort(error.RepoError(_("couldn't parse location %s") % path))
37
38 self.user = u.user
39 if u.passwd is not None:
40 self._abort(error.RepoError(_("password in URL not supported")))
41 self.host = u.host
42 self.port = u.port
43 self.path = u.path or "."
44
45 sshcmd = self.ui.config("ui", "ssh", "ssh")
46 remotecmd = self.ui.config("ui", "remotecmd", "hg")
47
48 args = util.sshargs(sshcmd, self.host, self.user, self.port)
49
50 if create:
51 cmd = '%s %s %s' % (sshcmd, args,
52 util.shellquote("%s init %s" %
53 (_serverquote(remotecmd), _serverquote(self.path))))
54 ui.note(_('running %s\n') % cmd)
55 res = util.system(cmd)
56 if res != 0:
57 self._abort(error.RepoError(_("could not create remote repo")))
58
59 self.validate_repo(ui, sshcmd, args, remotecmd)
60
61 def url(self):
62 return self._url
63
64 def validate_repo(self, ui, sshcmd, args, remotecmd):
65 # cleanup up previous run
66 self.cleanup()
67
68 cmd = '%s %s %s' % (sshcmd, args,
69 util.shellquote("%s -R %s serve --stdio" %
70 (_serverquote(remotecmd), _serverquote(self.path))))
71 ui.note(_('running %s\n') % cmd)
72 cmd = util.quotecommand(cmd)
73 self.pipeo, self.pipei, self.pipee = util.popen3(cmd)
74
75 # skip any noise generated by remote shell
76 self._callstream("hello")
77 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
78 lines = ["", "dummy"]
79 max_noise = 500
80 while lines[-1] and max_noise:
81 l = r.readline()
82 self.readerr()
83 if lines[-1] == "1\n" and l == "\n":
84 break
85 if l:
86 ui.debug("remote: ", l)
87 lines.append(l)
88 max_noise -= 1
89 else:
90 self._abort(error.RepoError(_('no suitable response from '
91 'remote hg')))
92
93 self._caps = set()
94 for l in reversed(lines):
95 if l.startswith("capabilities:"):
96 self._caps.update(l[:-1].split(":")[1].split())
97 break
98
99 def _capabilities(self):
100 return self._caps
101
102 def readerr(self):
103 while True:
104 size = util.fstat(self.pipee).st_size
105 if size == 0:
106 break
107 s = self.pipee.read(size)
108 if not s:
109 break
110 for l in s.splitlines():
111 self.ui.status(_("remote: "), l, '\n')
112
113 def _abort(self, exception):
114 self.cleanup()
115 raise exception
116
117 def cleanup(self):
118 if self.pipeo is None:
119 return
120 self.pipeo.close()
121 self.pipei.close()
122 try:
123 # read the error descriptor until EOF
124 for l in self.pipee:
125 self.ui.status(_("remote: "), l)
126 except (IOError, ValueError):
127 pass
128 self.pipee.close()
129
130 __del__ = cleanup
131
132 def _callstream(self, cmd, **args):
133 self.ui.debug("sending %s command\n" % cmd)
134 self.pipeo.write("%s\n" % cmd)
135 _func, names = wireproto.commands[cmd]
136 keys = names.split()
137 wireargs = {}
138 for k in keys:
139 if k == '*':
140 wireargs['*'] = args
141 break
142 else:
143 wireargs[k] = args[k]
144 del args[k]
145 for k, v in sorted(wireargs.iteritems()):
146 self.pipeo.write("%s %d\n" % (k, len(v)))
147 if isinstance(v, dict):
148 for dk, dv in v.iteritems():
149 self.pipeo.write("%s %d\n" % (dk, len(dv)))
150 self.pipeo.write(dv)
151 else:
152 self.pipeo.write(v)
153 self.pipeo.flush()
154
155 return self.pipei
156
157 def _call(self, cmd, **args):
158 self._callstream(cmd, **args)
159 return self._recv()
160
161 def _callpush(self, cmd, fp, **args):
162 r = self._call(cmd, **args)
163 if r:
164 return '', r
165 while True:
166 d = fp.read(4096)
167 if not d:
168 break
169 self._send(d)
170 self._send("", flush=True)
171 r = self._recv()
172 if r:
173 return '', r
174 return self._recv(), ''
175
176 def _decompress(self, stream):
177 return stream
178
179 def _recv(self):
180 l = self.pipei.readline()
181 if l == '\n':
182 err = []
183 while True:
184 line = self.pipee.readline()
185 if line == '-\n':
186 break
187 err.extend([line])
188 if len(err) > 0:
189 # strip the trailing newline added to the last line server-side
190 err[-1] = err[-1][:-1]
191 self._abort(error.OutOfBandError(*err))
192 self.readerr()
193 try:
194 l = int(l)
195 except ValueError:
196 self._abort(error.ResponseError(_("unexpected response:"), l))
197 return self.pipei.read(l)
198
199 def _send(self, data, flush=False):
200 self.pipeo.write("%d\n" % len(data))
201 if data:
202 self.pipeo.write(data)
203 if flush:
204 self.pipeo.flush()
205 self.readerr()
206
207 def lock(self):
208 self._call("lock")
209 return remotelock(self)
210
211 def unlock(self):
212 self._call("unlock")
213
214 def addchangegroup(self, cg, source, url, lock=None):
215 '''Send a changegroup to the remote server. Return an integer
216 similar to unbundle(). DEPRECATED, since it requires locking the
217 remote.'''
218 d = self._call("addchangegroup")
219 if d:
220 self._abort(error.RepoError(_("push refused: %s") % d))
221 while True:
222 d = cg.read(4096)
223 if not d:
224 break
225 self.pipeo.write(d)
226 self.readerr()
227
228 self.pipeo.flush()
229
230 self.readerr()
231 r = self._recv()
232 if not r:
233 return 1
234 try:
235 return int(r)
236 except ValueError:
237 self._abort(error.ResponseError(_("unexpected response:"), r))
238
239 instance = sshpeer