Mercurial > public > mercurial-scm > hg-stable
comparison mercurial/util.py @ 38713:27391d74aaa2
ssh: avoid reading beyond the end of stream when using compression
Compressed streams can be used as part of getbundle. The normal read()
operation of bufferedinputpipe will try to fulfill the request exactly
and can deadlock if the server sends less as it is done. At the same
time, the bundle2 logic will stop reading when it believes it has gotten
all parts of the bundle, which can leave behind end of stream markers as
used by bzip2 and zstd.
To solve this, introduce a new optional unbufferedread interface and
provided it in bufferedinputpipe and doublepipe. If there is buffered
data left, it will be returned, otherwise it will issue a single read
request and return whatever it obtains.
Reorganize the decompression handlers to try harder to read until the
end of stream, especially if the requested read can already be
fulfilled. Check for end of stream is messy with Python 2, none of the
standard compression modules properly exposes it. At least with zstd and
bzip2, decompressing will remember EOS and fail for empty input after
the EOS has been seen. For zlib, the only way to detect it with Python 2
is to duplicate the decompressobj and force some additional data into
it. The common handler can be further optimized, but works as PoC.
Differential Revision: https://phab.mercurial-scm.org/D3937
author | Joerg Sonnenberger <joerg@bec.de> |
---|---|
date | Thu, 12 Jul 2018 18:46:10 +0200 |
parents | 152f4822d210 |
children | 8751d1e2a7ff |
comparison
equal
deleted
inserted
replaced
38712:70a4289896b0 | 38713:27391d74aaa2 |
---|---|
320 def read(self, size): | 320 def read(self, size): |
321 while (not self._eof) and (self._lenbuf < size): | 321 while (not self._eof) and (self._lenbuf < size): |
322 self._fillbuffer() | 322 self._fillbuffer() |
323 return self._frombuffer(size) | 323 return self._frombuffer(size) |
324 | 324 |
325 def unbufferedread(self, size): | |
326 if not self._eof and self._lenbuf == 0: | |
327 self._fillbuffer(max(size, _chunksize)) | |
328 return self._frombuffer(min(self._lenbuf, size)) | |
329 | |
325 def readline(self, *args, **kwargs): | 330 def readline(self, *args, **kwargs): |
326 if 1 < len(self._buffer): | 331 if 1 < len(self._buffer): |
327 # this should not happen because both read and readline end with a | 332 # this should not happen because both read and readline end with a |
328 # _frombuffer call that collapse it. | 333 # _frombuffer call that collapse it. |
329 self._buffer = [''.join(self._buffer)] | 334 self._buffer = [''.join(self._buffer)] |
361 else: | 366 else: |
362 self._buffer = [] | 367 self._buffer = [] |
363 self._lenbuf = 0 | 368 self._lenbuf = 0 |
364 return data | 369 return data |
365 | 370 |
366 def _fillbuffer(self): | 371 def _fillbuffer(self, size=_chunksize): |
367 """read data to the buffer""" | 372 """read data to the buffer""" |
368 data = os.read(self._input.fileno(), _chunksize) | 373 data = os.read(self._input.fileno(), size) |
369 if not data: | 374 if not data: |
370 self._eof = True | 375 self._eof = True |
371 else: | 376 else: |
372 self._lenbuf += len(data) | 377 self._lenbuf += len(data) |
373 self._buffer.append(data) | 378 self._buffer.append(data) |
3300 | 3305 |
3301 The object is reusable but is not thread safe. | 3306 The object is reusable but is not thread safe. |
3302 """ | 3307 """ |
3303 raise NotImplementedError() | 3308 raise NotImplementedError() |
3304 | 3309 |
3310 class _CompressedStreamReader(object): | |
3311 def __init__(self, fh): | |
3312 if safehasattr(fh, 'unbufferedread'): | |
3313 self._reader = fh.unbufferedread | |
3314 else: | |
3315 self._reader = fh.read | |
3316 self._pending = [] | |
3317 self._pos = 0 | |
3318 self._eof = False | |
3319 | |
3320 def _decompress(self, chunk): | |
3321 raise NotImplementedError() | |
3322 | |
3323 def read(self, l): | |
3324 buf = [] | |
3325 while True: | |
3326 while self._pending: | |
3327 if len(self._pending[0]) > l + self._pos: | |
3328 newbuf = self._pending[0] | |
3329 buf.append(newbuf[self._pos:self._pos + l]) | |
3330 self._pos += l | |
3331 return ''.join(buf) | |
3332 | |
3333 newbuf = self._pending.pop(0) | |
3334 if self._pos: | |
3335 buf.append(newbuf[self._pos:]) | |
3336 l -= len(newbuf) - self._pos | |
3337 else: | |
3338 buf.append(newbuf) | |
3339 l -= len(newbuf) | |
3340 self._pos = 0 | |
3341 | |
3342 if self._eof: | |
3343 return ''.join(buf) | |
3344 chunk = self._reader(65536) | |
3345 self._decompress(chunk) | |
3346 | |
3347 class _GzipCompressedStreamReader(_CompressedStreamReader): | |
3348 def __init__(self, fh): | |
3349 super(_GzipCompressedStreamReader, self).__init__(fh) | |
3350 self._decompobj = zlib.decompressobj() | |
3351 def _decompress(self, chunk): | |
3352 newbuf = self._decompobj.decompress(chunk) | |
3353 if newbuf: | |
3354 self._pending.append(newbuf) | |
3355 d = self._decompobj.copy() | |
3356 try: | |
3357 d.decompress('x') | |
3358 d.flush() | |
3359 if d.unused_data == 'x': | |
3360 self._eof = True | |
3361 except zlib.error: | |
3362 pass | |
3363 | |
3364 class _BZ2CompressedStreamReader(_CompressedStreamReader): | |
3365 def __init__(self, fh): | |
3366 super(_BZ2CompressedStreamReader, self).__init__(fh) | |
3367 self._decompobj = bz2.BZ2Decompressor() | |
3368 def _decompress(self, chunk): | |
3369 newbuf = self._decompobj.decompress(chunk) | |
3370 if newbuf: | |
3371 self._pending.append(newbuf) | |
3372 try: | |
3373 while True: | |
3374 newbuf = self._decompobj.decompress('') | |
3375 if newbuf: | |
3376 self._pending.append(newbuf) | |
3377 else: | |
3378 break | |
3379 except EOFError: | |
3380 self._eof = True | |
3381 | |
3382 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): | |
3383 def __init__(self, fh): | |
3384 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) | |
3385 newbuf = self._decompobj.decompress('BZ') | |
3386 if newbuf: | |
3387 self._pending.append(newbuf) | |
3388 | |
3389 class _ZstdCompressedStreamReader(_CompressedStreamReader): | |
3390 def __init__(self, fh, zstd): | |
3391 super(_ZstdCompressedStreamReader, self).__init__(fh) | |
3392 self._zstd = zstd | |
3393 self._decompobj = zstd.ZstdDecompressor().decompressobj() | |
3394 def _decompress(self, chunk): | |
3395 newbuf = self._decompobj.decompress(chunk) | |
3396 if newbuf: | |
3397 self._pending.append(newbuf) | |
3398 try: | |
3399 while True: | |
3400 newbuf = self._decompobj.decompress('') | |
3401 if newbuf: | |
3402 self._pending.append(newbuf) | |
3403 else: | |
3404 break | |
3405 except self._zstd.ZstdError: | |
3406 self._eof = True | |
3407 | |
3305 class _zlibengine(compressionengine): | 3408 class _zlibengine(compressionengine): |
3306 def name(self): | 3409 def name(self): |
3307 return 'zlib' | 3410 return 'zlib' |
3308 | 3411 |
3309 def bundletype(self): | 3412 def bundletype(self): |
3333 yield data | 3436 yield data |
3334 | 3437 |
3335 yield z.flush() | 3438 yield z.flush() |
3336 | 3439 |
3337 def decompressorreader(self, fh): | 3440 def decompressorreader(self, fh): |
3338 def gen(): | 3441 return _GzipCompressedStreamReader(fh) |
3339 d = zlib.decompressobj() | |
3340 for chunk in filechunkiter(fh): | |
3341 while chunk: | |
3342 # Limit output size to limit memory. | |
3343 yield d.decompress(chunk, 2 ** 18) | |
3344 chunk = d.unconsumed_tail | |
3345 | |
3346 return chunkbuffer(gen()) | |
3347 | 3442 |
3348 class zlibrevlogcompressor(object): | 3443 class zlibrevlogcompressor(object): |
3349 def compress(self, data): | 3444 def compress(self, data): |
3350 insize = len(data) | 3445 insize = len(data) |
3351 # Caller handles empty input case. | 3446 # Caller handles empty input case. |
3421 yield data | 3516 yield data |
3422 | 3517 |
3423 yield z.flush() | 3518 yield z.flush() |
3424 | 3519 |
3425 def decompressorreader(self, fh): | 3520 def decompressorreader(self, fh): |
3426 def gen(): | 3521 return _BZ2CompressedStreamReader(fh) |
3427 d = bz2.BZ2Decompressor() | |
3428 for chunk in filechunkiter(fh): | |
3429 yield d.decompress(chunk) | |
3430 | |
3431 return chunkbuffer(gen()) | |
3432 | 3522 |
3433 compengines.register(_bz2engine()) | 3523 compengines.register(_bz2engine()) |
3434 | 3524 |
3435 class _truncatedbz2engine(compressionengine): | 3525 class _truncatedbz2engine(compressionengine): |
3436 def name(self): | 3526 def name(self): |
3440 return None, '_truncatedBZ' | 3530 return None, '_truncatedBZ' |
3441 | 3531 |
3442 # We don't implement compressstream because it is hackily handled elsewhere. | 3532 # We don't implement compressstream because it is hackily handled elsewhere. |
3443 | 3533 |
3444 def decompressorreader(self, fh): | 3534 def decompressorreader(self, fh): |
3445 def gen(): | 3535 return _TruncatedBZ2CompressedStreamReader(fh) |
3446 # The input stream doesn't have the 'BZ' header. So add it back. | |
3447 d = bz2.BZ2Decompressor() | |
3448 d.decompress('BZ') | |
3449 for chunk in filechunkiter(fh): | |
3450 yield d.decompress(chunk) | |
3451 | |
3452 return chunkbuffer(gen()) | |
3453 | 3536 |
3454 compengines.register(_truncatedbz2engine()) | 3537 compengines.register(_truncatedbz2engine()) |
3455 | 3538 |
3456 class _noopengine(compressionengine): | 3539 class _noopengine(compressionengine): |
3457 def name(self): | 3540 def name(self): |
3542 yield data | 3625 yield data |
3543 | 3626 |
3544 yield z.flush() | 3627 yield z.flush() |
3545 | 3628 |
3546 def decompressorreader(self, fh): | 3629 def decompressorreader(self, fh): |
3547 zstd = self._module | 3630 return _ZstdCompressedStreamReader(fh, self._module) |
3548 dctx = zstd.ZstdDecompressor() | |
3549 return chunkbuffer(dctx.read_from(fh)) | |
3550 | 3631 |
3551 class zstdrevlogcompressor(object): | 3632 class zstdrevlogcompressor(object): |
3552 def __init__(self, zstd, level=3): | 3633 def __init__(self, zstd, level=3): |
3553 # TODO consider omitting frame magic to save 4 bytes. | 3634 # TODO consider omitting frame magic to save 4 bytes. |
3554 # This writes content sizes into the frame header. That is | 3635 # This writes content sizes into the frame header. That is |