Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/wireprotoframing.py @ 37060:2ec1fb9de638
wireproto: add request IDs to frames
One of my primary goals with the new wire protocol is to make
operations faster and enable both client and server-side
operations to scale to multiple CPU cores.
One of the ways we make server interactions faster is by reducing
the number of round trips to that server.
With the existing wire protocol, the "batch" command facilitates
executing multiple commands from a single request payload. The way
it works is the requests for multiple commands are serialized. The
server executes those commands sequentially then serializes all
their results. As an optimization for reducing round trips, this
is very effective. The technical implementation, however, is pretty
bad and suffers from a number of deficiencies. For example, it
creates a new place where authorization to run a command must be
checked. (The lack of this checking in older Mercurial releases
was CVE-2018-1000132.)
The principles behind the "batch" command are sound. However, the
execution is not. Therefore, I want to ditch "batch" in the
new wire protocol and have protocol level support for issuing
multiple requests in a single round trip.
This commit introduces support in the frame-based wire protocol to
facilitate this. We do this by adding a "request ID" to each frame.
If a server sees frames associated with different "request IDs," it
handles them as separate requests. All of this happening possibly
as part of the same message from client to server (the same request
body in the case of HTTP).
We /could/ model the exchange the way pipelined HTTP requests do,
where the server processes requests in order they are issued and
received. But this artifically constrains scalability. A better
model is to allow multi-requests to be executed concurrently and
for responses to be sent and handled concurrently. So the
specification explicitly allows this. There is some work to be done
around specifying dependencies between multi-requests. We take
the easy road for now and punt on this problem, declaring that
if order is important, clients must not issue the request until
responses to dependent requests have been received.
This commit focuses on the boilerplate of implementing the request
ID. The server reactor still can't manage multiple, in-flight
request IDs. This will be addressed in a subsequent commit.
Because the wire semantics have changed, we bump the version of the
media type.
Differential Revision: https://phab.mercurial-scm.org/D2869
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 14 Mar 2018 16:51:34 -0700 |
parents | 861e9d37e56e |
children | c5e9c3b47366 |
comparison
equal
deleted
inserted
replaced
37059:861e9d37e56e | 37060:2ec1fb9de638 |
---|---|
17 from . import ( | 17 from . import ( |
18 error, | 18 error, |
19 util, | 19 util, |
20 ) | 20 ) |
21 | 21 |
22 FRAME_HEADER_SIZE = 4 | 22 FRAME_HEADER_SIZE = 6 |
23 DEFAULT_MAX_FRAME_SIZE = 32768 | 23 DEFAULT_MAX_FRAME_SIZE = 32768 |
24 | 24 |
25 FRAME_TYPE_COMMAND_NAME = 0x01 | 25 FRAME_TYPE_COMMAND_NAME = 0x01 |
26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02 | 26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02 |
27 FRAME_TYPE_COMMAND_DATA = 0x03 | 27 FRAME_TYPE_COMMAND_DATA = 0x03 |
87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, | 87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, |
88 } | 88 } |
89 | 89 |
90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') | 90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') |
91 | 91 |
92 def makeframe(frametype, frameflags, payload): | 92 def makeframe(requestid, frametype, frameflags, payload): |
93 """Assemble a frame into a byte array.""" | 93 """Assemble a frame into a byte array.""" |
94 # TODO assert size of payload. | 94 # TODO assert size of payload. |
95 frame = bytearray(FRAME_HEADER_SIZE + len(payload)) | 95 frame = bytearray(FRAME_HEADER_SIZE + len(payload)) |
96 | 96 |
97 # 24 bits length | |
98 # 16 bits request id | |
99 # 4 bits type | |
100 # 4 bits flags | |
101 | |
97 l = struct.pack(r'<I', len(payload)) | 102 l = struct.pack(r'<I', len(payload)) |
98 frame[0:3] = l[0:3] | 103 frame[0:3] = l[0:3] |
99 frame[3] = (frametype << 4) | frameflags | 104 struct.pack_into(r'<H', frame, 3, requestid) |
100 frame[4:] = payload | 105 frame[5] = (frametype << 4) | frameflags |
106 frame[6:] = payload | |
101 | 107 |
102 return frame | 108 return frame |
103 | 109 |
104 def makeframefromhumanstring(s): | 110 def makeframefromhumanstring(s): |
105 """Given a string of the form: <type> <flags> <payload>, creates a frame. | 111 """Create a frame from a human readable string |
112 | |
113 Strings have the form: | |
114 | |
115 <request-id> <type> <flags> <payload> | |
106 | 116 |
107 This can be used by user-facing applications and tests for creating | 117 This can be used by user-facing applications and tests for creating |
108 frames easily without having to type out a bunch of constants. | 118 frames easily without having to type out a bunch of constants. |
109 | 119 |
120 Request ID is an integer. | |
121 | |
110 Frame type and flags can be specified by integer or named constant. | 122 Frame type and flags can be specified by integer or named constant. |
123 | |
111 Flags can be delimited by `|` to bitwise OR them together. | 124 Flags can be delimited by `|` to bitwise OR them together. |
112 """ | 125 """ |
113 frametype, frameflags, payload = s.split(b' ', 2) | 126 requestid, frametype, frameflags, payload = s.split(b' ', 3) |
127 | |
128 requestid = int(requestid) | |
114 | 129 |
115 if frametype in FRAME_TYPES: | 130 if frametype in FRAME_TYPES: |
116 frametype = FRAME_TYPES[frametype] | 131 frametype = FRAME_TYPES[frametype] |
117 else: | 132 else: |
118 frametype = int(frametype) | 133 frametype = int(frametype) |
125 else: | 140 else: |
126 finalflags |= int(flag) | 141 finalflags |= int(flag) |
127 | 142 |
128 payload = util.unescapestr(payload) | 143 payload = util.unescapestr(payload) |
129 | 144 |
130 return makeframe(frametype, finalflags, payload) | 145 return makeframe(requestid, frametype, finalflags, payload) |
131 | 146 |
132 def parseheader(data): | 147 def parseheader(data): |
133 """Parse a unified framing protocol frame header from a buffer. | 148 """Parse a unified framing protocol frame header from a buffer. |
134 | 149 |
135 The header is expected to be in the buffer at offset 0 and the | 150 The header is expected to be in the buffer at offset 0 and the |
138 # 24 bits payload length (little endian) | 153 # 24 bits payload length (little endian) |
139 # 4 bits frame type | 154 # 4 bits frame type |
140 # 4 bits frame flags | 155 # 4 bits frame flags |
141 # ... payload | 156 # ... payload |
142 framelength = data[0] + 256 * data[1] + 16384 * data[2] | 157 framelength = data[0] + 256 * data[1] + 16384 * data[2] |
143 typeflags = data[3] | 158 requestid = struct.unpack_from(r'<H', data, 3)[0] |
159 typeflags = data[5] | |
144 | 160 |
145 frametype = (typeflags & 0xf0) >> 4 | 161 frametype = (typeflags & 0xf0) >> 4 |
146 frameflags = typeflags & 0x0f | 162 frameflags = typeflags & 0x0f |
147 | 163 |
148 return frametype, frameflags, framelength | 164 return requestid, frametype, frameflags, framelength |
149 | 165 |
150 def readframe(fh): | 166 def readframe(fh): |
151 """Read a unified framing protocol frame from a file object. | 167 """Read a unified framing protocol frame from a file object. |
152 | 168 |
153 Returns a 3-tuple of (type, flags, payload) for the decoded frame or | 169 Returns a 3-tuple of (type, flags, payload) for the decoded frame or |
163 | 179 |
164 if readcount != FRAME_HEADER_SIZE: | 180 if readcount != FRAME_HEADER_SIZE: |
165 raise error.Abort(_('received incomplete frame: got %d bytes: %s') % | 181 raise error.Abort(_('received incomplete frame: got %d bytes: %s') % |
166 (readcount, header)) | 182 (readcount, header)) |
167 | 183 |
168 frametype, frameflags, framelength = parseheader(header) | 184 requestid, frametype, frameflags, framelength = parseheader(header) |
169 | 185 |
170 payload = fh.read(framelength) | 186 payload = fh.read(framelength) |
171 if len(payload) != framelength: | 187 if len(payload) != framelength: |
172 raise error.Abort(_('frame length error: expected %d; got %d') % | 188 raise error.Abort(_('frame length error: expected %d; got %d') % |
173 (framelength, len(payload))) | 189 (framelength, len(payload))) |
174 | 190 |
175 return frametype, frameflags, payload | 191 return requestid, frametype, frameflags, payload |
176 | 192 |
177 def createcommandframes(cmd, args, datafh=None): | 193 def createcommandframes(requestid, cmd, args, datafh=None): |
178 """Create frames necessary to transmit a request to run a command. | 194 """Create frames necessary to transmit a request to run a command. |
179 | 195 |
180 This is a generator of bytearrays. Each item represents a frame | 196 This is a generator of bytearrays. Each item represents a frame |
181 ready to be sent over the wire to a peer. | 197 ready to be sent over the wire to a peer. |
182 """ | 198 """ |
187 flags |= FLAG_COMMAND_NAME_HAVE_DATA | 203 flags |= FLAG_COMMAND_NAME_HAVE_DATA |
188 | 204 |
189 if not flags: | 205 if not flags: |
190 flags |= FLAG_COMMAND_NAME_EOS | 206 flags |= FLAG_COMMAND_NAME_EOS |
191 | 207 |
192 yield makeframe(FRAME_TYPE_COMMAND_NAME, flags, cmd) | 208 yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd) |
193 | 209 |
194 for i, k in enumerate(sorted(args)): | 210 for i, k in enumerate(sorted(args)): |
195 v = args[k] | 211 v = args[k] |
196 last = i == len(args) - 1 | 212 last = i == len(args) - 1 |
197 | 213 |
203 payload[offset:offset + len(k)] = k | 219 payload[offset:offset + len(k)] = k |
204 offset += len(k) | 220 offset += len(k) |
205 payload[offset:offset + len(v)] = v | 221 payload[offset:offset + len(v)] = v |
206 | 222 |
207 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0 | 223 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0 |
208 yield makeframe(FRAME_TYPE_COMMAND_ARGUMENT, flags, payload) | 224 yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload) |
209 | 225 |
210 if datafh: | 226 if datafh: |
211 while True: | 227 while True: |
212 data = datafh.read(DEFAULT_MAX_FRAME_SIZE) | 228 data = datafh.read(DEFAULT_MAX_FRAME_SIZE) |
213 | 229 |
217 else: | 233 else: |
218 flags = FLAG_COMMAND_DATA_EOS | 234 flags = FLAG_COMMAND_DATA_EOS |
219 assert datafh.read(1) == b'' | 235 assert datafh.read(1) == b'' |
220 done = True | 236 done = True |
221 | 237 |
222 yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data) | 238 yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data) |
223 | 239 |
224 if done: | 240 if done: |
225 break | 241 break |
226 | 242 |
227 def createbytesresponseframesfrombytes(data, | 243 def createbytesresponseframesfrombytes(requestid, data, |
228 maxframesize=DEFAULT_MAX_FRAME_SIZE): | 244 maxframesize=DEFAULT_MAX_FRAME_SIZE): |
229 """Create a raw frame to send a bytes response from static bytes input. | 245 """Create a raw frame to send a bytes response from static bytes input. |
230 | 246 |
231 Returns a generator of bytearrays. | 247 Returns a generator of bytearrays. |
232 """ | 248 """ |
233 | 249 |
234 # Simple case of a single frame. | 250 # Simple case of a single frame. |
235 if len(data) <= maxframesize: | 251 if len(data) <= maxframesize: |
236 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, | 252 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, |
237 FLAG_BYTES_RESPONSE_EOS, data) | 253 FLAG_BYTES_RESPONSE_EOS, data) |
238 return | 254 return |
239 | 255 |
240 offset = 0 | 256 offset = 0 |
241 while True: | 257 while True: |
246 if done: | 262 if done: |
247 flags = FLAG_BYTES_RESPONSE_EOS | 263 flags = FLAG_BYTES_RESPONSE_EOS |
248 else: | 264 else: |
249 flags = FLAG_BYTES_RESPONSE_CONTINUATION | 265 flags = FLAG_BYTES_RESPONSE_CONTINUATION |
250 | 266 |
251 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk) | 267 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk) |
252 | 268 |
253 if done: | 269 if done: |
254 break | 270 break |
255 | 271 |
256 def createerrorframe(msg, protocol=False, application=False): | 272 def createerrorframe(requestid, msg, protocol=False, application=False): |
257 # TODO properly handle frame size limits. | 273 # TODO properly handle frame size limits. |
258 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE | 274 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
259 | 275 |
260 flags = 0 | 276 flags = 0 |
261 if protocol: | 277 if protocol: |
262 flags |= FLAG_ERROR_RESPONSE_PROTOCOL | 278 flags |= FLAG_ERROR_RESPONSE_PROTOCOL |
263 if application: | 279 if application: |
264 flags |= FLAG_ERROR_RESPONSE_APPLICATION | 280 flags |= FLAG_ERROR_RESPONSE_APPLICATION |
265 | 281 |
266 yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg) | 282 yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg) |
267 | 283 |
268 class serverreactor(object): | 284 class serverreactor(object): |
269 """Holds state of a server handling frame-based protocol requests. | 285 """Holds state of a server handling frame-based protocol requests. |
270 | 286 |
271 This class is the "brain" of the unified frame-based protocol server | 287 This class is the "brain" of the unified frame-based protocol server |
324 sender cannot receive until all data has been transmitted. | 340 sender cannot receive until all data has been transmitted. |
325 """ | 341 """ |
326 self._deferoutput = deferoutput | 342 self._deferoutput = deferoutput |
327 self._state = 'idle' | 343 self._state = 'idle' |
328 self._bufferedframegens = [] | 344 self._bufferedframegens = [] |
345 self._activerequestid = None | |
329 self._activecommand = None | 346 self._activecommand = None |
330 self._activeargs = None | 347 self._activeargs = None |
331 self._activedata = None | 348 self._activedata = None |
332 self._expectingargs = None | 349 self._expectingargs = None |
333 self._expectingdata = None | 350 self._expectingdata = None |
334 self._activeargname = None | 351 self._activeargname = None |
335 self._activeargchunks = None | 352 self._activeargchunks = None |
336 | 353 |
337 def onframerecv(self, frametype, frameflags, payload): | 354 def onframerecv(self, requestid, frametype, frameflags, payload): |
338 """Process a frame that has been received off the wire. | 355 """Process a frame that has been received off the wire. |
339 | 356 |
340 Returns a dict with an ``action`` key that details what action, | 357 Returns a dict with an ``action`` key that details what action, |
341 if any, the consumer should take next. | 358 if any, the consumer should take next. |
342 """ | 359 """ |
349 | 366 |
350 meth = handlers.get(self._state) | 367 meth = handlers.get(self._state) |
351 if not meth: | 368 if not meth: |
352 raise error.ProgrammingError('unhandled state: %s' % self._state) | 369 raise error.ProgrammingError('unhandled state: %s' % self._state) |
353 | 370 |
354 return meth(frametype, frameflags, payload) | 371 return meth(requestid, frametype, frameflags, payload) |
355 | 372 |
356 def onbytesresponseready(self, data): | 373 def onbytesresponseready(self, requestid, data): |
357 """Signal that a bytes response is ready to be sent to the client. | 374 """Signal that a bytes response is ready to be sent to the client. |
358 | 375 |
359 The raw bytes response is passed as an argument. | 376 The raw bytes response is passed as an argument. |
360 """ | 377 """ |
361 framegen = createbytesresponseframesfrombytes(data) | 378 framegen = createbytesresponseframesfrombytes(requestid, data) |
362 | 379 |
363 if self._deferoutput: | 380 if self._deferoutput: |
364 self._bufferedframegens.append(framegen) | 381 self._bufferedframegens.append(framegen) |
365 return 'noop', {} | 382 return 'noop', {} |
366 else: | 383 else: |
385 | 402 |
386 return 'sendframes', { | 403 return 'sendframes', { |
387 'framegen': makegen(), | 404 'framegen': makegen(), |
388 } | 405 } |
389 | 406 |
390 def onapplicationerror(self, msg): | 407 def onapplicationerror(self, requestid, msg): |
391 return 'sendframes', { | 408 return 'sendframes', { |
392 'framegen': createerrorframe(msg, application=True), | 409 'framegen': createerrorframe(requestid, msg, application=True), |
393 } | 410 } |
394 | 411 |
395 def _makeerrorresult(self, msg): | 412 def _makeerrorresult(self, msg): |
396 return 'error', { | 413 return 'error', { |
397 'message': msg, | 414 'message': msg, |
398 } | 415 } |
399 | 416 |
400 def _makeruncommandresult(self): | 417 def _makeruncommandresult(self): |
401 return 'runcommand', { | 418 return 'runcommand', { |
419 'requestid': self._activerequestid, | |
402 'command': self._activecommand, | 420 'command': self._activecommand, |
403 'args': self._activeargs, | 421 'args': self._activeargs, |
404 'data': self._activedata.getvalue() if self._activedata else None, | 422 'data': self._activedata.getvalue() if self._activedata else None, |
405 } | 423 } |
406 | 424 |
407 def _makewantframeresult(self): | 425 def _makewantframeresult(self): |
408 return 'wantframe', { | 426 return 'wantframe', { |
409 'state': self._state, | 427 'state': self._state, |
410 } | 428 } |
411 | 429 |
412 def _onframeidle(self, frametype, frameflags, payload): | 430 def _onframeidle(self, requestid, frametype, frameflags, payload): |
413 # The only frame type that should be received in this state is a | 431 # The only frame type that should be received in this state is a |
414 # command request. | 432 # command request. |
415 if frametype != FRAME_TYPE_COMMAND_NAME: | 433 if frametype != FRAME_TYPE_COMMAND_NAME: |
416 self._state = 'errored' | 434 self._state = 'errored' |
417 return self._makeerrorresult( | 435 return self._makeerrorresult( |
418 _('expected command frame; got %d') % frametype) | 436 _('expected command frame; got %d') % frametype) |
419 | 437 |
438 self._activerequestid = requestid | |
420 self._activecommand = payload | 439 self._activecommand = payload |
421 self._activeargs = {} | 440 self._activeargs = {} |
422 self._activedata = None | 441 self._activedata = None |
423 | 442 |
424 if frameflags & FLAG_COMMAND_NAME_EOS: | 443 if frameflags & FLAG_COMMAND_NAME_EOS: |
437 else: | 456 else: |
438 self._state = 'errored' | 457 self._state = 'errored' |
439 return self._makeerrorresult(_('missing frame flags on ' | 458 return self._makeerrorresult(_('missing frame flags on ' |
440 'command frame')) | 459 'command frame')) |
441 | 460 |
442 def _onframereceivingargs(self, frametype, frameflags, payload): | 461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload): |
443 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: | 462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: |
444 self._state = 'errored' | 463 self._state = 'errored' |
445 return self._makeerrorresult(_('expected command argument ' | 464 return self._makeerrorresult(_('expected command argument ' |
446 'frame; got %d') % frametype) | 465 'frame; got %d') % frametype) |
447 | 466 |
490 self._state = 'waiting' | 509 self._state = 'waiting' |
491 return self._makeruncommandresult() | 510 return self._makeruncommandresult() |
492 else: | 511 else: |
493 return self._makewantframeresult() | 512 return self._makewantframeresult() |
494 | 513 |
495 def _onframereceivingdata(self, frametype, frameflags, payload): | 514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload): |
496 if frametype != FRAME_TYPE_COMMAND_DATA: | 515 if frametype != FRAME_TYPE_COMMAND_DATA: |
497 self._state = 'errored' | 516 self._state = 'errored' |
498 return self._makeerrorresult(_('expected command data frame; ' | 517 return self._makeerrorresult(_('expected command data frame; ' |
499 'got %d') % frametype) | 518 'got %d') % frametype) |
500 | 519 |
510 else: | 529 else: |
511 self._state = 'errored' | 530 self._state = 'errored' |
512 return self._makeerrorresult(_('command data frame without ' | 531 return self._makeerrorresult(_('command data frame without ' |
513 'flags')) | 532 'flags')) |
514 | 533 |
515 def _onframeerrored(self, frametype, frameflags, payload): | 534 def _onframeerrored(self, requestid, frametype, frameflags, payload): |
516 return self._makeerrorresult(_('server already errored')) | 535 return self._makeerrorresult(_('server already errored')) |