diff --git a/memcache.py b/memcache.py index 80770ed..068e4ab 100644 --- a/memcache.py +++ b/memcache.py @@ -1412,6 +1412,146 @@ def __str__(self): return "unix:%s%s" % (self.address, d) +class KetamaClient(Client): + """ Memcach client with Consistent hashing support. + + Ketama is an implementation of a consistent hashing algorithm, meaning you + can add or remove servers from the memcached pool without causing a + complete remap of all keys. It was designed by Richard Jones. + + How Ketama Works: + 1. Hash each server to several unsigned integer values. + 2. Conceptually, these numbers are placed on a ring. + 3. Each number links to the server it was hashed from, so servers + appear at several points on the ring. + 4. To map a key->server, hash the key to an unsigned integer and find + the next biggest number on the ring. That's your server. If + the number is too big, roll over to the first server in the ring + When a server is added or removed, only some keys will be remapped to + different servers. With the original modula algorithm, all keys + would have been remapped. + + TODO: Improve the documentation, add test cases. + """ + # For this Consistent hashing client, the weight of the server is the + # number of entries it will have in the ring. This will make sure + # each server has well normalized key distribution. + DEFAULT_SERVER_WEIGHT = 200 + + # Total number of slots on the ring. + RING_SIZE = 2**16 + + def __init__(self, *args, **kwargs): + # Mapping between ring slot -> server. + self._ketama_server_ring = {} + + # Sorted server slots on top of the virtual ring. + self._ketama_server_slots = [] + + super(KetamaClient, self).__init__(*args, **kwargs) + + def _get_server(self, key): + """ + Get the memcache server corresponding to the given key. + + @param key: key, or (server_hash, key) tuple if you want to specify + a hash to determine which server is selected + + @return A tuple with (server_obj, key), or (None, None) if no servers + were available. + """ + # map the key on to the ring slot space. + h_key = self._generate_ring_slot(key) + + if isinstance(key, tuple): + serverhash, key = key + + for slot in self._ketama_server_slots: + if h_key <= slot: + server = self._ketama_server_ring[slot] + if server.connect(): + return (server, key) + + # Roll over to the first available server + for server in self._ketama_server_ring.values(): + if server and server.connect(): + return (server, key) + + return (None, None) + + def set_servers(self, servers): + """ + Set servers for this client. + + @param servers: List of server hosts in : format. + or + List of tuples with each tuple of the format + (:, weight) + """ + # Set the default weight if weight isn't passed. + self.servers = [_Host( + s if isinstance(s, tuple) else (s, self.DEFAULT_SERVER_WEIGHT), + self.debug, dead_retry=self.dead_retry, + socket_timeout=self.socket_timeout, + flush_on_reconnect=self.flush_on_reconnect) for s in servers] + + # Place all the servers on rings based on the slot allocation + # specifications. + map(self._place_server_on_ring, self.servers) + + def _place_server_on_ring(self, server): + """ + Based on the weight of the server, generate multiple slots for + each server. This ensures when a server is added/remove keys won't all + remap to the same new server + + @param server: An instance of :class:~`memcache._Host`. + """ + server_slots = self._get_server_slots_on_ring(server) + for slot in server_slots: + if slot not in self._ketama_server_ring: + self._ketama_server_ring[slot] = server + self._ketama_server_slots.append(slot) + else: + # TODO: Handle collisions + pass + + # Sort the server slot keys to make it a ring. + self._ketama_server_slots.sort() + + def _get_server_slots_on_ring(self, server): + """ + Returns list of slot on the ring for given server. + + This make sure that the slots won't collide with others server. + + @param: server An object of :class:~`memcache._Host`. + @return: list of slots on the ring. + """ + server_slots = [] + + for i in range(0, server.weight): + server_key = "%s:%d_%d" % (server.ip, server.port, i) + server_slots.append(self._generate_ring_slot(server_key)) + + return server_slots + + def _generate_ring_slot(self, key): + """ + Returns a slot in the ring for the given key. + + @param key: Key which needs to be mapped to the ring. + @type key: str + + @return: hash value corresponding to the `key` + """ + + if isinstance(key, tuple): + serverhash, key = key + else: + serverhash = binascii.crc32(key.encode('ascii')) & 0xffffffff + return serverhash % self.RING_SIZE + def _doctest(): import doctest import memcache diff --git a/tests/test_memcache.py b/tests/test_memcache.py index c12c528..9e233a2 100644 --- a/tests/test_memcache.py +++ b/tests/test_memcache.py @@ -2,7 +2,7 @@ from unittest import TestCase -from memcache import Client, SERVER_MAX_KEY_LENGTH +from memcache import Client, KetamaClient, SERVER_MAX_KEY_LENGTH try: _str_cls = basestring @@ -31,10 +31,10 @@ def __eq__(self, other): class TestMemcache(TestCase): - def setUp(self): + def setUp(self, client_class=Client): # TODO: unix socket server stuff servers = ["127.0.0.1:11211"] - self.mc = Client(servers, debug=1) + self.mc = client_class(servers, debug=1) pass def check_setget(self, key, val, noreply=False): @@ -119,6 +119,12 @@ def test_sending_key_too_long(self): self.mc.set('a' * SERVER_MAX_KEY_LENGTH, 1, noreply=True) +class TestMemcacheKetama(TestMemcache): + def setUp(self): + # Run all the tests again using the KetamaClient + super(TestMemcacheKetama, self).setUp(KetamaClient) + + if __name__ == "__main__": # failures = 0 # print("Testing docstrings...")