Mercurial > public > mercurial-scm > hg
comparison mercurial/hg.py @ 635:85e2209d401c
Protocol switch from using generators to stream-like objects.
This allows the the pull side to precisely control how much data is
read so that another encapsulation layer is not needed.
An http client gets a response with a finite size. Because ssh clients
need to keep the stream open, we must not read more data than is sent
in a response. But due to the streaming nature of the changegroup
scheme, only the piece that's parsing the data knows how far it's
allowed to read.
This means the generator scheme isn't fine-grained enough. Instead we
need file-like objects with a read(x) method. This switches everything
for push/pull over to using file-like objects rather than generators.
author | Matt Mackall <mpm@selenic.com> |
---|---|
date | Wed, 06 Jul 2005 22:20:12 -0800 |
parents | da5378d39269 |
children | ac0ec421e3a5 |
comparison
equal
deleted
inserted
replaced
634:da5378d39269 | 635:85e2209d401c |
---|---|
1023 | 1023 |
1024 cg = self.changegroup(update) | 1024 cg = self.changegroup(update) |
1025 return remote.addchangegroup(cg) | 1025 return remote.addchangegroup(cg) |
1026 | 1026 |
1027 def changegroup(self, basenodes): | 1027 def changegroup(self, basenodes): |
1028 nodes = self.newer(basenodes) | |
1029 | |
1030 # construct the link map | |
1031 linkmap = {} | |
1032 for n in nodes: | |
1033 linkmap[self.changelog.rev(n)] = n | |
1034 | |
1035 # construct a list of all changed files | |
1036 changed = {} | |
1037 for n in nodes: | |
1038 c = self.changelog.read(n) | |
1039 for f in c[3]: | |
1040 changed[f] = 1 | |
1041 changed = changed.keys() | |
1042 changed.sort() | |
1043 | |
1044 # the changegroup is changesets + manifests + all file revs | |
1045 revs = [ self.changelog.rev(n) for n in nodes ] | |
1046 | |
1047 for y in self.changelog.group(linkmap): yield y | |
1048 for y in self.manifest.group(linkmap): yield y | |
1049 for f in changed: | |
1050 yield struct.pack(">l", len(f) + 4) + f | |
1051 g = self.file(f).group(linkmap) | |
1052 for y in g: | |
1053 yield y | |
1054 | |
1055 def addchangegroup(self, generator): | |
1056 | |
1057 class genread: | 1028 class genread: |
1058 def __init__(self, generator): | 1029 def __init__(self, generator): |
1059 self.g = generator | 1030 self.g = generator |
1060 self.buf = "" | 1031 self.buf = "" |
1061 def read(self, l): | 1032 def read(self, l): |
1065 except StopIteration: | 1036 except StopIteration: |
1066 break | 1037 break |
1067 d, self.buf = self.buf[:l], self.buf[l:] | 1038 d, self.buf = self.buf[:l], self.buf[l:] |
1068 return d | 1039 return d |
1069 | 1040 |
1041 def gengroup(): | |
1042 nodes = self.newer(basenodes) | |
1043 | |
1044 # construct the link map | |
1045 linkmap = {} | |
1046 for n in nodes: | |
1047 linkmap[self.changelog.rev(n)] = n | |
1048 | |
1049 # construct a list of all changed files | |
1050 changed = {} | |
1051 for n in nodes: | |
1052 c = self.changelog.read(n) | |
1053 for f in c[3]: | |
1054 changed[f] = 1 | |
1055 changed = changed.keys() | |
1056 changed.sort() | |
1057 | |
1058 # the changegroup is changesets + manifests + all file revs | |
1059 revs = [ self.changelog.rev(n) for n in nodes ] | |
1060 | |
1061 for y in self.changelog.group(linkmap): yield y | |
1062 for y in self.manifest.group(linkmap): yield y | |
1063 for f in changed: | |
1064 yield struct.pack(">l", len(f) + 4) + f | |
1065 g = self.file(f).group(linkmap) | |
1066 for y in g: | |
1067 yield y | |
1068 | |
1069 yield struct.pack(">l", 0) | |
1070 | |
1071 return genread(gengroup()) | |
1072 | |
1073 def addchangegroup(self, source): | |
1074 | |
1070 def getchunk(): | 1075 def getchunk(): |
1071 d = source.read(4) | 1076 d = source.read(4) |
1072 if not d: return "" | 1077 if not d: return "" |
1073 l = struct.unpack(">l", d)[0] | 1078 l = struct.unpack(">l", d)[0] |
1074 if l <= 4: return "" | 1079 if l <= 4: return "" |
1085 return self.changelog.count() | 1090 return self.changelog.count() |
1086 | 1091 |
1087 def revmap(x): | 1092 def revmap(x): |
1088 return self.changelog.rev(x) | 1093 return self.changelog.rev(x) |
1089 | 1094 |
1090 if not generator: return | 1095 if not source: return |
1091 changesets = files = revisions = 0 | 1096 changesets = files = revisions = 0 |
1092 | 1097 |
1093 source = genread(generator) | |
1094 tr = self.transaction() | 1098 tr = self.transaction() |
1095 | 1099 |
1096 # pull off the changeset group | 1100 # pull off the changeset group |
1097 self.ui.status("adding changesets\n") | 1101 self.ui.status("adding changesets\n") |
1098 co = self.changelog.tip() | 1102 co = self.changelog.tip() |
1590 self.ui.warn("unexpected response:\n" + d[:400] + "\n...\n") | 1594 self.ui.warn("unexpected response:\n" + d[:400] + "\n...\n") |
1591 raise | 1595 raise |
1592 | 1596 |
1593 def changegroup(self, nodes): | 1597 def changegroup(self, nodes): |
1594 n = " ".join(map(hex, nodes)) | 1598 n = " ".join(map(hex, nodes)) |
1595 zd = zlib.decompressobj() | |
1596 f = self.do_cmd("changegroup", roots=n) | 1599 f = self.do_cmd("changegroup", roots=n) |
1597 bytes = 0 | 1600 bytes = 0 |
1598 while 1: | 1601 |
1599 d = f.read(4096) | 1602 class zread: |
1600 bytes += len(d) | 1603 def __init__(self, f): |
1601 if not d: | 1604 self.zd = zlib.decompressobj() |
1602 yield zd.flush() | 1605 self.f = f |
1603 break | 1606 self.buf = "" |
1604 yield zd.decompress(d) | 1607 def read(self, l): |
1605 self.ui.note("%d bytes of data transfered\n" % bytes) | 1608 while l > len(self.buf): |
1609 r = f.read(4096) | |
1610 if r: | |
1611 self.buf += self.zd.decompress(r) | |
1612 else: | |
1613 self.buf += self.zd.flush() | |
1614 break | |
1615 d, self.buf = self.buf[:l], self.buf[l:] | |
1616 return d | |
1617 | |
1618 return zread(f) | |
1619 | |
1606 | 1620 |
1607 class sshrepository: | 1621 class sshrepository: |
1608 def __init__(self, ui, path): | 1622 def __init__(self, ui, path): |
1609 self.url = path | 1623 self.url = path |
1610 self.ui = ui | 1624 self.ui = ui |
1678 raise | 1692 raise |
1679 | 1693 |
1680 def changegroup(self, nodes): | 1694 def changegroup(self, nodes): |
1681 n = " ".join(map(hex, nodes)) | 1695 n = " ".join(map(hex, nodes)) |
1682 f = self.do_cmd("changegroup", roots=n) | 1696 f = self.do_cmd("changegroup", roots=n) |
1683 bytes = 0 | 1697 return self.pipei |
1684 while 1: | |
1685 l = struct.unpack(">l", f.read(4))[0] | |
1686 if l == -1: break | |
1687 d = f.read(l) | |
1688 bytes += len(d) | |
1689 yield d | |
1690 self.ui.note("%d bytes of data transfered\n" % bytes) | |
1691 | 1698 |
1692 def repository(ui, path=None, create=0): | 1699 def repository(ui, path=None, create=0): |
1693 if path: | 1700 if path: |
1694 if path.startswith("http://"): | 1701 if path.startswith("http://"): |
1695 return httprepository(ui, path) | 1702 return httprepository(ui, path) |