Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/sshpeer.py @ 17192:1ac628cd7113
peer: introduce real peer classes
This change separates peer implementations from the repository implementation.
localpeer currently is a simple pass-through to localrepository, except for
legacy calls, which have already been removed from localpeer. This ensures that
the local client code only uses the most modern peer API when talking to local
repos.
Peers have a .local() method which returns either None or the underlying
localrepository (or descendant thereof). Repos have a .peer() method to return
a freshly constructed localpeer. The latter is used by hg.peer(), and also to
allow folks to pass either a peer or a repo to some generic helper methods.
We might want to get rid of .peer() eventually.
The only user of locallegacypeer is debugdiscovery, which uses it to pose as a
pre-setdiscovery client. But we decided to leave the old API defined in
locallegacypeer for clarity and maybe for other uses in the future.
It might be nice to actually define the peer API directly in peer.py as stub
methods. One problem there is, however, that localpeer implements
lock/addchangegroup, whereas the true remote peers implement unbundle.
It might be desireable to get rid of this distinction eventually.
author | Peter Arrenbrecht <peter.arrenbrecht@gmail.com> |
---|---|
date | Fri, 13 Jul 2012 21:47:06 +0200 |
parents | mercurial/sshrepo.py@cfb6682961b8 |
children | 9baf4330d88f |
comparison
equal
deleted
inserted
replaced
17191:5884812686f7 | 17192:1ac628cd7113 |
---|---|
1 # sshpeer.py - ssh repository proxy class for mercurial | |
2 # | |
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com> | |
4 # | |
5 # This software may be used and distributed according to the terms of the | |
6 # GNU General Public License version 2 or any later version. | |
7 | |
8 import re | |
9 from i18n import _ | |
10 import util, error, wireproto | |
11 | |
12 class remotelock(object): | |
13 def __init__(self, repo): | |
14 self.repo = repo | |
15 def release(self): | |
16 self.repo.unlock() | |
17 self.repo = None | |
18 def __del__(self): | |
19 if self.repo: | |
20 self.release() | |
21 | |
22 def _serverquote(s): | |
23 '''quote a string for the remote shell ... which we assume is sh''' | |
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s): | |
25 return s | |
26 return "'%s'" % s.replace("'", "'\\''") | |
27 | |
28 class sshpeer(wireproto.wirepeer): | |
29 def __init__(self, ui, path, create=False): | |
30 self._url = path | |
31 self.ui = ui | |
32 self.pipeo = self.pipei = self.pipee = None | |
33 | |
34 u = util.url(path, parsequery=False, parsefragment=False) | |
35 if u.scheme != 'ssh' or not u.host or u.path is None: | |
36 self._abort(error.RepoError(_("couldn't parse location %s") % path)) | |
37 | |
38 self.user = u.user | |
39 if u.passwd is not None: | |
40 self._abort(error.RepoError(_("password in URL not supported"))) | |
41 self.host = u.host | |
42 self.port = u.port | |
43 self.path = u.path or "." | |
44 | |
45 sshcmd = self.ui.config("ui", "ssh", "ssh") | |
46 remotecmd = self.ui.config("ui", "remotecmd", "hg") | |
47 | |
48 args = util.sshargs(sshcmd, self.host, self.user, self.port) | |
49 | |
50 if create: | |
51 cmd = '%s %s %s' % (sshcmd, args, | |
52 util.shellquote("%s init %s" % | |
53 (_serverquote(remotecmd), _serverquote(self.path)))) | |
54 ui.note(_('running %s\n') % cmd) | |
55 res = util.system(cmd) | |
56 if res != 0: | |
57 self._abort(error.RepoError(_("could not create remote repo"))) | |
58 | |
59 self.validate_repo(ui, sshcmd, args, remotecmd) | |
60 | |
61 def url(self): | |
62 return self._url | |
63 | |
64 def validate_repo(self, ui, sshcmd, args, remotecmd): | |
65 # cleanup up previous run | |
66 self.cleanup() | |
67 | |
68 cmd = '%s %s %s' % (sshcmd, args, | |
69 util.shellquote("%s -R %s serve --stdio" % | |
70 (_serverquote(remotecmd), _serverquote(self.path)))) | |
71 ui.note(_('running %s\n') % cmd) | |
72 cmd = util.quotecommand(cmd) | |
73 self.pipeo, self.pipei, self.pipee = util.popen3(cmd) | |
74 | |
75 # skip any noise generated by remote shell | |
76 self._callstream("hello") | |
77 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40))) | |
78 lines = ["", "dummy"] | |
79 max_noise = 500 | |
80 while lines[-1] and max_noise: | |
81 l = r.readline() | |
82 self.readerr() | |
83 if lines[-1] == "1\n" and l == "\n": | |
84 break | |
85 if l: | |
86 ui.debug("remote: ", l) | |
87 lines.append(l) | |
88 max_noise -= 1 | |
89 else: | |
90 self._abort(error.RepoError(_('no suitable response from ' | |
91 'remote hg'))) | |
92 | |
93 self._caps = set() | |
94 for l in reversed(lines): | |
95 if l.startswith("capabilities:"): | |
96 self._caps.update(l[:-1].split(":")[1].split()) | |
97 break | |
98 | |
99 def _capabilities(self): | |
100 return self._caps | |
101 | |
102 def readerr(self): | |
103 while True: | |
104 size = util.fstat(self.pipee).st_size | |
105 if size == 0: | |
106 break | |
107 s = self.pipee.read(size) | |
108 if not s: | |
109 break | |
110 for l in s.splitlines(): | |
111 self.ui.status(_("remote: "), l, '\n') | |
112 | |
113 def _abort(self, exception): | |
114 self.cleanup() | |
115 raise exception | |
116 | |
117 def cleanup(self): | |
118 if self.pipeo is None: | |
119 return | |
120 self.pipeo.close() | |
121 self.pipei.close() | |
122 try: | |
123 # read the error descriptor until EOF | |
124 for l in self.pipee: | |
125 self.ui.status(_("remote: "), l) | |
126 except (IOError, ValueError): | |
127 pass | |
128 self.pipee.close() | |
129 | |
130 __del__ = cleanup | |
131 | |
132 def _callstream(self, cmd, **args): | |
133 self.ui.debug("sending %s command\n" % cmd) | |
134 self.pipeo.write("%s\n" % cmd) | |
135 _func, names = wireproto.commands[cmd] | |
136 keys = names.split() | |
137 wireargs = {} | |
138 for k in keys: | |
139 if k == '*': | |
140 wireargs['*'] = args | |
141 break | |
142 else: | |
143 wireargs[k] = args[k] | |
144 del args[k] | |
145 for k, v in sorted(wireargs.iteritems()): | |
146 self.pipeo.write("%s %d\n" % (k, len(v))) | |
147 if isinstance(v, dict): | |
148 for dk, dv in v.iteritems(): | |
149 self.pipeo.write("%s %d\n" % (dk, len(dv))) | |
150 self.pipeo.write(dv) | |
151 else: | |
152 self.pipeo.write(v) | |
153 self.pipeo.flush() | |
154 | |
155 return self.pipei | |
156 | |
157 def _call(self, cmd, **args): | |
158 self._callstream(cmd, **args) | |
159 return self._recv() | |
160 | |
161 def _callpush(self, cmd, fp, **args): | |
162 r = self._call(cmd, **args) | |
163 if r: | |
164 return '', r | |
165 while True: | |
166 d = fp.read(4096) | |
167 if not d: | |
168 break | |
169 self._send(d) | |
170 self._send("", flush=True) | |
171 r = self._recv() | |
172 if r: | |
173 return '', r | |
174 return self._recv(), '' | |
175 | |
176 def _decompress(self, stream): | |
177 return stream | |
178 | |
179 def _recv(self): | |
180 l = self.pipei.readline() | |
181 if l == '\n': | |
182 err = [] | |
183 while True: | |
184 line = self.pipee.readline() | |
185 if line == '-\n': | |
186 break | |
187 err.extend([line]) | |
188 if len(err) > 0: | |
189 # strip the trailing newline added to the last line server-side | |
190 err[-1] = err[-1][:-1] | |
191 self._abort(error.OutOfBandError(*err)) | |
192 self.readerr() | |
193 try: | |
194 l = int(l) | |
195 except ValueError: | |
196 self._abort(error.ResponseError(_("unexpected response:"), l)) | |
197 return self.pipei.read(l) | |
198 | |
199 def _send(self, data, flush=False): | |
200 self.pipeo.write("%d\n" % len(data)) | |
201 if data: | |
202 self.pipeo.write(data) | |
203 if flush: | |
204 self.pipeo.flush() | |
205 self.readerr() | |
206 | |
207 def lock(self): | |
208 self._call("lock") | |
209 return remotelock(self) | |
210 | |
211 def unlock(self): | |
212 self._call("unlock") | |
213 | |
214 def addchangegroup(self, cg, source, url, lock=None): | |
215 '''Send a changegroup to the remote server. Return an integer | |
216 similar to unbundle(). DEPRECATED, since it requires locking the | |
217 remote.''' | |
218 d = self._call("addchangegroup") | |
219 if d: | |
220 self._abort(error.RepoError(_("push refused: %s") % d)) | |
221 while True: | |
222 d = cg.read(4096) | |
223 if not d: | |
224 break | |
225 self.pipeo.write(d) | |
226 self.readerr() | |
227 | |
228 self.pipeo.flush() | |
229 | |
230 self.readerr() | |
231 r = self._recv() | |
232 if not r: | |
233 return 1 | |
234 try: | |
235 return int(r) | |
236 except ValueError: | |
237 self._abort(error.ResponseError(_("unexpected response:"), r)) | |
238 | |
239 instance = sshpeer |