Cloud Search Using Suffix Arrays ? Well, … maybe.

Suffix Arrays are arrays that allow you to find exact substring matches. The core idea is that you generate a sorted array of positions using a comparison function that compares the suffixes starting at the respective positions.

Constructing a suffix array

It’s one of the cases where a few lines of code is more clarifying than the explanation itself.

def make_suffix_array(text):
    size = len(text)
    def compare_suffix(i,j):
        return cmp(text[i:],text[j:])
    
    indexes = range(size)
    indexes.sort(cmp = compare_suffix)
    return indexes

If you try this on the string “Suffix Arrays Rock?” and print out the sorted array’s indexes along with the suffixes that start there, you start to see both potential, as well as weaknesses.

pos suffix
06 ‘ Arrays Rock?’
13 ‘ Rock?’
18 ‘?’
07 ‘Arrays Rock?’
14 ‘Rock?’
00 ‘Suffix Arrays Rock?’
10 ‘ays Rock?’
16 ‘ck?’
02 ‘ffix Arrays Rock?’
03 ‘fix Arrays Rock?’
04 ‘ix Arrays Rock?’
17 ‘k?’
15 ‘ock?’
09 ‘rays Rock?’
08 ‘rrays Rock?’
12 ‘s Rock?’
01 ‘uffix Arrays Rock?’
05 ‘x Arrays Rock?’
11 ‘ys Rock?’

Note: This method of creating suffix arrays is criminally inefficient, but it suffices for explaining the concept. There are algorithms to do this in linear time.

You can use the suffix array to search for a string in the text using binary search. Something like this:

def find(sa, text, q):
    size = len(sa)
    qsize = len(q)
    hi = size -1
    lo = 0
    while hi >= lo:
        mid = (hi + lo) / 2
        begin = sa[mid]
        end = min(size, begin + qsize)
        test = text[begin: end]
        if test > q:
            hi = mid -1
        elif test < q:
            lo = mid + 1
        else:
            return begin
    return None

You have a fast search that allows you to return the exact position of the substring.
Even better: all matches are clustered together in the suffix array. Perfect locality.

Multiple documents

The concept easily extendeds itself to multiple documents.

def make_multi(texts):
    indexes = []
    nt = len(texts)
    for d in xrange(nt): 
        size = len(texts[d])
        for i in xrange(size):
            e = d,i
            indexes.append(e)

    def compare(e0,e1):
        d0, i0 = e0
        d1, i1 = e1
        s0 = texts[d0][i0:]
        s1 = texts[d1][i1:]
        return cmp(s0,s1)

    indexes.sort(cmp = compare)
    return indexes

A minimal example with 3 very small texts, and a bit of code to dump the suffix array, together with the suffixes.

def print_multi(sam, texts):
    for e in sam:
        d, i = e
        suffix = texts[d][i:]
        print "(%2i,%2i)\t%s" % (d,i,suffix)

texts = ["Suffix Arrays Rock?",
         "Redundant Array of Inexpensive Disks",
         "Metal is not Rock!"]
r0 = make_multi(texts)
print_multi(r0, texts)

yields:

( 1, 9)	 Array of Inexpensive Disks
( 0, 6)	 Arrays Rock?
( 1,30)	 Disks
( 1,18)	 Inexpensive Disks
( 2,12)	 Rock!
( 0,13)	 Rock?
( 2, 5)	 is not Rock!
( 2, 8)	 not Rock!
( 1,15)	 of Inexpensive Disks
...
( 1,28)	ve Disks
( 0, 5)	x Arrays Rock?
( 1,22)	xpensive Disks
( 1,14)	y of Inexpensive Disks
( 0,11)	ys Rock?

Ok, this seems to have almost perfect scalability properties. What’s the catch?

The catch

There are some severe downsides to Suffix Arrays.

  • Text needed during search
    A major disadvantage to suffix arrays is that you need access to the text while you do a search. It means that the suffix array and the text need to be kept close to each other.
  • Storage Needs
    If you think of it, a text is an array of chars (ascii, not unicode). Its suffix array is an array of positions. Suppose you store a position and document ids as a 32bit words. Remember you need to store the text, as well as the suffix array. If the text size is n, the total needs are 9n. To make matters worse: since you’ll be jumping around in the suffix array, as well as the text, compression schemes are not really feasible.
  • Updates?
    Even the slightest update to the text will invalidate the suffix array, and you need to reconstruct it. Even worse, if you have an array over multiple documents, an update in one of the documents will invalidate the entire structure.
  • Exact Matches
    You get what you asked for. Searching the example text for ‘suffix’ will not yield a result.
  • Overkill
    You can search for an exact match on a string of arbitrary length, but nobody needs to find exact matches on large strings. There is one notable exception: If you’re trying to look up DNA fragments in a large DNA string, this is exactly what you need. For everybody else, it’s overkill.

Getting better all the time

Well, it couldn’t get much worse now could it?
It doesn’t need to be this bad, and you can actually pick how much pain you want to inflict on yourself.

  • search starts at start of words
    Most of the time, it’s not needed to be able to search for substrings of words. For example, you probably don’t need to find the ‘rays’ in ‘arrays’. It divides the number of positions to sort by the average word size.
  • remove language noise
  • Not all words need to be considered. In fact, the most frequent words are noise.

  • old versions can be blacklisted
    If all versions of all your documents have a unique id, you can blacklist older unwanted document ids, and filter them out before you return the results. This means you don’t have to do updates immediately, but you can schedule them when it’s cheapest for you.
  • text size is not document size
    Suppose your documents are pdfs. The actual size of the text is small compared to all the other information in the file (markup, images, fonts, …). Moreover, storing text will not eat your storage. For example, a heavyweight (in every way) novel like War And Peace takes about 3.2 MB. The entire corpus of english wikipedia articles, which is stored as xml is just over 30 GB. peanuts.
  • multiple suffix arrays
    You don’t need add all your documents to one big suffix array. You can have many of them wich improves the situation in more than one way. Search can be parallel, and adding a new document is cheaper as well.

What does this have to do with ‘The Cloud’ ?

Since there is a network between the client issuing the request and the cloud infrastructure offering the search service, means you have a non-neglectable latency.
This means the latency added by the service just needs to be small compared to the overall latency, which makes our life a bit easier.

Also, the nature of ‘The Cloud’ pushes people to store consolidated instead of live data, which is exactly what is best suited for suffix arrays.
Another prerequisite for suffix arrays is the availability of the text during search, which is exactly what happens in a cloud setup.

Closing words

Suffix arrays are an idea that was conceived back when ‘online’ still had other semantics. At the time (1991) they were seen as too expensive for general search, but that was back when ‘big hard disks’ had less than 100MB of capacity. Maybe Suffix Arrays deserve a second look.

I can feel the urge to whip up an email search/indexing service in a few hundred lines of code, or a private wikipedia search service.
Just one question remains: what shall it be? ocaml or haskell? hm, tough one.

have fun,

Romain.


Hybrid sync & async Python request/response protocol client implementations

Arakoon, our in-house developed key-value store, is one of our flagship projects. Since a server isn’t of much use if no clients can talk to it, we also developed a couple of client libraries, including an OCaml, C, PHP and Python client.

Next to the Python client maintained inside the main Arakoon repository, an alternative client was developed as well (source, source). One of the goals of this alternative client was supporting the Twisted asynchronous networking framework.

In this post, I’ll present the approach taken to achieve this. It maintains a clear separation between the protocol (the bytes going over the wire) and the transport (the wire itself). Both synchronous as well as asynchronous transports can be implemented, and new request/response commands can be added easily, at a single place in the source code.

Throughout this post, we’ll write a client for this server, which implements a protocol similar to the Arakoon protocol:

import socket
import struct
import threading

HOST = 'localhost'
PORT = 8080

COMMAND_STRUCT = struct.Struct('<I')

SUCCESS_CODE = struct.pack('<I', 0)
ERROR_CODE = struct.pack('<I', 1)
ERROR_MESSAGE = 'Invalid request'
ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE),
    len(ERROR_MESSAGE), ERROR_MESSAGE)

LEN_STRUCT = struct.Struct('<I')

def handle(conn):
    while True:
        command_data = ''
        while len(command_data) < COMMAND_STRUCT.size:
            data = conn.recv(COMMAND_STRUCT.size - len(command_data))

            if not data:
                return

            command_data += data

        command, = COMMAND_STRUCT.unpack(command_data)

        if command == 1:
            len_data = ''

            while len(len_data) < LEN_STRUCT.size:
                data = conn.recv(LEN_STRUCT.size - len(len_data))
                len_data += data

            len_, = LEN_STRUCT.unpack(len_data)

            data = ''
            while len(data) < len_:
                data += conn.recv(len_ - len(data))

            conn.send(SUCCESS_CODE)
            conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1]))
        else:
            conn.send(ERROR_CODE)
            conn.send(ERROR_MESSAGE_DATA)

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((HOST, PORT))
    sock.listen(1)

    while True:
        conn, addr = sock.accept()
        print 'Connect: %r' % (addr, )

        threading.Thread(target=lambda: handle(conn)).start()

if __name__ == '__main__':
    main()

Every message sent by a client starts with a 32bit integer, the command identifier. Currently only one command, ‘reverse’ with ID 1, is implemented. This takes a single string as argument. Strings are encoded in two parts: first, a 32bit integer containing the length of the string, followed by the actual string data as characters.

When the server receives a command, it sends a 32bit integer denoting success (0x00) or failure (0x01). The ‘reverse’ command simply returns a the input string, reversed, using the same string encoding as described before. Once this cycle is complete, a new command can be sent by a client.

Now, on to the client side. We’ll need some imports:

import socket
import struct
import logging
import functools
import collections

from twisted.python import log
from twisted.internet import defer, protocol, reactor
from twisted.protocols import basic, stateful

import utils

The ‘utils’ module contains some helpers, and is contained in the Pyrakoon repository.

We need a way to communicate between the protocol layer and the transport layer. Only the protocol side knows how much data is expected, and only the transport side can provide these bytes, even though it might not have the required amount of data available immediately, and want to yield execution (in case of asynchronous networking). To make development within these constraints easier, coroutines are used throughout the system to encapsulate intermediate state whenever possible, whilst maintaining a simple API.

Here’s how the API works: a single protocol action (e.g. reading the response of a request) is backed by a coroutine, which yields one or more ‘Request’ objects, which encapsulate the number of bytes that should be provided to the coroutine for the protocol to be able to construct a value, or a ‘Result’ object which encapsulate the final value. Whenever the upper layer receives a ‘Request’ object, it should read the requested number of bytes from the transport, then ‘send’ the data into the coroutine, which will yield another ‘Request’, or finally  a ‘Response’.

The definitions are very simple:

class Request(object):
    def __init__(self, count):
        self.count = count

class Result(object):
    def __init__(self, value):
        self.value = value

Next, the protocol uses a couple of different types of values: 32bit (unsigned) integers, and strings. The latter uses the first in its internal encoding.

Every type has 3 methods: ‘check’, ‘serialize’ and ‘receive’.

‘check’ performs input validation for values of the given type (type check, boundary check,…). ‘serialize’ is a generator which yields the encoded data for a given value. ‘receive’ is a coroutine which yield ‘Request’ and ‘Result’ objects to receive a value of the type.

For basic types (e.g. integers) packed values using the ‘struct’ module can be used, so the base implementations provides the required functionality for this. Here’s the base type, and implementations for both 32bit unsigned integers and strings:

class Type(object):
    PACKER = None

    def check(self, value):
        raise NotImplementedError

    def serialize(self, value):
        if not self.PACKER:
            raise NotImplementedError

        yield self.PACKER.pack(value)

    def receive(self):
        if not self.PACKER:
            raise NotImplementedError

        data = yield Request(self.PACKER.size)
        result, = self.PACKER.unpack(data)

        yield Result(result)

class UnsignedInt32(Type):
    PACKER = struct.Struct('<I')
    MAX_INT = (2 ** 32) - 1

    def check(self, value):
        if not isinstance(value, (int, long)):
            raise TypeError

        if value < 0:
            raise ValueError('Unsigned integer expected')

        if value > self.MAX_INT:
            raise ValueError('Integer overflow')

UNSIGNED_INT32 = UnsignedInt32()

class String(Type):
    def check(self, value):
        if not isinstance(value, str):
            raise TypeError

    def serialize(self, value):
        length = len(value)

        for bytes_ in UNSIGNED_INT32.serialize(length):
            yield bytes_

        yield struct.pack('<%ds' % length, value)

    def receive(self):
        length_receiver = UNSIGNED_INT32.receive()
        request = length_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = length_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        length = request.value

        if length == 0:
            result = ''
        else:
            data = yield Request(length)
            result, = struct.unpack('<%ds' % length, data)

        yield Result(result)

STRING = String()

Now the basic types are defined, we can describe the request/response messages transferred between client and server. Every message has a tag (its identifier), zero or more arguments, and a return type. Messages can be serialized and received similar to the corresponding methods on ‘Type’. Most, if not all necessary plumbing can be hidden inside the ‘Message’ class, so command-specific classes can be very short and simple. This makes it easy to add new protocol commands to the client as well!

Here’s the ‘Message’ definition, as well as the implementation of our ‘Reverse’ command. Note how simple the definition of the latter is.

class Message(object):
    TAG = None
    ARGS = None
    RETURN_TYPE = None

    def serialize(self):
        for bytes_ in UNSIGNED_INT32.serialize(self.TAG):
            yield bytes_

        for arg in self.ARGS:
            name, type_ = arg

            for bytes_ in type_.serialize(getattr(self, name)):
                yield bytes_

    def receive(self):
        code_receiver = UNSIGNED_INT32.receive()
        request = code_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = code_receiver.send(value)

        if not isinstance(request, Result):
            yield TypeError

        code = request.value

        if code == 0x00:
            result_receiver = self.RETURN_TYPE.receive()
        else:
            result_receiver = STRING.receive()

        request = result_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = result_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        result = request.value

        if code == 0x00:
            yield Result(result)
        else:
            raise Exception('Error %d: %s' % (code, result))

class Reverse(Message):
    TAG = 0x01
    ARGS = ('text', STRING),
    RETURN_TYPE = STRING

    def __init__(self, text):
        super(Reverse, self).__init__()

        self.text = text

Next up, we’ll write the base class for all actual client implementations. Some dynamic method construction is used on the go, based on the following 2 utility functions:

def validate_types(specs, args):
    for spec, arg in zip(specs, args):
        name, type_ = spec[:2]

        try:
            type_.check(arg)
        except TypeError:
            raise TypeError('Invalid type of argument "%s"' % name)
        except ValueError:
            raise ValueError('Invalid value of argument "%s"' % name)

def call(message_type):
    def wrapper(fun):
        argspec = ['self']
        for arg in message_type.ARGS:
            argspec.append(arg[0])

        @utils.update_argspec(*argspec)
        @functools.wraps(fun)
        def wrapped(**kwargs):
            self = kwargs['self']

            if not self.connected:
                raise RuntimeError('Not connected')

            args = tuple(kwargs[arg[0]] for arg in message_type.ARGS)
            validate_types(message_type.ARGS, args)

            message = message_type(*args)

            return self._process(message)

        return wrapped

    return wrapper

The ‘Client’ base class becomes extremely simple. Whenever a new command is added to the protocol, adding it to this class (as done for the ‘reverse’ call) is obvious.

class Client(object):
    connected = False

    @call(Reverse)
    def reverse(self):
        assert False

    def _process(self, message):
        raise NotImplementedError

That’s about all there is. What’s left is transport-specific client implementations.

Starting with a synchronous socket client is the easiest. All we need is a ‘connect’ method to set up a socket, and implement ‘Client._process’ to handle interaction between the protocol and the transport. The implementation is pretty straight-forward:

class SyncClient(Client):
    def __init__(self):
        self._socket = None

    def connect(self, addr, port):
        self._socket = socket.create_connection((addr, port))

    @property
    def connected(self):
        return self._socket is not None

    def _process(self, message):
        try:
            for part in message.serialize():
                self._socket.sendall(part)

            receiver = message.receive()
            request = receiver.next()

            while isinstance(request, Request):
                data = ''

                while len(data) < request.count:
                    d = self._socket.recv(request.count - len(data))
                    if not d:
                        raise Exception

                    data += d

                request = receiver.send(data)

            if not isinstance(request, Result):
                raise TypeError

            utils.kill_coroutine(receiver, logging.exception)

            return request.value
        except Exception:
            try:
                self._socket.close()
            finally:
                self._socket = None

            raise

The Twisted protocol is somewhat more complex, and won’t be covered in detail in this post. If you ever wrote a Twisted protocol yourself, it should be easy to follow though. The implementation piggy-backs on ‘twisted.protocol.stateful.StatefulProtocol’, which simplifies a lot.

class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin):
    _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size

    def __init__(self):
        Client.__init__(self)

        self._handlers = collections.deque()
        self._currentHandler = None

        self._connected = False
        self._deferredLock = defer.DeferredLock()

    def _process(self, message):
        deferred = defer.Deferred()
        self._handlers.append((message.receive(), deferred))

        def process(_):
            try:
                for data in message.serialize():
                    self.transport.write(data)
            finally:
                self._deferredLock.release()

        self._deferredLock.acquire().addCallback(process)

        return deferred

    def getInitialState(self):
        self._currentHandler = None

        return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE

    def _responseCodeReceived(self, data):
        self._currentHandler = None

        try:
            self._currentHandler = handler = self._handlers.pop()
        except IndexError:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        request = handler[0].next()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            if request.count != self._INITIAL_REQUEST_SIZE:
                handler[1].errback(ValueError('Unexpected request count'))
                self.transport.loseConnection()

                return None

            return self._handleRequest(data)
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            handler[1].errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleRequest(self, data):
        if not self._currentHandler:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        receiver, deferred = self._currentHandler

        try:
            request = receiver.send(data)
        except Exception, exc: #pylint: disable-msg=W0703
            log.err(exc, 'Exception raised by message receive loop')
            deferred.errback(exc)

            return self.getInitialState()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            return self._handleRequest, request.count
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            deferred.errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleResult(self, result):
        receiver, deferred = self._currentHandler
        self._currentHandler = None

        # To be on the safe side...
        utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

        deferred.callback(result.value)

        return self.getInitialState()

    def connectionLost(self, reason=protocol.connectionDone):
        self._connected = False

        self._cancelHandlers(reason)

        return stateful.StatefulProtocol.connectionLost(self, reason)

    def _cancelHandlers(self, reason):
        while self._handlers:
            receiver, deferred = self._handlers.popleft()

            utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

            deferred.errback(reason)

 

That’s it! Finally, we can test our clients against the server:

HOST = 'localhost'
PORT = 8080

def test_sync():
    client = SyncClient()
    client.connect(HOST, PORT)
    r = client.reverse('sync')
    print 'sync =>', r
    print r, '=>', client.reverse(r)

def test_twisted():
    def create_client(host, port):
        client = protocol.ClientCreator(reactor, TwistedProtocol)
        return client.connectTCP(host, port)

    @defer.inlineCallbacks
    def run(proto):
        result = yield proto.reverse('twisted')
        print 'twisted =>', result
        result2 = yield proto.reverse(result)
        print result2, '=>', result
        proto.transport.loseConnection()

    deferred = create_client(HOST, PORT)
    deferred.addCallback(run)
    deferred.addBoth(lambda _: reactor.stop())

    reactor.run()

if __name__ == '__main__':
    test_sync()
    test_twisted()

 

If for example an ‘add’ method is added to the server, which returns the sum of two given 32bit unsigned integers, we could define a new command like this:

class Add(Message):
    TAG = 0x02
    ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32),
    RETURN_TYPE = UNSIGNED_INT32

    def __init__(self, a, b):
        super(Add, self).__init__()

        self.a = a
        self.b = b

Next, add it to the ‘Client’ class like this:

@call(Add)
def add(self):
    assert False

Once this is done, the ‘add(self, a, b)’ method will be available on all clients and work as expected!

 

This is just a basic example. The Arakoon protocol contains more complex types as well, including ‘option’ types and lists. See the Pyrakoon source-code to see how this is handled. Only a type definition should be added, multiple commands can use them as-is easily.

Using the approach described in this post, it becomes easy to provide client implementations using several different backends (blocking, non-blocking, sockets or anything else as transport,…), and simplify adding new commands/calls to all clients at once (keeping them in sync). This simplifies client maintenance a lot.