Mercurial > public > mercurial-scm > hg
comparison mercurial/wireproto.py @ 35750:a39a9df7ecca
wireproto: split streamres into legacy and modern case
A couple of commands currently require transmission of uncompressed
frames with the old MIME type. Split this case from streamres into
a new streamres_legacy class. Streamline the remaining code accordingly.
Add a new flag to streamres to request uncompressed streams. This is
useful for sending data that is already compressed like a pre-built
bundle. Expect clients to support uncompressed data. For older clients,
zlib will still be used.
Differential Revision: https://phab.mercurial-scm.org/D1862
author | Joerg Sonnenberger <joerg@bec.de> |
---|---|
date | Fri, 12 Jan 2018 10:59:58 +0100 |
parents | 8cdb671dbd0b |
children | c24dad55ac19 |
comparison
equal
deleted
inserted
replaced
35749:3a3b59bbe7ce | 35750:a39a9df7ecca |
---|---|
520 | 520 |
521 The call was successful and the result is a stream. | 521 The call was successful and the result is a stream. |
522 | 522 |
523 Accepts a generator containing chunks of data to be sent to the client. | 523 Accepts a generator containing chunks of data to be sent to the client. |
524 | 524 |
525 ``v1compressible`` indicates whether this data can be compressed to | 525 ``prefer_uncompressed`` indicates that the data is expected to be |
526 "version 1" clients (technically: HTTP peers using | 526 uncompressable and that the stream should therefore use the ``none`` |
527 application/mercurial-0.1 media type). This flag should NOT be used on | 527 engine. |
528 new commands because new clients should support a more modern compression | |
529 mechanism. | |
530 """ | 528 """ |
531 def __init__(self, gen=None, v1compressible=False): | 529 def __init__(self, gen=None, prefer_uncompressed=False): |
532 self.gen = gen | 530 self.gen = gen |
533 self.v1compressible = v1compressible | 531 self.prefer_uncompressed = prefer_uncompressed |
532 | |
533 class streamres_legacy(object): | |
534 """wireproto reply: uncompressed binary stream | |
535 | |
536 The call was successful and the result is a stream. | |
537 | |
538 Accepts a generator containing chunks of data to be sent to the client. | |
539 | |
540 Like ``streamres``, but sends an uncompressed data for "version 1" clients | |
541 using the application/mercurial-0.1 media type. | |
542 """ | |
543 def __init__(self, gen=None): | |
544 self.gen = gen | |
534 | 545 |
535 class pushres(object): | 546 class pushres(object): |
536 """wireproto reply: success with simple integer return | 547 """wireproto reply: success with simple integer return |
537 | 548 |
538 The call was successful and returned an integer contained in `self.res`. | 549 The call was successful and returned an integer contained in `self.res`. |
800 nodes = decodelist(roots) | 811 nodes = decodelist(roots) |
801 outgoing = discovery.outgoing(repo, missingroots=nodes, | 812 outgoing = discovery.outgoing(repo, missingroots=nodes, |
802 missingheads=repo.heads()) | 813 missingheads=repo.heads()) |
803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') | 814 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') |
804 gen = iter(lambda: cg.read(32768), '') | 815 gen = iter(lambda: cg.read(32768), '') |
805 return streamres(gen=gen, v1compressible=True) | 816 return streamres(gen=gen) |
806 | 817 |
807 @wireprotocommand('changegroupsubset', 'bases heads') | 818 @wireprotocommand('changegroupsubset', 'bases heads') |
808 def changegroupsubset(repo, proto, bases, heads): | 819 def changegroupsubset(repo, proto, bases, heads): |
809 bases = decodelist(bases) | 820 bases = decodelist(bases) |
810 heads = decodelist(heads) | 821 heads = decodelist(heads) |
811 outgoing = discovery.outgoing(repo, missingroots=bases, | 822 outgoing = discovery.outgoing(repo, missingroots=bases, |
812 missingheads=heads) | 823 missingheads=heads) |
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') | 824 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') |
814 gen = iter(lambda: cg.read(32768), '') | 825 gen = iter(lambda: cg.read(32768), '') |
815 return streamres(gen=gen, v1compressible=True) | 826 return streamres(gen=gen) |
816 | 827 |
817 @wireprotocommand('debugwireargs', 'one two *') | 828 @wireprotocommand('debugwireargs', 'one two *') |
818 def debugwireargs(repo, proto, one, two, others): | 829 def debugwireargs(repo, proto, one, two, others): |
819 # only accept optional args from the known set | 830 # only accept optional args from the known set |
820 opts = options('debugwireargs', ['three', 'four'], others) | 831 opts = options('debugwireargs', ['three', 'four'], others) |
875 advargs = [] | 886 advargs = [] |
876 if exc.hint is not None: | 887 if exc.hint is not None: |
877 advargs.append(('hint', exc.hint)) | 888 advargs.append(('hint', exc.hint)) |
878 bundler.addpart(bundle2.bundlepart('error:abort', | 889 bundler.addpart(bundle2.bundlepart('error:abort', |
879 manargs, advargs)) | 890 manargs, advargs)) |
880 return streamres(gen=bundler.getchunks(), v1compressible=True) | 891 return streamres(gen=bundler.getchunks()) |
881 return streamres(gen=chunks, v1compressible=True) | 892 return streamres(gen=chunks) |
882 | 893 |
883 @wireprotocommand('heads') | 894 @wireprotocommand('heads') |
884 def heads(repo, proto): | 895 def heads(repo, proto): |
885 h = repo.heads() | 896 h = repo.heads() |
886 return encodelist(h) + "\n" | 897 return encodelist(h) + "\n" |
953 def stream(repo, proto): | 964 def stream(repo, proto): |
954 '''If the server supports streaming clone, it advertises the "stream" | 965 '''If the server supports streaming clone, it advertises the "stream" |
955 capability with a value representing the version and flags of the repo | 966 capability with a value representing the version and flags of the repo |
956 it is serving. Client checks to see if it understands the format. | 967 it is serving. Client checks to see if it understands the format. |
957 ''' | 968 ''' |
958 return streamres(streamclone.generatev1wireproto(repo)) | 969 return streamres_legacy(streamclone.generatev1wireproto(repo)) |
959 | 970 |
960 @wireprotocommand('unbundle', 'heads') | 971 @wireprotocommand('unbundle', 'heads') |
961 def unbundle(repo, proto, heads): | 972 def unbundle(repo, proto, heads): |
962 their_heads = decodelist(heads) | 973 their_heads = decodelist(heads) |
963 | 974 |
988 r = exchange.unbundle(repo, gen, their_heads, 'serve', | 999 r = exchange.unbundle(repo, gen, their_heads, 'serve', |
989 proto._client()) | 1000 proto._client()) |
990 if util.safehasattr(r, 'addpart'): | 1001 if util.safehasattr(r, 'addpart'): |
991 # The return looks streamable, we are in the bundle2 case and | 1002 # The return looks streamable, we are in the bundle2 case and |
992 # should return a stream. | 1003 # should return a stream. |
993 return streamres(gen=r.getchunks()) | 1004 return streamres_legacy(gen=r.getchunks()) |
994 return pushres(r) | 1005 return pushres(r) |
995 | 1006 |
996 finally: | 1007 finally: |
997 fp.close() | 1008 fp.close() |
998 os.unlink(tempname) | 1009 os.unlink(tempname) |
1052 advargs.append(('hint', exc.hint)) | 1063 advargs.append(('hint', exc.hint)) |
1053 bundler.addpart(bundle2.bundlepart('error:abort', | 1064 bundler.addpart(bundle2.bundlepart('error:abort', |
1054 manargs, advargs)) | 1065 manargs, advargs)) |
1055 except error.PushRaced as exc: | 1066 except error.PushRaced as exc: |
1056 bundler.newpart('error:pushraced', [('message', str(exc))]) | 1067 bundler.newpart('error:pushraced', [('message', str(exc))]) |
1057 return streamres(gen=bundler.getchunks()) | 1068 return streamres_legacy(gen=bundler.getchunks()) |