Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotov2server.py @ 39864:7b752bf08435
wireprotov2server: port to emitrevisions()
We now have a proper storage API to request data on multiple
revisions. We can drop it into wire protocol version 2 with
minimal effort.
The new API handles pretty much everything we were doing manually to
build up the delta request. So we were able to delete a lot of code.
As a bonus, wireprotov2 code is no longer accessing some low-level
storage APIs. This includes the assumption that a node has an
associated numeric revision number! This should make it drastically
simpler to implement a server that doesn't have the concept of
revision numbers.
Differential Revision: https://phab.mercurial-scm.org/D4724
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 24 Sep 2018 09:48:02 -0700 |
parents | 69b4a5b89dc5 |
children | 582676acaf6d |
comparison
equal
deleted
inserted
replaced
39863:c73f9f345ec0 | 39864:7b752bf08435 |
---|---|
10 | 10 |
11 from .i18n import _ | 11 from .i18n import _ |
12 from .node import ( | 12 from .node import ( |
13 hex, | 13 hex, |
14 nullid, | 14 nullid, |
15 nullrev, | |
16 ) | 15 ) |
17 from . import ( | 16 from . import ( |
18 changegroup, | |
19 dagop, | |
20 discovery, | 17 discovery, |
21 encoding, | 18 encoding, |
22 error, | 19 error, |
23 narrowspec, | 20 narrowspec, |
24 pycompat, | 21 pycompat, |
461 caps['rawrepoformats'] = sorted(repo.requirements & | 458 caps['rawrepoformats'] = sorted(repo.requirements & |
462 repo.supportedformats) | 459 repo.supportedformats) |
463 | 460 |
464 return proto.addcapabilities(repo, caps) | 461 return proto.addcapabilities(repo, caps) |
465 | 462 |
466 def builddeltarequests(store, nodes, haveparents): | |
467 """Build a series of revision delta requests against a backend store. | |
468 | |
469 Returns a list of revision numbers in the order they should be sent | |
470 and a list of ``irevisiondeltarequest`` instances to be made against | |
471 the backend store. | |
472 """ | |
473 # We sort and send nodes in DAG order because this is optimal for | |
474 # storage emission. | |
475 # TODO we may want a better storage API here - one where we can throw | |
476 # a list of nodes and delta preconditions over a figurative wall and | |
477 # have the storage backend figure it out for us. | |
478 revs = dagop.linearize({store.rev(n) for n in nodes}, store.parentrevs) | |
479 | |
480 requests = [] | |
481 seenrevs = set() | |
482 | |
483 for rev in revs: | |
484 node = store.node(rev) | |
485 parentnodes = store.parents(node) | |
486 parentrevs = [store.rev(n) for n in parentnodes] | |
487 deltabaserev = store.deltaparent(rev) | |
488 deltabasenode = store.node(deltabaserev) | |
489 | |
490 # The choice of whether to send a fulltext revision or a delta and | |
491 # what delta to send is governed by a few factors. | |
492 # | |
493 # To send a delta, we need to ensure the receiver is capable of | |
494 # decoding it. And that requires the receiver to have the base | |
495 # revision the delta is against. | |
496 # | |
497 # We can only guarantee the receiver has the base revision if | |
498 # a) we've already sent the revision as part of this group | |
499 # b) the receiver has indicated they already have the revision. | |
500 # And the mechanism for "b" is the client indicating they have | |
501 # parent revisions. So this means we can only send the delta if | |
502 # it is sent before or it is against a delta and the receiver says | |
503 # they have a parent. | |
504 | |
505 # We can send storage delta if it is against a revision we've sent | |
506 # in this group. | |
507 if deltabaserev != nullrev and deltabaserev in seenrevs: | |
508 basenode = deltabasenode | |
509 | |
510 # We can send storage delta if it is against a parent revision and | |
511 # the receiver indicates they have the parents. | |
512 elif (deltabaserev != nullrev and deltabaserev in parentrevs | |
513 and haveparents): | |
514 basenode = deltabasenode | |
515 | |
516 # Otherwise the storage delta isn't appropriate. Fall back to | |
517 # using another delta, if possible. | |
518 | |
519 # Use p1 if we've emitted it or receiver says they have it. | |
520 elif parentrevs[0] != nullrev and ( | |
521 parentrevs[0] in seenrevs or haveparents): | |
522 basenode = parentnodes[0] | |
523 | |
524 # Use p2 if we've emitted it or receiver says they have it. | |
525 elif parentrevs[1] != nullrev and ( | |
526 parentrevs[1] in seenrevs or haveparents): | |
527 basenode = parentnodes[1] | |
528 | |
529 # Nothing appropriate to delta against. Send the full revision. | |
530 else: | |
531 basenode = nullid | |
532 | |
533 requests.append(changegroup.revisiondeltarequest( | |
534 node=node, | |
535 p1node=parentnodes[0], | |
536 p2node=parentnodes[1], | |
537 # Receiver deals with linknode resolution. | |
538 linknode=nullid, | |
539 basenode=basenode, | |
540 )) | |
541 | |
542 seenrevs.add(rev) | |
543 | |
544 return revs, requests | |
545 | |
546 def wireprotocommand(name, args=None, permission='push'): | 463 def wireprotocommand(name, args=None, permission='push'): |
547 """Decorator to declare a wire protocol command. | 464 """Decorator to declare a wire protocol command. |
548 | 465 |
549 ``name`` is the name of the wire protocol command being provided. | 466 ``name`` is the name of the wire protocol command being provided. |
550 | 467 |
858 store.rev(node) | 775 store.rev(node) |
859 except error.LookupError: | 776 except error.LookupError: |
860 raise error.WireprotoCommandError('unknown file node: %s', | 777 raise error.WireprotoCommandError('unknown file node: %s', |
861 (hex(node),)) | 778 (hex(node),)) |
862 | 779 |
863 revs, requests = builddeltarequests(store, nodes, haveparents) | 780 revisions = store.emitrevisions(nodes, |
781 revisiondata=b'revision' in fields, | |
782 assumehaveparentrevisions=haveparents) | |
864 | 783 |
865 yield { | 784 yield { |
866 b'totalitems': len(revs), | 785 b'totalitems': len(nodes), |
867 } | 786 } |
868 | 787 |
869 if b'revision' in fields: | 788 for revision in revisions: |
870 deltas = store.emitrevisiondeltas(requests) | |
871 else: | |
872 deltas = None | |
873 | |
874 for rev in revs: | |
875 node = store.node(rev) | |
876 | |
877 if deltas is not None: | |
878 delta = next(deltas) | |
879 else: | |
880 delta = None | |
881 | |
882 d = { | 789 d = { |
883 b'node': node, | 790 b'node': revision.node, |
884 } | 791 } |
885 | 792 |
886 if b'parents' in fields: | 793 if b'parents' in fields: |
887 d[b'parents'] = store.parents(node) | 794 d[b'parents'] = [revision.p1node, revision.p2node] |
888 | 795 |
889 followingmeta = [] | 796 followingmeta = [] |
890 followingdata = [] | 797 followingdata = [] |
891 | 798 |
892 if b'revision' in fields: | 799 if b'revision' in fields: |
893 assert delta is not None | 800 if revision.revision is not None: |
894 assert delta.flags == 0 | 801 followingmeta.append((b'revision', len(revision.revision))) |
895 assert d[b'node'] == delta.node | 802 followingdata.append(revision.revision) |
896 | |
897 if delta.revision is not None: | |
898 followingmeta.append((b'revision', len(delta.revision))) | |
899 followingdata.append(delta.revision) | |
900 else: | 803 else: |
901 d[b'deltabasenode'] = delta.basenode | 804 d[b'deltabasenode'] = revision.basenode |
902 followingmeta.append((b'delta', len(delta.delta))) | 805 followingmeta.append((b'delta', len(revision.delta))) |
903 followingdata.append(delta.delta) | 806 followingdata.append(revision.delta) |
904 | 807 |
905 if followingmeta: | 808 if followingmeta: |
906 d[b'fieldsfollowing'] = followingmeta | 809 d[b'fieldsfollowing'] = followingmeta |
907 | 810 |
908 yield d | 811 yield d |
909 | 812 |
910 for extra in followingdata: | 813 for extra in followingdata: |
911 yield extra | 814 yield extra |
912 | |
913 if deltas is not None: | |
914 try: | |
915 next(deltas) | |
916 raise error.ProgrammingError('should not have more deltas') | |
917 except GeneratorExit: | |
918 pass | |
919 | 815 |
920 @wireprotocommand( | 816 @wireprotocommand( |
921 'heads', | 817 'heads', |
922 args={ | 818 args={ |
923 'publiconly': { | 819 'publiconly': { |
1017 store.rev(node) | 913 store.rev(node) |
1018 except error.LookupError: | 914 except error.LookupError: |
1019 raise error.WireprotoCommandError( | 915 raise error.WireprotoCommandError( |
1020 'unknown node: %s', (node,)) | 916 'unknown node: %s', (node,)) |
1021 | 917 |
1022 revs, requests = builddeltarequests(store, nodes, haveparents) | 918 revisions = store.emitrevisions(nodes, |
919 revisiondata=b'revision' in fields, | |
920 assumehaveparentrevisions=haveparents) | |
1023 | 921 |
1024 yield { | 922 yield { |
1025 b'totalitems': len(revs), | 923 b'totalitems': len(nodes), |
1026 } | 924 } |
1027 | 925 |
1028 if b'revision' in fields: | 926 for revision in revisions: |
1029 deltas = store.emitrevisiondeltas(requests) | |
1030 else: | |
1031 deltas = None | |
1032 | |
1033 for rev in revs: | |
1034 node = store.node(rev) | |
1035 | |
1036 if deltas is not None: | |
1037 delta = next(deltas) | |
1038 else: | |
1039 delta = None | |
1040 | |
1041 d = { | 927 d = { |
1042 b'node': node, | 928 b'node': revision.node, |
1043 } | 929 } |
1044 | 930 |
1045 if b'parents' in fields: | 931 if b'parents' in fields: |
1046 d[b'parents'] = store.parents(node) | 932 d[b'parents'] = [revision.p1node, revision.p2node] |
1047 | 933 |
1048 followingmeta = [] | 934 followingmeta = [] |
1049 followingdata = [] | 935 followingdata = [] |
1050 | 936 |
1051 if b'revision' in fields: | 937 if b'revision' in fields: |
1052 assert delta is not None | 938 if revision.revision is not None: |
1053 assert delta.flags == 0 | 939 followingmeta.append((b'revision', len(revision.revision))) |
1054 assert d[b'node'] == delta.node | 940 followingdata.append(revision.revision) |
1055 | |
1056 if delta.revision is not None: | |
1057 followingmeta.append((b'revision', len(delta.revision))) | |
1058 followingdata.append(delta.revision) | |
1059 else: | 941 else: |
1060 d[b'deltabasenode'] = delta.basenode | 942 d[b'deltabasenode'] = revision.basenode |
1061 followingmeta.append((b'delta', len(delta.delta))) | 943 followingmeta.append((b'delta', len(revision.delta))) |
1062 followingdata.append(delta.delta) | 944 followingdata.append(revision.delta) |
1063 | 945 |
1064 if followingmeta: | 946 if followingmeta: |
1065 d[b'fieldsfollowing'] = followingmeta | 947 d[b'fieldsfollowing'] = followingmeta |
1066 | 948 |
1067 yield d | 949 yield d |
1068 | 950 |
1069 for extra in followingdata: | 951 for extra in followingdata: |
1070 yield extra | 952 yield extra |
1071 | |
1072 if deltas is not None: | |
1073 try: | |
1074 next(deltas) | |
1075 raise error.ProgrammingError('should not have more deltas') | |
1076 except GeneratorExit: | |
1077 pass | |
1078 | 953 |
1079 @wireprotocommand( | 954 @wireprotocommand( |
1080 'pushkey', | 955 'pushkey', |
1081 args={ | 956 args={ |
1082 'namespace': { | 957 'namespace': { |