[memcached] bradfitz, r320: merge end of facebook branch into trunk, ...

commits at code.sixapart.com commits at code.sixapart.com
Sun Sep 3 03:18:27 UTC 2006


merge end of facebook branch into trunk, after copying the old trunk to the memcached-1.1.x branch



U   trunk/server/ChangeLog
U   trunk/server/TODO
U   trunk/server/configure.ac
U   trunk/server/doc/memcached.1
U   trunk/server/doc/protocol.txt
U   trunk/server/items.c
U   trunk/server/memcached.c
U   trunk/server/memcached.h
U   trunk/server/slabs.c


Modified: trunk/server/ChangeLog
===================================================================
--- trunk/server/ChangeLog	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/ChangeLog	2006-09-03 03:18:26 UTC (rev 320)
@@ -1,3 +1,18 @@
+2006-08-21
+	* Nathan Neulinger <nneul at umr.edu>: fix incompatabilities with
+	  unix domain socket support and the UDP code and clean up stale 
+	  sockets
+
+2006-08-20
+	* Nathan Neulinger <nneul at umr.edu>: unix domain socket support
+
+2006-05-03
+	* Steven Grimm <sgrimm at facebook.com>:  big bunch of changes:
+	  big CPU reduction work, UDP-based interface, increased memory
+	  efficiency.  (intertwined patch, committed all together)
+	  <http://lists.danga.com/pipermail/memcached/2006-May/002164.html>
+	  or see svn commit logs
+
 2006-04-30
 	* River Tarnell:  autoconf work for Solaris 10.  Brad:
 	merge and verify it works on Nexenta.

Modified: trunk/server/TODO
===================================================================
--- trunk/server/TODO	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/TODO	2006-09-03 03:18:26 UTC (rev 320)
@@ -1,3 +1,20 @@
+* bug as shown with netcat (w/ small 16 byte object reproduces)
+
+>>I've done the following script to check that memcached has a key in it's
+>>inside, and thus know that it's working correctly:
+>>echo -e "get is_ok\r\nquit\r\n" | netcat $host $ip
+>>
+>>and I find that sometimes it returns the VALUE in it's inside, but other
+>>not.
+
+* namespaces
+
+* binary get protocol
+
+* refresh/touch command.
+
+* finer granularity of time for flush_all/delete, or generation number.
+
 * slab class reassignment still buggy and can crash.  once that's
   stable, server should re-assign pages every 60 seconds or so
   to keep all classes roughly equal.  [Update: fixed now?, but 

Modified: trunk/server/configure.ac
===================================================================
--- trunk/server/configure.ac	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/configure.ac	2006-09-03 03:18:26 UTC (rev 320)
@@ -1,5 +1,5 @@
 AC_PREREQ(2.52)
-AC_INIT(memcached, 1.1.13-pre2, brad at danga.com)
+AC_INIT(memcached, 1.2.0-rc1, brad at danga.com)
 AC_CANONICAL_SYSTEM
 AC_CONFIG_SRCDIR(memcached.c)
 AM_INIT_AUTOMAKE(AC_PACKAGE_NAME, AC_PACKAGE_VERSION)

Modified: trunk/server/doc/memcached.1
===================================================================
--- trunk/server/doc/memcached.1	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/doc/memcached.1	2006-09-03 03:18:26 UTC (rev 320)
@@ -43,8 +43,11 @@
 suggestions.
 .TP
 .B \-p <num> 
-Listen on port <num>, the default is port 11211.
+Listen on TCP port <num>, the default is port 11211.
 .TP
+.B \-U <num> 
+Listen on UDP port <num>, the default is port 11211.
+.TP
 .B \-M
 Disable automatic removal of items from the cache when out of memory.
 Additions will not be possible until adequate space is freed up.
@@ -52,6 +55,20 @@
 .B \-r
 Raise the core file size limit to the maximum allowable.
 .TP
+.B \-f <factor>
+Use <factor> as the multiplier for computing the sizes of memory chunks that
+items are stored in. A lower value may result in less wasted memory depending
+on the total amount of memory available and the distribution of item sizes.
+The default is 1.25.
+.TP
+.B \-s <size>
+Allocate a minimum of <size> bytes for the item key, value, and flags. The
+default is 48. If you have a lot of small keys and values, you can get a
+significant memory efficiency gain with a lower value. If you use a high
+chunk growth factor (-f option), on the other hand, you may want to increase
+the size to allow a bigger percentage of your items to fit in the most densely
+packed (smallest) chunks.
+.TP
 .B \-h
 Show the version of memcached and a summary of options.
 .TP

Modified: trunk/server/doc/protocol.txt
===================================================================
--- trunk/server/doc/protocol.txt	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/doc/protocol.txt	2006-09-03 03:18:26 UTC (rev 320)
@@ -1,8 +1,9 @@
 Protocol
 --------
 
-Clients of memcached communicate with server through TCP
-connections. A given running memcached server listens on some
+Clients of memcached communicate with server through TCP connections.
+(A UDP interface is also available; details are below under "UDP
+protocol.") A given running memcached server listens on some
 (configurable) port; clients connect to that port, send commands to
 the server, read responses, and eventually close the connection.
 
@@ -388,3 +389,45 @@
 Upon receiving this command, the server closes the
 connection. However, the client may also simply close the connection
 when it no longer needs it, without issuing this command.
+
+
+UDP protocol
+------------
+
+For very large installations where the number of clients is high enough
+that the number of TCP connections causes scaling difficulties, there is
+also a UDP-based interface. The UDP interface does not provide guaranteed
+delivery, so should only be used for operations that aren't required to
+succeed; typically it is used for "get" requests where a missing or
+incomplete response can simply be treated as a cache miss.
+
+Each UDP datagram contains a simple frame header, followed by data in the
+same format as the TCP protocol described above. In the current
+implementation, requests must be contained in a single UDP datagram, but
+responses may span several datagrams. (The only common requests that would
+span multiple datagrams are huge multi-key "get" requests and "set"
+requests, both of which are more suitable to TCP transport for reliability
+reasons anyway.)
+
+The frame header is 8 bytes long, as follows (all values are 16-bit integers
+in network byte order, high byte first):
+
+0-1 Request ID
+2-3 Sequence number
+4-5 Total number of datagrams in this message
+6-7 Reserved for future use; must be 0
+
+The request ID is supplied by the client. Typically it will be a
+monotonically increasing value starting from a random seed, but the client
+is free to use whatever request IDs it likes. The server's response will
+contain the same ID as the incoming request. The client uses the request ID
+to differentiate between responses to outstanding requests if there are
+several pending from the same server; any datagrams with an unknown request
+ID are probably delayed responses to an earlier request and should be
+discarded.
+
+The sequence number ranges from 0 to n-1, where n is the total number of
+datagrams in the message. The client should concatenate the payloads of the
+datagrams for a given response in sequence number order; the resulting byte
+stream will contain a complete response in the same format as the TCP
+protocol (including terminating \r\n sequences).

Modified: trunk/server/items.c
===================================================================
--- trunk/server/items.c	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/items.c	2006-09-03 03:18:26 UTC (rev 320)
@@ -21,12 +21,7 @@
 #include "memcached.h"
 
 
-/* 
- * NOTE: we assume here for simplicity that slab ids are <=32. That's true in 
- * the powers-of-2 implementation, but if that changes this should be changed too
- */
-
-#define LARGEST_ID 32
+#define LARGEST_ID 255
 static item *heads[LARGEST_ID];
 static item *tails[LARGEST_ID];
 unsigned int sizes[LARGEST_ID];
@@ -41,14 +36,31 @@
 }
 
 
-item *item_alloc(char *key, int flags, time_t exptime, int nbytes) {
-    int ntotal, len;
+/*
+ * Generates the variable-sized part of the header for an object.
+ *
+ * suffix  - Buffer for the "VALUE" line suffix (flags, size).
+ * nsuffix - The length of the suffix is stored here.
+ * keylen  - The length of the key plus any padding required to word-align the
+ *           "VALUE" suffix (which is done to speed up copying.)
+ *
+ * Returns the total size of the header.
+ */
+int item_make_header(char *key, int flags, int nbytes,
+                     char *suffix, int *nsuffix, int *keylen) {
+    *keylen = strlen(key) + 1; if(*keylen % 4) *keylen += 4 - (*keylen % 4);
+    *nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2);
+    return sizeof(item) + *keylen + *nsuffix + nbytes;
+}
+ 
+item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
+    int nsuffix, ntotal, len;
     item *it;
     unsigned int id;
+    char suffix[40];
 
-    len = strlen(key) + 1; if(len % 4) len += 4 - (len % 4);
-    ntotal = sizeof(item) + len + nbytes;
-    
+    ntotal = item_make_header(key, flags, nbytes, suffix, &nsuffix, &len);
+ 
     id = slabs_clsid(ntotal);
     if (id == 0)
         return 0;
@@ -97,7 +109,8 @@
     it->nbytes = nbytes;
     strcpy(ITEM_key(it), key);
     it->exptime = exptime;
-    it->flags = flags;
+    memcpy(ITEM_suffix(it), suffix, nsuffix);
+    it->nsuffix = nsuffix;
     return it;
 }
 
@@ -114,6 +127,18 @@
     slabs_free(it, ntotal);
 }
 
+/*
+ * Returns true if an item will fit in the cache (its size does not exceed
+ * the maximum for a cache entry.)
+ */
+int item_size_ok(char *key, int flags, int nbytes) {
+    char prefix[40];
+    int keylen, nsuffix;
+
+    return slabs_clsid(item_make_header(key, flags, nbytes,
+                                        prefix, &nsuffix, &keylen)) != 0;
+}
+
 void item_link_q(item *it) { /* item is the new head */
     item **head, **tail;
     assert(it->slabs_clsid <= LARGEST_ID);
@@ -159,7 +184,7 @@
     assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
     assert(it->nbytes < 1048576);
     it->it_flags |= ITEM_LINKED;
-    it->time = time(0);
+    it->time = current_time;
     assoc_insert(ITEM_key(it), it);
 
     stats.curr_bytes += ITEM_ntotal(it);
@@ -195,7 +220,7 @@
     assert((it->it_flags & ITEM_SLABBED) == 0);
 
     item_unlink_q(it);
-    it->time = time(0);
+    it->time = current_time;
     item_link_q(it);
 }
 
@@ -224,7 +249,7 @@
     bufcurr = 0;
 
     while (it && (!limit || shown < limit)) {
-        len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time);
+        len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time + stats.started);
         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
             break;
         strcpy(buffer + bufcurr, temp);
@@ -243,7 +268,7 @@
 void item_stats(char *buffer, int buflen) {
     int i;
     char *bufcurr = buffer;
-    time_t now = time(0);
+    rel_time_t now = current_time;
 
     if (buflen < 4096) {
         strcpy(buffer, "SERVER_ERROR out of memory");
@@ -252,7 +277,7 @@
 
     for (i=0; i<LARGEST_ID; i++) {
         if (tails[i])
-            bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %lu\r\n", 
+            bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %u\r\n", 
                                i, sizes[i], i, now - tails[i]->time);
     }
     strcpy(bufcurr, "END");
@@ -262,7 +287,7 @@
 /* dumps out a list of objects of each size, with granularity of 32 bytes */
 char* item_stats_sizes(int *bytes) {
     int num_buckets = 32768;   /* max 1MB object, divided into 32 bytes size buckets */
-    unsigned int *histogram = (int*) malloc(num_buckets * sizeof(int));
+    unsigned int *histogram = (unsigned int*) malloc(num_buckets * sizeof(int));
     char *buf = (char*) malloc(1024*1024*2*sizeof(char));
     int i;
 

Modified: trunk/server/memcached.c
===================================================================
--- trunk/server/memcached.c	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/memcached.c	2006-09-03 03:18:26 UTC (rev 320)
@@ -21,6 +21,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <sys/signal.h>
 #include <sys/resource.h>
 /* some POSIX systems need the following definition
@@ -28,6 +29,10 @@
 #ifndef _P1003_1B_VISIBLE
 #define _P1003_1B_VISIBLE
 #endif
+/* need this to get IOV_MAX on some platforms. */
+#ifndef __need_IOV_MAX
+#define __need_IOV_MAX
+#endif
 #include <pwd.h>
 #include <sys/mman.h>
 #include <fcntl.h>
@@ -42,6 +47,7 @@
 #include <time.h>
 #include <event.h>
 #include <assert.h>
+#include <limits.h>
 
 #ifdef HAVE_MALLOC_H
 #include <malloc.h>
@@ -56,21 +62,23 @@
 static int delcurr;
 static int deltotal;
 
+#define TRANSMIT_COMPLETE   0
+#define TRANSMIT_INCOMPLETE 1
+#define TRANSMIT_SOFT_ERROR 2
+#define TRANSMIT_HARD_ERROR 3
+
 int *buckets = 0; /* bucket->generation array for a managed instance */
 
-time_t realtime(time_t exptime) {
-    time_t now;
-
+#define REALTIME_MAXDELTA 60*60*24*30
+rel_time_t realtime(time_t exptime) {
     /* no. of seconds in 30 days - largest possible delta exptime */
-    #define REALTIME_MAXDELTA 60*60*24*30
 
     if (exptime == 0) return 0; /* 0 means never expire */
 
     if (exptime > REALTIME_MAXDELTA)
-        return exptime;
+        return (rel_time_t) (exptime - stats.started);
     else {
-        now = time(0);
-        return exptime + now;
+        return (rel_time_t) (exptime + current_time);
     }
 }
 
@@ -80,7 +88,6 @@
     stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
     stats.started = time(0);
 }
-
 void stats_reset(void) {
     stats.total_items = stats.total_conns = 0;
     stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
@@ -89,27 +96,59 @@
 
 void settings_init(void) {
     settings.port = 11211;
+    settings.udpport = 11211;
     settings.interface.s_addr = htonl(INADDR_ANY);
     settings.maxbytes = 64*1024*1024; /* default is 64MB */
     settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
     settings.verbose = 0;
     settings.oldest_live = 0;
     settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
+    settings.socketpath = NULL;       /* by default, not using a unix socket */
     settings.managed = 0;
+    settings.factor = 1.25;
+    settings.chunk_size = 48;         /* space for a modest key and value */
 }
 
+/*
+ * Adds a message header to a connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int add_msghdr(conn *c)
+{
+    struct msghdr *msg;
+
+    if (c->msgsize == c->msgused) {
+        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
+        if (! msg)
+            return -1;
+        c->msglist = msg;
+        c->msgsize *= 2;
+    }
+
+    msg = c->msglist + c->msgused;
+    msg->msg_iov = &c->iov[c->iovused];
+    msg->msg_iovlen = 0;
+    msg->msg_name = &c->request_addr;
+    msg->msg_namelen = c->request_addr_size;
+    msg->msg_control = 0;
+    msg->msg_controllen = 0;
+    msg->msg_flags = 0;
+    c->msgbytes = 0;
+    c->msgused++;
+
+    if (c->udp) {
+        /* Leave room for the UDP header, which we'll fill in later. */
+        return add_iov(c, NULL, UDP_HEADER_SIZE);
+    }
+
+    return 0;
+}
+
 conn **freeconns;
 int freetotal;
 int freecurr;
 
-void set_cork (conn *c, int val) {
-    if (c->is_corked == val) return;
-    c->is_corked = val;
-#ifdef TCP_NOPUSH
-    setsockopt(c->sfd, IPPROTO_TCP, TCP_NOPUSH, &val, sizeof(val));
-#endif
-}
-
 void conn_init(void) {
     freetotal = 200;
     freecurr = 0;
@@ -117,7 +156,8 @@
     return;
 }
 
-conn *conn_new(int sfd, int init_state, int event_flags) {
+conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
+                int is_udp) {
     conn *c;
 
     /* do we have a free conn structure from a previous close? */
@@ -130,42 +170,63 @@
         }
         c->rbuf = c->wbuf = 0;
         c->ilist = 0;
+        c->iov = 0;
+        c->msglist = 0;
+        c->hdrbuf = 0;
 
-        c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);
-        c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);
-        c->ilist = (item **) malloc(sizeof(item *)*200);
+        c->rsize = read_buffer_size;
+        c->wsize = DATA_BUFFER_SIZE;
+        c->isize = ITEM_LIST_INITIAL;
+        c->iovsize = IOV_LIST_INITIAL;
+        c->msgsize = MSG_LIST_INITIAL;
+        c->hdrsize = 0;
 
-        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
+        c->rbuf = (char *) malloc(c->rsize);
+        c->wbuf = (char *) malloc(c->wsize);
+        c->ilist = (item **) malloc(sizeof(item *) * c->isize);
+        c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
+        c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize);
+
+        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
+                c->msglist == 0) {
             if (c->rbuf != 0) free(c->rbuf);
             if (c->wbuf != 0) free(c->wbuf);
             if (c->ilist !=0) free(c->ilist);
+            if (c->iov != 0) free(c->iov);
+            if (c->msglist != 0) free(c->msglist);
             free(c);
             perror("malloc()");
             return 0;
         }
+
         c->rsize = c->wsize = DATA_BUFFER_SIZE;
-        c->rcurr = c->rbuf;
-        c->isize = 200;
+        c->isize = 200;   /* TODO: another instance of '200'.  must kill all these */
+
         stats.conn_structs++;
     }
 
     if (settings.verbose > 1) {
         if (init_state == conn_listening)
             fprintf(stderr, "<%d server listening\n", sfd);
+        else if (is_udp)
+            fprintf(stderr, "<%d server listening (udp)\n", sfd);
         else
             fprintf(stderr, "<%d new client connection\n", sfd);
     }
 
     c->sfd = sfd;
+    c->udp = is_udp;
     c->state = init_state;
     c->rlbytes = 0;
     c->rbytes = c->wbytes = 0;
     c->wcurr = c->wbuf;
+    c->rcurr = c->rbuf;
     c->ritem = 0;
-    c->icurr = c->ilist; 
+    c->icurr = c->ilist;
     c->ileft = 0;
-    c->iptr = c->ibuf;
-    c->ibytes = 0;
+    c->iovused = 0;
+    c->msgcurr = 0;
+    c->msgused = 0;
 
     c->write_and_go = conn_read;
     c->write_and_free = 0;
@@ -173,8 +234,6 @@
     c->bucket = -1;
     c->gen = 0;
 
-    c->is_corked = 0;
-
     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
     c->ev_flags = event_flags;
 
@@ -182,9 +241,13 @@
         if (freecurr < freetotal) {
             freeconns[freecurr++] = c;
         } else {
+            if (c->hdrbuf)
+                free (c->hdrbuf);
+            free (c->msglist);
             free (c->rbuf);
             free (c->wbuf);
             free (c->ilist);
+            free (c->iov);
             free (c);
         }
         return 0;
@@ -196,17 +259,10 @@
     return c;
 }
 
-void conn_close(conn *c) {
-    /* delete the event, the socket and the conn */
-    event_del(&c->event);
-
-    if (settings.verbose > 1)
-        fprintf(stderr, "<%d connection closed.\n", c->sfd);
-
-    close(c->sfd);
-
+void conn_cleanup(conn *c) {
     if (c->item) {
         item_free(c->item);
+        c->item = 0;
     }
 
     if (c->ileft) {
@@ -217,10 +273,46 @@
 
     if (c->write_and_free) {
         free(c->write_and_free);
+        c->write_and_free = 0;
     }
+}
 
-    /* if we have enough space in the free connections array, put the structure there */
-    if (freecurr < freetotal) {
+/*
+ * Frees a connection.
+ */
+static void conn_free(conn *c) {
+    if (c) {
+        if (c->hdrbuf)
+            free(c->hdrbuf);
+        if (c->msglist)
+            free(c->msglist);
+        if (c->rbuf)
+            free(c->rbuf);
+        if (c->wbuf)
+            free(c->wbuf);
+        if (c->ilist)
+            free(c->ilist);
+        if (c->iov)
+            free(c->iov);
+        free(c);
+    }
+}
+
+void conn_close(conn *c) {
+    /* delete the event, the socket and the conn */
+    event_del(&c->event);
+
+    if (settings.verbose > 1)
+        fprintf(stderr, "<%d connection closed.\n", c->sfd);
+
+    close(c->sfd);
+    conn_cleanup(c);
+
+    /* if the connection has big buffers, just free it */
+    if (c->rsize > READ_BUFFER_HIGHWAT) {
+        conn_free(c);
+    } else if (freecurr < freetotal) {
+        /* if we have enough space in the free connections array, put the structure there */
         freeconns[freecurr++] = c;
     } else {
         /* try to enlarge free connections array */
@@ -230,10 +322,7 @@
             freeconns = new_freeconns;
             freeconns[freecurr++] = c;
         } else {
-            free(c->rbuf);
-            free(c->wbuf);
-            free(c->ilist);
-            free(c);
+            conn_free(c);
         }
     }
 
@@ -242,6 +331,184 @@
     return;
 }
 
+/*
+ * Reallocates memory and updates a buffer size if successful.
+ */
+int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
+    void *newbuf = realloc(*orig, newsize * bytes_per_item);
+    if (newbuf) {
+        *orig = newbuf;
+        *size = newsize;
+       return 1;
+    }
+    return 0;
+}
+
+ /*
+ * Shrinks a connection's buffers if they're too big.  This prevents
+ * periodic large "get" requests from permanently chewing lots of server
+ * memory.
+ *
+ * This should only be called in between requests since it can wipe output
+ * buffers!
+ */
+void conn_shrink(conn *c) {
+    if (c->udp)
+        return;
+
+    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
+       do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
+    }
+
+    if (c->isize > ITEM_LIST_HIGHWAT) {
+        do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
+    }
+
+    if (c->msgsize > MSG_LIST_HIGHWAT) {
+        do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
+    }
+
+    if (c->iovsize > IOV_LIST_HIGHWAT) {
+        do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
+    }
+}
+
+/*
+ * Sets a connection's current state in the state machine. Any special
+ * processing that needs to happen on certain state transitions can
+ * happen here.
+ */
+void conn_set_state(conn *c, int state) {
+    if (state != c->state) {
+        if (state == conn_read) {
+            conn_shrink(c);
+        }
+        c->state = state;
+    }
+}
+
+
+/*
+ * Ensures that there is room for another struct iovec in a connection's
+ * iov list.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int ensure_iov_space(conn *c) {
+    if (c->iovused >= c->iovsize) {
+        int i, iovnum;
+        struct iovec *new_iov = (struct iovec *) realloc(c->iov,
+                                (c->iovsize * 2) * sizeof(struct iovec));
+        if (! new_iov)
+            return -1;
+        c->iov = new_iov;
+        c->iovsize *= 2;
+
+        /* Point all the msghdr structures at the new list. */
+        for (i = 0, iovnum = 0; i < c->msgused; i++) {
+            c->msglist[i].msg_iov = &c->iov[iovnum];
+            iovnum += c->msglist[i].msg_iovlen;
+        }
+    }
+
+    return 0;
+}
+
+
+/*
+ * Adds data to the list of pending data that will be written out to a
+ * connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+
+int add_iov(conn *c, const void *buf, int len) {
+    struct msghdr *m;
+    int i;
+    int leftover;
+    int limit_to_mtu;
+
+    do {
+        m = &c->msglist[c->msgused - 1];
+
+        /*
+         * Limit UDP packets, and the first payloads of TCP replies, to
+         * UDP_MAX_PAYLOAD_SIZE bytes.
+         */
+        limit_to_mtu = c->udp || (1 == c->msgused);
+
+        /* We may need to start a new msghdr if this one is full. */
+        if (m->msg_iovlen == IOV_MAX ||
+                limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) {
+            add_msghdr(c);
+            m = &c->msglist[c->msgused - 1];
+        }
+
+        if (ensure_iov_space(c))
+            return -1;
+
+        /* If the fragment is too big to fit in the datagram, split it up */
+        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
+            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
+            len -= leftover;
+        } else {
+            leftover = 0;
+        }
+
+        m = &c->msglist[c->msgused - 1];
+        m->msg_iov[m->msg_iovlen].iov_base = buf;
+        m->msg_iov[m->msg_iovlen].iov_len = len;
+
+        c->msgbytes += len;
+        c->iovused++;
+        m->msg_iovlen++;
+
+        buf = ((char *)buf) + len;
+        len = leftover;
+    } while (leftover > 0);
+
+    return 0;
+}
+
+
+/*
+ * Constructs a set of UDP headers and attaches them to the outgoing messages.
+ */
+int build_udp_headers(conn *c) {
+    int i;
+    unsigned char *hdr;
+
+    if (c->msgused > c->hdrsize) {
+        void *new_hdrbuf;
+        if (c->hdrbuf)
+            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
+        else
+            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
+        if (! new_hdrbuf)
+            return -1;
+        c->hdrbuf = (unsigned char *) new_hdrbuf;
+        c->hdrsize = c->msgused * 2;
+    }
+
+    hdr = c->hdrbuf;
+    for (i = 0; i < c->msgused; i++) {
+        c->msglist[i].msg_iov[0].iov_base = hdr;
+        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
+        *hdr++ = c->request_id / 256;
+        *hdr++ = c->request_id % 256;
+        *hdr++ = i / 256;
+        *hdr++ = i % 256;
+        *hdr++ = c->msgused / 256;
+        *hdr++ = c->msgused % 256;
+        *hdr++ = 0;
+        *hdr++ = 0;
+        assert(hdr == c->msglist[i].iov[0].iov_base + UDP_HEADER_SIZE);
+    }
+
+    return 0;
+}
+
+
 void out_string(conn *c, char *str) {
     int len;
 
@@ -256,11 +523,11 @@
     }
 
     strcpy(c->wbuf, str);
-    strcat(c->wbuf, "\r\n");
+    strcpy(c->wbuf + len, "\r\n");
     c->wbytes = len + 2;
     c->wcurr = c->wbuf;
 
-    c->state = conn_write;
+    conn_set_state(c, conn_write);
     c->write_and_go = conn_read;
     return;
 }
@@ -274,7 +541,7 @@
     item *it = c->item;
     int comm = c->item_comm;
     item *old_it;
-    time_t now = time(0);
+    rel_time_t now = current_time;
 
     stats.set_cmds++;
 
@@ -328,7 +595,7 @@
 }
 
 void process_stat(conn *c, char *command) {
-    time_t now = time(0);
+    rel_time_t now = current_time;
 
     if (strcmp(command, "stats") == 0) {
         char temp[1024];
@@ -339,8 +606,8 @@
         getrusage(RUSAGE_SELF, &usage);
 
         pos += sprintf(pos, "STAT pid %u\r\n", pid);
-        pos += sprintf(pos, "STAT uptime %lu\r\n", now - stats.started);
-        pos += sprintf(pos, "STAT time %ld\r\n", now);
+        pos += sprintf(pos, "STAT uptime %u\r\n", now);
+        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
         pos += sprintf(pos, "STAT version " VERSION "\r\n");
         pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
         pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
@@ -350,13 +617,13 @@
         pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
         pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
         pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
-        pos += sprintf(pos, "STAT cmd_get %u\r\n", stats.get_cmds);
-        pos += sprintf(pos, "STAT cmd_set %u\r\n", stats.set_cmds);
-        pos += sprintf(pos, "STAT get_hits %u\r\n", stats.get_hits);
-        pos += sprintf(pos, "STAT get_misses %u\r\n", stats.get_misses);
+        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
+        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
+        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
+        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
         pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
         pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
-        pos += sprintf(pos, "STAT limit_maxbytes %u\r\n", settings.maxbytes);
+        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long) settings.maxbytes);
         pos += sprintf(pos, "END");
         out_string(c, temp);
         return;
@@ -426,7 +693,7 @@
         c->write_and_free=wbuf;
         c->wcurr=wbuf;
         c->wbytes = res + 6;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         close(fd);
         return;
@@ -450,7 +717,7 @@
         c->write_and_free = buf;
         c->wcurr = buf;
         c->wbytes = bytes;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         return;
     }
@@ -465,7 +732,7 @@
         c->write_and_free = buf;
         c->wcurr = buf;
         c->wbytes = bytes;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         return;
     }
@@ -488,7 +755,7 @@
         c->write_and_free = buf;
         c->wcurr = buf;
         c->wbytes = bytes;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         return;
     }
@@ -509,9 +776,13 @@
     if (settings.verbose > 1)
         fprintf(stderr, "<%d %s\n", c->sfd, command);
 
-    /* All incoming commands will require a response, so we cork at the beginning,
-       and uncork at the very end (usually by means of out_string)  */
-    set_cork(c, 1);
+    c->msgcurr = 0;
+    c->msgused = 0;
+    c->iovused = 0;
+    if (add_msghdr(c)) {
+        out_string(c, "SERVER_ERROR out of memory");
+        return;
+    }
 
     if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) || 
         (strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
@@ -543,9 +814,13 @@
         }
 
         expire = realtime(expire);
-        it = item_alloc(key, flags, expire, len+2);
+        it = item_alloc(key, flags, realtime(expire), len+2);
+
         if (it == 0) {
-            out_string(c, "SERVER_ERROR out of memory");
+            if (! item_size_ok(key, flags, len + 2))
+                out_string(c, "SERVER_ERROR object too large for cache");
+            else
+                out_string(c, "SERVER_ERROR out of memory");
             /* swallow the data line */
             c->write_and_go = conn_swallow;
             c->sbytes = len+2;
@@ -556,7 +831,7 @@
         c->item = it;
         c->ritem = ITEM_data(it);
         c->rlbytes = it->nbytes;
-        c->state = conn_nread;
+        conn_set_state(c, conn_nread);
         return;
     }
 
@@ -569,7 +844,7 @@
         char key[251];
         int res;
         char *ptr;
-        time_t now = time(0);
+        rel_time_t now = current_time;
 
         res = sscanf(command, "%*s %250s %u\n", key, &delta);
         if (res!=2 || strlen(key)==0 ) {
@@ -605,7 +880,7 @@
         }
 
         ptr = ITEM_data(it);
-        while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;
+        while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;    // BUG: can't be true
         
         value = atoi(ptr);
 
@@ -620,7 +895,7 @@
         res = strlen(temp);
         if (res + 2 > it->nbytes) { /* need to realloc */
             item *new_it;
-            new_it = item_alloc(ITEM_key(it), it->flags, it->exptime, res + 2 );
+            new_it = item_alloc(ITEM_key(it), atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
             if (new_it == 0) {
                 out_string(c, "SERVER_ERROR out of memory");
                 return;
@@ -647,7 +922,7 @@
         int next;
         int i = 0;
         item *it;
-        time_t now = time(0);
+        rel_time_t now = current_time;
     get:
 
         if (settings.managed) {
@@ -688,6 +963,24 @@
                         c->ilist = new_list;
                     } else break;
                 }
+
+                /*
+                 * Construct the response. Each hit adds three elements to the
+                 * outgoing data list:
+                 *   "VALUE "
+                 *   key
+                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
+                 */
+                /* TODO: can we avoid the strlen() func call and cache that in wasted byte in item struct? */
+                if (add_iov(c, "VALUE ", 6) ||
+                    add_iov(c, ITEM_key(it), strlen(ITEM_key(it))) ||
+                    add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes))
+                {
+                    break;
+                }
+                if (settings.verbose > 1)
+                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
                 stats.get_hits++;
                 it->refcount++;
                 item_update(it);
@@ -695,17 +988,22 @@
                 i++;
             } else stats.get_misses++;
         }
+
         c->icurr = c->ilist;
         c->ileft = i;
-        if (c->ileft) {
-            c->ipart = 0;
-            c->state = conn_mwrite;
-            c->ibytes = 0;
-            return;
-        } else {
-            out_string(c, "END");
-            return;
+
+        if (settings.verbose > 1)
+            fprintf(stderr, ">%d END\n", c->sfd);
+        add_iov(c, "END\r\n", 5);
+
+        if (c->udp && build_udp_headers(c)) {
+            out_string(c, "SERVER_ERROR out of memory");
         }
+        else {
+            conn_set_state(c, conn_mwrite);
+            c->msgcurr = 0;
+        }
+        return;
     }
 
     if (strncmp(command, "delete ", 7) == 0) {
@@ -755,11 +1053,9 @@
             }
         }
             
-        exptime = realtime(exptime);
-
         it->refcount++;
         /* use its expiration time as its deletion time now */
-        it->exptime = exptime;
+        it->exptime = realtime(exptime);
         it->it_flags |= ITEM_DELETED;
         todelete[delcurr++] = it;
         out_string(c, "DELETED");
@@ -823,10 +1119,7 @@
                 c->bucket = bucket;
                 c->gen = gen;
             }
-            c->state = conn_read;
-            /* normally conn_write uncorks the connection, but this
-               is the only time we accept a command w/o writing anything */
-            set_cork(c,0); 
+            conn_set_state(c, conn_read);
             return;
         } else {
             out_string(c, "CLIENT_ERROR bad format");
@@ -844,12 +1137,12 @@
         int res;
 
         if (strcmp(command, "flush_all") == 0) {
-            settings.oldest_live = time(0);
+            settings.oldest_live = current_time;
             out_string(c, "OK");
             return;
         }
 
-        res = sscanf(command, "%*s %ld", &exptime); 
+        res = sscanf(command, "%*s %ld", &exptime);
         if (res != 1) {
             out_string(c, "ERROR");
             return;
@@ -866,11 +1159,12 @@
     }
 
     if (strcmp(command, "quit") == 0) {
-        c->state = conn_closing;
+        conn_set_state(c, conn_closing);
         return;
     }
 
     if (strncmp(command, "slabs reassign ", 15) == 0) {
+#ifdef ALLOW_SLABS_REASSIGN
         int src, dst;
         char *start = command+15;
         if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) {
@@ -889,6 +1183,9 @@
             }
         }
         out_string(c, "CLIENT_ERROR bogus command");
+#else
+        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
+#endif
         return;
     }
     
@@ -922,6 +1219,39 @@
 }
 
 /*
+ * read a UDP request.
+ * return 0 if there's nothing to read.
+ */
+int try_read_udp(conn *c) {
+    int res;
+
+    c->request_addr_size = sizeof(c->request_addr);
+    res = recvfrom(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes,
+                   0, &c->request_addr, &c->request_addr_size);
+    if (res > 8) {
+        unsigned char *buf = (unsigned char *)c->rbuf + c->rbytes;
+        stats.bytes_read += res;
+        
+        /* Beginning of UDP packet is the request ID; save it. */
+        c->request_id = buf[0] * 256 + buf[1];
+
+        /* If this is a multi-packet request, drop it. */
+        if (buf[4] != 0 || buf[5] != 1) {
+            out_string(c, "SERVER_ERROR multi-packet request not supported");
+            return 0;
+        }
+
+        /* Don't care about any of the rest of the header. */
+        res -= 8;
+        memmove(c->rbuf, c->rbuf + 8, res);
+
+        c->rbytes += res;
+        return 1;
+    }
+    return 0;
+}
+
+/*
  * read from network as much as we can, handle buffer overflow and connection
  * close. 
  * before reading, move the remaining incomplete fragment of a command
@@ -952,6 +1282,16 @@
             c->rcurr  = c->rbuf = new_rbuf;
             c->rsize *= 2;
         }
+
+        /* unix socket mode doesn't need this, so zeroed out.  but why
+         * is this done for every command?  presumably for UDP
+         * mode.  */
+        if (c->request_addr.sa_family != AF_UNSPEC) {
+            c->request_addr_size = sizeof(c->request_addr);
+        } else {
+            c->request_addr_size = 0;
+        }
+
         res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
         if (res > 0) {
             stats.bytes_read += res;
@@ -961,7 +1301,7 @@
         }
         if (res == 0) {
             /* connection closed */
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
             return 1;
         }
         if (res == -1) {
@@ -981,7 +1321,70 @@
     if (event_add(&c->event, 0) == -1) return 0;
     return 1;
 }
-    
+
+/*
+ * Transmit the next chunk of data from our list of msgbuf structures.
+ *
+ * Returns:
+ *   TRANSMIT_COMPLETE   All done writing.
+ *   TRANSMIT_INCOMPLETE More data remaining to write.
+ *   TRANSMIT_SOFT_ERROR Can't write any more right now.
+ *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
+ */
+int transmit(conn *c) {
+    int res;
+
+    if (c->msgcurr < c->msgused &&
+            c->msglist[c->msgcurr].msg_iovlen == 0) {
+        /* Finished writing the current msg; advance to the next. */
+        c->msgcurr++;
+    }
+    if (c->msgcurr < c->msgused) {
+        struct msghdr *m = &c->msglist[c->msgcurr];
+        res = sendmsg(c->sfd, m, 0);
+        if (res > 0) {
+            stats.bytes_written += res;
+
+            /* We've written some of the data. Remove the completed
+               iovec entries from the list of pending writes. */
+            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
+                res -= m->msg_iov->iov_len;
+                m->msg_iovlen--;
+                m->msg_iov++;
+            }
+
+            /* Might have written just part of the last iovec entry;
+               adjust it so the next write will do the rest. */
+            if (res > 0) {
+                m->msg_iov->iov_base += res;
+                m->msg_iov->iov_len -= res;
+            }
+            return TRANSMIT_INCOMPLETE;
+        }
+        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+                if (settings.verbose > 0)
+                    fprintf(stderr, "Couldn't update event\n");
+                conn_set_state(c, conn_closing);
+                return TRANSMIT_HARD_ERROR;
+            }
+            return TRANSMIT_SOFT_ERROR;
+        }
+        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
+           we have a real error, on which we close the connection */
+        if (settings.verbose > 0)
+            perror("Failed to write, and not due to blocking");
+
+        if (c->udp)
+            conn_set_state(c, conn_read);
+        else
+            conn_set_state(c, conn_closing);
+        return TRANSMIT_HARD_ERROR;
+    } else {
+        return TRANSMIT_COMPLETE;
+    }
+}
+
 void drive_machine(conn *c) {
 
     int exit = 0;
@@ -992,7 +1395,6 @@
     int res;
 
     while (!exit) {
-        /* printf("state %d\n", c->state);*/
         switch(c->state) {
         case conn_listening:
             addrlen = sizeof(addr);
@@ -1010,8 +1412,9 @@
                 perror("setting O_NONBLOCK");
                 close(sfd);
                 break;
-            }            
-            newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
+            }
+            newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+                            DATA_BUFFER_SIZE, 0);
             if (!newc) {
                 if (settings.verbose > 0)
                     fprintf(stderr, "couldn't create new connection\n");
@@ -1025,14 +1428,14 @@
             if (try_read_command(c)) {
                 continue;
             }
-            if (try_read_network(c)) {
+            if (c->udp ? try_read_udp(c) : try_read_network(c)) {
                 continue;
             }
             /* we have no command line and no data to read from network */
             if (!update_event(c, EV_READ | EV_PERSIST)) {
                 if (settings.verbose > 0)
                     fprintf(stderr, "Couldn't update event\n");
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 break;
             }
             exit = 1;
@@ -1064,14 +1467,14 @@
                 break;
             }
             if (res == 0) { /* end of stream */
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 break;
             }
             if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
                 if (!update_event(c, EV_READ | EV_PERSIST)) {
                     if (settings.verbose > 0) 
                         fprintf(stderr, "Couldn't update event\n");
-                    c->state = conn_closing;
+                    conn_set_state(c, conn_closing);
                     break;
                 }
                 exit = 1;
@@ -1080,13 +1483,13 @@
             /* otherwise we have a real error, on which we close the connection */
             if (settings.verbose > 0)
                 fprintf(stderr, "Failed to read, and not due to blocking\n");
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
             break;
 
         case conn_swallow:
             /* we are reading sbytes and throwing them away */
             if (c->sbytes == 0) {
-                c->state = conn_read;
+                conn_set_state(c, conn_read);
                 break;
             }
 
@@ -1107,14 +1510,14 @@
                 break;
             }
             if (res == 0) { /* end of stream */
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 break;
             }
             if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
                 if (!update_event(c, EV_READ | EV_PERSIST)) {
                     if (settings.verbose > 0)
                         fprintf(stderr, "Couldn't update event\n");
-                    c->state = conn_closing;
+                    conn_set_state(c, conn_closing);
                     break;
                 }
                 exit = 1;
@@ -1123,147 +1526,67 @@
             /* otherwise we have a real error, on which we close the connection */
             if (settings.verbose > 0)
                 fprintf(stderr, "Failed to read, and not due to blocking\n");
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
             break;
 
         case conn_write:
-            /* we are writing wbytes bytes starting from wcurr */
-            if (c->wbytes == 0) {
-                if (c->write_and_free) {
-                    free(c->write_and_free);
-                    c->write_and_free = 0;
-                }
-                c->state = c->write_and_go;
-                if (c->state == conn_read)
-                    set_cork(c, 0);
-                break;
-            }
-            res = write(c->sfd, c->wcurr, c->wbytes);
-            if (res > 0) {
-                stats.bytes_written += res;
-                c->wcurr  += res;
-                c->wbytes -= res;
-                break;
-            }
-            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+            /*
+             * We want to write out a simple response. If we haven't already,
+             * assemble it into a msgbuf list (this will be a single-entry
+             * list for TCP or a two-entry list for UDP).
+             */
+            if (c->iovused == 0) {
+                if (add_iov(c, c->wcurr, c->wbytes) ||
+                        c->udp && build_udp_headers(c)) {
                     if (settings.verbose > 0)
-                        fprintf(stderr, "Couldn't update event\n");
-                    c->state = conn_closing;
+                        fprintf(stderr, "Couldn't build response\n");
+                    conn_set_state(c, conn_closing);
                     break;
-                }                
-                exit = 1;
-                break;
+                }
             }
-            /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
-               we have a real error, on which we close the connection */
-            if (settings.verbose > 0)
-                fprintf(stderr, "Failed to write, and not due to blocking\n");
-            c->state = conn_closing;
-            break;
+
+            /* fall through... */
+
         case conn_mwrite:
-            /* 
-             * we're writing ibytes bytes from iptr. iptr alternates between
-             * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the 
-             * current item. When we finish a chunk, we choose the next one using 
-             * ipart, which has the following semantics: 0 - start the loop, 1 - 
-             * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
-             * move to the next item and build its ibuf; 3 - we finished all items, 
-             * write "END".
-             */
-            if (c->ibytes > 0) {
-                res = write(c->sfd, c->iptr, c->ibytes);
-                if (res > 0) {
-                    stats.bytes_written += res;
-                    c->iptr += res;
-                    c->ibytes -= res;
-                    break;
-                }
-                if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
-                        if (settings.verbose > 0)
-                            fprintf(stderr, "Couldn't update event\n");
-                        c->state = conn_closing;
-                        break;
+            switch (transmit(c)) {
+            case TRANSMIT_COMPLETE:
+                if (c->state == conn_mwrite) {
+                    while (c->ileft > 0) {
+                        item *it = *(c->icurr);
+                        assert((it->it_flags & ITEM_SLABBED) == 0);
+                        item_remove(it);
+                        c->icurr++;
+                        c->ileft--;
                     }
-                    exit = 1;
-                    break;
+                    conn_set_state(c, conn_read);
+                } else if (c->state == conn_write) {
+                    if (c->write_and_free) {
+                        free(c->write_and_free);
+                        c->write_and_free = 0;
+                    }
+                    conn_set_state(c, c->write_and_go);
+                } else {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Unexpected state %d\n", c->state);
+                    conn_set_state(c, conn_closing);
                 }
-                /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
-                   we have a real error, on which we close the connection */
-                if (settings.verbose > 0)
-                    fprintf(stderr, "Failed to write, and not due to blocking\n");
-                c->state = conn_closing;
                 break;
-            } else {
-                item *it;
-                /* we finished a chunk, decide what to do next */
-                switch (c->ipart) {
-                case 1:
-                    it = *(c->icurr);
-                    assert((it->it_flags & ITEM_SLABBED) == 0);
-                    c->iptr = ITEM_data(it);
-                    if (c->binary) {
-                        c->ibytes = it->nbytes - 2;
-                    } else {
-                    c->ibytes = it->nbytes;
-                    }
-                    c->ipart = 2;
-                    break;
-                case 2:
-                    it = *(c->icurr);
-                    item_remove(it);
-                    c->ileft--;
-                    if (c->ileft <= 0) {
-                        c->ipart = 3;
-                        break;
-                    } else {
-                        c->icurr++;
-                    }
-                    /* FALL THROUGH */
-                case 0:
-                    it = *(c->icurr);
-                    assert((it->it_flags & ITEM_SLABBED) == 0);
-                    // add branch for binary mode
-                    if (c->binary) {
-                        long int key_size   = htonl(it->nkey - 3);
-                        long int value_size = htonl(it->nbytes - 2);
-                        c->ibuf[0] = 0x01; // key
-                        /* XXX NOT SAFE ON 64 BIT FOR PROTOCOL REASONS
-                           NEED TO RECODE TO USE 4 byte INTS ONLY */
-                        memcpy(c->ibuf + 1, (char *)&key_size, sizeof(key_size));
-                        memcpy(c->ibuf + 1 + 4, ITEM_key(it), it->nkey - 3);
-                        memcpy(c->ibuf + 1 + 4 + it->nkey - 3, (char *)&value_size, sizeof(value_size));
-                        c->ibytes =      1 + 4 + it->nkey - 3 + 4;
-                    } else {
-                    c->ibytes = sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
-                    }
-                    if (settings.verbose > 1)
-                        fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
-                    c->iptr = c->ibuf;
-                    c->ipart = 1;
-                    break;
-                case 3:
-                    // add branch for binary mode
-                    if (c->binary) {
-                        c->binary = 0;
 
-                        c->wbuf[0] = 0x01;
-                        c->wbytes = 1;
-                        c->wcurr = c->wbuf;
-                        c->state = conn_write;
-                        c->write_and_go = conn_read;
+            case TRANSMIT_INCOMPLETE:
+            case TRANSMIT_HARD_ERROR:
+                break;                   /* Continue in state machine. */
 
-                    } else {
-                    out_string(c, "END");
-                    }
-                    break;
-                }
+            case TRANSMIT_SOFT_ERROR:
+                exit = 1;
+                break;
             }
             break;
 
         case conn_closing:
-            conn_close(c);
+            if (c->udp)
+                conn_cleanup(c);
+            else
+                conn_close(c);
             exit = 1;
             break;
         }
@@ -1273,10 +1596,9 @@
     return;
 }
 
-
 void event_handler(int fd, short which, void *arg) {
     conn *c;
-    
+
     c = (conn *)arg;
     c->which = which;
 
@@ -1295,11 +1617,11 @@
     return;
 }
 
-int new_socket(void) {
+int new_socket(int is_udp) {
     int sfd;
     int flags;
 
-    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
+    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
         perror("socket()");
         return -1;
     }
@@ -1313,24 +1635,62 @@
     return sfd;
 }
 
-int server_socket(int port) {
+
+/*
+ * Sets a socket's send buffer size to the maximum allowed by the system.
+ */
+void maximize_sndbuf(int sfd) {
+    socklen_t intsize = sizeof(int);
+    int last_good;
+    int min, max, avg;
+    int old_size;
+
+    /* Start with the default size. */
+    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) {
+        if (settings.verbose > 0)
+            perror("getsockopt(SO_SNDBUF)");
+        return;
+    }
+
+    /* Binary-search for the real maximum. */
+    min = old_size;
+    max = MAX_SENDBUF_SIZE;
+
+    while (min <= max) {
+        avg = ((unsigned int) min + max) / 2;
+        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &avg, intsize) == 0) {
+            last_good = avg;
+            min = avg + 1;
+        } else {
+            max = avg - 1;
+        }
+    }
+
+    if (settings.verbose > 1)
+        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
+}
+
+
+int server_socket(int port, int is_udp) {
     int sfd;
     struct linger ling = {0, 0};
     struct sockaddr_in addr;
     int flags =1;
 
-    if ((sfd = new_socket()) == -1) {
+    if ((sfd = new_socket(is_udp)) == -1) {
         return -1;
     }
 
     setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
-    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
-    setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
-#if !defined(TCP_NOPUSH)
-    setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
-#endif
+    if (is_udp) {
+        maximize_sndbuf(sfd);
+    } else {
+        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
+        setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
+    }
 
-    /* 
+    /*
      * the memset call clears nonstandard fields in some impementations
      * that otherwise mess things up.
      */
@@ -1344,6 +1704,72 @@
         close(sfd);
         return -1;
     }
+    if (! is_udp && listen(sfd, 1024) == -1) {
+        perror("listen()");
+        close(sfd);
+        return -1;
+    }
+    return sfd;
+}
+
+int new_socket_unix(void) {
+    int sfd;
+    int flags;
+
+    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+        perror("socket()");
+        return -1;
+    }
+
+    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
+        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+        perror("setting O_NONBLOCK");
+        close(sfd);
+        return -1;
+    }
+    return sfd;
+}
+
+int server_socket_unix(char *path) {
+    int sfd;
+    struct linger ling = {0, 0};
+    struct sockaddr_un addr;
+    struct stat tstat;
+    int flags =1;
+
+    if (!path) {
+        return -1;
+    }
+
+    if ((sfd = new_socket_unix()) == -1) {
+        return -1;
+    }
+
+    /* 
+     * Clean up a previous socket file if we left it around
+     */
+    if (!lstat(path, &tstat)) {
+        if (S_ISSOCK(tstat.st_mode))
+            unlink(path);
+    }
+
+    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
+    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
+    setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+    /*
+     * the memset call clears nonstandard fields in some impementations
+     * that otherwise mess things up.
+     */
+    memset(&addr, 0, sizeof(addr));
+
+    addr.sun_family = AF_UNIX;
+    strcpy(addr.sun_path, path);
+    if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
+        perror("bind()");
+        close(sfd);
+        return -1;
+    }
     if (listen(sfd, 1024) == -1) {
         perror("listen()");
         close(sfd);
@@ -1352,14 +1778,46 @@
     return sfd;
 }
 
+
 /* invoke right before gdb is called, on assert */
 void pre_gdb () {
     int i = 0;
     if(l_socket) close(l_socket);
+    if(u_socket > -1) close(u_socket);
     for (i=3; i<=500; i++) close(i); /* so lame */
     kill(getpid(), SIGABRT);
 }
 
+/*
+ * We keep the current time of day in a global variable that's updated by a
+ * timer event. This saves us a bunch of time() system calls (we really only
+ * need to get the time once a second, whereas there can be tens of thousands
+ * of requests a second) and allows us to use server-start-relative timestamps
+ * rather than absolute UNIX timestamps, a space savings on systems where
+ * sizeof(time_t) > sizeof(unsigned int).
+ */
+volatile rel_time_t current_time;
+struct event clockevent;
+
+void clock_handler(int fd, short which, void *arg) {
+    struct timeval t;
+    static int initialized = 0;
+
+    if (initialized) {
+        /* only delete the event if it's actually there. */
+        evtimer_del(&clockevent);
+    } else {
+        initialized = 1;
+    }
+
+    evtimer_set(&clockevent, clock_handler, 0);
+    t.tv_sec = 1;
+    t.tv_usec = 0;
+    evtimer_add(&clockevent, &t);
+
+    current_time = (rel_time_t) (time(0) - stats.started);
+}
+
 struct event deleteevent;
 
 void delete_handler(int fd, short which, void *arg) {
@@ -1380,7 +1838,7 @@
 
     {
         int i, j=0;
-        time_t now = time(0);
+        rel_time_t now = current_time;
         for (i=0; i<delcurr; i++) {
             item *it = todelete[i];
             if (it->exptime < now) {
@@ -1394,13 +1852,12 @@
         }
         delcurr = j;
     }
-                
-    return;
 }
         
 void usage(void) {
     printf(PACKAGE " " VERSION "\n");
     printf("-p <num>      port number to listen on\n");
+    printf("-s <file>     unix socket path to listen on (disables network support)\n");
     printf("-l <ip_addr>  interface to listen on, default is INDRR_ANY\n");
     printf("-d            run as a daemon\n");
     printf("-r            maximize core file limit\n");
@@ -1415,6 +1872,8 @@
     printf("-i            print memcached and libevent license\n");
     printf("-b            run a managed instanced (mnemonic: buckets)\n");
     printf("-P <file>     save PID in <file>, only used with -d option\n");
+    printf("-f <factor>   chunk size growth factor, default 1.25\n");
+    printf("-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
     return;
 }
 
@@ -1517,10 +1976,17 @@
 }
 
 int l_socket=0;
+int u_socket=-1;
 
+void sig_handler(int sig) {
+    printf("SIGINT handled.\n");
+    exit(0);
+}
+
 int main (int argc, char **argv) {
     int c;
     conn *l_conn;
+    conn *u_conn;
     struct in_addr addr;
     int lock_memory = 0;
     int daemonize = 0;
@@ -1531,23 +1997,32 @@
     struct rlimit rlim;
     char *pid_file = NULL;
 
+    /* handle SIGINT */
+    signal(SIGINT, sig_handler);
+
     /* init settings */
     settings_init();
-
+    
     /* set stderr non-buffering (for running under, say, daemontools) */
     setbuf(stderr, NULL);
 
     /* process arguments */
-    while ((c = getopt(argc, argv, "bp:m:Mc:khirvdl:u:P:")) != -1) {
+    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
         switch (c) {
+        case 'U':
+            settings.udpport = atoi(optarg);
+            break;
         case 'b':
             settings.managed = 1;
             break;
         case 'p':
             settings.port = atoi(optarg);
             break;
+        case 's':
+            settings.socketpath = optarg;
+            break;
         case 'm':
-            settings.maxbytes = atoi(optarg)*1024*1024;
+            settings.maxbytes = ((size_t)atoi(optarg))*1024*1024;
             break;
         case 'M':
             settings.evict_to_free = 0;
@@ -1587,6 +2062,20 @@
         case 'P':
             pid_file = optarg;
             break;
+        case 'f':
+            settings.factor = atof(optarg);
+            if (settings.factor <= 1.0) {
+                fprintf(stderr, "Factor must be greater than 1\n");
+                return 1;
+            }
+            break;
+        case 'n':
+            settings.chunk_size = atoi(optarg);
+            if (settings.chunk_size == 0) {
+                fprintf(stderr, "Chunk size must be greater than 0\n");
+                return 1;
+            }
+            break;
         default:
             fprintf(stderr, "Illegal argument \"%c\"\n", c);
             return 1;
@@ -1641,19 +2130,30 @@
     }
 
     /* 
-     * initialization order: first create the listening socket
+     * initialization order: first create the listening sockets
      * (may need root on low ports), then drop root if needed,
      * then daemonise if needed, then init libevent (in some cases
      * descriptors created by libevent wouldn't survive forking).
      */
 
     /* create the listening socket and bind it */
-    l_socket = server_socket(settings.port);
-    if (l_socket == -1) {
-        fprintf(stderr, "failed to listen\n");
-        exit(1);
+    if (!settings.socketpath) {
+        l_socket = server_socket(settings.port, 0);
+        if (l_socket == -1) {
+            fprintf(stderr, "failed to listen\n");
+            exit(1);
+        }
     }
 
+    if (settings.udpport > 0 && ! settings.socketpath) {
+        /* create the UDP listening socket and bind it */
+        u_socket = server_socket(settings.udpport, 1);
+        if (u_socket == -1) {
+            fprintf(stderr, "failed to listen\n");
+            exit(1);
+        }
+    }
+
     /* lose root privileges if we have them */
     if (getuid()== 0 || geteuid()==0) {
         if (username==0 || *username=='\0') {
@@ -1670,6 +2170,15 @@
         }
     }
 
+    /* create unix mode sockets after dropping privileges */
+    if (settings.socketpath) {
+        l_socket = server_socket_unix(settings.socketpath);
+        if (l_socket == -1) {
+            fprintf(stderr, "failed to listen\n");
+            exit(1);
+        }
+    }
+
     /* daemonize if requested */
     /* if we want to ensure our ability to dump core, don't chdir to / */
     if (daemonize) {
@@ -1688,7 +2197,7 @@
     stats_init();
     assoc_init();
     conn_init();
-    slabs_init(settings.maxbytes);
+    slabs_init(settings.maxbytes, settings.factor);
 
     /* managed instance? alloc and zero a bucket array */
     if (settings.managed) {
@@ -1722,11 +2231,21 @@
     }
 
     /* create the initial listening connection */
-    if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST))) {
+    if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) {
         fprintf(stderr, "failed to create listening connection");
         exit(1);
     }
 
+    /* create the initial listening udp connection */
+    if (u_socket > -1 &&
+        !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, 1))) {
+        fprintf(stderr, "failed to create udp connection");
+        exit(1);
+    }
+
+    /* initialise clock event */
+    clock_handler(0,0,0);
+
     /* initialise deletion array and timer event */
     deltotal = 200; delcurr = 0;
     todelete = malloc(sizeof(item *)*deltotal);
@@ -1745,4 +2264,3 @@
 
     return 0;
 }
-

Modified: trunk/server/memcached.h
===================================================================
--- trunk/server/memcached.h	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/memcached.h	2006-09-03 03:18:26 UTC (rev 320)
@@ -2,36 +2,58 @@
 /* $Id$ */
 
 #define DATA_BUFFER_SIZE 2048
+#define UDP_READ_BUFFER_SIZE 65536
+#define UDP_MAX_PAYLOAD_SIZE 1400
+#define UDP_HEADER_SIZE 8
+#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
 
-#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
-#define TCP_NOPUSH TCP_CORK
-#endif
+/* Initial size of list of items being returned by "get". */
+#define ITEM_LIST_INITIAL 200
 
+/* Initial size of the sendmsg() scatter/gather array. */
+#define IOV_LIST_INITIAL 400
+
+/* Initial number of sendmsg() argument structures to allocate. */
+#define MSG_LIST_INITIAL 10
+
+/* High water marks for buffer shrinking */
+#define READ_BUFFER_HIGHWAT 8192
+#define ITEM_LIST_HIGHWAT 400
+#define IOV_LIST_HIGHWAT 600
+#define MSG_LIST_HIGHWAT 100
+
+/* Time relative to server start. Smaller than time_t on 64-bit systems. */
+typedef unsigned int rel_time_t;
+
 struct stats {
     unsigned int  curr_items;
     unsigned int  total_items;
-    unsigned long long  curr_bytes;
+    unsigned long long curr_bytes;
     unsigned int  curr_conns;
     unsigned int  total_conns;
     unsigned int  conn_structs;
-    unsigned int  get_cmds;
-    unsigned int  set_cmds;
-    unsigned int  get_hits;
-    unsigned int  get_misses;
+    unsigned long long  get_cmds;
+    unsigned long long  set_cmds;
+    unsigned long long  get_hits;
+    unsigned long long  get_misses;
     time_t        started;          /* when the process was started */
     unsigned long long bytes_read;
     unsigned long long bytes_written;
 };
 
 struct settings {
-    unsigned int maxbytes;
+    size_t maxbytes;
     int maxconns;
     int port;
+    int udpport;
     struct in_addr interface;
     int verbose;
+    rel_time_t oldest_live; /* ignore existing items older than this */
     int managed;          /* if 1, a tracker manages virtual buckets */
-    time_t oldest_live;   /* ignore existing items older than this */
     int evict_to_free;
+    char *socketpath;   /* path to unix socket if using local socket */
+    double factor;          /* chunk size growth factor */
+    int chunk_size;
 };
 
 extern struct stats stats;
@@ -46,24 +68,27 @@
 typedef struct _stritem {
     struct _stritem *next;
     struct _stritem *prev;
-    struct _stritem *h_next;  /* hash chain next */
+    struct _stritem *h_next;    /* hash chain next */
+    rel_time_t      time;       /* least recent access */
+    rel_time_t      exptime;    /* expire time */
+    int             nbytes;     /* size of data */
     unsigned short  refcount; 
-    unsigned short  flags;
-    int    nbytes;  /* size of data */
-    time_t time;    /* least recent access */
-    time_t exptime; /* expire time */
-    unsigned char it_flags;     /* ITEM_* above */
-    unsigned char slabs_clsid;
-    unsigned char nkey;         /* key length, with terminating null and padding */
-    unsigned char dummy1;
+    unsigned char   nsuffix;    /* length of flags-and-length string */
+    unsigned char   it_flags;   /* ITEM_* above */
+    unsigned char   slabs_clsid;/* which slab class we're in */
+    unsigned char   nkey;       /* key length, w/terminating null and padding */
     void * end[0];
+    /* then null-terminated key */
+    /* then " flags length\r\n" (no terminating null) */
+    /* then data with terminating \r\n (no terminating null; it's binary!) */
 } item;
 
 #define ITEM_key(item) ((char*)&((item)->end[0]))
 
 /* warning: don't use these macros with a function, as it evals its arg twice */
-#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey)
-#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nbytes)
+#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey)
+#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix)
+#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes)
 
 enum conn_states {
     conn_listening,  /* the socket which listens for connections */
@@ -97,7 +122,6 @@
     int    wbytes; 
     int    write_and_go; /* which state to go into after finishing current write */
     void   *write_and_free; /* free this memory after finishing writing */
-    char    is_corked;         /* boolean, connection is corked */
 
     char   *ritem;  /* when we read in an item's value, it goes here */
     int    rlbytes;
@@ -117,19 +141,33 @@
     int    sbytes;    /* how many bytes to swallow */
 
     /* data for the mwrite state */
+    struct iovec *iov;
+    int    iovsize;   /* number of elements allocated in iov[] */
+    int    iovused;   /* number of elements used in iov[] */
+
+    struct msghdr *msglist;
+    int    msgsize;   /* number of elements allocated in msglist[] */
+    int    msgused;   /* number of elements used in msglist[] */
+    int    msgcurr;   /* element in msglist[] being transmitted now */
+    int    msgbytes;  /* number of bytes in current msg */
+
     item   **ilist;   /* list of items to write out */
     int    isize;
     item   **icurr;
     int    ileft;
-    int    ipart;     /* 1 if we're writing a VALUE line, 2 if we're writing data */
-    char   ibuf[300]; /* for VALUE lines */
-    char   *iptr;
-    int    ibytes;
+
+    /* data for UDP clients */
+    int    udp;       /* 1 if this is a UDP "connection" */
+    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
+    struct sockaddr request_addr; /* Who sent the most recent request */
+    socklen_t request_addr_size;
+    unsigned char *hdrbuf; /* udp packet headers */
+    int    hdrsize;   /* number of headers' worth of space is allocated */
+
     int    binary;    /* are we in binary mode */
     int    bucket;    /* bucket number for the next command, if running as
                          a managed instance. -1 (_not_ 0) means invalid. */
     int    gen;       /* generation requested for the bucket */
-                         
 } conn;
 
 /* number of virtual buckets for a managed instance */
@@ -138,6 +176,12 @@
 /* listening socket */
 extern int l_socket;
 
+/* udp socket */
+extern int u_socket;
+
+/* current time of day (updated periodically) */
+extern volatile rel_time_t current_time;
+
 /* temporary hack */
 /* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); }
    void pre_gdb (); */
@@ -152,12 +196,14 @@
  * be that low).
  */
 
-time_t realtime(time_t exptime);
+rel_time_t realtime(time_t exptime);
 
 /* slabs memory allocation */
 
-/* Init the subsystem. The argument is the limit on no. of bytes to allocate, 0 if no limit */
-void slabs_init(unsigned int limit);
+/* Init the subsystem. 1st argument is the limit on no. of bytes to allocate,
+   0 if no limit. 2nd argument is the growth factor; each slab will use a chunk
+   size equal to the previous slab's chunk size times this factor. */
+void slabs_init(size_t limit, double factor);
 
 /* Preallocate as many slab pages as possible (called from slabs_init)
    on start-up, so users don't get confused out-of-memory errors when
@@ -169,13 +215,13 @@
 
 /* Given object size, return id to use when allocating/freeing memory for object */
 /* 0 means error: can't store such a large object */
-unsigned int slabs_clsid(unsigned int size);
+unsigned int slabs_clsid(size_t size);
 
 /* Allocate object of given length. 0 on error */
-void *slabs_alloc(unsigned int size);
+void *slabs_alloc(size_t size);
 
 /* Free previously allocated object */
-void slabs_free(void *ptr, unsigned int size);
+void slabs_free(void *ptr, size_t size);
     
 /* Fill buffer with stats */
 char* slabs_stats(int *buflen);
@@ -188,17 +234,22 @@
     
 /* event handling, network IO */
 void event_handler(int fd, short which, void *arg);
-conn *conn_new(int sfd, int init_state, int event_flags);
+conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
 void conn_close(conn *c);
 void conn_init(void);
 void drive_machine(conn *c);
-int new_socket(void);
-int server_socket(int port);
+int new_socket(int isUdp);
+int server_socket(int port, int isUdp);
 int update_event(conn *c, int new_flags);
 int try_read_command(conn *c);
 int try_read_network(conn *c);
+int try_read_udp(conn *c);
 void complete_nread(conn *c);
 void process_command(conn *c, char *command);
+int transmit(conn *c);
+int ensure_iov_space(conn *c);
+int add_iov(conn *c, const void *buf, int len);
+int add_msghdr(conn *c);
 
 /* stats */
 void stats_reset(void);
@@ -215,8 +266,9 @@
 
 
 void item_init(void);
-item *item_alloc(char *key, int flags, time_t exptime, int nbytes);
+item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes);
 void item_free(item *it);
+int item_size_ok(char *key, int flags, int nbytes);
 
 int item_link(item *it);    /* may fail if transgresses limits */
 void item_unlink(item *it);

Modified: trunk/server/slabs.c
===================================================================
--- trunk/server/slabs.c	2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/slabs.c	2006-09-03 03:18:26 UTC (rev 320)
@@ -1,6 +1,11 @@
 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 /*
- * Slabs memory allocation, based on powers-of-2
+ * Slabs memory allocation, based on powers-of-N. Slabs are up to 1MB in size
+ * and are divided into chunks. The chunk sizes start off at the size of the
+ * "item" structure plus space for a small key and value. They increase by
+ * a multiplier factor from there, up to half the maximum slab size. The last
+ * slab size is always 1MB, since that's the maximum item size allowed by the
+ * memcached protocol.
  *
  * $Id$
  */
@@ -23,11 +28,12 @@
 
 #include "memcached.h"
 
-#define POWER_SMALLEST 3
-#define POWER_LARGEST  20
+#define POWER_SMALLEST 1
+#define POWER_LARGEST  200
 #define POWER_BLOCK 1048576
+#define CHUNK_ALIGN_BYTES (sizeof(void *))
 
-/* powers-of-2 allocation structures */
+/* powers-of-N allocation structures */
 
 typedef struct {
     unsigned int size;      /* sizes of items */
@@ -49,42 +55,61 @@
 } slabclass_t;
 
 static slabclass_t slabclass[POWER_LARGEST+1];
-static unsigned int mem_limit = 0;
-static unsigned int mem_malloced = 0;
+static size_t mem_limit = 0;
+static size_t mem_malloced = 0;
+static int power_largest;
 
-unsigned int slabs_clsid(unsigned int size) {
-    int res = 1;
+/*
+ * Figures out which slab class (chunk size) is required to store an item of
+ * a given size.
+ */
+unsigned int slabs_clsid(size_t size) {
+    int res = POWER_SMALLEST;
 
     if(size==0)
         return 0;
-    size--;
-    while(size >>= 1)
-        res++;
-    if (res < POWER_SMALLEST) 
-        res = POWER_SMALLEST;
-    if (res > POWER_LARGEST)
-        res = 0;
+    while (size > slabclass[res].size)
+        if (res++ == power_largest)     /* won't fit in the biggest slab */
+            return 0;
     return res;
 }
 
-void slabs_init(unsigned int limit) {
-    int i;
-    int size=1;
+/*
+ * Determines the chunk sizes and initializes the slab class descriptors
+ * accordingly.
+ */
+void slabs_init(size_t limit, double factor) {
+    int i = POWER_SMALLEST - 1;
+    unsigned int size = sizeof(item) + settings.chunk_size;
 
+    /* Factor of 2.0 means use the default memcached behavior */
+    if (factor == 2.0 && size < 128)
+        size = 128;
+
     mem_limit = limit;
-    for(i=0; i<=POWER_LARGEST; i++, size*=2) {
+    memset(slabclass, 0, sizeof(slabclass));
+
+    while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) {
+        /* Make sure items are always n-byte aligned */
+        if (size % CHUNK_ALIGN_BYTES)
+            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
+
         slabclass[i].size = size;
-        slabclass[i].perslab = POWER_BLOCK / size;
-        slabclass[i].slots = 0;
-        slabclass[i].sl_curr = slabclass[i].sl_total = slabclass[i].slabs = 0;
-        slabclass[i].end_page_ptr = 0;
-        slabclass[i].end_page_free = 0;
-        slabclass[i].slab_list = 0;
-        slabclass[i].list_size = 0;
-        slabclass[i].killing = 0;
+        slabclass[i].perslab = POWER_BLOCK / slabclass[i].size;
+        size *= factor;
+        if (settings.verbose > 1) {
+            fprintf(stderr, "slab class %3d: chunk size %6d perslab %5d\n",
+                    i, slabclass[i].size, slabclass[i].perslab);
+        }
     }
 
+    power_largest = i;
+    slabclass[power_largest].size = POWER_BLOCK;
+    slabclass[power_largest].perslab = 1;
+
+#ifndef DONT_PREALLOC_SLABS
     slabs_preallocate(limit / POWER_BLOCK);
+#endif
 }
 
 void slabs_preallocate (unsigned int maxslabs) {
@@ -101,14 +126,14 @@
         if (++prealloc > maxslabs)
             return;
         slabs_newslab(i);
-    } 
- 
+    }
+
 }
 
 static int grow_slab_list (unsigned int id) { 
     slabclass_t *p = &slabclass[id];
     if (p->slabs == p->list_size) {
-        unsigned int new_size =  p->list_size ? p->list_size * 2 : 16;
+        size_t new_size =  p->list_size ? p->list_size * 2 : 16;
         void *new_list = realloc(p->slab_list, new_size*sizeof(void*));
         if (new_list == 0) return 0;
         p->list_size = new_size;
@@ -119,11 +144,14 @@
 
 int slabs_newslab(unsigned int id) {
     slabclass_t *p = &slabclass[id];
-    int num = p->perslab;
+#ifdef ALLOW_SLABS_REASSIGN
     int len = POWER_BLOCK;
+#else
+    int len = p->size * p->perslab;
+#endif
     char *ptr;
 
-    if (mem_limit && mem_malloced + len > mem_limit)
+    if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
         return 0;
 
     if (! grow_slab_list(id)) return 0;
@@ -133,18 +161,18 @@
 
     memset(ptr, 0, len);
     p->end_page_ptr = ptr;
-    p->end_page_free = num;
+    p->end_page_free = p->perslab;
 
     p->slab_list[p->slabs++] = ptr;
     mem_malloced += len;
     return 1;
 }
 
-void *slabs_alloc(unsigned int size) {
+void *slabs_alloc(size_t size) {
     slabclass_t *p;
 
     unsigned char id = slabs_clsid(size);
-    if (id < POWER_SMALLEST || id > POWER_LARGEST)
+    if (id < POWER_SMALLEST || id > power_largest)
         return 0;
 
     p = &slabclass[id];
@@ -180,13 +208,13 @@
     return 0;  /* shouldn't ever get here */
 }
 
-void slabs_free(void *ptr, unsigned int size) {
+void slabs_free(void *ptr, size_t size) {
     unsigned char id = slabs_clsid(size);
     slabclass_t *p;
 
     assert(((item *)ptr)->slabs_clsid==0);
-    assert(id >= POWER_SMALLEST && id <= POWER_LARGEST);
-    if (id < POWER_SMALLEST || id > POWER_LARGEST)
+    assert(id >= POWER_SMALLEST && id <= power_largest);
+    if (id < POWER_SMALLEST || id > power_largest)
         return;
 
     p = &slabclass[id];
@@ -211,14 +239,14 @@
 
 char* slabs_stats(int *buflen) {
     int i, total;
-    char *buf = (char*) malloc(8192);
+    char *buf = (char*) malloc(power_largest * 200 + 100);
     char *bufcurr = buf;
 
     *buflen = 0;
     if (!buf) return 0;
 
     total = 0;
-    for(i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
+    for(i = POWER_SMALLEST; i <= power_largest; i++) {
         slabclass_t *p = &slabclass[i];
         if (p->slabs) {
             unsigned int perslab, slabs;
@@ -236,13 +264,19 @@
             total++;
         }
     }
-    bufcurr += sprintf(bufcurr, "STAT active_slabs %d\r\nSTAT total_malloced %u\r\n", total, mem_malloced);
+    bufcurr += sprintf(bufcurr, "STAT active_slabs %d\r\nSTAT total_malloced %llu\r\n", total, (unsigned long long) mem_malloced);
     bufcurr += sprintf(bufcurr, "END\r\n");
     *buflen = bufcurr - buf;
     return buf;
 }
 
-/* 1 = success
+#ifdef ALLOW_SLABS_REASSIGN
+/* Blows away all the items in a slab class and moves its slabs to another
+   class. This is only used by the "slabs reassign" command, for manual tweaking
+   of memory allocation. It's disabled by default since it requires that all
+   slabs be the same size (which can waste space for chunk size mantissas of
+   other than 2.0).
+   1 = success
    0 = fail
    -1 = tried. busy. send again shortly. */
 int slabs_reassign(unsigned char srcid, unsigned char dstid) {
@@ -251,8 +285,8 @@
     void *iter;
     int was_busy = 0;
 
-    if (srcid < POWER_SMALLEST || srcid > POWER_LARGEST ||
-        dstid < POWER_SMALLEST || dstid > POWER_LARGEST)
+    if (srcid < POWER_SMALLEST || srcid > power_largest ||
+        dstid < POWER_SMALLEST || dstid > power_largest)
         return 0;
    
     p = &slabclass[srcid];
@@ -308,3 +342,4 @@
 
     return 1;
 }
+#endif




More information about the memcached-commits mailing list