Hybrid sync & async Python request/response protocol client implementationsPosted: December 13, 2011
Arakoon, our in-house developed key-value store, is one of our flagship projects. Since a server isn’t of much use if no clients can talk to it, we also developed a couple of client libraries, including an OCaml, C, PHP and Python client.
Next to the Python client maintained inside the main Arakoon repository, an alternative client was developed as well (source, source). One of the goals of this alternative client was supporting the Twisted asynchronous networking framework.
In this post, I’ll present the approach taken to achieve this. It maintains a clear separation between the protocol (the bytes going over the wire) and the transport (the wire itself). Both synchronous as well as asynchronous transports can be implemented, and new request/response commands can be added easily, at a single place in the source code.
Throughout this post, we’ll write a client for this server, which implements a protocol similar to the Arakoon protocol:
import socket import struct import threading HOST = 'localhost' PORT = 8080 COMMAND_STRUCT = struct.Struct('<I') SUCCESS_CODE = struct.pack('<I', 0) ERROR_CODE = struct.pack('<I', 1) ERROR_MESSAGE = 'Invalid request' ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE), len(ERROR_MESSAGE), ERROR_MESSAGE) LEN_STRUCT = struct.Struct('<I') def handle(conn): while True: command_data = '' while len(command_data) < COMMAND_STRUCT.size: data = conn.recv(COMMAND_STRUCT.size - len(command_data)) if not data: return command_data += data command, = COMMAND_STRUCT.unpack(command_data) if command == 1: len_data = '' while len(len_data) < LEN_STRUCT.size: data = conn.recv(LEN_STRUCT.size - len(len_data)) len_data += data len_, = LEN_STRUCT.unpack(len_data) data = '' while len(data) < len_: data += conn.recv(len_ - len(data)) conn.send(SUCCESS_CODE) conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1])) else: conn.send(ERROR_CODE) conn.send(ERROR_MESSAGE_DATA) def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((HOST, PORT)) sock.listen(1) while True: conn, addr = sock.accept() print 'Connect: %r' % (addr, ) threading.Thread(target=lambda: handle(conn)).start() if __name__ == '__main__': main()
Every message sent by a client starts with a 32bit integer, the command identifier. Currently only one command, ‘reverse’ with ID 1, is implemented. This takes a single string as argument. Strings are encoded in two parts: first, a 32bit integer containing the length of the string, followed by the actual string data as characters.
When the server receives a command, it sends a 32bit integer denoting success (0x00) or failure (0x01). The ‘reverse’ command simply returns a the input string, reversed, using the same string encoding as described before. Once this cycle is complete, a new command can be sent by a client.
Now, on to the client side. We’ll need some imports:
import socket import struct import logging import functools import collections from twisted.python import log from twisted.internet import defer, protocol, reactor from twisted.protocols import basic, stateful import utils
The ‘utils’ module contains some helpers, and is contained in the Pyrakoon repository.
We need a way to communicate between the protocol layer and the transport layer. Only the protocol side knows how much data is expected, and only the transport side can provide these bytes, even though it might not have the required amount of data available immediately, and want to yield execution (in case of asynchronous networking). To make development within these constraints easier, coroutines are used throughout the system to encapsulate intermediate state whenever possible, whilst maintaining a simple API.
Here’s how the API works: a single protocol action (e.g. reading the response of a request) is backed by a coroutine, which yields one or more ‘Request’ objects, which encapsulate the number of bytes that should be provided to the coroutine for the protocol to be able to construct a value, or a ‘Result’ object which encapsulate the final value. Whenever the upper layer receives a ‘Request’ object, it should read the requested number of bytes from the transport, then ‘send’ the data into the coroutine, which will yield another ‘Request’, or finally a ‘Response’.
The definitions are very simple:
class Request(object): def __init__(self, count): self.count = count class Result(object): def __init__(self, value): self.value = value
Next, the protocol uses a couple of different types of values: 32bit (unsigned) integers, and strings. The latter uses the first in its internal encoding.
Every type has 3 methods: ‘check’, ‘serialize’ and ‘receive’.
‘check’ performs input validation for values of the given type (type check, boundary check,…). ‘serialize’ is a generator which yields the encoded data for a given value. ‘receive’ is a coroutine which yield ‘Request’ and ‘Result’ objects to receive a value of the type.
For basic types (e.g. integers) packed values using the ‘struct’ module can be used, so the base implementations provides the required functionality for this. Here’s the base type, and implementations for both 32bit unsigned integers and strings:
class Type(object): PACKER = None def check(self, value): raise NotImplementedError def serialize(self, value): if not self.PACKER: raise NotImplementedError yield self.PACKER.pack(value) def receive(self): if not self.PACKER: raise NotImplementedError data = yield Request(self.PACKER.size) result, = self.PACKER.unpack(data) yield Result(result) class UnsignedInt32(Type): PACKER = struct.Struct('<I') MAX_INT = (2 ** 32) - 1 def check(self, value): if not isinstance(value, (int, long)): raise TypeError if value < 0: raise ValueError('Unsigned integer expected') if value > self.MAX_INT: raise ValueError('Integer overflow') UNSIGNED_INT32 = UnsignedInt32() class String(Type): def check(self, value): if not isinstance(value, str): raise TypeError def serialize(self, value): length = len(value) for bytes_ in UNSIGNED_INT32.serialize(length): yield bytes_ yield struct.pack('<%ds' % length, value) def receive(self): length_receiver = UNSIGNED_INT32.receive() request = length_receiver.next() while isinstance(request, Request): value = yield request request = length_receiver.send(value) if not isinstance(request, Result): raise TypeError length = request.value if length == 0: result = '' else: data = yield Request(length) result, = struct.unpack('<%ds' % length, data) yield Result(result) STRING = String()
Now the basic types are defined, we can describe the request/response messages transferred between client and server. Every message has a tag (its identifier), zero or more arguments, and a return type. Messages can be serialized and received similar to the corresponding methods on ‘Type’. Most, if not all necessary plumbing can be hidden inside the ‘Message’ class, so command-specific classes can be very short and simple. This makes it easy to add new protocol commands to the client as well!
Here’s the ‘Message’ definition, as well as the implementation of our ‘Reverse’ command. Note how simple the definition of the latter is.
class Message(object): TAG = None ARGS = None RETURN_TYPE = None def serialize(self): for bytes_ in UNSIGNED_INT32.serialize(self.TAG): yield bytes_ for arg in self.ARGS: name, type_ = arg for bytes_ in type_.serialize(getattr(self, name)): yield bytes_ def receive(self): code_receiver = UNSIGNED_INT32.receive() request = code_receiver.next() while isinstance(request, Request): value = yield request request = code_receiver.send(value) if not isinstance(request, Result): yield TypeError code = request.value if code == 0x00: result_receiver = self.RETURN_TYPE.receive() else: result_receiver = STRING.receive() request = result_receiver.next() while isinstance(request, Request): value = yield request request = result_receiver.send(value) if not isinstance(request, Result): raise TypeError result = request.value if code == 0x00: yield Result(result) else: raise Exception('Error %d: %s' % (code, result)) class Reverse(Message): TAG = 0x01 ARGS = ('text', STRING), RETURN_TYPE = STRING def __init__(self, text): super(Reverse, self).__init__() self.text = text
Next up, we’ll write the base class for all actual client implementations. Some dynamic method construction is used on the go, based on the following 2 utility functions:
def validate_types(specs, args): for spec, arg in zip(specs, args): name, type_ = spec[:2] try: type_.check(arg) except TypeError: raise TypeError('Invalid type of argument "%s"' % name) except ValueError: raise ValueError('Invalid value of argument "%s"' % name) def call(message_type): def wrapper(fun): argspec = ['self'] for arg in message_type.ARGS: argspec.append(arg) @utils.update_argspec(*argspec) @functools.wraps(fun) def wrapped(**kwargs): self = kwargs['self'] if not self.connected: raise RuntimeError('Not connected') args = tuple(kwargs[arg] for arg in message_type.ARGS) validate_types(message_type.ARGS, args) message = message_type(*args) return self._process(message) return wrapped return wrapper
The ‘Client’ base class becomes extremely simple. Whenever a new command is added to the protocol, adding it to this class (as done for the ‘reverse’ call) is obvious.
class Client(object): connected = False @call(Reverse) def reverse(self): assert False def _process(self, message): raise NotImplementedError
That’s about all there is. What’s left is transport-specific client implementations.
Starting with a synchronous socket client is the easiest. All we need is a ‘connect’ method to set up a socket, and implement ‘Client._process’ to handle interaction between the protocol and the transport. The implementation is pretty straight-forward:
class SyncClient(Client): def __init__(self): self._socket = None def connect(self, addr, port): self._socket = socket.create_connection((addr, port)) @property def connected(self): return self._socket is not None def _process(self, message): try: for part in message.serialize(): self._socket.sendall(part) receiver = message.receive() request = receiver.next() while isinstance(request, Request): data = '' while len(data) < request.count: d = self._socket.recv(request.count - len(data)) if not d: raise Exception data += d request = receiver.send(data) if not isinstance(request, Result): raise TypeError utils.kill_coroutine(receiver, logging.exception) return request.value except Exception: try: self._socket.close() finally: self._socket = None raise
The Twisted protocol is somewhat more complex, and won’t be covered in detail in this post. If you ever wrote a Twisted protocol yourself, it should be easy to follow though. The implementation piggy-backs on ‘twisted.protocol.stateful.StatefulProtocol’, which simplifies a lot.
class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin): _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size def __init__(self): Client.__init__(self) self._handlers = collections.deque() self._currentHandler = None self._connected = False self._deferredLock = defer.DeferredLock() def _process(self, message): deferred = defer.Deferred() self._handlers.append((message.receive(), deferred)) def process(_): try: for data in message.serialize(): self.transport.write(data) finally: self._deferredLock.release() self._deferredLock.acquire().addCallback(process) return deferred def getInitialState(self): self._currentHandler = None return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE def _responseCodeReceived(self, data): self._currentHandler = None try: self._currentHandler = handler = self._handlers.pop() except IndexError: log.msg('Request data received but no handler registered') self.transport.loseConnection() return None request = handler.next() if isinstance(request, Result): return self._handleResult(request) elif isinstance(request, Request): if request.count != self._INITIAL_REQUEST_SIZE: handler.errback(ValueError('Unexpected request count')) self.transport.loseConnection() return None return self._handleRequest(data) else: log.err(TypeError, 'Received unknown type from message parsing coroutine') handler.errback(TypeError) self.transport.loseConnection() return None def _handleRequest(self, data): if not self._currentHandler: log.msg('Request data received but no handler registered') self.transport.loseConnection() return None receiver, deferred = self._currentHandler try: request = receiver.send(data) except Exception, exc: #pylint: disable-msg=W0703 log.err(exc, 'Exception raised by message receive loop') deferred.errback(exc) return self.getInitialState() if isinstance(request, Result): return self._handleResult(request) elif isinstance(request, Request): return self._handleRequest, request.count else: log.err(TypeError, 'Received unknown type from message parsing coroutine') deferred.errback(TypeError) self.transport.loseConnection() return None def _handleResult(self, result): receiver, deferred = self._currentHandler self._currentHandler = None # To be on the safe side... utils.kill_coroutine(receiver, lambda msg: log.err(None, msg)) deferred.callback(result.value) return self.getInitialState() def connectionLost(self, reason=protocol.connectionDone): self._connected = False self._cancelHandlers(reason) return stateful.StatefulProtocol.connectionLost(self, reason) def _cancelHandlers(self, reason): while self._handlers: receiver, deferred = self._handlers.popleft() utils.kill_coroutine(receiver, lambda msg: log.err(None, msg)) deferred.errback(reason)
That’s it! Finally, we can test our clients against the server:
HOST = 'localhost' PORT = 8080 def test_sync(): client = SyncClient() client.connect(HOST, PORT) r = client.reverse('sync') print 'sync =>', r print r, '=>', client.reverse(r) def test_twisted(): def create_client(host, port): client = protocol.ClientCreator(reactor, TwistedProtocol) return client.connectTCP(host, port) @defer.inlineCallbacks def run(proto): result = yield proto.reverse('twisted') print 'twisted =>', result result2 = yield proto.reverse(result) print result2, '=>', result proto.transport.loseConnection() deferred = create_client(HOST, PORT) deferred.addCallback(run) deferred.addBoth(lambda _: reactor.stop()) reactor.run() if __name__ == '__main__': test_sync() test_twisted()
If for example an ‘add’ method is added to the server, which returns the sum of two given 32bit unsigned integers, we could define a new command like this:
class Add(Message): TAG = 0x02 ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32), RETURN_TYPE = UNSIGNED_INT32 def __init__(self, a, b): super(Add, self).__init__() self.a = a self.b = b
Next, add it to the ‘Client’ class like this:
@call(Add) def add(self): assert False
Once this is done, the ‘add(self, a, b)’ method will be available on all clients and work as expected!
This is just a basic example. The Arakoon protocol contains more complex types as well, including ‘option’ types and lists. See the Pyrakoon source-code to see how this is handled. Only a type definition should be added, multiple commands can use them as-is easily.
Using the approach described in this post, it becomes easy to provide client implementations using several different backends (blocking, non-blocking, sockets or anything else as transport,…), and simplify adding new commands/calls to all clients at once (keeping them in sync). This simplifies client maintenance a lot.