comparison mercurial/wireproto.py @ 35750:a39a9df7ecca

wireproto: split streamres into legacy and modern case A couple of commands currently require transmission of uncompressed frames with the old MIME type. Split this case from streamres into a new streamres_legacy class. Streamline the remaining code accordingly. Add a new flag to streamres to request uncompressed streams. This is useful for sending data that is already compressed like a pre-built bundle. Expect clients to support uncompressed data. For older clients, zlib will still be used. Differential Revision: https://phab.mercurial-scm.org/D1862
author Joerg Sonnenberger <joerg@bec.de>
date Fri, 12 Jan 2018 10:59:58 +0100
parents 8cdb671dbd0b
children c24dad55ac19
comparison
equal deleted inserted replaced
35749:3a3b59bbe7ce 35750:a39a9df7ecca
520 520
521 The call was successful and the result is a stream. 521 The call was successful and the result is a stream.
522 522
523 Accepts a generator containing chunks of data to be sent to the client. 523 Accepts a generator containing chunks of data to be sent to the client.
524 524
525 ``v1compressible`` indicates whether this data can be compressed to 525 ``prefer_uncompressed`` indicates that the data is expected to be
526 "version 1" clients (technically: HTTP peers using 526 uncompressable and that the stream should therefore use the ``none``
527 application/mercurial-0.1 media type). This flag should NOT be used on 527 engine.
528 new commands because new clients should support a more modern compression
529 mechanism.
530 """ 528 """
531 def __init__(self, gen=None, v1compressible=False): 529 def __init__(self, gen=None, prefer_uncompressed=False):
532 self.gen = gen 530 self.gen = gen
533 self.v1compressible = v1compressible 531 self.prefer_uncompressed = prefer_uncompressed
532
533 class streamres_legacy(object):
534 """wireproto reply: uncompressed binary stream
535
536 The call was successful and the result is a stream.
537
538 Accepts a generator containing chunks of data to be sent to the client.
539
540 Like ``streamres``, but sends an uncompressed data for "version 1" clients
541 using the application/mercurial-0.1 media type.
542 """
543 def __init__(self, gen=None):
544 self.gen = gen
534 545
535 class pushres(object): 546 class pushres(object):
536 """wireproto reply: success with simple integer return 547 """wireproto reply: success with simple integer return
537 548
538 The call was successful and returned an integer contained in `self.res`. 549 The call was successful and returned an integer contained in `self.res`.
800 nodes = decodelist(roots) 811 nodes = decodelist(roots)
801 outgoing = discovery.outgoing(repo, missingroots=nodes, 812 outgoing = discovery.outgoing(repo, missingroots=nodes,
802 missingheads=repo.heads()) 813 missingheads=repo.heads())
803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') 814 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 gen = iter(lambda: cg.read(32768), '') 815 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True) 816 return streamres(gen=gen)
806 817
807 @wireprotocommand('changegroupsubset', 'bases heads') 818 @wireprotocommand('changegroupsubset', 'bases heads')
808 def changegroupsubset(repo, proto, bases, heads): 819 def changegroupsubset(repo, proto, bases, heads):
809 bases = decodelist(bases) 820 bases = decodelist(bases)
810 heads = decodelist(heads) 821 heads = decodelist(heads)
811 outgoing = discovery.outgoing(repo, missingroots=bases, 822 outgoing = discovery.outgoing(repo, missingroots=bases,
812 missingheads=heads) 823 missingheads=heads)
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') 824 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 gen = iter(lambda: cg.read(32768), '') 825 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True) 826 return streamres(gen=gen)
816 827
817 @wireprotocommand('debugwireargs', 'one two *') 828 @wireprotocommand('debugwireargs', 'one two *')
818 def debugwireargs(repo, proto, one, two, others): 829 def debugwireargs(repo, proto, one, two, others):
819 # only accept optional args from the known set 830 # only accept optional args from the known set
820 opts = options('debugwireargs', ['three', 'four'], others) 831 opts = options('debugwireargs', ['three', 'four'], others)
875 advargs = [] 886 advargs = []
876 if exc.hint is not None: 887 if exc.hint is not None:
877 advargs.append(('hint', exc.hint)) 888 advargs.append(('hint', exc.hint))
878 bundler.addpart(bundle2.bundlepart('error:abort', 889 bundler.addpart(bundle2.bundlepart('error:abort',
879 manargs, advargs)) 890 manargs, advargs))
880 return streamres(gen=bundler.getchunks(), v1compressible=True) 891 return streamres(gen=bundler.getchunks())
881 return streamres(gen=chunks, v1compressible=True) 892 return streamres(gen=chunks)
882 893
883 @wireprotocommand('heads') 894 @wireprotocommand('heads')
884 def heads(repo, proto): 895 def heads(repo, proto):
885 h = repo.heads() 896 h = repo.heads()
886 return encodelist(h) + "\n" 897 return encodelist(h) + "\n"
953 def stream(repo, proto): 964 def stream(repo, proto):
954 '''If the server supports streaming clone, it advertises the "stream" 965 '''If the server supports streaming clone, it advertises the "stream"
955 capability with a value representing the version and flags of the repo 966 capability with a value representing the version and flags of the repo
956 it is serving. Client checks to see if it understands the format. 967 it is serving. Client checks to see if it understands the format.
957 ''' 968 '''
958 return streamres(streamclone.generatev1wireproto(repo)) 969 return streamres_legacy(streamclone.generatev1wireproto(repo))
959 970
960 @wireprotocommand('unbundle', 'heads') 971 @wireprotocommand('unbundle', 'heads')
961 def unbundle(repo, proto, heads): 972 def unbundle(repo, proto, heads):
962 their_heads = decodelist(heads) 973 their_heads = decodelist(heads)
963 974
988 r = exchange.unbundle(repo, gen, their_heads, 'serve', 999 r = exchange.unbundle(repo, gen, their_heads, 'serve',
989 proto._client()) 1000 proto._client())
990 if util.safehasattr(r, 'addpart'): 1001 if util.safehasattr(r, 'addpart'):
991 # The return looks streamable, we are in the bundle2 case and 1002 # The return looks streamable, we are in the bundle2 case and
992 # should return a stream. 1003 # should return a stream.
993 return streamres(gen=r.getchunks()) 1004 return streamres_legacy(gen=r.getchunks())
994 return pushres(r) 1005 return pushres(r)
995 1006
996 finally: 1007 finally:
997 fp.close() 1008 fp.close()
998 os.unlink(tempname) 1009 os.unlink(tempname)
1052 advargs.append(('hint', exc.hint)) 1063 advargs.append(('hint', exc.hint))
1053 bundler.addpart(bundle2.bundlepart('error:abort', 1064 bundler.addpart(bundle2.bundlepart('error:abort',
1054 manargs, advargs)) 1065 manargs, advargs))
1055 except error.PushRaced as exc: 1066 except error.PushRaced as exc:
1056 bundler.newpart('error:pushraced', [('message', str(exc))]) 1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1057 return streamres(gen=bundler.getchunks()) 1068 return streamres_legacy(gen=bundler.getchunks())