Mercurial > public > mercurial-scm > hg
comparison mercurial/wireprotov2server.py @ 40021:c537144fdbef
wireprotov2: support response caching
One of the things I've learned from managing VCS servers over the
years is that they are hard to scale. It is well known that some
companies have very beefy (read: very expensive) servers to power
their VCS needs. It is also known that specialized servers for
various VCS exist in order to facilitate scaling servers. (Mercurial
is in this boat.)
One of the aspects that make a VCS server hard to scale is the
high CPU load incurred by constant client clone/pull operations.
To alleviate the scaling pain associated with data retrieval
operations, I want to integrate caching into the Mercurial wire
protocol server as robustly as possible such that servers can
aggressively cache responses and defer as much server load as
possible.
This commit represents the initial implementation of a general
caching layer in wire protocol version 2.
We define a new interface and behavior for a wire protocol cacher
in repository.py. (This is probably where a reviewer should look
first to understand what is going on.)
The bulk of the added code is in wireprotov2server.py, where we
define how a command can opt in to being cached and integrate
caching into command dispatching.
From a very high-level:
* A command can declare itself as cacheable by providing a callable
that can be used to derive a cache key.
* At dispatch time, if a command is cacheable, we attempt to
construct a cacher and use it for serving the request and/or
caching the request.
* The dispatch layer handles the bulk of the business logic for
caching, making cachers mostly "dumb content stores."
* The mechanism for invalidating cached entries (one of the harder
parts about caching in general) is by varying the cache key when
state changes. As such, cachers don't need to be concerned with
cache invalidation.
Initially, we've hooked up support for caching "manifestdata" and
"filedata" commands. These are the simplest to cache, as they should
be immutable over time. Caching of commands related to changeset
data is a bit harder (because cache validation is impacted by
changes to bookmarks, phases, etc). This will be implemented later.
(Strictly speaking, censoring a file should invalidate caches. I've
added an inline TODO to track this edge case.)
To prove it works, this commit implements a test-only extension
providing in-memory caching backed by an lrucachedict. A new test
showing this extension behaving properly is added. FWIW, the
cacher is ~50 lines of code, demonstrating the relative ease with
which a cache can be added to a server.
While the test cacher is not suitable for production workloads, just
for kicks I performed a clone of just the changeset and manifest data
for the mozilla-unified repository. With a fully warmed cache (of just
the manifest data since changeset data is not cached), server-side
CPU usage dropped from ~73s to ~28s. That's pretty significant and
demonstrates the potential that response caching has on server
scalability!
Differential Revision: https://phab.mercurial-scm.org/D4773
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 26 Sep 2018 17:16:56 -0700 |
parents | 582676acaf6d |
children | 10cf8b116dd8 |
comparison
equal
deleted
inserted
replaced
40020:ed919b90acda | 40021:c537144fdbef |
---|---|
5 # GNU General Public License version 2 or any later version. | 5 # GNU General Public License version 2 or any later version. |
6 | 6 |
7 from __future__ import absolute_import | 7 from __future__ import absolute_import |
8 | 8 |
9 import contextlib | 9 import contextlib |
10 import hashlib | |
10 | 11 |
11 from .i18n import _ | 12 from .i18n import _ |
12 from .node import ( | 13 from .node import ( |
13 hex, | 14 hex, |
14 nullid, | 15 nullid, |
23 util, | 24 util, |
24 wireprotoframing, | 25 wireprotoframing, |
25 wireprototypes, | 26 wireprototypes, |
26 ) | 27 ) |
27 from .utils import ( | 28 from .utils import ( |
29 cborutil, | |
28 interfaceutil, | 30 interfaceutil, |
29 stringutil, | 31 stringutil, |
30 ) | 32 ) |
31 | 33 |
32 FRAMINGTYPE = b'application/mercurial-exp-framing-0005' | 34 FRAMINGTYPE = b'application/mercurial-exp-framing-0005' |
33 | 35 |
34 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2 | 36 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2 |
35 | 37 |
36 COMMANDS = wireprototypes.commanddict() | 38 COMMANDS = wireprototypes.commanddict() |
39 | |
40 # Value inserted into cache key computation function. Change the value to | |
41 # force new cache keys for every command request. This should be done when | |
42 # there is a change to how caching works, etc. | |
43 GLOBAL_CACHE_VERSION = 1 | |
37 | 44 |
38 def handlehttpv2request(rctx, req, res, checkperm, urlparts): | 45 def handlehttpv2request(rctx, req, res, checkperm, urlparts): |
39 from .hgweb import common as hgwebcommon | 46 from .hgweb import common as hgwebcommon |
40 | 47 |
41 # URL space looks like: <permissions>/<command>, where <permission> can | 48 # URL space looks like: <permissions>/<command>, where <permission> can |
331 | 338 |
332 def getdispatchrepo(repo, proto, command): | 339 def getdispatchrepo(repo, proto, command): |
333 return repo.filtered('served') | 340 return repo.filtered('served') |
334 | 341 |
335 def dispatch(repo, proto, command): | 342 def dispatch(repo, proto, command): |
343 """Run a wire protocol command. | |
344 | |
345 Returns an iterable of objects that will be sent to the client. | |
346 """ | |
336 repo = getdispatchrepo(repo, proto, command) | 347 repo = getdispatchrepo(repo, proto, command) |
337 | 348 |
338 func, spec = COMMANDS[command] | 349 entry = COMMANDS[command] |
350 func = entry.func | |
351 spec = entry.args | |
352 | |
339 args = proto.getargs(spec) | 353 args = proto.getargs(spec) |
340 | 354 |
341 return func(repo, proto, **pycompat.strkwargs(args)) | 355 # There is some duplicate boilerplate code here for calling the command and |
356 # emitting objects. It is either that or a lot of indented code that looks | |
357 # like a pyramid (since there are a lot of code paths that result in not | |
358 # using the cacher). | |
359 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args)) | |
360 | |
361 # Request is not cacheable. Don't bother instantiating a cacher. | |
362 if not entry.cachekeyfn: | |
363 for o in callcommand(): | |
364 yield o | |
365 return | |
366 | |
367 cacher = makeresponsecacher(repo, proto, command, args, | |
368 cborutil.streamencode) | |
369 | |
370 # But we have no cacher. Do default handling. | |
371 if not cacher: | |
372 for o in callcommand(): | |
373 yield o | |
374 return | |
375 | |
376 with cacher: | |
377 cachekey = entry.cachekeyfn(repo, proto, cacher, **args) | |
378 | |
379 # No cache key or the cacher doesn't like it. Do default handling. | |
380 if cachekey is None or not cacher.setcachekey(cachekey): | |
381 for o in callcommand(): | |
382 yield o | |
383 return | |
384 | |
385 # Serve it from the cache, if possible. | |
386 cached = cacher.lookup() | |
387 | |
388 if cached: | |
389 for o in cached['objs']: | |
390 yield o | |
391 return | |
392 | |
393 # Else call the command and feed its output into the cacher, allowing | |
394 # the cacher to buffer/mutate objects as it desires. | |
395 for o in callcommand(): | |
396 for o in cacher.onobject(o): | |
397 yield o | |
398 | |
399 for o in cacher.onfinished(): | |
400 yield o | |
342 | 401 |
343 @interfaceutil.implementer(wireprototypes.baseprotocolhandler) | 402 @interfaceutil.implementer(wireprototypes.baseprotocolhandler) |
344 class httpv2protocolhandler(object): | 403 class httpv2protocolhandler(object): |
345 def __init__(self, req, ui, args=None): | 404 def __init__(self, req, ui, args=None): |
346 self._req = req | 405 self._req = req |
458 caps['rawrepoformats'] = sorted(repo.requirements & | 517 caps['rawrepoformats'] = sorted(repo.requirements & |
459 repo.supportedformats) | 518 repo.supportedformats) |
460 | 519 |
461 return proto.addcapabilities(repo, caps) | 520 return proto.addcapabilities(repo, caps) |
462 | 521 |
463 def wireprotocommand(name, args=None, permission='push'): | 522 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None): |
464 """Decorator to declare a wire protocol command. | 523 """Decorator to declare a wire protocol command. |
465 | 524 |
466 ``name`` is the name of the wire protocol command being provided. | 525 ``name`` is the name of the wire protocol command being provided. |
467 | 526 |
468 ``args`` is a dict defining arguments accepted by the command. Keys are | 527 ``args`` is a dict defining arguments accepted by the command. Keys are |
487 Can be ``push`` or ``pull``. These roughly map to read-write and read-only, | 546 Can be ``push`` or ``pull``. These roughly map to read-write and read-only, |
488 respectively. Default is to assume command requires ``push`` permissions | 547 respectively. Default is to assume command requires ``push`` permissions |
489 because otherwise commands not declaring their permissions could modify | 548 because otherwise commands not declaring their permissions could modify |
490 a repository that is supposed to be read-only. | 549 a repository that is supposed to be read-only. |
491 | 550 |
551 ``cachekeyfn`` defines an optional callable that can derive the | |
552 cache key for this request. | |
553 | |
492 Wire protocol commands are generators of objects to be serialized and | 554 Wire protocol commands are generators of objects to be serialized and |
493 sent to the client. | 555 sent to the client. |
494 | 556 |
495 If a command raises an uncaught exception, this will be translated into | 557 If a command raises an uncaught exception, this will be translated into |
496 a command error. | 558 a command error. |
559 | |
560 All commands can opt in to being cacheable by defining a function | |
561 (``cachekeyfn``) that is called to derive a cache key. This function | |
562 receives the same arguments as the command itself plus a ``cacher`` | |
563 argument containing the active cacher for the request and returns a bytes | |
564 containing the key in a cache the response to this command may be cached | |
565 under. | |
497 """ | 566 """ |
498 transports = {k for k, v in wireprototypes.TRANSPORTS.items() | 567 transports = {k for k, v in wireprototypes.TRANSPORTS.items() |
499 if v['version'] == 2} | 568 if v['version'] == 2} |
500 | 569 |
501 if permission not in ('push', 'pull'): | 570 if permission not in ('push', 'pull'): |
541 if name in COMMANDS: | 610 if name in COMMANDS: |
542 raise error.ProgrammingError('%s command already registered ' | 611 raise error.ProgrammingError('%s command already registered ' |
543 'for version 2' % name) | 612 'for version 2' % name) |
544 | 613 |
545 COMMANDS[name] = wireprototypes.commandentry( | 614 COMMANDS[name] = wireprototypes.commandentry( |
546 func, args=args, transports=transports, permission=permission) | 615 func, args=args, transports=transports, permission=permission, |
616 cachekeyfn=cachekeyfn) | |
547 | 617 |
548 return func | 618 return func |
549 | 619 |
550 return register | 620 return register |
621 | |
622 def makecommandcachekeyfn(command, localversion=None, allargs=False): | |
623 """Construct a cache key derivation function with common features. | |
624 | |
625 By default, the cache key is a hash of: | |
626 | |
627 * The command name. | |
628 * A global cache version number. | |
629 * A local cache version number (passed via ``localversion``). | |
630 * All the arguments passed to the command. | |
631 * The media type used. | |
632 * Wire protocol version string. | |
633 * The repository path. | |
634 """ | |
635 if not allargs: | |
636 raise error.ProgrammingError('only allargs=True is currently supported') | |
637 | |
638 if localversion is None: | |
639 raise error.ProgrammingError('must set localversion argument value') | |
640 | |
641 def cachekeyfn(repo, proto, cacher, **args): | |
642 spec = COMMANDS[command] | |
643 | |
644 # Commands that mutate the repo can not be cached. | |
645 if spec.permission == 'push': | |
646 return None | |
647 | |
648 # TODO config option to disable caching. | |
649 | |
650 # Our key derivation strategy is to construct a data structure | |
651 # holding everything that could influence cacheability and to hash | |
652 # the CBOR representation of that. Using CBOR seems like it might | |
653 # be overkill. However, simpler hashing mechanisms are prone to | |
654 # duplicate input issues. e.g. if you just concatenate two values, | |
655 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides | |
656 # "padding" between values and prevents these problems. | |
657 | |
658 # Seed the hash with various data. | |
659 state = { | |
660 # To invalidate all cache keys. | |
661 b'globalversion': GLOBAL_CACHE_VERSION, | |
662 # More granular cache key invalidation. | |
663 b'localversion': localversion, | |
664 # Cache keys are segmented by command. | |
665 b'command': pycompat.sysbytes(command), | |
666 # Throw in the media type and API version strings so changes | |
667 # to exchange semantics invalid cache. | |
668 b'mediatype': FRAMINGTYPE, | |
669 b'version': HTTP_WIREPROTO_V2, | |
670 # So same requests for different repos don't share cache keys. | |
671 b'repo': repo.root, | |
672 } | |
673 | |
674 # The arguments passed to us will have already been normalized. | |
675 # Default values will be set, etc. This is important because it | |
676 # means that it doesn't matter if clients send an explicit argument | |
677 # or rely on the default value: it will all normalize to the same | |
678 # set of arguments on the server and therefore the same cache key. | |
679 # | |
680 # Arguments by their very nature must support being encoded to CBOR. | |
681 # And the CBOR encoder is deterministic. So we hash the arguments | |
682 # by feeding the CBOR of their representation into the hasher. | |
683 if allargs: | |
684 state[b'args'] = pycompat.byteskwargs(args) | |
685 | |
686 cacher.adjustcachekeystate(state) | |
687 | |
688 hasher = hashlib.sha1() | |
689 for chunk in cborutil.streamencode(state): | |
690 hasher.update(chunk) | |
691 | |
692 return pycompat.sysbytes(hasher.hexdigest()) | |
693 | |
694 return cachekeyfn | |
695 | |
696 def makeresponsecacher(repo, proto, command, args, objencoderfn): | |
697 """Construct a cacher for a cacheable command. | |
698 | |
699 Returns an ``iwireprotocolcommandcacher`` instance. | |
700 | |
701 Extensions can monkeypatch this function to provide custom caching | |
702 backends. | |
703 """ | |
704 return None | |
551 | 705 |
552 @wireprotocommand('branchmap', permission='pull') | 706 @wireprotocommand('branchmap', permission='pull') |
553 def branchmapv2(repo, proto): | 707 def branchmapv2(repo, proto): |
554 yield {encoding.fromlocal(k): v | 708 yield {encoding.fromlocal(k): v |
555 for k, v in repo.branchmap().iteritems()} | 709 for k, v in repo.branchmap().iteritems()} |
753 'path': { | 907 'path': { |
754 'type': 'bytes', | 908 'type': 'bytes', |
755 'example': b'foo.txt', | 909 'example': b'foo.txt', |
756 } | 910 } |
757 }, | 911 }, |
758 permission='pull') | 912 permission='pull', |
913 # TODO censoring a file revision won't invalidate the cache. | |
914 # Figure out a way to take censoring into account when deriving | |
915 # the cache key. | |
916 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True)) | |
759 def filedata(repo, proto, haveparents, nodes, fields, path): | 917 def filedata(repo, proto, haveparents, nodes, fields, path): |
760 try: | 918 try: |
761 # Extensions may wish to access the protocol handler. | 919 # Extensions may wish to access the protocol handler. |
762 store = getfilestore(repo, proto, path) | 920 store = getfilestore(repo, proto, path) |
763 except FileAccessError as e: | 921 except FileAccessError as e: |
891 'tree': { | 1049 'tree': { |
892 'type': 'bytes', | 1050 'type': 'bytes', |
893 'example': b'', | 1051 'example': b'', |
894 }, | 1052 }, |
895 }, | 1053 }, |
896 permission='pull') | 1054 permission='pull', |
1055 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True)) | |
897 def manifestdata(repo, proto, haveparents, nodes, fields, tree): | 1056 def manifestdata(repo, proto, haveparents, nodes, fields, tree): |
898 store = repo.manifestlog.getstorage(tree) | 1057 store = repo.manifestlog.getstorage(tree) |
899 | 1058 |
900 # Validate the node is known and abort on unknown revisions. | 1059 # Validate the node is known and abort on unknown revisions. |
901 for node in nodes: | 1060 for node in nodes: |