Cloud Search Using Suffix Arrays ? Well, … maybe.
Posted: February 3, 2012 Filed under: algorithms, Python, Uncategorized | Tags: cloud, python, suffix arrays, text search 2 CommentsSuffix Arrays are arrays that allow you to find exact substring matches. The core idea is that you generate a sorted array of positions using a comparison function that compares the suffixes starting at the respective positions.
Constructing a suffix array
It’s one of the cases where a few lines of code is more clarifying than the explanation itself.
def make_suffix_array(text): size = len(text) def compare_suffix(i,j): return cmp(text[i:],text[j:]) indexes = range(size) indexes.sort(cmp = compare_suffix) return indexes
If you try this on the string “Suffix Arrays Rock?” and print out the sorted array’s indexes along with the suffixes that start there, you start to see both potential, as well as weaknesses.
pos | suffix |
06 | ‘ Arrays Rock?’ |
13 | ‘ Rock?’ |
18 | ‘?’ |
07 | ‘Arrays Rock?’ |
14 | ‘Rock?’ |
00 | ‘Suffix Arrays Rock?’ |
10 | ‘ays Rock?’ |
16 | ‘ck?’ |
02 | ‘ffix Arrays Rock?’ |
03 | ‘fix Arrays Rock?’ |
04 | ‘ix Arrays Rock?’ |
17 | ‘k?’ |
15 | ‘ock?’ |
09 | ‘rays Rock?’ |
08 | ‘rrays Rock?’ |
12 | ‘s Rock?’ |
01 | ‘uffix Arrays Rock?’ |
05 | ‘x Arrays Rock?’ |
11 | ‘ys Rock?’ |
Note: This method of creating suffix arrays is criminally inefficient, but it suffices for explaining the concept. There are algorithms to do this in linear time.
You can use the suffix array to search for a string in the text using binary search. Something like this:
def find(sa, text, q): size = len(sa) qsize = len(q) hi = size -1 lo = 0 while hi >= lo: mid = (hi + lo) / 2 begin = sa[mid] end = min(size, begin + qsize) test = text[begin: end] if test > q: hi = mid -1 elif test < q: lo = mid + 1 else: return begin return None
You have a fast search that allows you to return the exact position of the substring.
Even better: all matches are clustered together in the suffix array. Perfect locality.
Multiple documents
The concept easily extendeds itself to multiple documents.
def make_multi(texts): indexes = [] nt = len(texts) for d in xrange(nt): size = len(texts[d]) for i in xrange(size): e = d,i indexes.append(e) def compare(e0,e1): d0, i0 = e0 d1, i1 = e1 s0 = texts[d0][i0:] s1 = texts[d1][i1:] return cmp(s0,s1) indexes.sort(cmp = compare) return indexes
A minimal example with 3 very small texts, and a bit of code to dump the suffix array, together with the suffixes.
def print_multi(sam, texts): for e in sam: d, i = e suffix = texts[d][i:] print "(%2i,%2i)\t%s" % (d,i,suffix) texts = ["Suffix Arrays Rock?", "Redundant Array of Inexpensive Disks", "Metal is not Rock!"] r0 = make_multi(texts) print_multi(r0, texts)
yields:
( 1, 9) Array of Inexpensive Disks ( 0, 6) Arrays Rock? ( 1,30) Disks ( 1,18) Inexpensive Disks ( 2,12) Rock! ( 0,13) Rock? ( 2, 5) is not Rock! ( 2, 8) not Rock! ( 1,15) of Inexpensive Disks ... ( 1,28) ve Disks ( 0, 5) x Arrays Rock? ( 1,22) xpensive Disks ( 1,14) y of Inexpensive Disks ( 0,11) ys Rock?
Ok, this seems to have almost perfect scalability properties. What’s the catch?
The catch
There are some severe downsides to Suffix Arrays.
- Text needed during search
A major disadvantage to suffix arrays is that you need access to the text while you do a search. It means that the suffix array and the text need to be kept close to each other. - Storage Needs
If you think of it, a text is an array of chars (ascii, not unicode). Its suffix array is an array of positions. Suppose you store a position and document ids as a 32bit words. Remember you need to store the text, as well as the suffix array. If the text size is n, the total needs are 9n. To make matters worse: since you’ll be jumping around in the suffix array, as well as the text, compression schemes are not really feasible. - Updates?
Even the slightest update to the text will invalidate the suffix array, and you need to reconstruct it. Even worse, if you have an array over multiple documents, an update in one of the documents will invalidate the entire structure. - Exact Matches
You get what you asked for. Searching the example text for ‘suffix’ will not yield a result. - Overkill
You can search for an exact match on a string of arbitrary length, but nobody needs to find exact matches on large strings. There is one notable exception: If you’re trying to look up DNA fragments in a large DNA string, this is exactly what you need. For everybody else, it’s overkill.
Getting better all the time
Well, it couldn’t get much worse now could it?
It doesn’t need to be this bad, and you can actually pick how much pain you want to inflict on yourself.
- search starts at start of words
Most of the time, it’s not needed to be able to search for substrings of words. For example, you probably don’t need to find the ‘rays’ in ‘arrays’. It divides the number of positions to sort by the average word size. - remove language noise
- old versions can be blacklisted
If all versions of all your documents have a unique id, you can blacklist older unwanted document ids, and filter them out before you return the results. This means you don’t have to do updates immediately, but you can schedule them when it’s cheapest for you. - text size is not document size
Suppose your documents are pdfs. The actual size of the text is small compared to all the other information in the file (markup, images, fonts, …). Moreover, storing text will not eat your storage. For example, a heavyweight (in every way) novel like War And Peace takes about 3.2 MB. The entire corpus of english wikipedia articles, which is stored as xml is just over 30 GB. peanuts. - multiple suffix arrays
You don’t need add all your documents to one big suffix array. You can have many of them wich improves the situation in more than one way. Search can be parallel, and adding a new document is cheaper as well.
Not all words need to be considered. In fact, the most frequent words are noise.
What does this have to do with ‘The Cloud’ ?
Since there is a network between the client issuing the request and the cloud infrastructure offering the search service, means you have a non-neglectable latency.
This means the latency added by the service just needs to be small compared to the overall latency, which makes our life a bit easier.
Also, the nature of ‘The Cloud’ pushes people to store consolidated instead of live data, which is exactly what is best suited for suffix arrays.
Another prerequisite for suffix arrays is the availability of the text during search, which is exactly what happens in a cloud setup.
Closing words
Suffix arrays are an idea that was conceived back when ‘online’ still had other semantics. At the time (1991) they were seen as too expensive for general search, but that was back when ‘big hard disks’ had less than 100MB of capacity. Maybe Suffix Arrays deserve a second look.
I can feel the urge to whip up an email search/indexing service in a few hundred lines of code, or a private wikipedia search service.
Just one question remains: what shall it be? ocaml or haskell? hm, tough one.
have fun,
Romain.
Hybrid sync & async Python request/response protocol client implementations
Posted: December 13, 2011 Filed under: Programming, Python | Tags: python 1 CommentArakoon, our in-house developed key-value store, is one of our flagship projects. Since a server isn’t of much use if no clients can talk to it, we also developed a couple of client libraries, including an OCaml, C, PHP and Python client.
Next to the Python client maintained inside the main Arakoon repository, an alternative client was developed as well (source, source). One of the goals of this alternative client was supporting the Twisted asynchronous networking framework.
In this post, I’ll present the approach taken to achieve this. It maintains a clear separation between the protocol (the bytes going over the wire) and the transport (the wire itself). Both synchronous as well as asynchronous transports can be implemented, and new request/response commands can be added easily, at a single place in the source code.
Throughout this post, we’ll write a client for this server, which implements a protocol similar to the Arakoon protocol:
import socket import struct import threading HOST = 'localhost' PORT = 8080 COMMAND_STRUCT = struct.Struct('<I') SUCCESS_CODE = struct.pack('<I', 0) ERROR_CODE = struct.pack('<I', 1) ERROR_MESSAGE = 'Invalid request' ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE), len(ERROR_MESSAGE), ERROR_MESSAGE) LEN_STRUCT = struct.Struct('<I') def handle(conn): while True: command_data = '' while len(command_data) < COMMAND_STRUCT.size: data = conn.recv(COMMAND_STRUCT.size - len(command_data)) if not data: return command_data += data command, = COMMAND_STRUCT.unpack(command_data) if command == 1: len_data = '' while len(len_data) < LEN_STRUCT.size: data = conn.recv(LEN_STRUCT.size - len(len_data)) len_data += data len_, = LEN_STRUCT.unpack(len_data) data = '' while len(data) < len_: data += conn.recv(len_ - len(data)) conn.send(SUCCESS_CODE) conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1])) else: conn.send(ERROR_CODE) conn.send(ERROR_MESSAGE_DATA) def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((HOST, PORT)) sock.listen(1) while True: conn, addr = sock.accept() print 'Connect: %r' % (addr, ) threading.Thread(target=lambda: handle(conn)).start() if __name__ == '__main__': main()
Every message sent by a client starts with a 32bit integer, the command identifier. Currently only one command, ‘reverse’ with ID 1, is implemented. This takes a single string as argument. Strings are encoded in two parts: first, a 32bit integer containing the length of the string, followed by the actual string data as characters.
When the server receives a command, it sends a 32bit integer denoting success (0x00) or failure (0x01). The ‘reverse’ command simply returns a the input string, reversed, using the same string encoding as described before. Once this cycle is complete, a new command can be sent by a client.
Now, on to the client side. We’ll need some imports:
import socket import struct import logging import functools import collections from twisted.python import log from twisted.internet import defer, protocol, reactor from twisted.protocols import basic, stateful import utils
The ‘utils’ module contains some helpers, and is contained in the Pyrakoon repository.
We need a way to communicate between the protocol layer and the transport layer. Only the protocol side knows how much data is expected, and only the transport side can provide these bytes, even though it might not have the required amount of data available immediately, and want to yield execution (in case of asynchronous networking). To make development within these constraints easier, coroutines are used throughout the system to encapsulate intermediate state whenever possible, whilst maintaining a simple API.
Here’s how the API works: a single protocol action (e.g. reading the response of a request) is backed by a coroutine, which yields one or more ‘Request’ objects, which encapsulate the number of bytes that should be provided to the coroutine for the protocol to be able to construct a value, or a ‘Result’ object which encapsulate the final value. Whenever the upper layer receives a ‘Request’ object, it should read the requested number of bytes from the transport, then ‘send’ the data into the coroutine, which will yield another ‘Request’, or finally a ‘Response’.
The definitions are very simple:
class Request(object): def __init__(self, count): self.count = count class Result(object): def __init__(self, value): self.value = value
Next, the protocol uses a couple of different types of values: 32bit (unsigned) integers, and strings. The latter uses the first in its internal encoding.
Every type has 3 methods: ‘check’, ‘serialize’ and ‘receive’.
‘check’ performs input validation for values of the given type (type check, boundary check,…). ‘serialize’ is a generator which yields the encoded data for a given value. ‘receive’ is a coroutine which yield ‘Request’ and ‘Result’ objects to receive a value of the type.
For basic types (e.g. integers) packed values using the ‘struct’ module can be used, so the base implementations provides the required functionality for this. Here’s the base type, and implementations for both 32bit unsigned integers and strings:
class Type(object): PACKER = None def check(self, value): raise NotImplementedError def serialize(self, value): if not self.PACKER: raise NotImplementedError yield self.PACKER.pack(value) def receive(self): if not self.PACKER: raise NotImplementedError data = yield Request(self.PACKER.size) result, = self.PACKER.unpack(data) yield Result(result) class UnsignedInt32(Type): PACKER = struct.Struct('<I') MAX_INT = (2 ** 32) - 1 def check(self, value): if not isinstance(value, (int, long)): raise TypeError if value < 0: raise ValueError('Unsigned integer expected') if value > self.MAX_INT: raise ValueError('Integer overflow') UNSIGNED_INT32 = UnsignedInt32() class String(Type): def check(self, value): if not isinstance(value, str): raise TypeError def serialize(self, value): length = len(value) for bytes_ in UNSIGNED_INT32.serialize(length): yield bytes_ yield struct.pack('<%ds' % length, value) def receive(self): length_receiver = UNSIGNED_INT32.receive() request = length_receiver.next() while isinstance(request, Request): value = yield request request = length_receiver.send(value) if not isinstance(request, Result): raise TypeError length = request.value if length == 0: result = '' else: data = yield Request(length) result, = struct.unpack('<%ds' % length, data) yield Result(result) STRING = String()
Now the basic types are defined, we can describe the request/response messages transferred between client and server. Every message has a tag (its identifier), zero or more arguments, and a return type. Messages can be serialized and received similar to the corresponding methods on ‘Type’. Most, if not all necessary plumbing can be hidden inside the ‘Message’ class, so command-specific classes can be very short and simple. This makes it easy to add new protocol commands to the client as well!
Here’s the ‘Message’ definition, as well as the implementation of our ‘Reverse’ command. Note how simple the definition of the latter is.
class Message(object): TAG = None ARGS = None RETURN_TYPE = None def serialize(self): for bytes_ in UNSIGNED_INT32.serialize(self.TAG): yield bytes_ for arg in self.ARGS: name, type_ = arg for bytes_ in type_.serialize(getattr(self, name)): yield bytes_ def receive(self): code_receiver = UNSIGNED_INT32.receive() request = code_receiver.next() while isinstance(request, Request): value = yield request request = code_receiver.send(value) if not isinstance(request, Result): yield TypeError code = request.value if code == 0x00: result_receiver = self.RETURN_TYPE.receive() else: result_receiver = STRING.receive() request = result_receiver.next() while isinstance(request, Request): value = yield request request = result_receiver.send(value) if not isinstance(request, Result): raise TypeError result = request.value if code == 0x00: yield Result(result) else: raise Exception('Error %d: %s' % (code, result)) class Reverse(Message): TAG = 0x01 ARGS = ('text', STRING), RETURN_TYPE = STRING def __init__(self, text): super(Reverse, self).__init__() self.text = text
Next up, we’ll write the base class for all actual client implementations. Some dynamic method construction is used on the go, based on the following 2 utility functions:
def validate_types(specs, args): for spec, arg in zip(specs, args): name, type_ = spec[:2] try: type_.check(arg) except TypeError: raise TypeError('Invalid type of argument "%s"' % name) except ValueError: raise ValueError('Invalid value of argument "%s"' % name) def call(message_type): def wrapper(fun): argspec = ['self'] for arg in message_type.ARGS: argspec.append(arg[0]) @utils.update_argspec(*argspec) @functools.wraps(fun) def wrapped(**kwargs): self = kwargs['self'] if not self.connected: raise RuntimeError('Not connected') args = tuple(kwargs[arg[0]] for arg in message_type.ARGS) validate_types(message_type.ARGS, args) message = message_type(*args) return self._process(message) return wrapped return wrapper
The ‘Client’ base class becomes extremely simple. Whenever a new command is added to the protocol, adding it to this class (as done for the ‘reverse’ call) is obvious.
class Client(object): connected = False @call(Reverse) def reverse(self): assert False def _process(self, message): raise NotImplementedError
That’s about all there is. What’s left is transport-specific client implementations.
Starting with a synchronous socket client is the easiest. All we need is a ‘connect’ method to set up a socket, and implement ‘Client._process’ to handle interaction between the protocol and the transport. The implementation is pretty straight-forward:
class SyncClient(Client): def __init__(self): self._socket = None def connect(self, addr, port): self._socket = socket.create_connection((addr, port)) @property def connected(self): return self._socket is not None def _process(self, message): try: for part in message.serialize(): self._socket.sendall(part) receiver = message.receive() request = receiver.next() while isinstance(request, Request): data = '' while len(data) < request.count: d = self._socket.recv(request.count - len(data)) if not d: raise Exception data += d request = receiver.send(data) if not isinstance(request, Result): raise TypeError utils.kill_coroutine(receiver, logging.exception) return request.value except Exception: try: self._socket.close() finally: self._socket = None raise
The Twisted protocol is somewhat more complex, and won’t be covered in detail in this post. If you ever wrote a Twisted protocol yourself, it should be easy to follow though. The implementation piggy-backs on ‘twisted.protocol.stateful.StatefulProtocol’, which simplifies a lot.
class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin): _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size def __init__(self): Client.__init__(self) self._handlers = collections.deque() self._currentHandler = None self._connected = False self._deferredLock = defer.DeferredLock() def _process(self, message): deferred = defer.Deferred() self._handlers.append((message.receive(), deferred)) def process(_): try: for data in message.serialize(): self.transport.write(data) finally: self._deferredLock.release() self._deferredLock.acquire().addCallback(process) return deferred def getInitialState(self): self._currentHandler = None return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE def _responseCodeReceived(self, data): self._currentHandler = None try: self._currentHandler = handler = self._handlers.pop() except IndexError: log.msg('Request data received but no handler registered') self.transport.loseConnection() return None request = handler[0].next() if isinstance(request, Result): return self._handleResult(request) elif isinstance(request, Request): if request.count != self._INITIAL_REQUEST_SIZE: handler[1].errback(ValueError('Unexpected request count')) self.transport.loseConnection() return None return self._handleRequest(data) else: log.err(TypeError, 'Received unknown type from message parsing coroutine') handler[1].errback(TypeError) self.transport.loseConnection() return None def _handleRequest(self, data): if not self._currentHandler: log.msg('Request data received but no handler registered') self.transport.loseConnection() return None receiver, deferred = self._currentHandler try: request = receiver.send(data) except Exception, exc: #pylint: disable-msg=W0703 log.err(exc, 'Exception raised by message receive loop') deferred.errback(exc) return self.getInitialState() if isinstance(request, Result): return self._handleResult(request) elif isinstance(request, Request): return self._handleRequest, request.count else: log.err(TypeError, 'Received unknown type from message parsing coroutine') deferred.errback(TypeError) self.transport.loseConnection() return None def _handleResult(self, result): receiver, deferred = self._currentHandler self._currentHandler = None # To be on the safe side... utils.kill_coroutine(receiver, lambda msg: log.err(None, msg)) deferred.callback(result.value) return self.getInitialState() def connectionLost(self, reason=protocol.connectionDone): self._connected = False self._cancelHandlers(reason) return stateful.StatefulProtocol.connectionLost(self, reason) def _cancelHandlers(self, reason): while self._handlers: receiver, deferred = self._handlers.popleft() utils.kill_coroutine(receiver, lambda msg: log.err(None, msg)) deferred.errback(reason)
That’s it! Finally, we can test our clients against the server:
HOST = 'localhost' PORT = 8080 def test_sync(): client = SyncClient() client.connect(HOST, PORT) r = client.reverse('sync') print 'sync =>', r print r, '=>', client.reverse(r) def test_twisted(): def create_client(host, port): client = protocol.ClientCreator(reactor, TwistedProtocol) return client.connectTCP(host, port) @defer.inlineCallbacks def run(proto): result = yield proto.reverse('twisted') print 'twisted =>', result result2 = yield proto.reverse(result) print result2, '=>', result proto.transport.loseConnection() deferred = create_client(HOST, PORT) deferred.addCallback(run) deferred.addBoth(lambda _: reactor.stop()) reactor.run() if __name__ == '__main__': test_sync() test_twisted()
If for example an ‘add’ method is added to the server, which returns the sum of two given 32bit unsigned integers, we could define a new command like this:
class Add(Message): TAG = 0x02 ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32), RETURN_TYPE = UNSIGNED_INT32 def __init__(self, a, b): super(Add, self).__init__() self.a = a self.b = b
Next, add it to the ‘Client’ class like this:
@call(Add) def add(self): assert False
Once this is done, the ‘add(self, a, b)’ method will be available on all clients and work as expected!
This is just a basic example. The Arakoon protocol contains more complex types as well, including ‘option’ types and lists. See the Pyrakoon source-code to see how this is handled. Only a type definition should be added, multiple commands can use them as-is easily.
Using the approach described in this post, it becomes easy to provide client implementations using several different backends (blocking, non-blocking, sockets or anything else as transport,…), and simplify adding new commands/calls to all clients at once (keeping them in sync). This simplifies client maintenance a lot.