278 |
278 |
279 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, |
279 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, |
280 payload) |
280 payload) |
281 |
281 |
282 def createcommandframes(stream, requestid, cmd, args, datafh=None, |
282 def createcommandframes(stream, requestid, cmd, args, datafh=None, |
283 maxframesize=DEFAULT_MAX_FRAME_SIZE): |
283 maxframesize=DEFAULT_MAX_FRAME_SIZE, |
|
284 redirect=None): |
284 """Create frames necessary to transmit a request to run a command. |
285 """Create frames necessary to transmit a request to run a command. |
285 |
286 |
286 This is a generator of bytearrays. Each item represents a frame |
287 This is a generator of bytearrays. Each item represents a frame |
287 ready to be sent over the wire to a peer. |
288 ready to be sent over the wire to a peer. |
288 """ |
289 """ |
289 data = {b'name': cmd} |
290 data = {b'name': cmd} |
290 if args: |
291 if args: |
291 data[b'args'] = args |
292 data[b'args'] = args |
|
293 |
|
294 if redirect: |
|
295 data[b'redirect'] = redirect |
292 |
296 |
293 data = b''.join(cborutil.streamencode(data)) |
297 data = b''.join(cborutil.streamencode(data)) |
294 |
298 |
295 offset = 0 |
299 offset = 0 |
296 |
300 |
1133 return self._makeerrorresult(_('server already errored')) |
1137 return self._makeerrorresult(_('server already errored')) |
1134 |
1138 |
1135 class commandrequest(object): |
1139 class commandrequest(object): |
1136 """Represents a request to run a command.""" |
1140 """Represents a request to run a command.""" |
1137 |
1141 |
1138 def __init__(self, requestid, name, args, datafh=None): |
1142 def __init__(self, requestid, name, args, datafh=None, redirect=None): |
1139 self.requestid = requestid |
1143 self.requestid = requestid |
1140 self.name = name |
1144 self.name = name |
1141 self.args = args |
1145 self.args = args |
1142 self.datafh = datafh |
1146 self.datafh = datafh |
|
1147 self.redirect = redirect |
1143 self.state = 'pending' |
1148 self.state = 'pending' |
1144 |
1149 |
1145 class clientreactor(object): |
1150 class clientreactor(object): |
1146 """Holds state of a client issuing frame-based protocol requests. |
1151 """Holds state of a client issuing frame-based protocol requests. |
1147 |
1152 |
1176 self._outgoingstream = stream(1) |
1181 self._outgoingstream = stream(1) |
1177 self._pendingrequests = collections.deque() |
1182 self._pendingrequests = collections.deque() |
1178 self._activerequests = {} |
1183 self._activerequests = {} |
1179 self._incomingstreams = {} |
1184 self._incomingstreams = {} |
1180 |
1185 |
1181 def callcommand(self, name, args, datafh=None): |
1186 def callcommand(self, name, args, datafh=None, redirect=None): |
1182 """Request that a command be executed. |
1187 """Request that a command be executed. |
1183 |
1188 |
1184 Receives the command name, a dict of arguments to pass to the command, |
1189 Receives the command name, a dict of arguments to pass to the command, |
1185 and an optional file object containing the raw data for the command. |
1190 and an optional file object containing the raw data for the command. |
1186 |
1191 |
1190 raise error.ProgrammingError('cannot issue new commands') |
1195 raise error.ProgrammingError('cannot issue new commands') |
1191 |
1196 |
1192 requestid = self._nextrequestid |
1197 requestid = self._nextrequestid |
1193 self._nextrequestid += 2 |
1198 self._nextrequestid += 2 |
1194 |
1199 |
1195 request = commandrequest(requestid, name, args, datafh=datafh) |
1200 request = commandrequest(requestid, name, args, datafh=datafh, |
|
1201 redirect=redirect) |
1196 |
1202 |
1197 if self._buffersends: |
1203 if self._buffersends: |
1198 self._pendingrequests.append(request) |
1204 self._pendingrequests.append(request) |
1199 return request, 'noop', {} |
1205 return request, 'noop', {} |
1200 else: |
1206 else: |