preliminary python memcache client

Evan Martin martine@danga.com
Thu, 7 Aug 2003 15:04:23 -0700


--FCuugMFkClbJLl1L
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline

Attached is a patch against memcached CVS that adds a Python client.
It currently lacks some features, but it should provide the basic
functionality you'd need to try using memcached with Python.

It also lacks docs, but it pretty much follows the Perl API; there's
also a simple example included at the end of the module itself
('python memcached.py').  You'll need to be running a memcached process
before the test does anything useful.

Bug reports / feature requests / enhancements are welcome.

Finally, it's been a while since I've used Python, so I'd appreciate 
stylistic feedback:  am I using the proper idioms, am I
naming/capitalizing things the way you'd expect, etc.
If anyone here is knowledgable about those sorts of things, please
respond to me off-list, as I have a lot of questions (like: which of the
doc tools should I use?).

-- 
Evan Martin
martine@danga.com
http://neugierig.org

--FCuugMFkClbJLl1L
Content-Type: text/plain; charset=us-ascii
Content-Disposition: attachment; filename="memcached-python.patch"

diff -ruN memcache-orig/api/python/ChangeLog memcache-python/api/python/ChangeLog
--- memcache-orig/api/python/ChangeLog	1969-12-31 16:00:00.000000000 -0800
+++ memcache-python/api/python/ChangeLog	2003-08-07 14:24:20.000000000 -0700
@@ -0,0 +1,4 @@
+Thu, 07 Aug 2003 14:20:27 -0700  Evan Martin  <martine@danga.com>
+
+	* Initial prerelease.
+
diff -ruN memcache-orig/api/python/MANIFEST memcache-python/api/python/MANIFEST
--- memcache-orig/api/python/MANIFEST	1969-12-31 16:00:00.000000000 -0800
+++ memcache-python/api/python/MANIFEST	2003-08-07 14:21:38.000000000 -0700
@@ -0,0 +1,3 @@
+memcache.py
+setup.py
+ChangeLog
diff -ruN memcache-orig/api/python/memcache.py memcache-python/api/python/memcache.py
--- memcache-orig/api/python/memcache.py	1969-12-31 16:00:00.000000000 -0800
+++ memcache-python/api/python/memcache.py	2003-08-07 15:02:21.000000000 -0700
@@ -0,0 +1,367 @@
+#!/usr/bin/env python
+
+"""MemCachedClient: a client for memcached.
+
+http://www.danga.com/memcached"""
+
+import sys
+import socket
+import pickle
+import time
+import re
+import string
+import types
+
+__author__    = "Evan Martin <martine@danga.com>"
+__version__   = "1.0"
+__copyright__ = "Copyright (C) 2003 Danga Interactive"
+__license__   = "Python"
+
+class Error(Exception):
+    pass
+
+class Client:
+    _valuere = re.compile(r'^VALUE (\S+) (\d+) (\d+)')
+    _FLAG_PICKLE  = 1<<0
+    _FLAG_INTEGER = 1<<1
+    _FLAG_LONG    = 1<<2
+
+    _SERVER_RETRIES = 10  # how many times to try finding a free server.
+
+    def __init__(self, servers, debug=0):
+        self.set_servers(servers)
+        self.debug = debug
+        self.stats = {}
+    
+    def set_servers(self, servers):
+        self.servers = [Host(s, self.debuglog) for s in servers]
+        self.init_buckets()
+
+    def debuglog(self, str):
+        if self.debug:
+            sys.stderr.write("MemCached: %s\n" % str)
+
+    def statlog(self, func):
+        if not self.stats.has_key(func):
+            self.stats[func] = 1
+        else:
+            self.stats[func] += 1
+
+    def forget_dead_hosts(self):
+        for s in self.servers:
+            s.dead_until = 0
+
+    def init_buckets(self):
+        self.buckets = []
+        for server in self.servers:
+            for i in range(server.weight):
+                self.buckets.append(server)
+
+    def get_server(self, key):
+        if type(key) == types.TupleType:
+            serverhash = key[0]
+            key = key[1]
+        else:
+            serverhash = hash(key)
+
+        for i in range(Client._SERVER_RETRIES):
+            server = self.buckets[serverhash % len(self.buckets)]
+            if server.connect():
+                #print "(using server %s)" % server,
+                return server
+            serverhash = hash(str(serverhash) + str(i))
+        return None
+
+    def disconnect_all(self):
+        for s in self.servers:
+            s.close_socket()
+    
+    def delete(self, key, time):
+        # XXX implement me.
+        pass
+
+    def incr(self, key, time):
+        # XXX implement me.
+        pass
+
+    def decr(self, key, time):
+        # XXX implement me.
+        pass
+
+    def add(self, key, val, time=0):
+        '''Add {key: val}, expiring at time.'''
+        return self._set("add", key, val, time)
+    def replace(self, key, val, time=0):
+        '''Replace existing key with {key: val}, expiring at time.'''
+        return self._set("replace", key, val, time)
+    def set(self, key, val, time=0):
+        '''Set {key: val}, expiring at time.'''
+        return self._set("set", key, val, time)
+    
+    def _set(self, cmd, key, val, time):
+        server = self.get_server(key)
+        if not server:
+            return 0
+
+        self.statlog(cmd)
+
+        flags = 0
+        if isinstance(val, types.StringTypes):
+            pass
+        elif isinstance(val, int):
+            flags |= Client._FLAG_INTEGER
+            val = "%d" % val
+        elif isinstance(val, long):
+            flags |= Client._FLAG_LONG
+            val = "%d" % val
+        else:
+            flags |= Client._FLAG_PICKLE
+            val = pickle.dumps(val, 2)
+        
+        fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, flags, time, len(val), val)
+        try:
+            server.send_cmd(fullcmd)
+            server.expect("STORED")
+        except socket.error, msg:
+            server.mark_dead(msg[1])
+            return 0
+        return 1
+
+    def get(self, key):
+        server = self.get_server(key)
+        if not server:
+            return None
+
+        self.statlog('get')
+
+        try:
+            server.send_cmd("get %s" % key)
+            rkey, flags, rlen, = self._expectvalue(server)
+            if not rkey:
+                return None
+            value = self._recv_value(server, flags, rlen)
+            server.expect("END")
+        except (Error, socket.error), msg:
+            if type(msg) is types.TupleType:
+                msg = msg[1]
+            server.mark_dead(msg)
+            return None
+        return value
+
+    def get_multi(self, *keys):
+        self.statlog('get_multi')
+
+        server_keys = {}
+
+        # build up a list for each server of all the keys we want.
+        for key in keys:
+            server = self.get_server(key)
+            if not server:
+                continue
+            if type(key) == types.TupleType:
+                key = key[1]
+            if not server_keys.has_key(server):
+                server_keys[server] = []
+            server_keys[server].append(key)
+
+        # send out all requests on each server before reading anything
+        dead_servers = []
+        for server in server_keys.keys():
+            try:
+                server.send_cmd("get %s" % string.join(server_keys[server]))
+            except socket.error, msg:
+                server.mark_dead(msg[1])
+                dead_servers.append(server)
+
+        # if any servers died on the way, don't expect them to respond.
+        for server in dead_servers:
+            del server_keys[server]
+
+        retvals = {}
+        for server in server_keys.keys():
+            try:
+                line = server.readline()
+                while line and line != 'END':
+                    rkey, flags, rlen = self._expectvalue(server, line)
+                    val = self._recv_value(server, flags, rlen)
+                    retvals[rkey] = val
+                    line = server.readline()
+            except (Error, socket.error), msg:
+                server.mark_dead(msg)
+        return retvals
+
+    def _expectvalue(self, server, line=None):
+        if not line:
+            line = server.readline()
+        match = self._valuere.match(line)
+        if not match:
+            return (None, None, None)
+        rkey = match.group(1)
+        flags = int(match.group(2))
+        rlen = int(match.group(3))
+        return (rkey, flags, rlen)
+
+    def _recv_value(self, server, flags, rlen):
+        rlen += 2 # include \r\n
+        buf = server.recv(rlen)
+        if len(buf) != rlen:
+            raise Error("received %d bytes when expecting %d" % (len(buf), rlen))
+
+        if len(buf) == rlen:
+            buf = buf[:-2]  # strip \r\n
+
+        if flags == 0:
+            val = buf
+        elif flags & Client._FLAG_INTEGER:
+            val = int(buf)
+        elif flags & Client._FLAG_LONG:
+            val = long(buf)
+        elif flags & Client._FLAG_PICKLE:
+            val = pickle.loads(buf)
+        else:
+            self.debuglog("unknown flags on get: %x\n" % flags)
+
+        return val
+
+    
+class Host:
+    """A single server in the memcachepool that MemCachedClient chooses from."""
+    def __init__(self, host, debugfunc=None):
+        if isinstance(host, types.TupleType):
+            host = host[0]
+            self.weight = host[1]
+        else:
+            self.weight = 1
+
+        if string.find(host, ":") > 0:
+            self.ip, self.port = string.split(host, ":")
+            self.port = int(self.port)
+        else:
+            self.ip, self.port = host, 11211
+
+        if not debugfunc:
+            debugfunc = lambda x: x
+        self.debuglog = debugfunc
+
+        self.deaduntil = 0
+        self.socket = None
+    
+    def _check_dead(self):
+        if self.deaduntil and self.deaduntil > time.time():
+            return 1
+        self.deaduntil = 0
+        return 0
+
+    def connect(self):
+        if self._get_socket():
+            return 1
+        return 0
+
+    def mark_dead(self, reason):
+        print "MemCache: %s: %s.  Marking dead." % (self, reason)
+        self.deaduntil = time.time()+5 #XXX magic
+        self.close_socket()
+        
+    def _get_socket(self):
+        if self._check_dead():
+            return None
+        if self.socket:
+            return self.socket
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        #s.settimeout(1) # XXX magic number
+        try:
+            s.connect((self.ip, self.port))
+        except socket.error, msg:
+            self.mark_dead("connect: %s" % msg[1])
+            return None
+        self.socket = s
+        return s
+    
+    def close_socket(self):
+        if self.socket:
+            self.socket.close()
+            self.socket = None
+
+    def send_cmd(self, cmd):
+        self.socket.sendall(cmd + "\r\n")
+
+    def readline(self):
+        newlines = 0
+        buf = ''
+        while newlines < 2:
+            char = self.socket.recv(1) # XXX does this buffer or is this slow?
+            if len(char) == 0:
+                # connection closed.
+                print "MemCache: Connection closed while reading from %s.  Marking dead." % self
+                self.mark_dead
+                return buf
+            if char == '\r' and newlines == 0:
+                newlines = 1
+            elif char == '\n' and newlines == 1: 
+                newlines = 2
+            else:
+                newlines = 0
+                buf = buf + char
+        return buf
+
+    def expect(self, text):
+        line = self.readline()
+        if line != text:
+            self.debuglog("while expecting '%s', got unexpected response '%s'" % (text, line))
+        return line
+    
+    def recv(self, rlen):
+        buf = ''
+        while len(buf) < rlen:
+            buf = buf + self.socket.recv(rlen - len(buf))
+        return buf
+
+    def __str__(self):
+        d = ''
+        if self.deaduntil:
+            d = " (dead until %d)" % self.deaduntil
+        return "%s:%d%s" % (self.ip, self.port, d)
+
+if __name__ == "__main__":
+    print "Running tests:"
+    print
+    #servers = ["127.0.0.1:11211", "127.0.0.1:11212"]
+    servers = ["127.0.0.1:11211"]
+    mc = Client(servers, debug=1)
+
+    def to_s(val):
+        if not isinstance(val, types.StringTypes):
+            return "%s (%s)" % (val, type(val))
+        return "%s" % val
+    def test_setget(key, val):
+        print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)),
+        mc.set(key, val)
+        newval = mc.get(key)
+        if newval == val:
+            print "OK"
+        else:
+            print "FAIL"
+
+    class FooStruct:
+        def __init__(self):
+            self.bar = "baz"
+        def __str__(self):
+            return "A FooStruct"
+        def __eq__(self, other):
+            if isinstance(other, FooStruct):
+                return self.bar == other.bar
+            return 0
+        
+    test_setget("a_string", "some random string")
+    test_setget("an_integer", 42)
+    print "Testing get_multi ...",
+    print mc.get_multi("a_string", "an_integer")
+    test_setget("long", long(1<<30))
+
+    print "Testing get(unknown value) ...",
+    print to_s(mc.get("unknown_value"))
+
+    f = FooStruct()
+    test_setget("foostruct", f)
+
+# vim: ts=4 sw=4 et :
diff -ruN memcache-orig/api/python/setup.py memcache-python/api/python/setup.py
--- memcache-orig/api/python/setup.py	1969-12-31 16:00:00.000000000 -0800
+++ memcache-python/api/python/setup.py	2003-08-07 15:02:34.000000000 -0700
@@ -0,0 +1,12 @@
+#!/usr/bin/env python
+
+from distutils.core import setup
+import memcache
+
+setup(name="memcache",
+      version=memcache.__version__,
+      author="Evan Martin",
+      author_email="martine@danga.com",
+      url="http://www.danga.com/memcached",
+      py_modules=["memcache"])
+
diff -ruN memcache-orig/api/python/tests/pooltest.py memcache-python/api/python/tests/pooltest.py
--- memcache-orig/api/python/tests/pooltest.py	1969-12-31 16:00:00.000000000 -0800
+++ memcache-python/api/python/tests/pooltest.py	2003-08-07 14:26:38.000000000 -0700
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+
+"""pooltest
+
+Bring up two memcaches on :11211 and :11212.  Try killing one or both.
+If this code raises any exceptions, it's a bug."""
+
+import memcache
+import time
+
+mc = memcache.Client(["127.0.0.1:11211", "127.0.0.1:11212"], debug=1)
+
+def test_setget(key, val):
+    print "Testing set/get {'%s': %s} ..." % (key, val),
+    mc.set(key, val)
+    newval = mc.get(key)
+    if newval == val:
+        print "OK"
+    else:
+        print "FAIL"
+
+i = 0
+while 1:
+    test_setget("foo%d" % i, "bar%d" % i)
+    time.sleep(1)
+    i += 1
diff -ruN memcache-orig/CONTRIBUTORS memcache-python/CONTRIBUTORS
--- memcache-orig/CONTRIBUTORS	2003-07-28 01:40:02.000000000 -0700
+++ memcache-python/CONTRIBUTORS	2003-08-07 15:02:10.000000000 -0700
@@ -10,8 +10,9 @@
   -- C server
   -- memory allocator design
 
-Evan Martine <evan@danga.com>
+Evan Martin <martine@danga.com>
   -- automake/autoconf support
+  -- Python client
 
 Ryan <hotrodder@rocketmail.com>
   -- PHP client

--FCuugMFkClbJLl1L--