comparison mercurial/wireprotov2server.py @ 39864:7b752bf08435

wireprotov2server: port to emitrevisions() We now have a proper storage API to request data on multiple revisions. We can drop it into wire protocol version 2 with minimal effort. The new API handles pretty much everything we were doing manually to build up the delta request. So we were able to delete a lot of code. As a bonus, wireprotov2 code is no longer accessing some low-level storage APIs. This includes the assumption that a node has an associated numeric revision number! This should make it drastically simpler to implement a server that doesn't have the concept of revision numbers. Differential Revision: https://phab.mercurial-scm.org/D4724
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 24 Sep 2018 09:48:02 -0700
parents 69b4a5b89dc5
children 582676acaf6d
comparison
equal deleted inserted replaced
39863:c73f9f345ec0 39864:7b752bf08435
10 10
11 from .i18n import _ 11 from .i18n import _
12 from .node import ( 12 from .node import (
13 hex, 13 hex,
14 nullid, 14 nullid,
15 nullrev,
16 ) 15 )
17 from . import ( 16 from . import (
18 changegroup,
19 dagop,
20 discovery, 17 discovery,
21 encoding, 18 encoding,
22 error, 19 error,
23 narrowspec, 20 narrowspec,
24 pycompat, 21 pycompat,
461 caps['rawrepoformats'] = sorted(repo.requirements & 458 caps['rawrepoformats'] = sorted(repo.requirements &
462 repo.supportedformats) 459 repo.supportedformats)
463 460
464 return proto.addcapabilities(repo, caps) 461 return proto.addcapabilities(repo, caps)
465 462
466 def builddeltarequests(store, nodes, haveparents):
467 """Build a series of revision delta requests against a backend store.
468
469 Returns a list of revision numbers in the order they should be sent
470 and a list of ``irevisiondeltarequest`` instances to be made against
471 the backend store.
472 """
473 # We sort and send nodes in DAG order because this is optimal for
474 # storage emission.
475 # TODO we may want a better storage API here - one where we can throw
476 # a list of nodes and delta preconditions over a figurative wall and
477 # have the storage backend figure it out for us.
478 revs = dagop.linearize({store.rev(n) for n in nodes}, store.parentrevs)
479
480 requests = []
481 seenrevs = set()
482
483 for rev in revs:
484 node = store.node(rev)
485 parentnodes = store.parents(node)
486 parentrevs = [store.rev(n) for n in parentnodes]
487 deltabaserev = store.deltaparent(rev)
488 deltabasenode = store.node(deltabaserev)
489
490 # The choice of whether to send a fulltext revision or a delta and
491 # what delta to send is governed by a few factors.
492 #
493 # To send a delta, we need to ensure the receiver is capable of
494 # decoding it. And that requires the receiver to have the base
495 # revision the delta is against.
496 #
497 # We can only guarantee the receiver has the base revision if
498 # a) we've already sent the revision as part of this group
499 # b) the receiver has indicated they already have the revision.
500 # And the mechanism for "b" is the client indicating they have
501 # parent revisions. So this means we can only send the delta if
502 # it is sent before or it is against a delta and the receiver says
503 # they have a parent.
504
505 # We can send storage delta if it is against a revision we've sent
506 # in this group.
507 if deltabaserev != nullrev and deltabaserev in seenrevs:
508 basenode = deltabasenode
509
510 # We can send storage delta if it is against a parent revision and
511 # the receiver indicates they have the parents.
512 elif (deltabaserev != nullrev and deltabaserev in parentrevs
513 and haveparents):
514 basenode = deltabasenode
515
516 # Otherwise the storage delta isn't appropriate. Fall back to
517 # using another delta, if possible.
518
519 # Use p1 if we've emitted it or receiver says they have it.
520 elif parentrevs[0] != nullrev and (
521 parentrevs[0] in seenrevs or haveparents):
522 basenode = parentnodes[0]
523
524 # Use p2 if we've emitted it or receiver says they have it.
525 elif parentrevs[1] != nullrev and (
526 parentrevs[1] in seenrevs or haveparents):
527 basenode = parentnodes[1]
528
529 # Nothing appropriate to delta against. Send the full revision.
530 else:
531 basenode = nullid
532
533 requests.append(changegroup.revisiondeltarequest(
534 node=node,
535 p1node=parentnodes[0],
536 p2node=parentnodes[1],
537 # Receiver deals with linknode resolution.
538 linknode=nullid,
539 basenode=basenode,
540 ))
541
542 seenrevs.add(rev)
543
544 return revs, requests
545
546 def wireprotocommand(name, args=None, permission='push'): 463 def wireprotocommand(name, args=None, permission='push'):
547 """Decorator to declare a wire protocol command. 464 """Decorator to declare a wire protocol command.
548 465
549 ``name`` is the name of the wire protocol command being provided. 466 ``name`` is the name of the wire protocol command being provided.
550 467
858 store.rev(node) 775 store.rev(node)
859 except error.LookupError: 776 except error.LookupError:
860 raise error.WireprotoCommandError('unknown file node: %s', 777 raise error.WireprotoCommandError('unknown file node: %s',
861 (hex(node),)) 778 (hex(node),))
862 779
863 revs, requests = builddeltarequests(store, nodes, haveparents) 780 revisions = store.emitrevisions(nodes,
781 revisiondata=b'revision' in fields,
782 assumehaveparentrevisions=haveparents)
864 783
865 yield { 784 yield {
866 b'totalitems': len(revs), 785 b'totalitems': len(nodes),
867 } 786 }
868 787
869 if b'revision' in fields: 788 for revision in revisions:
870 deltas = store.emitrevisiondeltas(requests)
871 else:
872 deltas = None
873
874 for rev in revs:
875 node = store.node(rev)
876
877 if deltas is not None:
878 delta = next(deltas)
879 else:
880 delta = None
881
882 d = { 789 d = {
883 b'node': node, 790 b'node': revision.node,
884 } 791 }
885 792
886 if b'parents' in fields: 793 if b'parents' in fields:
887 d[b'parents'] = store.parents(node) 794 d[b'parents'] = [revision.p1node, revision.p2node]
888 795
889 followingmeta = [] 796 followingmeta = []
890 followingdata = [] 797 followingdata = []
891 798
892 if b'revision' in fields: 799 if b'revision' in fields:
893 assert delta is not None 800 if revision.revision is not None:
894 assert delta.flags == 0 801 followingmeta.append((b'revision', len(revision.revision)))
895 assert d[b'node'] == delta.node 802 followingdata.append(revision.revision)
896
897 if delta.revision is not None:
898 followingmeta.append((b'revision', len(delta.revision)))
899 followingdata.append(delta.revision)
900 else: 803 else:
901 d[b'deltabasenode'] = delta.basenode 804 d[b'deltabasenode'] = revision.basenode
902 followingmeta.append((b'delta', len(delta.delta))) 805 followingmeta.append((b'delta', len(revision.delta)))
903 followingdata.append(delta.delta) 806 followingdata.append(revision.delta)
904 807
905 if followingmeta: 808 if followingmeta:
906 d[b'fieldsfollowing'] = followingmeta 809 d[b'fieldsfollowing'] = followingmeta
907 810
908 yield d 811 yield d
909 812
910 for extra in followingdata: 813 for extra in followingdata:
911 yield extra 814 yield extra
912
913 if deltas is not None:
914 try:
915 next(deltas)
916 raise error.ProgrammingError('should not have more deltas')
917 except GeneratorExit:
918 pass
919 815
920 @wireprotocommand( 816 @wireprotocommand(
921 'heads', 817 'heads',
922 args={ 818 args={
923 'publiconly': { 819 'publiconly': {
1017 store.rev(node) 913 store.rev(node)
1018 except error.LookupError: 914 except error.LookupError:
1019 raise error.WireprotoCommandError( 915 raise error.WireprotoCommandError(
1020 'unknown node: %s', (node,)) 916 'unknown node: %s', (node,))
1021 917
1022 revs, requests = builddeltarequests(store, nodes, haveparents) 918 revisions = store.emitrevisions(nodes,
919 revisiondata=b'revision' in fields,
920 assumehaveparentrevisions=haveparents)
1023 921
1024 yield { 922 yield {
1025 b'totalitems': len(revs), 923 b'totalitems': len(nodes),
1026 } 924 }
1027 925
1028 if b'revision' in fields: 926 for revision in revisions:
1029 deltas = store.emitrevisiondeltas(requests)
1030 else:
1031 deltas = None
1032
1033 for rev in revs:
1034 node = store.node(rev)
1035
1036 if deltas is not None:
1037 delta = next(deltas)
1038 else:
1039 delta = None
1040
1041 d = { 927 d = {
1042 b'node': node, 928 b'node': revision.node,
1043 } 929 }
1044 930
1045 if b'parents' in fields: 931 if b'parents' in fields:
1046 d[b'parents'] = store.parents(node) 932 d[b'parents'] = [revision.p1node, revision.p2node]
1047 933
1048 followingmeta = [] 934 followingmeta = []
1049 followingdata = [] 935 followingdata = []
1050 936
1051 if b'revision' in fields: 937 if b'revision' in fields:
1052 assert delta is not None 938 if revision.revision is not None:
1053 assert delta.flags == 0 939 followingmeta.append((b'revision', len(revision.revision)))
1054 assert d[b'node'] == delta.node 940 followingdata.append(revision.revision)
1055
1056 if delta.revision is not None:
1057 followingmeta.append((b'revision', len(delta.revision)))
1058 followingdata.append(delta.revision)
1059 else: 941 else:
1060 d[b'deltabasenode'] = delta.basenode 942 d[b'deltabasenode'] = revision.basenode
1061 followingmeta.append((b'delta', len(delta.delta))) 943 followingmeta.append((b'delta', len(revision.delta)))
1062 followingdata.append(delta.delta) 944 followingdata.append(revision.delta)
1063 945
1064 if followingmeta: 946 if followingmeta:
1065 d[b'fieldsfollowing'] = followingmeta 947 d[b'fieldsfollowing'] = followingmeta
1066 948
1067 yield d 949 yield d
1068 950
1069 for extra in followingdata: 951 for extra in followingdata:
1070 yield extra 952 yield extra
1071
1072 if deltas is not None:
1073 try:
1074 next(deltas)
1075 raise error.ProgrammingError('should not have more deltas')
1076 except GeneratorExit:
1077 pass
1078 953
1079 @wireprotocommand( 954 @wireprotocommand(
1080 'pushkey', 955 'pushkey',
1081 args={ 956 args={
1082 'namespace': { 957 'namespace': {