comparison mercurial/wireprotoframing.py @ 37060:2ec1fb9de638

wireproto: add request IDs to frames One of my primary goals with the new wire protocol is to make operations faster and enable both client and server-side operations to scale to multiple CPU cores. One of the ways we make server interactions faster is by reducing the number of round trips to that server. With the existing wire protocol, the "batch" command facilitates executing multiple commands from a single request payload. The way it works is the requests for multiple commands are serialized. The server executes those commands sequentially then serializes all their results. As an optimization for reducing round trips, this is very effective. The technical implementation, however, is pretty bad and suffers from a number of deficiencies. For example, it creates a new place where authorization to run a command must be checked. (The lack of this checking in older Mercurial releases was CVE-2018-1000132.) The principles behind the "batch" command are sound. However, the execution is not. Therefore, I want to ditch "batch" in the new wire protocol and have protocol level support for issuing multiple requests in a single round trip. This commit introduces support in the frame-based wire protocol to facilitate this. We do this by adding a "request ID" to each frame. If a server sees frames associated with different "request IDs," it handles them as separate requests. All of this happening possibly as part of the same message from client to server (the same request body in the case of HTTP). We /could/ model the exchange the way pipelined HTTP requests do, where the server processes requests in order they are issued and received. But this artifically constrains scalability. A better model is to allow multi-requests to be executed concurrently and for responses to be sent and handled concurrently. So the specification explicitly allows this. There is some work to be done around specifying dependencies between multi-requests. We take the easy road for now and punt on this problem, declaring that if order is important, clients must not issue the request until responses to dependent requests have been received. This commit focuses on the boilerplate of implementing the request ID. The server reactor still can't manage multiple, in-flight request IDs. This will be addressed in a subsequent commit. Because the wire semantics have changed, we bump the version of the media type. Differential Revision: https://phab.mercurial-scm.org/D2869
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 14 Mar 2018 16:51:34 -0700
parents 861e9d37e56e
children c5e9c3b47366
comparison
equal deleted inserted replaced
37059:861e9d37e56e 37060:2ec1fb9de638
17 from . import ( 17 from . import (
18 error, 18 error,
19 util, 19 util,
20 ) 20 )
21 21
22 FRAME_HEADER_SIZE = 4 22 FRAME_HEADER_SIZE = 6
23 DEFAULT_MAX_FRAME_SIZE = 32768 23 DEFAULT_MAX_FRAME_SIZE = 32768
24 24
25 FRAME_TYPE_COMMAND_NAME = 0x01 25 FRAME_TYPE_COMMAND_NAME = 0x01
26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02 26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
27 FRAME_TYPE_COMMAND_DATA = 0x03 27 FRAME_TYPE_COMMAND_DATA = 0x03
87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, 87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
88 } 88 }
89 89
90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') 90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
91 91
92 def makeframe(frametype, frameflags, payload): 92 def makeframe(requestid, frametype, frameflags, payload):
93 """Assemble a frame into a byte array.""" 93 """Assemble a frame into a byte array."""
94 # TODO assert size of payload. 94 # TODO assert size of payload.
95 frame = bytearray(FRAME_HEADER_SIZE + len(payload)) 95 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
96 96
97 # 24 bits length
98 # 16 bits request id
99 # 4 bits type
100 # 4 bits flags
101
97 l = struct.pack(r'<I', len(payload)) 102 l = struct.pack(r'<I', len(payload))
98 frame[0:3] = l[0:3] 103 frame[0:3] = l[0:3]
99 frame[3] = (frametype << 4) | frameflags 104 struct.pack_into(r'<H', frame, 3, requestid)
100 frame[4:] = payload 105 frame[5] = (frametype << 4) | frameflags
106 frame[6:] = payload
101 107
102 return frame 108 return frame
103 109
104 def makeframefromhumanstring(s): 110 def makeframefromhumanstring(s):
105 """Given a string of the form: <type> <flags> <payload>, creates a frame. 111 """Create a frame from a human readable string
112
113 Strings have the form:
114
115 <request-id> <type> <flags> <payload>
106 116
107 This can be used by user-facing applications and tests for creating 117 This can be used by user-facing applications and tests for creating
108 frames easily without having to type out a bunch of constants. 118 frames easily without having to type out a bunch of constants.
109 119
120 Request ID is an integer.
121
110 Frame type and flags can be specified by integer or named constant. 122 Frame type and flags can be specified by integer or named constant.
123
111 Flags can be delimited by `|` to bitwise OR them together. 124 Flags can be delimited by `|` to bitwise OR them together.
112 """ 125 """
113 frametype, frameflags, payload = s.split(b' ', 2) 126 requestid, frametype, frameflags, payload = s.split(b' ', 3)
127
128 requestid = int(requestid)
114 129
115 if frametype in FRAME_TYPES: 130 if frametype in FRAME_TYPES:
116 frametype = FRAME_TYPES[frametype] 131 frametype = FRAME_TYPES[frametype]
117 else: 132 else:
118 frametype = int(frametype) 133 frametype = int(frametype)
125 else: 140 else:
126 finalflags |= int(flag) 141 finalflags |= int(flag)
127 142
128 payload = util.unescapestr(payload) 143 payload = util.unescapestr(payload)
129 144
130 return makeframe(frametype, finalflags, payload) 145 return makeframe(requestid, frametype, finalflags, payload)
131 146
132 def parseheader(data): 147 def parseheader(data):
133 """Parse a unified framing protocol frame header from a buffer. 148 """Parse a unified framing protocol frame header from a buffer.
134 149
135 The header is expected to be in the buffer at offset 0 and the 150 The header is expected to be in the buffer at offset 0 and the
138 # 24 bits payload length (little endian) 153 # 24 bits payload length (little endian)
139 # 4 bits frame type 154 # 4 bits frame type
140 # 4 bits frame flags 155 # 4 bits frame flags
141 # ... payload 156 # ... payload
142 framelength = data[0] + 256 * data[1] + 16384 * data[2] 157 framelength = data[0] + 256 * data[1] + 16384 * data[2]
143 typeflags = data[3] 158 requestid = struct.unpack_from(r'<H', data, 3)[0]
159 typeflags = data[5]
144 160
145 frametype = (typeflags & 0xf0) >> 4 161 frametype = (typeflags & 0xf0) >> 4
146 frameflags = typeflags & 0x0f 162 frameflags = typeflags & 0x0f
147 163
148 return frametype, frameflags, framelength 164 return requestid, frametype, frameflags, framelength
149 165
150 def readframe(fh): 166 def readframe(fh):
151 """Read a unified framing protocol frame from a file object. 167 """Read a unified framing protocol frame from a file object.
152 168
153 Returns a 3-tuple of (type, flags, payload) for the decoded frame or 169 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
163 179
164 if readcount != FRAME_HEADER_SIZE: 180 if readcount != FRAME_HEADER_SIZE:
165 raise error.Abort(_('received incomplete frame: got %d bytes: %s') % 181 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
166 (readcount, header)) 182 (readcount, header))
167 183
168 frametype, frameflags, framelength = parseheader(header) 184 requestid, frametype, frameflags, framelength = parseheader(header)
169 185
170 payload = fh.read(framelength) 186 payload = fh.read(framelength)
171 if len(payload) != framelength: 187 if len(payload) != framelength:
172 raise error.Abort(_('frame length error: expected %d; got %d') % 188 raise error.Abort(_('frame length error: expected %d; got %d') %
173 (framelength, len(payload))) 189 (framelength, len(payload)))
174 190
175 return frametype, frameflags, payload 191 return requestid, frametype, frameflags, payload
176 192
177 def createcommandframes(cmd, args, datafh=None): 193 def createcommandframes(requestid, cmd, args, datafh=None):
178 """Create frames necessary to transmit a request to run a command. 194 """Create frames necessary to transmit a request to run a command.
179 195
180 This is a generator of bytearrays. Each item represents a frame 196 This is a generator of bytearrays. Each item represents a frame
181 ready to be sent over the wire to a peer. 197 ready to be sent over the wire to a peer.
182 """ 198 """
187 flags |= FLAG_COMMAND_NAME_HAVE_DATA 203 flags |= FLAG_COMMAND_NAME_HAVE_DATA
188 204
189 if not flags: 205 if not flags:
190 flags |= FLAG_COMMAND_NAME_EOS 206 flags |= FLAG_COMMAND_NAME_EOS
191 207
192 yield makeframe(FRAME_TYPE_COMMAND_NAME, flags, cmd) 208 yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
193 209
194 for i, k in enumerate(sorted(args)): 210 for i, k in enumerate(sorted(args)):
195 v = args[k] 211 v = args[k]
196 last = i == len(args) - 1 212 last = i == len(args) - 1
197 213
203 payload[offset:offset + len(k)] = k 219 payload[offset:offset + len(k)] = k
204 offset += len(k) 220 offset += len(k)
205 payload[offset:offset + len(v)] = v 221 payload[offset:offset + len(v)] = v
206 222
207 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0 223 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
208 yield makeframe(FRAME_TYPE_COMMAND_ARGUMENT, flags, payload) 224 yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
209 225
210 if datafh: 226 if datafh:
211 while True: 227 while True:
212 data = datafh.read(DEFAULT_MAX_FRAME_SIZE) 228 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
213 229
217 else: 233 else:
218 flags = FLAG_COMMAND_DATA_EOS 234 flags = FLAG_COMMAND_DATA_EOS
219 assert datafh.read(1) == b'' 235 assert datafh.read(1) == b''
220 done = True 236 done = True
221 237
222 yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data) 238 yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
223 239
224 if done: 240 if done:
225 break 241 break
226 242
227 def createbytesresponseframesfrombytes(data, 243 def createbytesresponseframesfrombytes(requestid, data,
228 maxframesize=DEFAULT_MAX_FRAME_SIZE): 244 maxframesize=DEFAULT_MAX_FRAME_SIZE):
229 """Create a raw frame to send a bytes response from static bytes input. 245 """Create a raw frame to send a bytes response from static bytes input.
230 246
231 Returns a generator of bytearrays. 247 Returns a generator of bytearrays.
232 """ 248 """
233 249
234 # Simple case of a single frame. 250 # Simple case of a single frame.
235 if len(data) <= maxframesize: 251 if len(data) <= maxframesize:
236 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, 252 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
237 FLAG_BYTES_RESPONSE_EOS, data) 253 FLAG_BYTES_RESPONSE_EOS, data)
238 return 254 return
239 255
240 offset = 0 256 offset = 0
241 while True: 257 while True:
246 if done: 262 if done:
247 flags = FLAG_BYTES_RESPONSE_EOS 263 flags = FLAG_BYTES_RESPONSE_EOS
248 else: 264 else:
249 flags = FLAG_BYTES_RESPONSE_CONTINUATION 265 flags = FLAG_BYTES_RESPONSE_CONTINUATION
250 266
251 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk) 267 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
252 268
253 if done: 269 if done:
254 break 270 break
255 271
256 def createerrorframe(msg, protocol=False, application=False): 272 def createerrorframe(requestid, msg, protocol=False, application=False):
257 # TODO properly handle frame size limits. 273 # TODO properly handle frame size limits.
258 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE 274 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
259 275
260 flags = 0 276 flags = 0
261 if protocol: 277 if protocol:
262 flags |= FLAG_ERROR_RESPONSE_PROTOCOL 278 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
263 if application: 279 if application:
264 flags |= FLAG_ERROR_RESPONSE_APPLICATION 280 flags |= FLAG_ERROR_RESPONSE_APPLICATION
265 281
266 yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg) 282 yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
267 283
268 class serverreactor(object): 284 class serverreactor(object):
269 """Holds state of a server handling frame-based protocol requests. 285 """Holds state of a server handling frame-based protocol requests.
270 286
271 This class is the "brain" of the unified frame-based protocol server 287 This class is the "brain" of the unified frame-based protocol server
324 sender cannot receive until all data has been transmitted. 340 sender cannot receive until all data has been transmitted.
325 """ 341 """
326 self._deferoutput = deferoutput 342 self._deferoutput = deferoutput
327 self._state = 'idle' 343 self._state = 'idle'
328 self._bufferedframegens = [] 344 self._bufferedframegens = []
345 self._activerequestid = None
329 self._activecommand = None 346 self._activecommand = None
330 self._activeargs = None 347 self._activeargs = None
331 self._activedata = None 348 self._activedata = None
332 self._expectingargs = None 349 self._expectingargs = None
333 self._expectingdata = None 350 self._expectingdata = None
334 self._activeargname = None 351 self._activeargname = None
335 self._activeargchunks = None 352 self._activeargchunks = None
336 353
337 def onframerecv(self, frametype, frameflags, payload): 354 def onframerecv(self, requestid, frametype, frameflags, payload):
338 """Process a frame that has been received off the wire. 355 """Process a frame that has been received off the wire.
339 356
340 Returns a dict with an ``action`` key that details what action, 357 Returns a dict with an ``action`` key that details what action,
341 if any, the consumer should take next. 358 if any, the consumer should take next.
342 """ 359 """
349 366
350 meth = handlers.get(self._state) 367 meth = handlers.get(self._state)
351 if not meth: 368 if not meth:
352 raise error.ProgrammingError('unhandled state: %s' % self._state) 369 raise error.ProgrammingError('unhandled state: %s' % self._state)
353 370
354 return meth(frametype, frameflags, payload) 371 return meth(requestid, frametype, frameflags, payload)
355 372
356 def onbytesresponseready(self, data): 373 def onbytesresponseready(self, requestid, data):
357 """Signal that a bytes response is ready to be sent to the client. 374 """Signal that a bytes response is ready to be sent to the client.
358 375
359 The raw bytes response is passed as an argument. 376 The raw bytes response is passed as an argument.
360 """ 377 """
361 framegen = createbytesresponseframesfrombytes(data) 378 framegen = createbytesresponseframesfrombytes(requestid, data)
362 379
363 if self._deferoutput: 380 if self._deferoutput:
364 self._bufferedframegens.append(framegen) 381 self._bufferedframegens.append(framegen)
365 return 'noop', {} 382 return 'noop', {}
366 else: 383 else:
385 402
386 return 'sendframes', { 403 return 'sendframes', {
387 'framegen': makegen(), 404 'framegen': makegen(),
388 } 405 }
389 406
390 def onapplicationerror(self, msg): 407 def onapplicationerror(self, requestid, msg):
391 return 'sendframes', { 408 return 'sendframes', {
392 'framegen': createerrorframe(msg, application=True), 409 'framegen': createerrorframe(requestid, msg, application=True),
393 } 410 }
394 411
395 def _makeerrorresult(self, msg): 412 def _makeerrorresult(self, msg):
396 return 'error', { 413 return 'error', {
397 'message': msg, 414 'message': msg,
398 } 415 }
399 416
400 def _makeruncommandresult(self): 417 def _makeruncommandresult(self):
401 return 'runcommand', { 418 return 'runcommand', {
419 'requestid': self._activerequestid,
402 'command': self._activecommand, 420 'command': self._activecommand,
403 'args': self._activeargs, 421 'args': self._activeargs,
404 'data': self._activedata.getvalue() if self._activedata else None, 422 'data': self._activedata.getvalue() if self._activedata else None,
405 } 423 }
406 424
407 def _makewantframeresult(self): 425 def _makewantframeresult(self):
408 return 'wantframe', { 426 return 'wantframe', {
409 'state': self._state, 427 'state': self._state,
410 } 428 }
411 429
412 def _onframeidle(self, frametype, frameflags, payload): 430 def _onframeidle(self, requestid, frametype, frameflags, payload):
413 # The only frame type that should be received in this state is a 431 # The only frame type that should be received in this state is a
414 # command request. 432 # command request.
415 if frametype != FRAME_TYPE_COMMAND_NAME: 433 if frametype != FRAME_TYPE_COMMAND_NAME:
416 self._state = 'errored' 434 self._state = 'errored'
417 return self._makeerrorresult( 435 return self._makeerrorresult(
418 _('expected command frame; got %d') % frametype) 436 _('expected command frame; got %d') % frametype)
419 437
438 self._activerequestid = requestid
420 self._activecommand = payload 439 self._activecommand = payload
421 self._activeargs = {} 440 self._activeargs = {}
422 self._activedata = None 441 self._activedata = None
423 442
424 if frameflags & FLAG_COMMAND_NAME_EOS: 443 if frameflags & FLAG_COMMAND_NAME_EOS:
437 else: 456 else:
438 self._state = 'errored' 457 self._state = 'errored'
439 return self._makeerrorresult(_('missing frame flags on ' 458 return self._makeerrorresult(_('missing frame flags on '
440 'command frame')) 459 'command frame'))
441 460
442 def _onframereceivingargs(self, frametype, frameflags, payload): 461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
443 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: 462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
444 self._state = 'errored' 463 self._state = 'errored'
445 return self._makeerrorresult(_('expected command argument ' 464 return self._makeerrorresult(_('expected command argument '
446 'frame; got %d') % frametype) 465 'frame; got %d') % frametype)
447 466
490 self._state = 'waiting' 509 self._state = 'waiting'
491 return self._makeruncommandresult() 510 return self._makeruncommandresult()
492 else: 511 else:
493 return self._makewantframeresult() 512 return self._makewantframeresult()
494 513
495 def _onframereceivingdata(self, frametype, frameflags, payload): 514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
496 if frametype != FRAME_TYPE_COMMAND_DATA: 515 if frametype != FRAME_TYPE_COMMAND_DATA:
497 self._state = 'errored' 516 self._state = 'errored'
498 return self._makeerrorresult(_('expected command data frame; ' 517 return self._makeerrorresult(_('expected command data frame; '
499 'got %d') % frametype) 518 'got %d') % frametype)
500 519
510 else: 529 else:
511 self._state = 'errored' 530 self._state = 'errored'
512 return self._makeerrorresult(_('command data frame without ' 531 return self._makeerrorresult(_('command data frame without '
513 'flags')) 532 'flags'))
514 533
515 def _onframeerrored(self, frametype, frameflags, payload): 534 def _onframeerrored(self, requestid, frametype, frameflags, payload):
516 return self._makeerrorresult(_('server already errored')) 535 return self._makeerrorresult(_('server already errored'))