Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotov2server.py @ 39559:07b58266bce3
wireprotov2: implement commands as a generator of objects
Previously, wire protocol version 2 inherited version 1's model of
having separate types to represent the results of different wire
protocol commands.
As I implemented more powerful commands in future commits, I found
I was using a common pattern of returning a special type to hold a
generator. This meant the command function required a closure to
do most of the work. That made logic flow more difficult to follow.
I also noticed that many commands were effectively a sequence of
objects to be CBOR encoded.
I think it makes sense to define version 2 commands as generators.
This way, commands can simply emit the data structures they wish to
send to the client. This eliminates the need for a closure in
command functions and removes encoding from the bodies of commands.
As part of this commit, the handling of response objects has been
moved into the serverreactor class. This puts the reactor in the
driver's seat with regards to CBOR encoding and error handling.
Having error handling in the function that emits frames is
particularly important because exceptions in that function can lead
to things getting in a bad state: I'm fairly certain that uncaught
exceptions in the frame generator were causing deadlocks.
I also introduced a dedicated error type for explicit error reporting
in command handlers. This will be used in subsequent commits.
There's still a bit of work to be done here, especially around
formalizing the error handling "protocol." I've added yet another
TODO to track this so we don't forget.
Test output changed because we're using generators and no longer know
we are at the end of the data until we hit the end of the generator.
This means we can't emit the end-of-stream flag until we've exhausted
the generator. Hence the introduction of 0-sized end-of-stream frames.
Differential Revision: https://phab.mercurial-scm.org/D4472
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 05 Sep 2018 09:06:40 -0700 |
parents | 660879e49b46 |
children | 9c2c77c73f23 |
comparison
equal
deleted
inserted
replaced
39558:b0e0db1565d1 | 39559:07b58266bce3 |
---|---|
17 util, | 17 util, |
18 wireprotoframing, | 18 wireprotoframing, |
19 wireprototypes, | 19 wireprototypes, |
20 ) | 20 ) |
21 from .utils import ( | 21 from .utils import ( |
22 cborutil, | |
23 interfaceutil, | 22 interfaceutil, |
24 ) | 23 ) |
25 | 24 |
26 FRAMINGTYPE = b'application/mercurial-exp-framing-0005' | 25 FRAMINGTYPE = b'application/mercurial-exp-framing-0005' |
27 | 26 |
293 res.status = b'200 OK' | 292 res.status = b'200 OK' |
294 res.headers[b'Content-Type'] = b'text/plain' | 293 res.headers[b'Content-Type'] = b'text/plain' |
295 res.setbodybytes(_('command in frame must match command in URL')) | 294 res.setbodybytes(_('command in frame must match command in URL')) |
296 return True | 295 return True |
297 | 296 |
298 rsp = dispatch(repo, proto, command['command']) | |
299 | |
300 res.status = b'200 OK' | 297 res.status = b'200 OK' |
301 res.headers[b'Content-Type'] = FRAMINGTYPE | 298 res.headers[b'Content-Type'] = FRAMINGTYPE |
302 | 299 |
303 # TODO consider adding a type to represent an iterable of values to | 300 try: |
304 # be CBOR encoded. | 301 objs = dispatch(repo, proto, command['command']) |
305 if isinstance(rsp, wireprototypes.cborresponse): | 302 |
306 # TODO consider calling oncommandresponsereadygen(). | 303 action, meta = reactor.oncommandresponsereadyobjects( |
307 encoded = b''.join(cborutil.streamencode(rsp.value)) | 304 outstream, command['requestid'], objs) |
308 action, meta = reactor.oncommandresponseready(outstream, | 305 |
309 command['requestid'], | 306 except Exception as e: |
310 encoded) | |
311 elif isinstance(rsp, wireprototypes.v2streamingresponse): | |
312 action, meta = reactor.oncommandresponsereadygen(outstream, | |
313 command['requestid'], | |
314 rsp.gen) | |
315 elif isinstance(rsp, wireprototypes.v2errorresponse): | |
316 action, meta = reactor.oncommanderror(outstream, | |
317 command['requestid'], | |
318 rsp.message, | |
319 rsp.args) | |
320 else: | |
321 action, meta = reactor.onservererror( | 307 action, meta = reactor.onservererror( |
322 _('unhandled response type from wire proto command')) | 308 outstream, command['requestid'], |
309 _('exception when invoking command: %s') % e) | |
323 | 310 |
324 if action == 'sendframes': | 311 if action == 'sendframes': |
325 res.setbodygen(meta['framegen']) | 312 res.setbodygen(meta['framegen']) |
326 return True | 313 return True |
327 elif action == 'noop': | 314 elif action == 'noop': |
428 ``permission`` defines the permission type needed to run this command. | 415 ``permission`` defines the permission type needed to run this command. |
429 Can be ``push`` or ``pull``. These roughly map to read-write and read-only, | 416 Can be ``push`` or ``pull``. These roughly map to read-write and read-only, |
430 respectively. Default is to assume command requires ``push`` permissions | 417 respectively. Default is to assume command requires ``push`` permissions |
431 because otherwise commands not declaring their permissions could modify | 418 because otherwise commands not declaring their permissions could modify |
432 a repository that is supposed to be read-only. | 419 a repository that is supposed to be read-only. |
420 | |
421 Wire protocol commands are generators of objects to be serialized and | |
422 sent to the client. | |
423 | |
424 If a command raises an uncaught exception, this will be translated into | |
425 a command error. | |
433 """ | 426 """ |
434 transports = {k for k, v in wireprototypes.TRANSPORTS.items() | 427 transports = {k for k, v in wireprototypes.TRANSPORTS.items() |
435 if v['version'] == 2} | 428 if v['version'] == 2} |
436 | 429 |
437 if permission not in ('push', 'pull'): | 430 if permission not in ('push', 'pull'): |
458 | 451 |
459 return register | 452 return register |
460 | 453 |
461 @wireprotocommand('branchmap', permission='pull') | 454 @wireprotocommand('branchmap', permission='pull') |
462 def branchmapv2(repo, proto): | 455 def branchmapv2(repo, proto): |
463 branchmap = {encoding.fromlocal(k): v | 456 yield {encoding.fromlocal(k): v |
464 for k, v in repo.branchmap().iteritems()} | 457 for k, v in repo.branchmap().iteritems()} |
465 | |
466 return wireprototypes.cborresponse(branchmap) | |
467 | 458 |
468 @wireprotocommand('capabilities', permission='pull') | 459 @wireprotocommand('capabilities', permission='pull') |
469 def capabilitiesv2(repo, proto): | 460 def capabilitiesv2(repo, proto): |
470 caps = _capabilitiesv2(repo, proto) | 461 yield _capabilitiesv2(repo, proto) |
471 | |
472 return wireprototypes.cborresponse(caps) | |
473 | 462 |
474 @wireprotocommand('heads', | 463 @wireprotocommand('heads', |
475 args={ | 464 args={ |
476 'publiconly': False, | 465 'publiconly': False, |
477 }, | 466 }, |
478 permission='pull') | 467 permission='pull') |
479 def headsv2(repo, proto, publiconly=False): | 468 def headsv2(repo, proto, publiconly=False): |
480 if publiconly: | 469 if publiconly: |
481 repo = repo.filtered('immutable') | 470 repo = repo.filtered('immutable') |
482 | 471 |
483 return wireprototypes.cborresponse(repo.heads()) | 472 yield repo.heads() |
484 | 473 |
485 @wireprotocommand('known', | 474 @wireprotocommand('known', |
486 args={ | 475 args={ |
487 'nodes': [b'deadbeef'], | 476 'nodes': [b'deadbeef'], |
488 }, | 477 }, |
489 permission='pull') | 478 permission='pull') |
490 def knownv2(repo, proto, nodes=None): | 479 def knownv2(repo, proto, nodes=None): |
491 nodes = nodes or [] | 480 nodes = nodes or [] |
492 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes)) | 481 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes)) |
493 return wireprototypes.cborresponse(result) | 482 yield result |
494 | 483 |
495 @wireprotocommand('listkeys', | 484 @wireprotocommand('listkeys', |
496 args={ | 485 args={ |
497 'namespace': b'ns', | 486 'namespace': b'ns', |
498 }, | 487 }, |
500 def listkeysv2(repo, proto, namespace=None): | 489 def listkeysv2(repo, proto, namespace=None): |
501 keys = repo.listkeys(encoding.tolocal(namespace)) | 490 keys = repo.listkeys(encoding.tolocal(namespace)) |
502 keys = {encoding.fromlocal(k): encoding.fromlocal(v) | 491 keys = {encoding.fromlocal(k): encoding.fromlocal(v) |
503 for k, v in keys.iteritems()} | 492 for k, v in keys.iteritems()} |
504 | 493 |
505 return wireprototypes.cborresponse(keys) | 494 yield keys |
506 | 495 |
507 @wireprotocommand('lookup', | 496 @wireprotocommand('lookup', |
508 args={ | 497 args={ |
509 'key': b'foo', | 498 'key': b'foo', |
510 }, | 499 }, |
513 key = encoding.tolocal(key) | 502 key = encoding.tolocal(key) |
514 | 503 |
515 # TODO handle exception. | 504 # TODO handle exception. |
516 node = repo.lookup(key) | 505 node = repo.lookup(key) |
517 | 506 |
518 return wireprototypes.cborresponse(node) | 507 yield node |
519 | 508 |
520 @wireprotocommand('pushkey', | 509 @wireprotocommand('pushkey', |
521 args={ | 510 args={ |
522 'namespace': b'ns', | 511 'namespace': b'ns', |
523 'key': b'key', | 512 'key': b'key', |
525 'new': b'new', | 514 'new': b'new', |
526 }, | 515 }, |
527 permission='push') | 516 permission='push') |
528 def pushkeyv2(repo, proto, namespace, key, old, new): | 517 def pushkeyv2(repo, proto, namespace, key, old, new): |
529 # TODO handle ui output redirection | 518 # TODO handle ui output redirection |
530 r = repo.pushkey(encoding.tolocal(namespace), | 519 yield repo.pushkey(encoding.tolocal(namespace), |
531 encoding.tolocal(key), | 520 encoding.tolocal(key), |
532 encoding.tolocal(old), | 521 encoding.tolocal(old), |
533 encoding.tolocal(new)) | 522 encoding.tolocal(new)) |
534 | |
535 return wireprototypes.cborresponse(r) |