[memcached] bradfitz, r320: merge end of facebook branch into trunk,
...
commits at code.sixapart.com
commits at code.sixapart.com
Sun Sep 3 03:18:27 UTC 2006
merge end of facebook branch into trunk, after copying the old trunk to the memcached-1.1.x branch
U trunk/server/ChangeLog
U trunk/server/TODO
U trunk/server/configure.ac
U trunk/server/doc/memcached.1
U trunk/server/doc/protocol.txt
U trunk/server/items.c
U trunk/server/memcached.c
U trunk/server/memcached.h
U trunk/server/slabs.c
Modified: trunk/server/ChangeLog
===================================================================
--- trunk/server/ChangeLog 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/ChangeLog 2006-09-03 03:18:26 UTC (rev 320)
@@ -1,3 +1,18 @@
+2006-08-21
+ * Nathan Neulinger <nneul at umr.edu>: fix incompatabilities with
+ unix domain socket support and the UDP code and clean up stale
+ sockets
+
+2006-08-20
+ * Nathan Neulinger <nneul at umr.edu>: unix domain socket support
+
+2006-05-03
+ * Steven Grimm <sgrimm at facebook.com>: big bunch of changes:
+ big CPU reduction work, UDP-based interface, increased memory
+ efficiency. (intertwined patch, committed all together)
+ <http://lists.danga.com/pipermail/memcached/2006-May/002164.html>
+ or see svn commit logs
+
2006-04-30
* River Tarnell: autoconf work for Solaris 10. Brad:
merge and verify it works on Nexenta.
Modified: trunk/server/TODO
===================================================================
--- trunk/server/TODO 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/TODO 2006-09-03 03:18:26 UTC (rev 320)
@@ -1,3 +1,20 @@
+* bug as shown with netcat (w/ small 16 byte object reproduces)
+
+>>I've done the following script to check that memcached has a key in it's
+>>inside, and thus know that it's working correctly:
+>>echo -e "get is_ok\r\nquit\r\n" | netcat $host $ip
+>>
+>>and I find that sometimes it returns the VALUE in it's inside, but other
+>>not.
+
+* namespaces
+
+* binary get protocol
+
+* refresh/touch command.
+
+* finer granularity of time for flush_all/delete, or generation number.
+
* slab class reassignment still buggy and can crash. once that's
stable, server should re-assign pages every 60 seconds or so
to keep all classes roughly equal. [Update: fixed now?, but
Modified: trunk/server/configure.ac
===================================================================
--- trunk/server/configure.ac 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/configure.ac 2006-09-03 03:18:26 UTC (rev 320)
@@ -1,5 +1,5 @@
AC_PREREQ(2.52)
-AC_INIT(memcached, 1.1.13-pre2, brad at danga.com)
+AC_INIT(memcached, 1.2.0-rc1, brad at danga.com)
AC_CANONICAL_SYSTEM
AC_CONFIG_SRCDIR(memcached.c)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME, AC_PACKAGE_VERSION)
Modified: trunk/server/doc/memcached.1
===================================================================
--- trunk/server/doc/memcached.1 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/doc/memcached.1 2006-09-03 03:18:26 UTC (rev 320)
@@ -43,8 +43,11 @@
suggestions.
.TP
.B \-p <num>
-Listen on port <num>, the default is port 11211.
+Listen on TCP port <num>, the default is port 11211.
.TP
+.B \-U <num>
+Listen on UDP port <num>, the default is port 11211.
+.TP
.B \-M
Disable automatic removal of items from the cache when out of memory.
Additions will not be possible until adequate space is freed up.
@@ -52,6 +55,20 @@
.B \-r
Raise the core file size limit to the maximum allowable.
.TP
+.B \-f <factor>
+Use <factor> as the multiplier for computing the sizes of memory chunks that
+items are stored in. A lower value may result in less wasted memory depending
+on the total amount of memory available and the distribution of item sizes.
+The default is 1.25.
+.TP
+.B \-s <size>
+Allocate a minimum of <size> bytes for the item key, value, and flags. The
+default is 48. If you have a lot of small keys and values, you can get a
+significant memory efficiency gain with a lower value. If you use a high
+chunk growth factor (-f option), on the other hand, you may want to increase
+the size to allow a bigger percentage of your items to fit in the most densely
+packed (smallest) chunks.
+.TP
.B \-h
Show the version of memcached and a summary of options.
.TP
Modified: trunk/server/doc/protocol.txt
===================================================================
--- trunk/server/doc/protocol.txt 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/doc/protocol.txt 2006-09-03 03:18:26 UTC (rev 320)
@@ -1,8 +1,9 @@
Protocol
--------
-Clients of memcached communicate with server through TCP
-connections. A given running memcached server listens on some
+Clients of memcached communicate with server through TCP connections.
+(A UDP interface is also available; details are below under "UDP
+protocol.") A given running memcached server listens on some
(configurable) port; clients connect to that port, send commands to
the server, read responses, and eventually close the connection.
@@ -388,3 +389,45 @@
Upon receiving this command, the server closes the
connection. However, the client may also simply close the connection
when it no longer needs it, without issuing this command.
+
+
+UDP protocol
+------------
+
+For very large installations where the number of clients is high enough
+that the number of TCP connections causes scaling difficulties, there is
+also a UDP-based interface. The UDP interface does not provide guaranteed
+delivery, so should only be used for operations that aren't required to
+succeed; typically it is used for "get" requests where a missing or
+incomplete response can simply be treated as a cache miss.
+
+Each UDP datagram contains a simple frame header, followed by data in the
+same format as the TCP protocol described above. In the current
+implementation, requests must be contained in a single UDP datagram, but
+responses may span several datagrams. (The only common requests that would
+span multiple datagrams are huge multi-key "get" requests and "set"
+requests, both of which are more suitable to TCP transport for reliability
+reasons anyway.)
+
+The frame header is 8 bytes long, as follows (all values are 16-bit integers
+in network byte order, high byte first):
+
+0-1 Request ID
+2-3 Sequence number
+4-5 Total number of datagrams in this message
+6-7 Reserved for future use; must be 0
+
+The request ID is supplied by the client. Typically it will be a
+monotonically increasing value starting from a random seed, but the client
+is free to use whatever request IDs it likes. The server's response will
+contain the same ID as the incoming request. The client uses the request ID
+to differentiate between responses to outstanding requests if there are
+several pending from the same server; any datagrams with an unknown request
+ID are probably delayed responses to an earlier request and should be
+discarded.
+
+The sequence number ranges from 0 to n-1, where n is the total number of
+datagrams in the message. The client should concatenate the payloads of the
+datagrams for a given response in sequence number order; the resulting byte
+stream will contain a complete response in the same format as the TCP
+protocol (including terminating \r\n sequences).
Modified: trunk/server/items.c
===================================================================
--- trunk/server/items.c 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/items.c 2006-09-03 03:18:26 UTC (rev 320)
@@ -21,12 +21,7 @@
#include "memcached.h"
-/*
- * NOTE: we assume here for simplicity that slab ids are <=32. That's true in
- * the powers-of-2 implementation, but if that changes this should be changed too
- */
-
-#define LARGEST_ID 32
+#define LARGEST_ID 255
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
unsigned int sizes[LARGEST_ID];
@@ -41,14 +36,31 @@
}
-item *item_alloc(char *key, int flags, time_t exptime, int nbytes) {
- int ntotal, len;
+/*
+ * Generates the variable-sized part of the header for an object.
+ *
+ * suffix - Buffer for the "VALUE" line suffix (flags, size).
+ * nsuffix - The length of the suffix is stored here.
+ * keylen - The length of the key plus any padding required to word-align the
+ * "VALUE" suffix (which is done to speed up copying.)
+ *
+ * Returns the total size of the header.
+ */
+int item_make_header(char *key, int flags, int nbytes,
+ char *suffix, int *nsuffix, int *keylen) {
+ *keylen = strlen(key) + 1; if(*keylen % 4) *keylen += 4 - (*keylen % 4);
+ *nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2);
+ return sizeof(item) + *keylen + *nsuffix + nbytes;
+}
+
+item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
+ int nsuffix, ntotal, len;
item *it;
unsigned int id;
+ char suffix[40];
- len = strlen(key) + 1; if(len % 4) len += 4 - (len % 4);
- ntotal = sizeof(item) + len + nbytes;
-
+ ntotal = item_make_header(key, flags, nbytes, suffix, &nsuffix, &len);
+
id = slabs_clsid(ntotal);
if (id == 0)
return 0;
@@ -97,7 +109,8 @@
it->nbytes = nbytes;
strcpy(ITEM_key(it), key);
it->exptime = exptime;
- it->flags = flags;
+ memcpy(ITEM_suffix(it), suffix, nsuffix);
+ it->nsuffix = nsuffix;
return it;
}
@@ -114,6 +127,18 @@
slabs_free(it, ntotal);
}
+/*
+ * Returns true if an item will fit in the cache (its size does not exceed
+ * the maximum for a cache entry.)
+ */
+int item_size_ok(char *key, int flags, int nbytes) {
+ char prefix[40];
+ int keylen, nsuffix;
+
+ return slabs_clsid(item_make_header(key, flags, nbytes,
+ prefix, &nsuffix, &keylen)) != 0;
+}
+
void item_link_q(item *it) { /* item is the new head */
item **head, **tail;
assert(it->slabs_clsid <= LARGEST_ID);
@@ -159,7 +184,7 @@
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
assert(it->nbytes < 1048576);
it->it_flags |= ITEM_LINKED;
- it->time = time(0);
+ it->time = current_time;
assoc_insert(ITEM_key(it), it);
stats.curr_bytes += ITEM_ntotal(it);
@@ -195,7 +220,7 @@
assert((it->it_flags & ITEM_SLABBED) == 0);
item_unlink_q(it);
- it->time = time(0);
+ it->time = current_time;
item_link_q(it);
}
@@ -224,7 +249,7 @@
bufcurr = 0;
while (it && (!limit || shown < limit)) {
- len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time);
+ len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time + stats.started);
if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
break;
strcpy(buffer + bufcurr, temp);
@@ -243,7 +268,7 @@
void item_stats(char *buffer, int buflen) {
int i;
char *bufcurr = buffer;
- time_t now = time(0);
+ rel_time_t now = current_time;
if (buflen < 4096) {
strcpy(buffer, "SERVER_ERROR out of memory");
@@ -252,7 +277,7 @@
for (i=0; i<LARGEST_ID; i++) {
if (tails[i])
- bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %lu\r\n",
+ bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %u\r\n",
i, sizes[i], i, now - tails[i]->time);
}
strcpy(bufcurr, "END");
@@ -262,7 +287,7 @@
/* dumps out a list of objects of each size, with granularity of 32 bytes */
char* item_stats_sizes(int *bytes) {
int num_buckets = 32768; /* max 1MB object, divided into 32 bytes size buckets */
- unsigned int *histogram = (int*) malloc(num_buckets * sizeof(int));
+ unsigned int *histogram = (unsigned int*) malloc(num_buckets * sizeof(int));
char *buf = (char*) malloc(1024*1024*2*sizeof(char));
int i;
Modified: trunk/server/memcached.c
===================================================================
--- trunk/server/memcached.c 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/memcached.c 2006-09-03 03:18:26 UTC (rev 320)
@@ -21,6 +21,7 @@
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <sys/signal.h>
#include <sys/resource.h>
/* some POSIX systems need the following definition
@@ -28,6 +29,10 @@
#ifndef _P1003_1B_VISIBLE
#define _P1003_1B_VISIBLE
#endif
+/* need this to get IOV_MAX on some platforms. */
+#ifndef __need_IOV_MAX
+#define __need_IOV_MAX
+#endif
#include <pwd.h>
#include <sys/mman.h>
#include <fcntl.h>
@@ -42,6 +47,7 @@
#include <time.h>
#include <event.h>
#include <assert.h>
+#include <limits.h>
#ifdef HAVE_MALLOC_H
#include <malloc.h>
@@ -56,21 +62,23 @@
static int delcurr;
static int deltotal;
+#define TRANSMIT_COMPLETE 0
+#define TRANSMIT_INCOMPLETE 1
+#define TRANSMIT_SOFT_ERROR 2
+#define TRANSMIT_HARD_ERROR 3
+
int *buckets = 0; /* bucket->generation array for a managed instance */
-time_t realtime(time_t exptime) {
- time_t now;
-
+#define REALTIME_MAXDELTA 60*60*24*30
+rel_time_t realtime(time_t exptime) {
/* no. of seconds in 30 days - largest possible delta exptime */
- #define REALTIME_MAXDELTA 60*60*24*30
if (exptime == 0) return 0; /* 0 means never expire */
if (exptime > REALTIME_MAXDELTA)
- return exptime;
+ return (rel_time_t) (exptime - stats.started);
else {
- now = time(0);
- return exptime + now;
+ return (rel_time_t) (exptime + current_time);
}
}
@@ -80,7 +88,6 @@
stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
stats.started = time(0);
}
-
void stats_reset(void) {
stats.total_items = stats.total_conns = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
@@ -89,27 +96,59 @@
void settings_init(void) {
settings.port = 11211;
+ settings.udpport = 11211;
settings.interface.s_addr = htonl(INADDR_ANY);
settings.maxbytes = 64*1024*1024; /* default is 64MB */
settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
settings.verbose = 0;
settings.oldest_live = 0;
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
+ settings.socketpath = NULL; /* by default, not using a unix socket */
settings.managed = 0;
+ settings.factor = 1.25;
+ settings.chunk_size = 48; /* space for a modest key and value */
}
+/*
+ * Adds a message header to a connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int add_msghdr(conn *c)
+{
+ struct msghdr *msg;
+
+ if (c->msgsize == c->msgused) {
+ msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
+ if (! msg)
+ return -1;
+ c->msglist = msg;
+ c->msgsize *= 2;
+ }
+
+ msg = c->msglist + c->msgused;
+ msg->msg_iov = &c->iov[c->iovused];
+ msg->msg_iovlen = 0;
+ msg->msg_name = &c->request_addr;
+ msg->msg_namelen = c->request_addr_size;
+ msg->msg_control = 0;
+ msg->msg_controllen = 0;
+ msg->msg_flags = 0;
+ c->msgbytes = 0;
+ c->msgused++;
+
+ if (c->udp) {
+ /* Leave room for the UDP header, which we'll fill in later. */
+ return add_iov(c, NULL, UDP_HEADER_SIZE);
+ }
+
+ return 0;
+}
+
conn **freeconns;
int freetotal;
int freecurr;
-void set_cork (conn *c, int val) {
- if (c->is_corked == val) return;
- c->is_corked = val;
-#ifdef TCP_NOPUSH
- setsockopt(c->sfd, IPPROTO_TCP, TCP_NOPUSH, &val, sizeof(val));
-#endif
-}
-
void conn_init(void) {
freetotal = 200;
freecurr = 0;
@@ -117,7 +156,8 @@
return;
}
-conn *conn_new(int sfd, int init_state, int event_flags) {
+conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
+ int is_udp) {
conn *c;
/* do we have a free conn structure from a previous close? */
@@ -130,42 +170,63 @@
}
c->rbuf = c->wbuf = 0;
c->ilist = 0;
+ c->iov = 0;
+ c->msglist = 0;
+ c->hdrbuf = 0;
- c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);
- c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);
- c->ilist = (item **) malloc(sizeof(item *)*200);
+ c->rsize = read_buffer_size;
+ c->wsize = DATA_BUFFER_SIZE;
+ c->isize = ITEM_LIST_INITIAL;
+ c->iovsize = IOV_LIST_INITIAL;
+ c->msgsize = MSG_LIST_INITIAL;
+ c->hdrsize = 0;
- if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
+ c->rbuf = (char *) malloc(c->rsize);
+ c->wbuf = (char *) malloc(c->wsize);
+ c->ilist = (item **) malloc(sizeof(item *) * c->isize);
+ c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
+ c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize);
+
+ if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
+ c->msglist == 0) {
if (c->rbuf != 0) free(c->rbuf);
if (c->wbuf != 0) free(c->wbuf);
if (c->ilist !=0) free(c->ilist);
+ if (c->iov != 0) free(c->iov);
+ if (c->msglist != 0) free(c->msglist);
free(c);
perror("malloc()");
return 0;
}
+
c->rsize = c->wsize = DATA_BUFFER_SIZE;
- c->rcurr = c->rbuf;
- c->isize = 200;
+ c->isize = 200; /* TODO: another instance of '200'. must kill all these */
+
stats.conn_structs++;
}
if (settings.verbose > 1) {
if (init_state == conn_listening)
fprintf(stderr, "<%d server listening\n", sfd);
+ else if (is_udp)
+ fprintf(stderr, "<%d server listening (udp)\n", sfd);
else
fprintf(stderr, "<%d new client connection\n", sfd);
}
c->sfd = sfd;
+ c->udp = is_udp;
c->state = init_state;
c->rlbytes = 0;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
+ c->rcurr = c->rbuf;
c->ritem = 0;
- c->icurr = c->ilist;
+ c->icurr = c->ilist;
c->ileft = 0;
- c->iptr = c->ibuf;
- c->ibytes = 0;
+ c->iovused = 0;
+ c->msgcurr = 0;
+ c->msgused = 0;
c->write_and_go = conn_read;
c->write_and_free = 0;
@@ -173,8 +234,6 @@
c->bucket = -1;
c->gen = 0;
- c->is_corked = 0;
-
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
c->ev_flags = event_flags;
@@ -182,9 +241,13 @@
if (freecurr < freetotal) {
freeconns[freecurr++] = c;
} else {
+ if (c->hdrbuf)
+ free (c->hdrbuf);
+ free (c->msglist);
free (c->rbuf);
free (c->wbuf);
free (c->ilist);
+ free (c->iov);
free (c);
}
return 0;
@@ -196,17 +259,10 @@
return c;
}
-void conn_close(conn *c) {
- /* delete the event, the socket and the conn */
- event_del(&c->event);
-
- if (settings.verbose > 1)
- fprintf(stderr, "<%d connection closed.\n", c->sfd);
-
- close(c->sfd);
-
+void conn_cleanup(conn *c) {
if (c->item) {
item_free(c->item);
+ c->item = 0;
}
if (c->ileft) {
@@ -217,10 +273,46 @@
if (c->write_and_free) {
free(c->write_and_free);
+ c->write_and_free = 0;
}
+}
- /* if we have enough space in the free connections array, put the structure there */
- if (freecurr < freetotal) {
+/*
+ * Frees a connection.
+ */
+static void conn_free(conn *c) {
+ if (c) {
+ if (c->hdrbuf)
+ free(c->hdrbuf);
+ if (c->msglist)
+ free(c->msglist);
+ if (c->rbuf)
+ free(c->rbuf);
+ if (c->wbuf)
+ free(c->wbuf);
+ if (c->ilist)
+ free(c->ilist);
+ if (c->iov)
+ free(c->iov);
+ free(c);
+ }
+}
+
+void conn_close(conn *c) {
+ /* delete the event, the socket and the conn */
+ event_del(&c->event);
+
+ if (settings.verbose > 1)
+ fprintf(stderr, "<%d connection closed.\n", c->sfd);
+
+ close(c->sfd);
+ conn_cleanup(c);
+
+ /* if the connection has big buffers, just free it */
+ if (c->rsize > READ_BUFFER_HIGHWAT) {
+ conn_free(c);
+ } else if (freecurr < freetotal) {
+ /* if we have enough space in the free connections array, put the structure there */
freeconns[freecurr++] = c;
} else {
/* try to enlarge free connections array */
@@ -230,10 +322,7 @@
freeconns = new_freeconns;
freeconns[freecurr++] = c;
} else {
- free(c->rbuf);
- free(c->wbuf);
- free(c->ilist);
- free(c);
+ conn_free(c);
}
}
@@ -242,6 +331,184 @@
return;
}
+/*
+ * Reallocates memory and updates a buffer size if successful.
+ */
+int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
+ void *newbuf = realloc(*orig, newsize * bytes_per_item);
+ if (newbuf) {
+ *orig = newbuf;
+ *size = newsize;
+ return 1;
+ }
+ return 0;
+}
+
+ /*
+ * Shrinks a connection's buffers if they're too big. This prevents
+ * periodic large "get" requests from permanently chewing lots of server
+ * memory.
+ *
+ * This should only be called in between requests since it can wipe output
+ * buffers!
+ */
+void conn_shrink(conn *c) {
+ if (c->udp)
+ return;
+
+ if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
+ do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
+ }
+
+ if (c->isize > ITEM_LIST_HIGHWAT) {
+ do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
+ }
+
+ if (c->msgsize > MSG_LIST_HIGHWAT) {
+ do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
+ }
+
+ if (c->iovsize > IOV_LIST_HIGHWAT) {
+ do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
+ }
+}
+
+/*
+ * Sets a connection's current state in the state machine. Any special
+ * processing that needs to happen on certain state transitions can
+ * happen here.
+ */
+void conn_set_state(conn *c, int state) {
+ if (state != c->state) {
+ if (state == conn_read) {
+ conn_shrink(c);
+ }
+ c->state = state;
+ }
+}
+
+
+/*
+ * Ensures that there is room for another struct iovec in a connection's
+ * iov list.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int ensure_iov_space(conn *c) {
+ if (c->iovused >= c->iovsize) {
+ int i, iovnum;
+ struct iovec *new_iov = (struct iovec *) realloc(c->iov,
+ (c->iovsize * 2) * sizeof(struct iovec));
+ if (! new_iov)
+ return -1;
+ c->iov = new_iov;
+ c->iovsize *= 2;
+
+ /* Point all the msghdr structures at the new list. */
+ for (i = 0, iovnum = 0; i < c->msgused; i++) {
+ c->msglist[i].msg_iov = &c->iov[iovnum];
+ iovnum += c->msglist[i].msg_iovlen;
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ * Adds data to the list of pending data that will be written out to a
+ * connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+
+int add_iov(conn *c, const void *buf, int len) {
+ struct msghdr *m;
+ int i;
+ int leftover;
+ int limit_to_mtu;
+
+ do {
+ m = &c->msglist[c->msgused - 1];
+
+ /*
+ * Limit UDP packets, and the first payloads of TCP replies, to
+ * UDP_MAX_PAYLOAD_SIZE bytes.
+ */
+ limit_to_mtu = c->udp || (1 == c->msgused);
+
+ /* We may need to start a new msghdr if this one is full. */
+ if (m->msg_iovlen == IOV_MAX ||
+ limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) {
+ add_msghdr(c);
+ m = &c->msglist[c->msgused - 1];
+ }
+
+ if (ensure_iov_space(c))
+ return -1;
+
+ /* If the fragment is too big to fit in the datagram, split it up */
+ if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
+ leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
+ len -= leftover;
+ } else {
+ leftover = 0;
+ }
+
+ m = &c->msglist[c->msgused - 1];
+ m->msg_iov[m->msg_iovlen].iov_base = buf;
+ m->msg_iov[m->msg_iovlen].iov_len = len;
+
+ c->msgbytes += len;
+ c->iovused++;
+ m->msg_iovlen++;
+
+ buf = ((char *)buf) + len;
+ len = leftover;
+ } while (leftover > 0);
+
+ return 0;
+}
+
+
+/*
+ * Constructs a set of UDP headers and attaches them to the outgoing messages.
+ */
+int build_udp_headers(conn *c) {
+ int i;
+ unsigned char *hdr;
+
+ if (c->msgused > c->hdrsize) {
+ void *new_hdrbuf;
+ if (c->hdrbuf)
+ new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
+ else
+ new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
+ if (! new_hdrbuf)
+ return -1;
+ c->hdrbuf = (unsigned char *) new_hdrbuf;
+ c->hdrsize = c->msgused * 2;
+ }
+
+ hdr = c->hdrbuf;
+ for (i = 0; i < c->msgused; i++) {
+ c->msglist[i].msg_iov[0].iov_base = hdr;
+ c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
+ *hdr++ = c->request_id / 256;
+ *hdr++ = c->request_id % 256;
+ *hdr++ = i / 256;
+ *hdr++ = i % 256;
+ *hdr++ = c->msgused / 256;
+ *hdr++ = c->msgused % 256;
+ *hdr++ = 0;
+ *hdr++ = 0;
+ assert(hdr == c->msglist[i].iov[0].iov_base + UDP_HEADER_SIZE);
+ }
+
+ return 0;
+}
+
+
void out_string(conn *c, char *str) {
int len;
@@ -256,11 +523,11 @@
}
strcpy(c->wbuf, str);
- strcat(c->wbuf, "\r\n");
+ strcpy(c->wbuf + len, "\r\n");
c->wbytes = len + 2;
c->wcurr = c->wbuf;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -274,7 +541,7 @@
item *it = c->item;
int comm = c->item_comm;
item *old_it;
- time_t now = time(0);
+ rel_time_t now = current_time;
stats.set_cmds++;
@@ -328,7 +595,7 @@
}
void process_stat(conn *c, char *command) {
- time_t now = time(0);
+ rel_time_t now = current_time;
if (strcmp(command, "stats") == 0) {
char temp[1024];
@@ -339,8 +606,8 @@
getrusage(RUSAGE_SELF, &usage);
pos += sprintf(pos, "STAT pid %u\r\n", pid);
- pos += sprintf(pos, "STAT uptime %lu\r\n", now - stats.started);
- pos += sprintf(pos, "STAT time %ld\r\n", now);
+ pos += sprintf(pos, "STAT uptime %u\r\n", now);
+ pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
pos += sprintf(pos, "STAT version " VERSION "\r\n");
pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
@@ -350,13 +617,13 @@
pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
- pos += sprintf(pos, "STAT cmd_get %u\r\n", stats.get_cmds);
- pos += sprintf(pos, "STAT cmd_set %u\r\n", stats.set_cmds);
- pos += sprintf(pos, "STAT get_hits %u\r\n", stats.get_hits);
- pos += sprintf(pos, "STAT get_misses %u\r\n", stats.get_misses);
+ pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
+ pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
+ pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
+ pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
- pos += sprintf(pos, "STAT limit_maxbytes %u\r\n", settings.maxbytes);
+ pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long) settings.maxbytes);
pos += sprintf(pos, "END");
out_string(c, temp);
return;
@@ -426,7 +693,7 @@
c->write_and_free=wbuf;
c->wcurr=wbuf;
c->wbytes = res + 6;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
close(fd);
return;
@@ -450,7 +717,7 @@
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -465,7 +732,7 @@
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -488,7 +755,7 @@
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -509,9 +776,13 @@
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
- /* All incoming commands will require a response, so we cork at the beginning,
- and uncork at the very end (usually by means of out_string) */
- set_cork(c, 1);
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
+ if (add_msghdr(c)) {
+ out_string(c, "SERVER_ERROR out of memory");
+ return;
+ }
if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
(strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
@@ -543,9 +814,13 @@
}
expire = realtime(expire);
- it = item_alloc(key, flags, expire, len+2);
+ it = item_alloc(key, flags, realtime(expire), len+2);
+
if (it == 0) {
- out_string(c, "SERVER_ERROR out of memory");
+ if (! item_size_ok(key, flags, len + 2))
+ out_string(c, "SERVER_ERROR object too large for cache");
+ else
+ out_string(c, "SERVER_ERROR out of memory");
/* swallow the data line */
c->write_and_go = conn_swallow;
c->sbytes = len+2;
@@ -556,7 +831,7 @@
c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
- c->state = conn_nread;
+ conn_set_state(c, conn_nread);
return;
}
@@ -569,7 +844,7 @@
char key[251];
int res;
char *ptr;
- time_t now = time(0);
+ rel_time_t now = current_time;
res = sscanf(command, "%*s %250s %u\n", key, &delta);
if (res!=2 || strlen(key)==0 ) {
@@ -605,7 +880,7 @@
}
ptr = ITEM_data(it);
- while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;
+ while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true
value = atoi(ptr);
@@ -620,7 +895,7 @@
res = strlen(temp);
if (res + 2 > it->nbytes) { /* need to realloc */
item *new_it;
- new_it = item_alloc(ITEM_key(it), it->flags, it->exptime, res + 2 );
+ new_it = item_alloc(ITEM_key(it), atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
if (new_it == 0) {
out_string(c, "SERVER_ERROR out of memory");
return;
@@ -647,7 +922,7 @@
int next;
int i = 0;
item *it;
- time_t now = time(0);
+ rel_time_t now = current_time;
get:
if (settings.managed) {
@@ -688,6 +963,24 @@
c->ilist = new_list;
} else break;
}
+
+ /*
+ * Construct the response. Each hit adds three elements to the
+ * outgoing data list:
+ * "VALUE "
+ * key
+ * " " + flags + " " + data length + "\r\n" + data (with \r\n)
+ */
+ /* TODO: can we avoid the strlen() func call and cache that in wasted byte in item struct? */
+ if (add_iov(c, "VALUE ", 6) ||
+ add_iov(c, ITEM_key(it), strlen(ITEM_key(it))) ||
+ add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes))
+ {
+ break;
+ }
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
stats.get_hits++;
it->refcount++;
item_update(it);
@@ -695,17 +988,22 @@
i++;
} else stats.get_misses++;
}
+
c->icurr = c->ilist;
c->ileft = i;
- if (c->ileft) {
- c->ipart = 0;
- c->state = conn_mwrite;
- c->ibytes = 0;
- return;
- } else {
- out_string(c, "END");
- return;
+
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d END\n", c->sfd);
+ add_iov(c, "END\r\n", 5);
+
+ if (c->udp && build_udp_headers(c)) {
+ out_string(c, "SERVER_ERROR out of memory");
}
+ else {
+ conn_set_state(c, conn_mwrite);
+ c->msgcurr = 0;
+ }
+ return;
}
if (strncmp(command, "delete ", 7) == 0) {
@@ -755,11 +1053,9 @@
}
}
- exptime = realtime(exptime);
-
it->refcount++;
/* use its expiration time as its deletion time now */
- it->exptime = exptime;
+ it->exptime = realtime(exptime);
it->it_flags |= ITEM_DELETED;
todelete[delcurr++] = it;
out_string(c, "DELETED");
@@ -823,10 +1119,7 @@
c->bucket = bucket;
c->gen = gen;
}
- c->state = conn_read;
- /* normally conn_write uncorks the connection, but this
- is the only time we accept a command w/o writing anything */
- set_cork(c,0);
+ conn_set_state(c, conn_read);
return;
} else {
out_string(c, "CLIENT_ERROR bad format");
@@ -844,12 +1137,12 @@
int res;
if (strcmp(command, "flush_all") == 0) {
- settings.oldest_live = time(0);
+ settings.oldest_live = current_time;
out_string(c, "OK");
return;
}
- res = sscanf(command, "%*s %ld", &exptime);
+ res = sscanf(command, "%*s %ld", &exptime);
if (res != 1) {
out_string(c, "ERROR");
return;
@@ -866,11 +1159,12 @@
}
if (strcmp(command, "quit") == 0) {
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
return;
}
if (strncmp(command, "slabs reassign ", 15) == 0) {
+#ifdef ALLOW_SLABS_REASSIGN
int src, dst;
char *start = command+15;
if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) {
@@ -889,6 +1183,9 @@
}
}
out_string(c, "CLIENT_ERROR bogus command");
+#else
+ out_string(c, "CLIENT_ERROR Slab reassignment not supported");
+#endif
return;
}
@@ -922,6 +1219,39 @@
}
/*
+ * read a UDP request.
+ * return 0 if there's nothing to read.
+ */
+int try_read_udp(conn *c) {
+ int res;
+
+ c->request_addr_size = sizeof(c->request_addr);
+ res = recvfrom(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes,
+ 0, &c->request_addr, &c->request_addr_size);
+ if (res > 8) {
+ unsigned char *buf = (unsigned char *)c->rbuf + c->rbytes;
+ stats.bytes_read += res;
+
+ /* Beginning of UDP packet is the request ID; save it. */
+ c->request_id = buf[0] * 256 + buf[1];
+
+ /* If this is a multi-packet request, drop it. */
+ if (buf[4] != 0 || buf[5] != 1) {
+ out_string(c, "SERVER_ERROR multi-packet request not supported");
+ return 0;
+ }
+
+ /* Don't care about any of the rest of the header. */
+ res -= 8;
+ memmove(c->rbuf, c->rbuf + 8, res);
+
+ c->rbytes += res;
+ return 1;
+ }
+ return 0;
+}
+
+/*
* read from network as much as we can, handle buffer overflow and connection
* close.
* before reading, move the remaining incomplete fragment of a command
@@ -952,6 +1282,16 @@
c->rcurr = c->rbuf = new_rbuf;
c->rsize *= 2;
}
+
+ /* unix socket mode doesn't need this, so zeroed out. but why
+ * is this done for every command? presumably for UDP
+ * mode. */
+ if (c->request_addr.sa_family != AF_UNSPEC) {
+ c->request_addr_size = sizeof(c->request_addr);
+ } else {
+ c->request_addr_size = 0;
+ }
+
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
stats.bytes_read += res;
@@ -961,7 +1301,7 @@
}
if (res == 0) {
/* connection closed */
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
return 1;
}
if (res == -1) {
@@ -981,7 +1321,70 @@
if (event_add(&c->event, 0) == -1) return 0;
return 1;
}
-
+
+/*
+ * Transmit the next chunk of data from our list of msgbuf structures.
+ *
+ * Returns:
+ * TRANSMIT_COMPLETE All done writing.
+ * TRANSMIT_INCOMPLETE More data remaining to write.
+ * TRANSMIT_SOFT_ERROR Can't write any more right now.
+ * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
+ */
+int transmit(conn *c) {
+ int res;
+
+ if (c->msgcurr < c->msgused &&
+ c->msglist[c->msgcurr].msg_iovlen == 0) {
+ /* Finished writing the current msg; advance to the next. */
+ c->msgcurr++;
+ }
+ if (c->msgcurr < c->msgused) {
+ struct msghdr *m = &c->msglist[c->msgcurr];
+ res = sendmsg(c->sfd, m, 0);
+ if (res > 0) {
+ stats.bytes_written += res;
+
+ /* We've written some of the data. Remove the completed
+ iovec entries from the list of pending writes. */
+ while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
+ res -= m->msg_iov->iov_len;
+ m->msg_iovlen--;
+ m->msg_iov++;
+ }
+
+ /* Might have written just part of the last iovec entry;
+ adjust it so the next write will do the rest. */
+ if (res > 0) {
+ m->msg_iov->iov_base += res;
+ m->msg_iov->iov_len -= res;
+ }
+ return TRANSMIT_INCOMPLETE;
+ }
+ if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Couldn't update event\n");
+ conn_set_state(c, conn_closing);
+ return TRANSMIT_HARD_ERROR;
+ }
+ return TRANSMIT_SOFT_ERROR;
+ }
+ /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
+ we have a real error, on which we close the connection */
+ if (settings.verbose > 0)
+ perror("Failed to write, and not due to blocking");
+
+ if (c->udp)
+ conn_set_state(c, conn_read);
+ else
+ conn_set_state(c, conn_closing);
+ return TRANSMIT_HARD_ERROR;
+ } else {
+ return TRANSMIT_COMPLETE;
+ }
+}
+
void drive_machine(conn *c) {
int exit = 0;
@@ -992,7 +1395,6 @@
int res;
while (!exit) {
- /* printf("state %d\n", c->state);*/
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
@@ -1010,8 +1412,9 @@
perror("setting O_NONBLOCK");
close(sfd);
break;
- }
- newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
+ }
+ newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+ DATA_BUFFER_SIZE, 0);
if (!newc) {
if (settings.verbose > 0)
fprintf(stderr, "couldn't create new connection\n");
@@ -1025,14 +1428,14 @@
if (try_read_command(c)) {
continue;
}
- if (try_read_network(c)) {
+ if (c->udp ? try_read_udp(c) : try_read_network(c)) {
continue;
}
/* we have no command line and no data to read from network */
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
exit = 1;
@@ -1064,14 +1467,14 @@
break;
}
if (res == 0) { /* end of stream */
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
exit = 1;
@@ -1080,13 +1483,13 @@
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
case conn_swallow:
/* we are reading sbytes and throwing them away */
if (c->sbytes == 0) {
- c->state = conn_read;
+ conn_set_state(c, conn_read);
break;
}
@@ -1107,14 +1510,14 @@
break;
}
if (res == 0) { /* end of stream */
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
exit = 1;
@@ -1123,147 +1526,67 @@
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
case conn_write:
- /* we are writing wbytes bytes starting from wcurr */
- if (c->wbytes == 0) {
- if (c->write_and_free) {
- free(c->write_and_free);
- c->write_and_free = 0;
- }
- c->state = c->write_and_go;
- if (c->state == conn_read)
- set_cork(c, 0);
- break;
- }
- res = write(c->sfd, c->wcurr, c->wbytes);
- if (res > 0) {
- stats.bytes_written += res;
- c->wcurr += res;
- c->wbytes -= res;
- break;
- }
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+ /*
+ * We want to write out a simple response. If we haven't already,
+ * assemble it into a msgbuf list (this will be a single-entry
+ * list for TCP or a two-entry list for UDP).
+ */
+ if (c->iovused == 0) {
+ if (add_iov(c, c->wcurr, c->wbytes) ||
+ c->udp && build_udp_headers(c)) {
if (settings.verbose > 0)
- fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ fprintf(stderr, "Couldn't build response\n");
+ conn_set_state(c, conn_closing);
break;
- }
- exit = 1;
- break;
+ }
}
- /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
- we have a real error, on which we close the connection */
- if (settings.verbose > 0)
- fprintf(stderr, "Failed to write, and not due to blocking\n");
- c->state = conn_closing;
- break;
+
+ /* fall through... */
+
case conn_mwrite:
- /*
- * we're writing ibytes bytes from iptr. iptr alternates between
- * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the
- * current item. When we finish a chunk, we choose the next one using
- * ipart, which has the following semantics: 0 - start the loop, 1 -
- * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
- * move to the next item and build its ibuf; 3 - we finished all items,
- * write "END".
- */
- if (c->ibytes > 0) {
- res = write(c->sfd, c->iptr, c->ibytes);
- if (res > 0) {
- stats.bytes_written += res;
- c->iptr += res;
- c->ibytes -= res;
- break;
- }
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- if (!update_event(c, EV_WRITE | EV_PERSIST)) {
- if (settings.verbose > 0)
- fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
- break;
+ switch (transmit(c)) {
+ case TRANSMIT_COMPLETE:
+ if (c->state == conn_mwrite) {
+ while (c->ileft > 0) {
+ item *it = *(c->icurr);
+ assert((it->it_flags & ITEM_SLABBED) == 0);
+ item_remove(it);
+ c->icurr++;
+ c->ileft--;
}
- exit = 1;
- break;
+ conn_set_state(c, conn_read);
+ } else if (c->state == conn_write) {
+ if (c->write_and_free) {
+ free(c->write_and_free);
+ c->write_and_free = 0;
+ }
+ conn_set_state(c, c->write_and_go);
+ } else {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Unexpected state %d\n", c->state);
+ conn_set_state(c, conn_closing);
}
- /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
- we have a real error, on which we close the connection */
- if (settings.verbose > 0)
- fprintf(stderr, "Failed to write, and not due to blocking\n");
- c->state = conn_closing;
break;
- } else {
- item *it;
- /* we finished a chunk, decide what to do next */
- switch (c->ipart) {
- case 1:
- it = *(c->icurr);
- assert((it->it_flags & ITEM_SLABBED) == 0);
- c->iptr = ITEM_data(it);
- if (c->binary) {
- c->ibytes = it->nbytes - 2;
- } else {
- c->ibytes = it->nbytes;
- }
- c->ipart = 2;
- break;
- case 2:
- it = *(c->icurr);
- item_remove(it);
- c->ileft--;
- if (c->ileft <= 0) {
- c->ipart = 3;
- break;
- } else {
- c->icurr++;
- }
- /* FALL THROUGH */
- case 0:
- it = *(c->icurr);
- assert((it->it_flags & ITEM_SLABBED) == 0);
- // add branch for binary mode
- if (c->binary) {
- long int key_size = htonl(it->nkey - 3);
- long int value_size = htonl(it->nbytes - 2);
- c->ibuf[0] = 0x01; // key
- /* XXX NOT SAFE ON 64 BIT FOR PROTOCOL REASONS
- NEED TO RECODE TO USE 4 byte INTS ONLY */
- memcpy(c->ibuf + 1, (char *)&key_size, sizeof(key_size));
- memcpy(c->ibuf + 1 + 4, ITEM_key(it), it->nkey - 3);
- memcpy(c->ibuf + 1 + 4 + it->nkey - 3, (char *)&value_size, sizeof(value_size));
- c->ibytes = 1 + 4 + it->nkey - 3 + 4;
- } else {
- c->ibytes = sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
- }
- if (settings.verbose > 1)
- fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
- c->iptr = c->ibuf;
- c->ipart = 1;
- break;
- case 3:
- // add branch for binary mode
- if (c->binary) {
- c->binary = 0;
- c->wbuf[0] = 0x01;
- c->wbytes = 1;
- c->wcurr = c->wbuf;
- c->state = conn_write;
- c->write_and_go = conn_read;
+ case TRANSMIT_INCOMPLETE:
+ case TRANSMIT_HARD_ERROR:
+ break; /* Continue in state machine. */
- } else {
- out_string(c, "END");
- }
- break;
- }
+ case TRANSMIT_SOFT_ERROR:
+ exit = 1;
+ break;
}
break;
case conn_closing:
- conn_close(c);
+ if (c->udp)
+ conn_cleanup(c);
+ else
+ conn_close(c);
exit = 1;
break;
}
@@ -1273,10 +1596,9 @@
return;
}
-
void event_handler(int fd, short which, void *arg) {
conn *c;
-
+
c = (conn *)arg;
c->which = which;
@@ -1295,11 +1617,11 @@
return;
}
-int new_socket(void) {
+int new_socket(int is_udp) {
int sfd;
int flags;
- if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
+ if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
perror("socket()");
return -1;
}
@@ -1313,24 +1635,62 @@
return sfd;
}
-int server_socket(int port) {
+
+/*
+ * Sets a socket's send buffer size to the maximum allowed by the system.
+ */
+void maximize_sndbuf(int sfd) {
+ socklen_t intsize = sizeof(int);
+ int last_good;
+ int min, max, avg;
+ int old_size;
+
+ /* Start with the default size. */
+ if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) {
+ if (settings.verbose > 0)
+ perror("getsockopt(SO_SNDBUF)");
+ return;
+ }
+
+ /* Binary-search for the real maximum. */
+ min = old_size;
+ max = MAX_SENDBUF_SIZE;
+
+ while (min <= max) {
+ avg = ((unsigned int) min + max) / 2;
+ if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &avg, intsize) == 0) {
+ last_good = avg;
+ min = avg + 1;
+ } else {
+ max = avg - 1;
+ }
+ }
+
+ if (settings.verbose > 1)
+ fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
+}
+
+
+int server_socket(int port, int is_udp) {
int sfd;
struct linger ling = {0, 0};
struct sockaddr_in addr;
int flags =1;
- if ((sfd = new_socket()) == -1) {
+ if ((sfd = new_socket(is_udp)) == -1) {
return -1;
}
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
-#if !defined(TCP_NOPUSH)
- setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
-#endif
+ if (is_udp) {
+ maximize_sndbuf(sfd);
+ } else {
+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
+ }
- /*
+ /*
* the memset call clears nonstandard fields in some impementations
* that otherwise mess things up.
*/
@@ -1344,6 +1704,72 @@
close(sfd);
return -1;
}
+ if (! is_udp && listen(sfd, 1024) == -1) {
+ perror("listen()");
+ close(sfd);
+ return -1;
+ }
+ return sfd;
+}
+
+int new_socket_unix(void) {
+ int sfd;
+ int flags;
+
+ if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ perror("socket()");
+ return -1;
+ }
+
+ if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
+ fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ perror("setting O_NONBLOCK");
+ close(sfd);
+ return -1;
+ }
+ return sfd;
+}
+
+int server_socket_unix(char *path) {
+ int sfd;
+ struct linger ling = {0, 0};
+ struct sockaddr_un addr;
+ struct stat tstat;
+ int flags =1;
+
+ if (!path) {
+ return -1;
+ }
+
+ if ((sfd = new_socket_unix()) == -1) {
+ return -1;
+ }
+
+ /*
+ * Clean up a previous socket file if we left it around
+ */
+ if (!lstat(path, &tstat)) {
+ if (S_ISSOCK(tstat.st_mode))
+ unlink(path);
+ }
+
+ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+ /*
+ * the memset call clears nonstandard fields in some impementations
+ * that otherwise mess things up.
+ */
+ memset(&addr, 0, sizeof(addr));
+
+ addr.sun_family = AF_UNIX;
+ strcpy(addr.sun_path, path);
+ if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
+ perror("bind()");
+ close(sfd);
+ return -1;
+ }
if (listen(sfd, 1024) == -1) {
perror("listen()");
close(sfd);
@@ -1352,14 +1778,46 @@
return sfd;
}
+
/* invoke right before gdb is called, on assert */
void pre_gdb () {
int i = 0;
if(l_socket) close(l_socket);
+ if(u_socket > -1) close(u_socket);
for (i=3; i<=500; i++) close(i); /* so lame */
kill(getpid(), SIGABRT);
}
+/*
+ * We keep the current time of day in a global variable that's updated by a
+ * timer event. This saves us a bunch of time() system calls (we really only
+ * need to get the time once a second, whereas there can be tens of thousands
+ * of requests a second) and allows us to use server-start-relative timestamps
+ * rather than absolute UNIX timestamps, a space savings on systems where
+ * sizeof(time_t) > sizeof(unsigned int).
+ */
+volatile rel_time_t current_time;
+struct event clockevent;
+
+void clock_handler(int fd, short which, void *arg) {
+ struct timeval t;
+ static int initialized = 0;
+
+ if (initialized) {
+ /* only delete the event if it's actually there. */
+ evtimer_del(&clockevent);
+ } else {
+ initialized = 1;
+ }
+
+ evtimer_set(&clockevent, clock_handler, 0);
+ t.tv_sec = 1;
+ t.tv_usec = 0;
+ evtimer_add(&clockevent, &t);
+
+ current_time = (rel_time_t) (time(0) - stats.started);
+}
+
struct event deleteevent;
void delete_handler(int fd, short which, void *arg) {
@@ -1380,7 +1838,7 @@
{
int i, j=0;
- time_t now = time(0);
+ rel_time_t now = current_time;
for (i=0; i<delcurr; i++) {
item *it = todelete[i];
if (it->exptime < now) {
@@ -1394,13 +1852,12 @@
}
delcurr = j;
}
-
- return;
}
void usage(void) {
printf(PACKAGE " " VERSION "\n");
printf("-p <num> port number to listen on\n");
+ printf("-s <file> unix socket path to listen on (disables network support)\n");
printf("-l <ip_addr> interface to listen on, default is INDRR_ANY\n");
printf("-d run as a daemon\n");
printf("-r maximize core file limit\n");
@@ -1415,6 +1872,8 @@
printf("-i print memcached and libevent license\n");
printf("-b run a managed instanced (mnemonic: buckets)\n");
printf("-P <file> save PID in <file>, only used with -d option\n");
+ printf("-f <factor> chunk size growth factor, default 1.25\n");
+ printf("-n <bytes> minimum space allocated for key+value+flags, default 48\n");
return;
}
@@ -1517,10 +1976,17 @@
}
int l_socket=0;
+int u_socket=-1;
+void sig_handler(int sig) {
+ printf("SIGINT handled.\n");
+ exit(0);
+}
+
int main (int argc, char **argv) {
int c;
conn *l_conn;
+ conn *u_conn;
struct in_addr addr;
int lock_memory = 0;
int daemonize = 0;
@@ -1531,23 +1997,32 @@
struct rlimit rlim;
char *pid_file = NULL;
+ /* handle SIGINT */
+ signal(SIGINT, sig_handler);
+
/* init settings */
settings_init();
-
+
/* set stderr non-buffering (for running under, say, daemontools) */
setbuf(stderr, NULL);
/* process arguments */
- while ((c = getopt(argc, argv, "bp:m:Mc:khirvdl:u:P:")) != -1) {
+ while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
switch (c) {
+ case 'U':
+ settings.udpport = atoi(optarg);
+ break;
case 'b':
settings.managed = 1;
break;
case 'p':
settings.port = atoi(optarg);
break;
+ case 's':
+ settings.socketpath = optarg;
+ break;
case 'm':
- settings.maxbytes = atoi(optarg)*1024*1024;
+ settings.maxbytes = ((size_t)atoi(optarg))*1024*1024;
break;
case 'M':
settings.evict_to_free = 0;
@@ -1587,6 +2062,20 @@
case 'P':
pid_file = optarg;
break;
+ case 'f':
+ settings.factor = atof(optarg);
+ if (settings.factor <= 1.0) {
+ fprintf(stderr, "Factor must be greater than 1\n");
+ return 1;
+ }
+ break;
+ case 'n':
+ settings.chunk_size = atoi(optarg);
+ if (settings.chunk_size == 0) {
+ fprintf(stderr, "Chunk size must be greater than 0\n");
+ return 1;
+ }
+ break;
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
@@ -1641,19 +2130,30 @@
}
/*
- * initialization order: first create the listening socket
+ * initialization order: first create the listening sockets
* (may need root on low ports), then drop root if needed,
* then daemonise if needed, then init libevent (in some cases
* descriptors created by libevent wouldn't survive forking).
*/
/* create the listening socket and bind it */
- l_socket = server_socket(settings.port);
- if (l_socket == -1) {
- fprintf(stderr, "failed to listen\n");
- exit(1);
+ if (!settings.socketpath) {
+ l_socket = server_socket(settings.port, 0);
+ if (l_socket == -1) {
+ fprintf(stderr, "failed to listen\n");
+ exit(1);
+ }
}
+ if (settings.udpport > 0 && ! settings.socketpath) {
+ /* create the UDP listening socket and bind it */
+ u_socket = server_socket(settings.udpport, 1);
+ if (u_socket == -1) {
+ fprintf(stderr, "failed to listen\n");
+ exit(1);
+ }
+ }
+
/* lose root privileges if we have them */
if (getuid()== 0 || geteuid()==0) {
if (username==0 || *username=='\0') {
@@ -1670,6 +2170,15 @@
}
}
+ /* create unix mode sockets after dropping privileges */
+ if (settings.socketpath) {
+ l_socket = server_socket_unix(settings.socketpath);
+ if (l_socket == -1) {
+ fprintf(stderr, "failed to listen\n");
+ exit(1);
+ }
+ }
+
/* daemonize if requested */
/* if we want to ensure our ability to dump core, don't chdir to / */
if (daemonize) {
@@ -1688,7 +2197,7 @@
stats_init();
assoc_init();
conn_init();
- slabs_init(settings.maxbytes);
+ slabs_init(settings.maxbytes, settings.factor);
/* managed instance? alloc and zero a bucket array */
if (settings.managed) {
@@ -1722,11 +2231,21 @@
}
/* create the initial listening connection */
- if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST))) {
+ if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) {
fprintf(stderr, "failed to create listening connection");
exit(1);
}
+ /* create the initial listening udp connection */
+ if (u_socket > -1 &&
+ !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, 1))) {
+ fprintf(stderr, "failed to create udp connection");
+ exit(1);
+ }
+
+ /* initialise clock event */
+ clock_handler(0,0,0);
+
/* initialise deletion array and timer event */
deltotal = 200; delcurr = 0;
todelete = malloc(sizeof(item *)*deltotal);
@@ -1745,4 +2264,3 @@
return 0;
}
-
Modified: trunk/server/memcached.h
===================================================================
--- trunk/server/memcached.h 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/memcached.h 2006-09-03 03:18:26 UTC (rev 320)
@@ -2,36 +2,58 @@
/* $Id$ */
#define DATA_BUFFER_SIZE 2048
+#define UDP_READ_BUFFER_SIZE 65536
+#define UDP_MAX_PAYLOAD_SIZE 1400
+#define UDP_HEADER_SIZE 8
+#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
-#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
-#define TCP_NOPUSH TCP_CORK
-#endif
+/* Initial size of list of items being returned by "get". */
+#define ITEM_LIST_INITIAL 200
+/* Initial size of the sendmsg() scatter/gather array. */
+#define IOV_LIST_INITIAL 400
+
+/* Initial number of sendmsg() argument structures to allocate. */
+#define MSG_LIST_INITIAL 10
+
+/* High water marks for buffer shrinking */
+#define READ_BUFFER_HIGHWAT 8192
+#define ITEM_LIST_HIGHWAT 400
+#define IOV_LIST_HIGHWAT 600
+#define MSG_LIST_HIGHWAT 100
+
+/* Time relative to server start. Smaller than time_t on 64-bit systems. */
+typedef unsigned int rel_time_t;
+
struct stats {
unsigned int curr_items;
unsigned int total_items;
- unsigned long long curr_bytes;
+ unsigned long long curr_bytes;
unsigned int curr_conns;
unsigned int total_conns;
unsigned int conn_structs;
- unsigned int get_cmds;
- unsigned int set_cmds;
- unsigned int get_hits;
- unsigned int get_misses;
+ unsigned long long get_cmds;
+ unsigned long long set_cmds;
+ unsigned long long get_hits;
+ unsigned long long get_misses;
time_t started; /* when the process was started */
unsigned long long bytes_read;
unsigned long long bytes_written;
};
struct settings {
- unsigned int maxbytes;
+ size_t maxbytes;
int maxconns;
int port;
+ int udpport;
struct in_addr interface;
int verbose;
+ rel_time_t oldest_live; /* ignore existing items older than this */
int managed; /* if 1, a tracker manages virtual buckets */
- time_t oldest_live; /* ignore existing items older than this */
int evict_to_free;
+ char *socketpath; /* path to unix socket if using local socket */
+ double factor; /* chunk size growth factor */
+ int chunk_size;
};
extern struct stats stats;
@@ -46,24 +68,27 @@
typedef struct _stritem {
struct _stritem *next;
struct _stritem *prev;
- struct _stritem *h_next; /* hash chain next */
+ struct _stritem *h_next; /* hash chain next */
+ rel_time_t time; /* least recent access */
+ rel_time_t exptime; /* expire time */
+ int nbytes; /* size of data */
unsigned short refcount;
- unsigned short flags;
- int nbytes; /* size of data */
- time_t time; /* least recent access */
- time_t exptime; /* expire time */
- unsigned char it_flags; /* ITEM_* above */
- unsigned char slabs_clsid;
- unsigned char nkey; /* key length, with terminating null and padding */
- unsigned char dummy1;
+ unsigned char nsuffix; /* length of flags-and-length string */
+ unsigned char it_flags; /* ITEM_* above */
+ unsigned char slabs_clsid;/* which slab class we're in */
+ unsigned char nkey; /* key length, w/terminating null and padding */
void * end[0];
+ /* then null-terminated key */
+ /* then " flags length\r\n" (no terminating null) */
+ /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
#define ITEM_key(item) ((char*)&((item)->end[0]))
/* warning: don't use these macros with a function, as it evals its arg twice */
-#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey)
-#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nbytes)
+#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey)
+#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix)
+#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes)
enum conn_states {
conn_listening, /* the socket which listens for connections */
@@ -97,7 +122,6 @@
int wbytes;
int write_and_go; /* which state to go into after finishing current write */
void *write_and_free; /* free this memory after finishing writing */
- char is_corked; /* boolean, connection is corked */
char *ritem; /* when we read in an item's value, it goes here */
int rlbytes;
@@ -117,19 +141,33 @@
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
+ struct iovec *iov;
+ int iovsize; /* number of elements allocated in iov[] */
+ int iovused; /* number of elements used in iov[] */
+
+ struct msghdr *msglist;
+ int msgsize; /* number of elements allocated in msglist[] */
+ int msgused; /* number of elements used in msglist[] */
+ int msgcurr; /* element in msglist[] being transmitted now */
+ int msgbytes; /* number of bytes in current msg */
+
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
- int ipart; /* 1 if we're writing a VALUE line, 2 if we're writing data */
- char ibuf[300]; /* for VALUE lines */
- char *iptr;
- int ibytes;
+
+ /* data for UDP clients */
+ int udp; /* 1 if this is a UDP "connection" */
+ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
+ struct sockaddr request_addr; /* Who sent the most recent request */
+ socklen_t request_addr_size;
+ unsigned char *hdrbuf; /* udp packet headers */
+ int hdrsize; /* number of headers' worth of space is allocated */
+
int binary; /* are we in binary mode */
int bucket; /* bucket number for the next command, if running as
a managed instance. -1 (_not_ 0) means invalid. */
int gen; /* generation requested for the bucket */
-
} conn;
/* number of virtual buckets for a managed instance */
@@ -138,6 +176,12 @@
/* listening socket */
extern int l_socket;
+/* udp socket */
+extern int u_socket;
+
+/* current time of day (updated periodically) */
+extern volatile rel_time_t current_time;
+
/* temporary hack */
/* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); }
void pre_gdb (); */
@@ -152,12 +196,14 @@
* be that low).
*/
-time_t realtime(time_t exptime);
+rel_time_t realtime(time_t exptime);
/* slabs memory allocation */
-/* Init the subsystem. The argument is the limit on no. of bytes to allocate, 0 if no limit */
-void slabs_init(unsigned int limit);
+/* Init the subsystem. 1st argument is the limit on no. of bytes to allocate,
+ 0 if no limit. 2nd argument is the growth factor; each slab will use a chunk
+ size equal to the previous slab's chunk size times this factor. */
+void slabs_init(size_t limit, double factor);
/* Preallocate as many slab pages as possible (called from slabs_init)
on start-up, so users don't get confused out-of-memory errors when
@@ -169,13 +215,13 @@
/* Given object size, return id to use when allocating/freeing memory for object */
/* 0 means error: can't store such a large object */
-unsigned int slabs_clsid(unsigned int size);
+unsigned int slabs_clsid(size_t size);
/* Allocate object of given length. 0 on error */
-void *slabs_alloc(unsigned int size);
+void *slabs_alloc(size_t size);
/* Free previously allocated object */
-void slabs_free(void *ptr, unsigned int size);
+void slabs_free(void *ptr, size_t size);
/* Fill buffer with stats */
char* slabs_stats(int *buflen);
@@ -188,17 +234,22 @@
/* event handling, network IO */
void event_handler(int fd, short which, void *arg);
-conn *conn_new(int sfd, int init_state, int event_flags);
+conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
void conn_close(conn *c);
void conn_init(void);
void drive_machine(conn *c);
-int new_socket(void);
-int server_socket(int port);
+int new_socket(int isUdp);
+int server_socket(int port, int isUdp);
int update_event(conn *c, int new_flags);
int try_read_command(conn *c);
int try_read_network(conn *c);
+int try_read_udp(conn *c);
void complete_nread(conn *c);
void process_command(conn *c, char *command);
+int transmit(conn *c);
+int ensure_iov_space(conn *c);
+int add_iov(conn *c, const void *buf, int len);
+int add_msghdr(conn *c);
/* stats */
void stats_reset(void);
@@ -215,8 +266,9 @@
void item_init(void);
-item *item_alloc(char *key, int flags, time_t exptime, int nbytes);
+item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes);
void item_free(item *it);
+int item_size_ok(char *key, int flags, int nbytes);
int item_link(item *it); /* may fail if transgresses limits */
void item_unlink(item *it);
Modified: trunk/server/slabs.c
===================================================================
--- trunk/server/slabs.c 2006-09-03 03:10:59 UTC (rev 319)
+++ trunk/server/slabs.c 2006-09-03 03:18:26 UTC (rev 320)
@@ -1,6 +1,11 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
- * Slabs memory allocation, based on powers-of-2
+ * Slabs memory allocation, based on powers-of-N. Slabs are up to 1MB in size
+ * and are divided into chunks. The chunk sizes start off at the size of the
+ * "item" structure plus space for a small key and value. They increase by
+ * a multiplier factor from there, up to half the maximum slab size. The last
+ * slab size is always 1MB, since that's the maximum item size allowed by the
+ * memcached protocol.
*
* $Id$
*/
@@ -23,11 +28,12 @@
#include "memcached.h"
-#define POWER_SMALLEST 3
-#define POWER_LARGEST 20
+#define POWER_SMALLEST 1
+#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
+#define CHUNK_ALIGN_BYTES (sizeof(void *))
-/* powers-of-2 allocation structures */
+/* powers-of-N allocation structures */
typedef struct {
unsigned int size; /* sizes of items */
@@ -49,42 +55,61 @@
} slabclass_t;
static slabclass_t slabclass[POWER_LARGEST+1];
-static unsigned int mem_limit = 0;
-static unsigned int mem_malloced = 0;
+static size_t mem_limit = 0;
+static size_t mem_malloced = 0;
+static int power_largest;
-unsigned int slabs_clsid(unsigned int size) {
- int res = 1;
+/*
+ * Figures out which slab class (chunk size) is required to store an item of
+ * a given size.
+ */
+unsigned int slabs_clsid(size_t size) {
+ int res = POWER_SMALLEST;
if(size==0)
return 0;
- size--;
- while(size >>= 1)
- res++;
- if (res < POWER_SMALLEST)
- res = POWER_SMALLEST;
- if (res > POWER_LARGEST)
- res = 0;
+ while (size > slabclass[res].size)
+ if (res++ == power_largest) /* won't fit in the biggest slab */
+ return 0;
return res;
}
-void slabs_init(unsigned int limit) {
- int i;
- int size=1;
+/*
+ * Determines the chunk sizes and initializes the slab class descriptors
+ * accordingly.
+ */
+void slabs_init(size_t limit, double factor) {
+ int i = POWER_SMALLEST - 1;
+ unsigned int size = sizeof(item) + settings.chunk_size;
+ /* Factor of 2.0 means use the default memcached behavior */
+ if (factor == 2.0 && size < 128)
+ size = 128;
+
mem_limit = limit;
- for(i=0; i<=POWER_LARGEST; i++, size*=2) {
+ memset(slabclass, 0, sizeof(slabclass));
+
+ while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) {
+ /* Make sure items are always n-byte aligned */
+ if (size % CHUNK_ALIGN_BYTES)
+ size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
+
slabclass[i].size = size;
- slabclass[i].perslab = POWER_BLOCK / size;
- slabclass[i].slots = 0;
- slabclass[i].sl_curr = slabclass[i].sl_total = slabclass[i].slabs = 0;
- slabclass[i].end_page_ptr = 0;
- slabclass[i].end_page_free = 0;
- slabclass[i].slab_list = 0;
- slabclass[i].list_size = 0;
- slabclass[i].killing = 0;
+ slabclass[i].perslab = POWER_BLOCK / slabclass[i].size;
+ size *= factor;
+ if (settings.verbose > 1) {
+ fprintf(stderr, "slab class %3d: chunk size %6d perslab %5d\n",
+ i, slabclass[i].size, slabclass[i].perslab);
+ }
}
+ power_largest = i;
+ slabclass[power_largest].size = POWER_BLOCK;
+ slabclass[power_largest].perslab = 1;
+
+#ifndef DONT_PREALLOC_SLABS
slabs_preallocate(limit / POWER_BLOCK);
+#endif
}
void slabs_preallocate (unsigned int maxslabs) {
@@ -101,14 +126,14 @@
if (++prealloc > maxslabs)
return;
slabs_newslab(i);
- }
-
+ }
+
}
static int grow_slab_list (unsigned int id) {
slabclass_t *p = &slabclass[id];
if (p->slabs == p->list_size) {
- unsigned int new_size = p->list_size ? p->list_size * 2 : 16;
+ size_t new_size = p->list_size ? p->list_size * 2 : 16;
void *new_list = realloc(p->slab_list, new_size*sizeof(void*));
if (new_list == 0) return 0;
p->list_size = new_size;
@@ -119,11 +144,14 @@
int slabs_newslab(unsigned int id) {
slabclass_t *p = &slabclass[id];
- int num = p->perslab;
+#ifdef ALLOW_SLABS_REASSIGN
int len = POWER_BLOCK;
+#else
+ int len = p->size * p->perslab;
+#endif
char *ptr;
- if (mem_limit && mem_malloced + len > mem_limit)
+ if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
return 0;
if (! grow_slab_list(id)) return 0;
@@ -133,18 +161,18 @@
memset(ptr, 0, len);
p->end_page_ptr = ptr;
- p->end_page_free = num;
+ p->end_page_free = p->perslab;
p->slab_list[p->slabs++] = ptr;
mem_malloced += len;
return 1;
}
-void *slabs_alloc(unsigned int size) {
+void *slabs_alloc(size_t size) {
slabclass_t *p;
unsigned char id = slabs_clsid(size);
- if (id < POWER_SMALLEST || id > POWER_LARGEST)
+ if (id < POWER_SMALLEST || id > power_largest)
return 0;
p = &slabclass[id];
@@ -180,13 +208,13 @@
return 0; /* shouldn't ever get here */
}
-void slabs_free(void *ptr, unsigned int size) {
+void slabs_free(void *ptr, size_t size) {
unsigned char id = slabs_clsid(size);
slabclass_t *p;
assert(((item *)ptr)->slabs_clsid==0);
- assert(id >= POWER_SMALLEST && id <= POWER_LARGEST);
- if (id < POWER_SMALLEST || id > POWER_LARGEST)
+ assert(id >= POWER_SMALLEST && id <= power_largest);
+ if (id < POWER_SMALLEST || id > power_largest)
return;
p = &slabclass[id];
@@ -211,14 +239,14 @@
char* slabs_stats(int *buflen) {
int i, total;
- char *buf = (char*) malloc(8192);
+ char *buf = (char*) malloc(power_largest * 200 + 100);
char *bufcurr = buf;
*buflen = 0;
if (!buf) return 0;
total = 0;
- for(i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
+ for(i = POWER_SMALLEST; i <= power_largest; i++) {
slabclass_t *p = &slabclass[i];
if (p->slabs) {
unsigned int perslab, slabs;
@@ -236,13 +264,19 @@
total++;
}
}
- bufcurr += sprintf(bufcurr, "STAT active_slabs %d\r\nSTAT total_malloced %u\r\n", total, mem_malloced);
+ bufcurr += sprintf(bufcurr, "STAT active_slabs %d\r\nSTAT total_malloced %llu\r\n", total, (unsigned long long) mem_malloced);
bufcurr += sprintf(bufcurr, "END\r\n");
*buflen = bufcurr - buf;
return buf;
}
-/* 1 = success
+#ifdef ALLOW_SLABS_REASSIGN
+/* Blows away all the items in a slab class and moves its slabs to another
+ class. This is only used by the "slabs reassign" command, for manual tweaking
+ of memory allocation. It's disabled by default since it requires that all
+ slabs be the same size (which can waste space for chunk size mantissas of
+ other than 2.0).
+ 1 = success
0 = fail
-1 = tried. busy. send again shortly. */
int slabs_reassign(unsigned char srcid, unsigned char dstid) {
@@ -251,8 +285,8 @@
void *iter;
int was_busy = 0;
- if (srcid < POWER_SMALLEST || srcid > POWER_LARGEST ||
- dstid < POWER_SMALLEST || dstid > POWER_LARGEST)
+ if (srcid < POWER_SMALLEST || srcid > power_largest ||
+ dstid < POWER_SMALLEST || dstid > power_largest)
return 0;
p = &slabclass[srcid];
@@ -308,3 +342,4 @@
return 1;
}
+#endif
More information about the memcached-commits
mailing list