Mercurial > public > mercurial-scm > hg-stable
comparison contrib/python-zstandard/tests/test_buffer_util.py @ 31799:e0dc40530c5a
zstd: vendor python-zstandard 0.8.0
Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
Updates relevant to Mercurial include:
* Support for multi-threaded compression (we can use this for
bundle and wire protocol compression).
* APIs for batch compression and decompression operations using
multiple threads and optimal memory allocation mechanism. (Can
be useful for revlog perf improvements.)
* A ``BufferWithSegments`` type that models a single memory buffer
containing N discrete items of known lengths. This type can be
used for very efficient 0-copy data operations.
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 15:24:03 -0700 |
parents | |
children | b1fb341d8a61 |
comparison
equal
deleted
inserted
replaced
31798:2b130e26c3a4 | 31799:e0dc40530c5a |
---|---|
1 import struct | |
2 | |
3 try: | |
4 import unittest2 as unittest | |
5 except ImportError: | |
6 import unittest | |
7 | |
8 import zstd | |
9 | |
10 ss = struct.Struct('=QQ') | |
11 | |
12 | |
13 class TestBufferWithSegments(unittest.TestCase): | |
14 def test_arguments(self): | |
15 with self.assertRaises(TypeError): | |
16 zstd.BufferWithSegments() | |
17 | |
18 with self.assertRaises(TypeError): | |
19 zstd.BufferWithSegments(b'foo') | |
20 | |
21 # Segments data should be a multiple of 16. | |
22 with self.assertRaisesRegexp(ValueError, 'segments array size is not a multiple of 16'): | |
23 zstd.BufferWithSegments(b'foo', b'\x00\x00') | |
24 | |
25 def test_invalid_offset(self): | |
26 with self.assertRaisesRegexp(ValueError, 'offset within segments array references memory'): | |
27 zstd.BufferWithSegments(b'foo', ss.pack(0, 4)) | |
28 | |
29 def test_invalid_getitem(self): | |
30 b = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) | |
31 | |
32 with self.assertRaisesRegexp(IndexError, 'offset must be non-negative'): | |
33 test = b[-10] | |
34 | |
35 with self.assertRaisesRegexp(IndexError, 'offset must be less than 1'): | |
36 test = b[1] | |
37 | |
38 with self.assertRaisesRegexp(IndexError, 'offset must be less than 1'): | |
39 test = b[2] | |
40 | |
41 def test_single(self): | |
42 b = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) | |
43 self.assertEqual(len(b), 1) | |
44 self.assertEqual(b.size, 3) | |
45 self.assertEqual(b.tobytes(), b'foo') | |
46 | |
47 self.assertEqual(len(b[0]), 3) | |
48 self.assertEqual(b[0].offset, 0) | |
49 self.assertEqual(b[0].tobytes(), b'foo') | |
50 | |
51 def test_multiple(self): | |
52 b = zstd.BufferWithSegments(b'foofooxfooxy', b''.join([ss.pack(0, 3), | |
53 ss.pack(3, 4), | |
54 ss.pack(7, 5)])) | |
55 self.assertEqual(len(b), 3) | |
56 self.assertEqual(b.size, 12) | |
57 self.assertEqual(b.tobytes(), b'foofooxfooxy') | |
58 | |
59 self.assertEqual(b[0].tobytes(), b'foo') | |
60 self.assertEqual(b[1].tobytes(), b'foox') | |
61 self.assertEqual(b[2].tobytes(), b'fooxy') | |
62 | |
63 | |
64 class TestBufferWithSegmentsCollection(unittest.TestCase): | |
65 def test_empty_constructor(self): | |
66 with self.assertRaisesRegexp(ValueError, 'must pass at least 1 argument'): | |
67 zstd.BufferWithSegmentsCollection() | |
68 | |
69 def test_argument_validation(self): | |
70 with self.assertRaisesRegexp(TypeError, 'arguments must be BufferWithSegments'): | |
71 zstd.BufferWithSegmentsCollection(None) | |
72 | |
73 with self.assertRaisesRegexp(TypeError, 'arguments must be BufferWithSegments'): | |
74 zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b'foo', ss.pack(0, 3)), | |
75 None) | |
76 | |
77 with self.assertRaisesRegexp(ValueError, 'ZstdBufferWithSegments cannot be empty'): | |
78 zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b'', b'')) | |
79 | |
80 def test_length(self): | |
81 b1 = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) | |
82 b2 = zstd.BufferWithSegments(b'barbaz', b''.join([ss.pack(0, 3), | |
83 ss.pack(3, 3)])) | |
84 | |
85 c = zstd.BufferWithSegmentsCollection(b1) | |
86 self.assertEqual(len(c), 1) | |
87 self.assertEqual(c.size(), 3) | |
88 | |
89 c = zstd.BufferWithSegmentsCollection(b2) | |
90 self.assertEqual(len(c), 2) | |
91 self.assertEqual(c.size(), 6) | |
92 | |
93 c = zstd.BufferWithSegmentsCollection(b1, b2) | |
94 self.assertEqual(len(c), 3) | |
95 self.assertEqual(c.size(), 9) | |
96 | |
97 def test_getitem(self): | |
98 b1 = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) | |
99 b2 = zstd.BufferWithSegments(b'barbaz', b''.join([ss.pack(0, 3), | |
100 ss.pack(3, 3)])) | |
101 | |
102 c = zstd.BufferWithSegmentsCollection(b1, b2) | |
103 | |
104 with self.assertRaisesRegexp(IndexError, 'offset must be less than 3'): | |
105 c[3] | |
106 | |
107 with self.assertRaisesRegexp(IndexError, 'offset must be less than 3'): | |
108 c[4] | |
109 | |
110 self.assertEqual(c[0].tobytes(), b'foo') | |
111 self.assertEqual(c[1].tobytes(), b'bar') | |
112 self.assertEqual(c[2].tobytes(), b'baz') |