comparison mercurial/wireprotov2server.py @ 39559:07b58266bce3

wireprotov2: implement commands as a generator of objects Previously, wire protocol version 2 inherited version 1's model of having separate types to represent the results of different wire protocol commands. As I implemented more powerful commands in future commits, I found I was using a common pattern of returning a special type to hold a generator. This meant the command function required a closure to do most of the work. That made logic flow more difficult to follow. I also noticed that many commands were effectively a sequence of objects to be CBOR encoded. I think it makes sense to define version 2 commands as generators. This way, commands can simply emit the data structures they wish to send to the client. This eliminates the need for a closure in command functions and removes encoding from the bodies of commands. As part of this commit, the handling of response objects has been moved into the serverreactor class. This puts the reactor in the driver's seat with regards to CBOR encoding and error handling. Having error handling in the function that emits frames is particularly important because exceptions in that function can lead to things getting in a bad state: I'm fairly certain that uncaught exceptions in the frame generator were causing deadlocks. I also introduced a dedicated error type for explicit error reporting in command handlers. This will be used in subsequent commits. There's still a bit of work to be done here, especially around formalizing the error handling "protocol." I've added yet another TODO to track this so we don't forget. Test output changed because we're using generators and no longer know we are at the end of the data until we hit the end of the generator. This means we can't emit the end-of-stream flag until we've exhausted the generator. Hence the introduction of 0-sized end-of-stream frames. Differential Revision: https://phab.mercurial-scm.org/D4472
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 05 Sep 2018 09:06:40 -0700
parents 660879e49b46
children 9c2c77c73f23
comparison
equal deleted inserted replaced
39558:b0e0db1565d1 39559:07b58266bce3
17 util, 17 util,
18 wireprotoframing, 18 wireprotoframing,
19 wireprototypes, 19 wireprototypes,
20 ) 20 )
21 from .utils import ( 21 from .utils import (
22 cborutil,
23 interfaceutil, 22 interfaceutil,
24 ) 23 )
25 24
26 FRAMINGTYPE = b'application/mercurial-exp-framing-0005' 25 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
27 26
293 res.status = b'200 OK' 292 res.status = b'200 OK'
294 res.headers[b'Content-Type'] = b'text/plain' 293 res.headers[b'Content-Type'] = b'text/plain'
295 res.setbodybytes(_('command in frame must match command in URL')) 294 res.setbodybytes(_('command in frame must match command in URL'))
296 return True 295 return True
297 296
298 rsp = dispatch(repo, proto, command['command'])
299
300 res.status = b'200 OK' 297 res.status = b'200 OK'
301 res.headers[b'Content-Type'] = FRAMINGTYPE 298 res.headers[b'Content-Type'] = FRAMINGTYPE
302 299
303 # TODO consider adding a type to represent an iterable of values to 300 try:
304 # be CBOR encoded. 301 objs = dispatch(repo, proto, command['command'])
305 if isinstance(rsp, wireprototypes.cborresponse): 302
306 # TODO consider calling oncommandresponsereadygen(). 303 action, meta = reactor.oncommandresponsereadyobjects(
307 encoded = b''.join(cborutil.streamencode(rsp.value)) 304 outstream, command['requestid'], objs)
308 action, meta = reactor.oncommandresponseready(outstream, 305
309 command['requestid'], 306 except Exception as e:
310 encoded)
311 elif isinstance(rsp, wireprototypes.v2streamingresponse):
312 action, meta = reactor.oncommandresponsereadygen(outstream,
313 command['requestid'],
314 rsp.gen)
315 elif isinstance(rsp, wireprototypes.v2errorresponse):
316 action, meta = reactor.oncommanderror(outstream,
317 command['requestid'],
318 rsp.message,
319 rsp.args)
320 else:
321 action, meta = reactor.onservererror( 307 action, meta = reactor.onservererror(
322 _('unhandled response type from wire proto command')) 308 outstream, command['requestid'],
309 _('exception when invoking command: %s') % e)
323 310
324 if action == 'sendframes': 311 if action == 'sendframes':
325 res.setbodygen(meta['framegen']) 312 res.setbodygen(meta['framegen'])
326 return True 313 return True
327 elif action == 'noop': 314 elif action == 'noop':
428 ``permission`` defines the permission type needed to run this command. 415 ``permission`` defines the permission type needed to run this command.
429 Can be ``push`` or ``pull``. These roughly map to read-write and read-only, 416 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
430 respectively. Default is to assume command requires ``push`` permissions 417 respectively. Default is to assume command requires ``push`` permissions
431 because otherwise commands not declaring their permissions could modify 418 because otherwise commands not declaring their permissions could modify
432 a repository that is supposed to be read-only. 419 a repository that is supposed to be read-only.
420
421 Wire protocol commands are generators of objects to be serialized and
422 sent to the client.
423
424 If a command raises an uncaught exception, this will be translated into
425 a command error.
433 """ 426 """
434 transports = {k for k, v in wireprototypes.TRANSPORTS.items() 427 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
435 if v['version'] == 2} 428 if v['version'] == 2}
436 429
437 if permission not in ('push', 'pull'): 430 if permission not in ('push', 'pull'):
458 451
459 return register 452 return register
460 453
461 @wireprotocommand('branchmap', permission='pull') 454 @wireprotocommand('branchmap', permission='pull')
462 def branchmapv2(repo, proto): 455 def branchmapv2(repo, proto):
463 branchmap = {encoding.fromlocal(k): v 456 yield {encoding.fromlocal(k): v
464 for k, v in repo.branchmap().iteritems()} 457 for k, v in repo.branchmap().iteritems()}
465
466 return wireprototypes.cborresponse(branchmap)
467 458
468 @wireprotocommand('capabilities', permission='pull') 459 @wireprotocommand('capabilities', permission='pull')
469 def capabilitiesv2(repo, proto): 460 def capabilitiesv2(repo, proto):
470 caps = _capabilitiesv2(repo, proto) 461 yield _capabilitiesv2(repo, proto)
471
472 return wireprototypes.cborresponse(caps)
473 462
474 @wireprotocommand('heads', 463 @wireprotocommand('heads',
475 args={ 464 args={
476 'publiconly': False, 465 'publiconly': False,
477 }, 466 },
478 permission='pull') 467 permission='pull')
479 def headsv2(repo, proto, publiconly=False): 468 def headsv2(repo, proto, publiconly=False):
480 if publiconly: 469 if publiconly:
481 repo = repo.filtered('immutable') 470 repo = repo.filtered('immutable')
482 471
483 return wireprototypes.cborresponse(repo.heads()) 472 yield repo.heads()
484 473
485 @wireprotocommand('known', 474 @wireprotocommand('known',
486 args={ 475 args={
487 'nodes': [b'deadbeef'], 476 'nodes': [b'deadbeef'],
488 }, 477 },
489 permission='pull') 478 permission='pull')
490 def knownv2(repo, proto, nodes=None): 479 def knownv2(repo, proto, nodes=None):
491 nodes = nodes or [] 480 nodes = nodes or []
492 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes)) 481 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
493 return wireprototypes.cborresponse(result) 482 yield result
494 483
495 @wireprotocommand('listkeys', 484 @wireprotocommand('listkeys',
496 args={ 485 args={
497 'namespace': b'ns', 486 'namespace': b'ns',
498 }, 487 },
500 def listkeysv2(repo, proto, namespace=None): 489 def listkeysv2(repo, proto, namespace=None):
501 keys = repo.listkeys(encoding.tolocal(namespace)) 490 keys = repo.listkeys(encoding.tolocal(namespace))
502 keys = {encoding.fromlocal(k): encoding.fromlocal(v) 491 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
503 for k, v in keys.iteritems()} 492 for k, v in keys.iteritems()}
504 493
505 return wireprototypes.cborresponse(keys) 494 yield keys
506 495
507 @wireprotocommand('lookup', 496 @wireprotocommand('lookup',
508 args={ 497 args={
509 'key': b'foo', 498 'key': b'foo',
510 }, 499 },
513 key = encoding.tolocal(key) 502 key = encoding.tolocal(key)
514 503
515 # TODO handle exception. 504 # TODO handle exception.
516 node = repo.lookup(key) 505 node = repo.lookup(key)
517 506
518 return wireprototypes.cborresponse(node) 507 yield node
519 508
520 @wireprotocommand('pushkey', 509 @wireprotocommand('pushkey',
521 args={ 510 args={
522 'namespace': b'ns', 511 'namespace': b'ns',
523 'key': b'key', 512 'key': b'key',
525 'new': b'new', 514 'new': b'new',
526 }, 515 },
527 permission='push') 516 permission='push')
528 def pushkeyv2(repo, proto, namespace, key, old, new): 517 def pushkeyv2(repo, proto, namespace, key, old, new):
529 # TODO handle ui output redirection 518 # TODO handle ui output redirection
530 r = repo.pushkey(encoding.tolocal(namespace), 519 yield repo.pushkey(encoding.tolocal(namespace),
531 encoding.tolocal(key), 520 encoding.tolocal(key),
532 encoding.tolocal(old), 521 encoding.tolocal(old),
533 encoding.tolocal(new)) 522 encoding.tolocal(new))
534
535 return wireprototypes.cborresponse(r)