mercurial/httpclient/tests/test_readers.py
changeset 16643 24dbef11f477
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/httpclient/tests/test_readers.py	Fri May 04 16:00:33 2012 -0500
@@ -0,0 +1,70 @@
+# Copyright 2010, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import unittest
+
+from httpplus import _readers
+
+def chunkedblock(x, eol='\r\n'):
+    r"""Make a chunked transfer-encoding block.
+
+    >>> chunkedblock('hi')
+    '2\r\nhi\r\n'
+    >>> chunkedblock('hi' * 10)
+    '14\r\nhihihihihihihihihihi\r\n'
+    >>> chunkedblock('hi', eol='\n')
+    '2\nhi\n'
+    """
+    return ''.join((hex(len(x))[2:], eol, x, eol))
+
+corpus = 'foo\r\nbar\r\nbaz\r\n'
+
+
+class ChunkedReaderTest(unittest.TestCase):
+    def test_many_block_boundaries(self):
+        for step in xrange(1, len(corpus)):
+            data = ''.join(chunkedblock(corpus[start:start+step]) for
+                           start in xrange(0, len(corpus), step))
+            for istep in xrange(1, len(data)):
+                rdr = _readers.ChunkedReader('\r\n')
+                print 'step', step, 'load', istep
+                for start in xrange(0, len(data), istep):
+                    rdr._load(data[start:start+istep])
+                rdr._load(chunkedblock(''))
+                self.assertEqual(corpus, rdr.read(len(corpus) + 1))
+
+    def test_small_chunk_blocks_large_wire_blocks(self):
+        data = ''.join(map(chunkedblock, corpus)) + chunkedblock('')
+        rdr = _readers.ChunkedReader('\r\n')
+        for start in xrange(0, len(data), 4):
+            d = data[start:start + 4]
+            if d:
+                rdr._load(d)
+        self.assertEqual(corpus, rdr.read(len(corpus)+100))
+# no-check-code