Mercurial > public > mercurial-scm > hg
comparison mercurial/wireproto.py @ 30466:2add671bf55b
wireproto: perform chunking and compression at protocol layer (API)
Currently, the "streamres" response type is populated with a generator
of chunks with compression possibly already applied. This puts the onus
on commands to perform chunking and compression. Architecturally, I
think this is the wrong place to perform this work. I think commands
should say "here is the data" and the protocol layer should take care
of encoding the final bytes to put on the wire.
Additionally, upcoming commits will improve wire protocol support for
compression. Having a central place for performing compression in the
protocol transport layer will be easier than having to deal with
compression at the commands layer.
This commit refactors the "streamres" response type to accept either
a generator or an object with "read." Additionally, the type now
accepts a flag indicating whether the response is a "version 1
compressible" response. This basically identifies all commands
currently performing compression. I could have used a special type
for this, but a flag works just as well. The argument name
foreshadows the introduction of wire protocol changes, hence the "v1."
The code for chunking and compressing has been moved to the output
generation function for each protocol transport. Some code has been
inlined, resulting in the deletion of now unused methods.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sun, 20 Nov 2016 13:50:45 -0800 |
parents | d105195436c0 |
children | 39d13b8c101d |
comparison
equal
deleted
inserted
replaced
30465:40a1871eea5e | 30466:2add671bf55b |
---|---|
75 # | 75 # |
76 #def restore(self): | 76 #def restore(self): |
77 # """reinstall previous stdout and stderr and return intercepted stdout | 77 # """reinstall previous stdout and stderr and return intercepted stdout |
78 # """ | 78 # """ |
79 # raise NotImplementedError() | 79 # raise NotImplementedError() |
80 | |
81 def groupchunks(self, fh): | |
82 """Generator of chunks to send to the client. | |
83 | |
84 Some protocols may have compressed the contents. | |
85 """ | |
86 raise NotImplementedError() | |
87 | |
88 def compresschunks(self, chunks): | |
89 """Generator of possible compressed chunks to send to the client. | |
90 | |
91 This is like ``groupchunks()`` except it accepts a generator as | |
92 its argument. | |
93 """ | |
94 raise NotImplementedError() | |
95 | 80 |
96 class remotebatch(peer.batcher): | 81 class remotebatch(peer.batcher): |
97 '''batches the queued calls; uses as few roundtrips as possible''' | 82 '''batches the queued calls; uses as few roundtrips as possible''' |
98 def __init__(self, remote): | 83 def __init__(self, remote): |
99 '''remote must support _submitbatch(encbatch) and | 84 '''remote must support _submitbatch(encbatch) and |
527 # wire protocol command can either return a string or one of these classes. | 512 # wire protocol command can either return a string or one of these classes. |
528 class streamres(object): | 513 class streamres(object): |
529 """wireproto reply: binary stream | 514 """wireproto reply: binary stream |
530 | 515 |
531 The call was successful and the result is a stream. | 516 The call was successful and the result is a stream. |
532 Iterate on the `self.gen` attribute to retrieve chunks. | 517 |
518 Accepts either a generator or an object with a ``read(size)`` method. | |
519 | |
520 ``v1compressible`` indicates whether this data can be compressed to | |
521 "version 1" clients (technically: HTTP peers using | |
522 application/mercurial-0.1 media type). This flag should NOT be used on | |
523 new commands because new clients should support a more modern compression | |
524 mechanism. | |
533 """ | 525 """ |
534 def __init__(self, gen): | 526 def __init__(self, gen=None, reader=None, v1compressible=False): |
535 self.gen = gen | 527 self.gen = gen |
528 self.reader = reader | |
529 self.v1compressible = v1compressible | |
536 | 530 |
537 class pushres(object): | 531 class pushres(object): |
538 """wireproto reply: success with simple integer return | 532 """wireproto reply: success with simple integer return |
539 | 533 |
540 The call was successful and returned an integer contained in `self.res`. | 534 The call was successful and returned an integer contained in `self.res`. |
737 | 731 |
738 @wireprotocommand('changegroup', 'roots') | 732 @wireprotocommand('changegroup', 'roots') |
739 def changegroup(repo, proto, roots): | 733 def changegroup(repo, proto, roots): |
740 nodes = decodelist(roots) | 734 nodes = decodelist(roots) |
741 cg = changegroupmod.changegroup(repo, nodes, 'serve') | 735 cg = changegroupmod.changegroup(repo, nodes, 'serve') |
742 return streamres(proto.groupchunks(cg)) | 736 return streamres(reader=cg, v1compressible=True) |
743 | 737 |
744 @wireprotocommand('changegroupsubset', 'bases heads') | 738 @wireprotocommand('changegroupsubset', 'bases heads') |
745 def changegroupsubset(repo, proto, bases, heads): | 739 def changegroupsubset(repo, proto, bases, heads): |
746 bases = decodelist(bases) | 740 bases = decodelist(bases) |
747 heads = decodelist(heads) | 741 heads = decodelist(heads) |
748 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve') | 742 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve') |
749 return streamres(proto.groupchunks(cg)) | 743 return streamres(reader=cg, v1compressible=True) |
750 | 744 |
751 @wireprotocommand('debugwireargs', 'one two *') | 745 @wireprotocommand('debugwireargs', 'one two *') |
752 def debugwireargs(repo, proto, one, two, others): | 746 def debugwireargs(repo, proto, one, two, others): |
753 # only accept optional args from the known set | 747 # only accept optional args from the known set |
754 opts = options('debugwireargs', ['three', 'four'], others) | 748 opts = options('debugwireargs', ['three', 'four'], others) |
779 if not bundle1allowed(repo, 'pull'): | 773 if not bundle1allowed(repo, 'pull'): |
780 if not exchange.bundle2requested(opts.get('bundlecaps')): | 774 if not exchange.bundle2requested(opts.get('bundlecaps')): |
781 return ooberror(bundle2required) | 775 return ooberror(bundle2required) |
782 | 776 |
783 chunks = exchange.getbundlechunks(repo, 'serve', **opts) | 777 chunks = exchange.getbundlechunks(repo, 'serve', **opts) |
784 return streamres(proto.compresschunks(chunks)) | 778 return streamres(gen=chunks, v1compressible=True) |
785 | 779 |
786 @wireprotocommand('heads') | 780 @wireprotocommand('heads') |
787 def heads(repo, proto): | 781 def heads(repo, proto): |
788 h = repo.heads() | 782 h = repo.heads() |
789 return encodelist(h) + "\n" | 783 return encodelist(h) + "\n" |
868 | 862 |
869 try: | 863 try: |
870 # LockError may be raised before the first result is yielded. Don't | 864 # LockError may be raised before the first result is yielded. Don't |
871 # emit output until we're sure we got the lock successfully. | 865 # emit output until we're sure we got the lock successfully. |
872 it = streamclone.generatev1wireproto(repo) | 866 it = streamclone.generatev1wireproto(repo) |
873 return streamres(getstream(it)) | 867 return streamres(gen=getstream(it)) |
874 except error.LockError: | 868 except error.LockError: |
875 return '2\n' | 869 return '2\n' |
876 | 870 |
877 @wireprotocommand('unbundle', 'heads') | 871 @wireprotocommand('unbundle', 'heads') |
878 def unbundle(repo, proto, heads): | 872 def unbundle(repo, proto, heads): |
898 r = exchange.unbundle(repo, gen, their_heads, 'serve', | 892 r = exchange.unbundle(repo, gen, their_heads, 'serve', |
899 proto._client()) | 893 proto._client()) |
900 if util.safehasattr(r, 'addpart'): | 894 if util.safehasattr(r, 'addpart'): |
901 # The return looks streamable, we are in the bundle2 case and | 895 # The return looks streamable, we are in the bundle2 case and |
902 # should return a stream. | 896 # should return a stream. |
903 return streamres(r.getchunks()) | 897 return streamres(gen=r.getchunks()) |
904 return pushres(r) | 898 return pushres(r) |
905 | 899 |
906 finally: | 900 finally: |
907 fp.close() | 901 fp.close() |
908 os.unlink(tempname) | 902 os.unlink(tempname) |
960 advargs.append(('hint', exc.hint)) | 954 advargs.append(('hint', exc.hint)) |
961 bundler.addpart(bundle2.bundlepart('error:abort', | 955 bundler.addpart(bundle2.bundlepart('error:abort', |
962 manargs, advargs)) | 956 manargs, advargs)) |
963 except error.PushRaced as exc: | 957 except error.PushRaced as exc: |
964 bundler.newpart('error:pushraced', [('message', str(exc))]) | 958 bundler.newpart('error:pushraced', [('message', str(exc))]) |
965 return streamres(bundler.getchunks()) | 959 return streamres(gen=bundler.getchunks()) |