Patch: CPU efficiency, UDP support, and other changes
Steven Grimm
sgrimm at facebook.com
Wed May 3 16:06:35 UTC 2006
This big patch contains all the changes we've made to memcached 1.1.12
at facebook.com. It includes some changes I've sent to the list as
separate patches:
* Memory efficiency is increased; we get about 40% more items in a given
amount of memory vs. the standard 1.1.12 memcached. (This patch has a
couple tweaks that aren't in the previous smaller one since they tie
into other changes.)
* Support for large memory sizes (64-bit pointers and size_t).
* Fix for bogus "out of memory" errors caused by memory filling up
before a slab class has any slabs.
But the big changes here are not in the other patches:
* CPU consumption is reduced 25-30%.
* A UDP-based interface is supported in addition to the standard TCP one.
No doubt some will ask, "Why are you sending this out as one big patch
instead of splitting everything out into small independent patches?"
I'll include a section answering just that question at the end of this
message.
Details follow. Some of this will look familiar if you've seen the
earlier patches.
Memory consumption
------------------
The slab allocator's powers-of-2 size strategy is now a powers-of-N
strategy, where N may be specified on the command line using the new
"-f" option. The default is 1.25. For a large memcached instance, where
there are enough items of enough different sizes that the increased
number of slab classes isn't itself a waste of memory, this is a
significant win: items are placed in chunks whose sizes are much closer
to the item size, wasting less memory.
One consequence of this is that slabs are no longer fixed-size; by
default they are no bigger than 1MB each, but are only as big as they
need to be to hold a whole number of chunks. That causes the "slabs
reassign" command to be unavailable; it can be reenabled by compiling
with -DALLOW_SLABS_REASSIGN at the expense of some wasted memory (all
slabs will be 1MB).
The minimum amount of space for data in chunks of the smallest slab
class may be adjusted on the command line using the new "-s" option.
Each chunk is that many bytes plus the size of the fixed-size item
header. If you have a lot of items with small fixed-size keys and
values, you can use this option to maximize the number of items per slab
in the smallest slab class; experiment with your particular data to find
the optimal value.
Item expiration times and access times are now stored as 32-bit integers
(number of seconds relative to server start) rather than time_t, which
is 64 bits on some platforms. This saves 8 bytes per item when compiled
in 64-bit mode, and is harmless otherwise.
CPU consumption
---------------
The implementation of the "get" request is substantially reworked. Now
the entire response is composed in memory ahead of time, and we write it
out in (usually) just one system call using sendmsg()'s scatter/gather
capability. Since we are no longer doing small writes, the
TCP_CORK/NOPUSH code is not needed and we can simply set the TCP socket
to TCP_NODELAY at connect time, saving a couple more system calls per
request.
The "VALUE" line (response to a "get" request) is rendered once at item
creation time, rather than re-rendered on each fetch.
The current system time is stored in a global variable that's updated
every second by a libevent timer; this eliminates several time() calls
per request. A minor improvement, but a cycle saved is a cycle earned.
UDP support
-----------
For large installations with tens of thousands of clients, the amount of
memory consumed by per-TCP-connection kernel buffers can grow large,
reducing the amount of memory that can be used by memcached. There is
now a UDP protocol, which supports an arbitrarily large number of
clients using a constant amount of server memory.
In the interest of efficiency and simplicity of implementation, the UDP
protocol does not support reliable delivery; it should therefore be used
for "get" requests where a dropped response would simply result in a
recoverable cache miss. For write requests (set, delete, etc.) or very
large "get" requests, a nonpersistent TCP connection should be used.
(This is simply advice; the code will happily accept any kind of request
via its UDP interface.)
UDP support is only enabled if a UDP port is specified on the command line.
The UDP protocol is described at the bottom of doc/protocol.txt.
Large memory support
--------------------
This mostly involves using size_t rather than unsigned int in a few
places and compiling in 64-bit mode, which gives us 64-bit pointers and
makes size_t 64 bits.
Fix for "out of memory" errors
------------------------------
Rather than preallocate a slab in each slab class as the memcached
1.1.13 prerelease does, we decided to instead allow memcached to exceed
its memory limit slightly. When a "set" request comes in that requires a
slab whose slab class is empty, we always allocate a slab, even if
memcached is already at its configured memory limit.
Our memcached instances are large enough that going over the limit by a
few megabytes is barely even detectable. If you are running in a very
constrained environment, you can lower the memory limit slightly to
account for this change, but bear in mind that this change will only
exceed the memory limit if a "set" request requires it (which will never
happen if your data always falls within a limited range of sizes.)
Why is this one patch?
----------------------
First, this patch is tested. It runs 24x7 on a large number of very busy
memcached hosts. Thoroughly testing every possible permutation of these
changes isn't really feasible.
Second, the changes are not all easily separable. For example, adding
the UDP support required reorganizing memcached's implementation of the
"get" request, and that reorganization also resulted in most of the CPU
time improvement. Similarly, one of the memory efficiency tweaks is
only required because compiling in 64-bit mode (for large memory
support) increases the size of a particular data type, and the
implementation of that tweak results in part of the CPU time savings.
Third, I *did* send it out as separate patches to the extent it made
sense to separate out the changes. But rather than excluding those
changes from the not-easily-separable stuff, I think it makes more sense
to include it all together. Otherwise anyone who wants to combine
everything will have to do tedious error-prone manual editing to merge
it all together, since some of the changes conflict. For example, both
the large memory support and the slab allocator modification involve
changing the parameters to slabs_init(), so it would be impossible to
produce two independent patches against the 1.1.12 release that could be
applied successfully one after the other.
Credits
-------
These changes were made by David Fetterman, Steven Grimm, and Scott
Marlette. Send comments to Steven Grimm (sgrimm at facebook.com) or,
preferably, to the memcached mailing list.
-------------- next part --------------
--- ../1.1.12-dist/doc/memcached.1 2006-05-02 14:53:48.000000000 -0700
+++ ./doc/memcached.1 2006-05-02 17:25:31.000000000 -0700
@@ -47,10 +47,10 @@
suggestions.
.TP
.B \-p <num>
-Listen on port <num>, the default is port 11211.
+Listen on TCP port <num>, the default is port 11211.
.TP
-.B \-r
-Maximize core file limit
+.B \-U <num>
+Listen on UDP port <num>, the default is port 11211.
.TP
.B \-M
Disable automatic removal of items from the cache when out of memory.
@@ -59,6 +59,20 @@
.B \-r
Raise the core file size limit to the maximum allowable.
.TP
+.B \-f <factor>
+Use <factor> as the multiplier for computing the sizes of memory chunks that
+items are stored in. A lower value may result in less wasted memory depending
+on the total amount of memory available and the distribution of item sizes.
+The default is 1.25.
+.TP
+.B \-s <size>
+Allocate a minimum of <size> bytes for the item key, value, and flags. The
+default is 48. If you have a lot of small keys and values, you can get a
+significant memory efficiency gain with a lower value. If you use a high
+chunk growth factor (-f option), on the other hand, you may want to increase
+the size to allow a bigger percentage of your items to fit in the most densely
+packed (smallest) chunks.
+.TP
.B \-h
Show the version of memcached and a summary of options.
.TP
--- ../1.1.12-dist/doc/protocol.txt 2006-05-02 14:53:48.000000000 -0700
+++ ./doc/protocol.txt 2006-05-02 17:25:31.000000000 -0700
@@ -1,8 +1,9 @@
Protocol
--------
-Clients of memcached communicate with server through TCP
-connections. A given running memcached server listens on some
+Clients of memcached communicate with server through TCP connections.
+(A UDP interface is also available; details are below under "UDP
+protocol.") A given running memcached server listens on some
(configurable) port; clients connect to that port, send commands to
the server, read responses, and eventually close the connection.
@@ -387,3 +388,45 @@
Upon receiving this command, the server closes the
connection. However, the client may also simply close the connection
when it no longer needs it, without issuing this command.
+
+
+UDP protocol
+------------
+
+For very large installations where the number of clients is high enough
+that the number of TCP connections causes scaling difficulties, there is
+also a UDP-based interface. The UDP interface does not provide guaranteed
+delivery, so should only be used for operations that aren't required to
+succeed; typically it is used for "get" requests where a missing or
+incomplete response can simply be treated as a cache miss.
+
+Each UDP datagram contains a simple frame header, followed by data in the
+same format as the TCP protocol described above. In the current
+implementation, requests must be contained in a single UDP datagram, but
+responses may span several datagrams. (The only common requests that would
+span multiple datagrams are huge multi-key "get" requests and "set"
+requests, both of which are more suitable to TCP transport for reliability
+reasons anyway.)
+
+The frame header is 8 bytes long, as follows (all values are 16-bit integers
+in network byte order, high byte first):
+
+0-1 Request ID
+2-3 Sequence number
+4-5 Total number of datagrams in this message
+6-7 Reserved for future use; must be 0
+
+The request ID is supplied by the client. Typically it will be a
+monotonically increasing value starting from a random seed, but the client
+is free to use whatever request IDs it likes. The server's response will
+contain the same ID as the incoming request. The client uses the request ID
+to differentiate between responses to outstanding requests if there are
+several pending from the same server; any datagrams with an unknown request
+ID are probably delayed responses to an earlier request and should be
+discarded.
+
+The sequence number ranges from 0 to n-1, where n is the total number of
+datagrams in the message. The client should concatenate the payloads of the
+datagrams for a given response in sequence number order; the resulting byte
+stream will contain a complete response in the same format as the TCP
+protocol (including terminating \r\n sequences).
--- ../1.1.12-dist/items.c 2006-05-02 14:53:49.000000000 -0700
+++ ./items.c 2006-05-02 17:25:32.000000000 -0700
@@ -21,12 +21,7 @@
#include "memcached.h"
-/*
- * NOTE: we assume here for simplicity that slab ids are <=32. That's true in
- * the powers-of-2 implementation, but if that changes this should be changed too
- */
-
-#define LARGEST_ID 32
+#define LARGEST_ID 255
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
unsigned int sizes[LARGEST_ID];
@@ -41,14 +36,16 @@
}
-item *item_alloc(char *key, int flags, time_t exptime, int nbytes) {
- int ntotal, len;
+item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
+ int nsuffix, ntotal, len;
item *it;
unsigned int id;
+ char suffix[40];
len = strlen(key) + 1; if(len % 4) len += 4 - (len % 4);
- ntotal = sizeof(item) + len + nbytes;
-
+ nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2);
+ ntotal = sizeof(item) + len + nsuffix + nbytes;
+
id = slabs_clsid(ntotal);
if (id == 0)
return 0;
@@ -97,7 +94,8 @@
it->nbytes = nbytes;
strcpy(ITEM_key(it), key);
it->exptime = exptime;
- it->flags = flags;
+ memcpy(ITEM_suffix(it), suffix, nsuffix);
+ it->nsuffix = nsuffix;
return it;
}
@@ -159,7 +157,7 @@
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
assert(it->nbytes < 1048576);
it->it_flags |= ITEM_LINKED;
- it->time = time(0);
+ it->time = current_time;
assoc_insert(ITEM_key(it), it);
stats.curr_bytes += ITEM_ntotal(it);
@@ -195,7 +193,7 @@
assert((it->it_flags & ITEM_SLABBED) == 0);
item_unlink_q(it);
- it->time = time(0);
+ it->time = current_time;
item_link_q(it);
}
@@ -224,7 +222,7 @@
bufcurr = 0;
while (it && (!limit || shown < limit)) {
- len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time);
+ len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time + stats.started);
if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
break;
strcpy(buffer + bufcurr, temp);
@@ -243,7 +241,7 @@
void item_stats(char *buffer, int buflen) {
int i;
char *bufcurr = buffer;
- time_t now = time(0);
+ rel_time_t now = current_time;
if (buflen < 4096) {
strcpy(buffer, "SERVER_ERROR out of memory");
@@ -252,7 +250,7 @@
for (i=0; i<LARGEST_ID; i++) {
if (tails[i])
- bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %lu\r\n",
+ bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %u\r\n",
i, sizes[i], i, now - tails[i]->time);
}
strcpy(bufcurr, "END");
@@ -262,7 +260,7 @@
/* dumps out a list of objects of each size, with granularity of 32 bytes */
char* item_stats_sizes(int *bytes) {
int num_buckets = 32768; /* max 1MB object, divided into 32 bytes size buckets */
- unsigned int *histogram = (int*) malloc(num_buckets * sizeof(int));
+ unsigned int *histogram = (unsigned int*) malloc(num_buckets * sizeof(int));
char *buf = (char*) malloc(1024*1024*2*sizeof(char));
int i;
--- ../1.1.12-dist/memcached.c 2006-05-02 14:53:49.000000000 -0700
+++ ./memcached.c 2006-05-02 17:25:32.000000000 -0700
@@ -28,6 +28,10 @@
#ifndef _P1003_1B_VISIBLE
#define _P1003_1B_VISIBLE
#endif
+/* need this to get IOV_MAX on some platforms. */
+#ifndef __need_IOV_MAX
+#define __need_IOV_MAX
+#endif
#include <pwd.h>
#include <sys/mman.h>
#include <fcntl.h>
@@ -42,6 +46,7 @@
#include <time.h>
#include <event.h>
#include <assert.h>
+#include <limits.h>
#ifdef HAVE_MALLOC_H
#include <malloc.h>
@@ -56,19 +61,21 @@
static int delcurr;
static int deltotal;
-time_t realtime(time_t exptime) {
- time_t now;
+#define TRANSMIT_COMPLETE 0
+#define TRANSMIT_INCOMPLETE 1
+#define TRANSMIT_SOFT_ERROR 2
+#define TRANSMIT_HARD_ERROR 3
+rel_time_t realtime(time_t exptime) {
/* no. of seconds in 30 days - largest possible delta exptime */
#define REALTIME_MAXDELTA 60*60*24*30
if (exptime == 0) return 0; /* 0 means never expire */
if (exptime > REALTIME_MAXDELTA)
- return exptime;
+ return (rel_time_t) (exptime - stats.started);
else {
- now = time(0);
- return exptime + now;
+ return (rel_time_t) (exptime + current_time);
}
}
@@ -77,8 +84,7 @@
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
stats.started = time(0);
-}
-
+}
void stats_reset(void) {
stats.total_items = stats.total_conns = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
@@ -87,26 +93,57 @@
void settings_init(void) {
settings.port = 11211;
+ settings.udpport = 11211;
settings.interface.s_addr = htonl(INADDR_ANY);
settings.maxbytes = 64*1024*1024; /* default is 64MB */
settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
settings.verbose = 0;
settings.oldest_live = 0;
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
+ settings.factor = 1.25;
+ settings.chunk_size = 48; /* space for a modest key and value */
+}
+
+/*
+ * Adds a message header to a connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int add_msghdr(conn *c)
+{
+ struct msghdr *msg;
+
+ if (c->msgsize == c->msgused) {
+ msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
+ if (! msg)
+ return -1;
+ c->msglist = msg;
+ c->msgsize *= 2;
+ }
+
+ msg = c->msglist + c->msgused;
+ msg->msg_iov = &c->iov[c->iovused];
+ msg->msg_iovlen = 0;
+ msg->msg_name = &c->request_addr;
+ msg->msg_namelen = c->request_addr_size;
+ msg->msg_control = 0;
+ msg->msg_controllen = 0;
+ msg->msg_flags = 0;
+ c->msgbytes = 0;
+ c->msgused++;
+
+ if (c->udp) {
+ /* Leave room for the UDP header, which we'll fill in later. */
+ return add_iov(c, NULL, UDP_HEADER_SIZE);
+ }
+
+ return 0;
}
conn **freeconns;
int freetotal;
int freecurr;
-void set_cork (conn *c, int val) {
- if (c->is_corked == val) return;
- c->is_corked = val;
-#ifdef TCP_NOPUSH
- setsockopt(c->sfd, IPPROTO_TCP, TCP_NOPUSH, &val, sizeof(val));
-#endif
-}
-
void conn_init(void) {
freetotal = 200;
freecurr = 0;
@@ -114,7 +151,8 @@
return;
}
-conn *conn_new(int sfd, int init_state, int event_flags) {
+conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
+ int is_udp) {
conn *c;
/* do we have a free conn structure from a previous close? */
@@ -127,32 +165,48 @@
}
c->rbuf = c->wbuf = 0;
c->ilist = 0;
+ c->iov = 0;
+ c->msglist = 0;
+ c->hdrbuf = 0;
- c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);
- c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);
- c->ilist = (item **) malloc(sizeof(item *)*200);
+ c->rsize = read_buffer_size;
+ c->wsize = DATA_BUFFER_SIZE;
+ c->isize = 200;
+ c->iovsize = 200;
+ c->msgsize = 10;
+ c->hdrsize = 0;
+
+ c->rbuf = (char *) malloc(c->rsize);
+ c->wbuf = (char *) malloc(c->wsize);
+ c->ilist = (item **) malloc(sizeof(item *) * c->isize);
+ c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
+ c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize);
- if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
+ if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
+ c->msglist == 0) {
if (c->rbuf != 0) free(c->rbuf);
if (c->wbuf != 0) free(c->wbuf);
if (c->ilist !=0) free(c->ilist);
+ if (c->iov != 0) free(c->iov);
+ if (c->msglist != 0) free(c->msglist);
free(c);
perror("malloc()");
return 0;
}
- c->rsize = c->wsize = DATA_BUFFER_SIZE;
- c->isize = 200;
stats.conn_structs++;
}
if (settings.verbose > 1) {
if (init_state == conn_listening)
fprintf(stderr, "<%d server listening\n", sfd);
+ else if (is_udp)
+ fprintf(stderr, "<%d server listening (udp)\n", sfd);
else
fprintf(stderr, "<%d new client connection\n", sfd);
}
c->sfd = sfd;
+ c->udp = is_udp;
c->state = init_state;
c->rlbytes = 0;
c->rbytes = c->wbytes = 0;
@@ -160,15 +214,14 @@
c->rcurr = c->rbuf;
c->icurr = c->ilist;
c->ileft = 0;
- c->iptr = c->ibuf;
- c->ibytes = 0;
+ c->iovused = 0;
+ c->msgcurr = 0;
+ c->msgused = 0;
c->write_and_go = conn_read;
c->write_and_free = 0;
c->item = 0;
- c->is_corked = 0;
-
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
c->ev_flags = event_flags;
@@ -176,9 +229,13 @@
if (freecurr < freetotal) {
freeconns[freecurr++] = c;
} else {
+ if (c->hdrbuf)
+ free (c->hdrbuf);
+ free (c->msglist);
free (c->rbuf);
free (c->wbuf);
free (c->ilist);
+ free (c->iov);
free (c);
}
return 0;
@@ -190,17 +247,10 @@
return c;
}
-void conn_close(conn *c) {
- /* delete the event, the socket and the conn */
- event_del(&c->event);
-
- if (settings.verbose > 1)
- fprintf(stderr, "<%d connection closed.\n", c->sfd);
-
- close(c->sfd);
-
+void conn_cleanup(conn *c) {
if (c->item) {
item_free(c->item);
+ c->item = 0;
}
if (c->ileft) {
@@ -211,7 +261,19 @@
if (c->write_and_free) {
free(c->write_and_free);
+ c->write_and_free = 0;
}
+}
+
+void conn_close(conn *c) {
+ /* delete the event, the socket and the conn */
+ event_del(&c->event);
+
+ if (settings.verbose > 1)
+ fprintf(stderr, "<%d connection closed.\n", c->sfd);
+
+ close(c->sfd);
+ conn_cleanup(c);
/* if we have enough space in the free connections array, put the structure there */
if (freecurr < freetotal) {
@@ -224,9 +286,13 @@
freeconns = new_freeconns;
freeconns[freecurr++] = c;
} else {
+ if (c->hdrbuf)
+ free(c->hdrbuf);
+ free(c->msglist);
free(c->rbuf);
free(c->wbuf);
free(c->ilist);
+ free(c->iov);
free(c);
}
}
@@ -236,6 +302,120 @@
return;
}
+
+/*
+ * Ensures that there is room for another struct iovec in a connection's
+ * iov list.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int ensure_iov_space(conn *c) {
+ if (c->iovused >= c->iovsize) {
+ int i, iovnum;
+ struct iovec *new_iov = (struct iovec *) realloc(c->iov,
+ (c->iovsize * 2) * sizeof(struct iovec));
+ if (! new_iov)
+ return -1;
+ c->iov = new_iov;
+ c->iovsize *= 2;
+
+ /* Point all the msghdr structures at the new list. */
+ for (i = 0, iovnum = 0; i < c->msgused; i++) {
+ c->msglist[i].msg_iov = &c->iov[iovnum];
+ iovnum += c->msglist[i].msg_iovlen;
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ * Adds data to the list of pending data that will be written out to a
+ * connection.
+ *
+ * Returns 0 on success, -1 on out-of-memory.
+ */
+int add_iov(conn *c, void *buf, int len) {
+ struct msghdr *m;
+ int i;
+ int leftover;
+
+ do {
+ m = &c->msglist[c->msgused - 1];
+
+ /* We may need to start a new msghdr if this one is full. */
+ if (m->msg_iovlen == IOV_MAX ||
+ c->udp && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) {
+ add_msghdr(c);
+ m = &c->msglist[c->msgused - 1];
+ }
+
+ if (ensure_iov_space(c))
+ return -1;
+
+ /* If the fragment is too big to fit in the datagram, split it up */
+ if (c->udp && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
+ leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
+ len -= leftover;
+ } else {
+ leftover = 0;
+ }
+
+ m = &c->msglist[c->msgused - 1];
+ m->msg_iov[m->msg_iovlen].iov_base = buf;
+ m->msg_iov[m->msg_iovlen].iov_len = len;
+
+ c->msgbytes += len;
+ c->iovused++;
+ m->msg_iovlen++;
+
+ buf = ((char *)buf) + len;
+ len = leftover;
+ } while (leftover > 0);
+
+ return 0;
+}
+
+
+/*
+ * Constructs a set of UDP headers and attaches them to the outgoing messages.
+ */
+int build_udp_headers(conn *c) {
+ int i;
+ unsigned char *hdr;
+
+ if (c->msgused > c->hdrsize) {
+ void *new_hdrbuf;
+ if (c->hdrbuf)
+ new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
+ else
+ new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
+ if (! new_hdrbuf)
+ return -1;
+ c->hdrbuf = (unsigned char *) new_hdrbuf;
+ c->hdrsize = c->msgused * 2;
+ }
+
+ hdr = c->hdrbuf;
+ for (i = 0; i < c->msgused; i++) {
+ c->msglist[i].msg_iov[0].iov_base = hdr;
+ c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
+ *hdr++ = c->request_id / 256;
+ *hdr++ = c->request_id % 256;
+ *hdr++ = i / 256;
+ *hdr++ = i % 256;
+ *hdr++ = c->msgused / 256;
+ *hdr++ = c->msgused % 256;
+ *hdr++ = 0;
+ *hdr++ = 0;
+ assert(hdr == c->msglist[i].iov[0].iov_base + UDP_HEADER_SIZE);
+ }
+
+ return 0;
+}
+
+
void out_string(conn *c, char *str) {
int len;
@@ -250,7 +430,7 @@
}
strcpy(c->wbuf, str);
- strcat(c->wbuf, "\r\n");
+ strcpy(c->wbuf + len, "\r\n");
c->wbytes = len + 2;
c->wcurr = c->wbuf;
@@ -268,7 +448,7 @@
item *it = c->item;
int comm = c->item_comm;
item *old_it;
- time_t now = time(0);
+ rel_time_t now = current_time;
stats.set_cmds++;
@@ -322,7 +502,7 @@
}
void process_stat(conn *c, char *command) {
- time_t now = time(0);
+ rel_time_t now = current_time;
if (strcmp(command, "stats") == 0) {
char temp[1024];
@@ -333,8 +513,8 @@
getrusage(RUSAGE_SELF, &usage);
pos += sprintf(pos, "STAT pid %u\r\n", pid);
- pos += sprintf(pos, "STAT uptime %lu\r\n", now - stats.started);
- pos += sprintf(pos, "STAT time %ld\r\n", now);
+ pos += sprintf(pos, "STAT uptime %u\r\n", now);
+ pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
pos += sprintf(pos, "STAT version " VERSION "\r\n");
pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
@@ -344,13 +524,13 @@
pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
- pos += sprintf(pos, "STAT cmd_get %u\r\n", stats.get_cmds);
- pos += sprintf(pos, "STAT cmd_set %u\r\n", stats.set_cmds);
- pos += sprintf(pos, "STAT get_hits %u\r\n", stats.get_hits);
- pos += sprintf(pos, "STAT get_misses %u\r\n", stats.get_misses);
+ pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
+ pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
+ pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
+ pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
- pos += sprintf(pos, "STAT limit_maxbytes %u\r\n", settings.maxbytes);
+ pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long) settings.maxbytes);
pos += sprintf(pos, "END");
out_string(c, temp);
return;
@@ -503,9 +683,13 @@
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
- /* All incoming commands will require a response, so we cork at the beginning,
- and uncork at the very end (usually by means of out_string) */
- set_cork(c, 1);
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
+ if (add_msghdr(c)) {
+ out_string(c, "SERVER_ERROR out of memory");
+ return;
+ }
if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
(strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
@@ -522,8 +706,7 @@
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
- expire = realtime(expire);
- it = item_alloc(key, flags, expire, len+2);
+ it = item_alloc(key, flags, realtime(expire), len+2);
if (it == 0) {
out_string(c, "SERVER_ERROR out of memory");
/* swallow the data line */
@@ -549,7 +732,7 @@
char key[251];
int res;
char *ptr;
- time_t now = time(0);
+ rel_time_t now = current_time;
res = sscanf(command, "%*s %250s %u\n", key, &delta);
if (res!=2 || strlen(key)==0 ) {
@@ -572,7 +755,7 @@
}
ptr = ITEM_data(it);
- while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;
+ while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true
value = atoi(ptr);
@@ -587,7 +770,7 @@
res = strlen(temp);
if (res + 2 > it->nbytes) { /* need to realloc */
item *new_it;
- new_it = item_alloc(ITEM_key(it), it->flags, it->exptime, res + 2 );
+ new_it = item_alloc(ITEM_key(it), atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
if (new_it == 0) {
out_string(c, "SERVER_ERROR out of memory");
return;
@@ -610,7 +793,7 @@
int next;
int i = 0;
item *it;
- time_t now = time(0);
+ rel_time_t now = current_time;
while(sscanf(start, " %250s%n", key, &next) >= 1) {
start+=next;
@@ -637,6 +820,23 @@
c->ilist = new_list;
} else break;
}
+
+ /*
+ * Construct the response. Each hit adds three elements to the
+ * outgoing data list:
+ * "VALUE "
+ * key
+ * " " + flags + " " + data length + "\r\n" + data (with \r\n)
+ */
+ if (add_iov(c, "VALUE ", 6) ||
+ add_iov(c, ITEM_key(it), strlen(ITEM_key(it))) ||
+ add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes))
+ {
+ break;
+ }
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
stats.get_hits++;
it->refcount++;
item_update(it);
@@ -644,17 +844,22 @@
i++;
} else stats.get_misses++;
}
+
c->icurr = c->ilist;
c->ileft = i;
- if (c->ileft) {
- c->ipart = 0;
+
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d END\n", c->sfd);
+ add_iov(c, "END\r\n", 5);
+
+ if (c->udp && build_udp_headers(c)) {
+ out_string(c, "SERVER_ERROR out of memory");
+ }
+ else {
c->state = conn_mwrite;
- c->ibytes = 0;
- return;
- } else {
- out_string(c, "END");
- return;
+ c->msgcurr = 0;
}
+ return;
}
if (strncmp(command, "delete ", 7) == 0) {
@@ -691,11 +896,9 @@
}
}
- exptime = realtime(exptime);
-
it->refcount++;
/* use its expiration time as its deletion time now */
- it->exptime = exptime;
+ it->exptime = realtime(exptime);
it->it_flags |= ITEM_DELETED;
todelete[delcurr++] = it;
out_string(c, "DELETED");
@@ -708,7 +911,7 @@
}
if (strcmp(command, "flush_all") == 0) {
- settings.oldest_live = time(0);
+ settings.oldest_live = current_time;
out_string(c, "OK");
return;
}
@@ -724,6 +927,7 @@
}
if (strncmp(command, "slabs reassign ", 15) == 0) {
+#ifdef ALLOW_SLABS_REASSIGN
int src, dst;
char *start = command+15;
if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) {
@@ -742,6 +946,9 @@
}
}
out_string(c, "CLIENT_ERROR bogus command");
+#else
+ out_string(c, "CLIENT_ERROR Slab reassignment not supported");
+#endif
return;
}
@@ -777,6 +984,39 @@
}
/*
+ * read a UDP request.
+ * return 0 if there's nothing to read.
+ */
+int try_read_udp(conn *c) {
+ int res;
+
+ c->request_addr_size = sizeof(c->request_addr);
+ res = recvfrom(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes,
+ 0, &c->request_addr, &c->request_addr_size);
+ if (res > 8) {
+ unsigned char *buf = (unsigned char *)c->rbuf + c->rbytes;
+ stats.bytes_read += res;
+
+ /* Beginning of UDP packet is the request ID; save it. */
+ c->request_id = buf[0] * 256 + buf[1];
+
+ /* If this is a multi-packet request, drop it. */
+ if (buf[4] != 0 || buf[5] != 1) {
+ out_string(c, "SERVER_ERROR multi-packet request not supported");
+ return 0;
+ }
+
+ /* Don't care about any of the rest of the header. */
+ res -= 8;
+ memmove(c->rbuf, c->rbuf + 8, res);
+
+ c->rbytes += res;
+ return 1;
+ }
+ return 0;
+}
+
+/*
* read from network as much as we can, handle buffer overflow and connection
* close.
* return 0 if there's nothing to read on the first read.
@@ -797,6 +1037,7 @@
}
c->rbuf = new_rbuf; c->rsize *= 2;
}
+ c->request_addr_size = sizeof(c->request_addr);
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
stats.bytes_read += res;
@@ -826,7 +1067,70 @@
if (event_add(&c->event, 0) == -1) return 0;
return 1;
}
-
+
+/*
+ * Transmit the next chunk of data from our list of msgbuf structures.
+ *
+ * Returns:
+ * TRANSMIT_COMPLETE All done writing.
+ * TRANSMIT_INCOMPLETE More data remaining to write.
+ * TRANSMIT_SOFT_ERROR Can't write any more right now.
+ * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
+ */
+int transmit(conn *c) {
+ int res;
+
+ if (c->msgcurr < c->msgused &&
+ c->msglist[c->msgcurr].msg_iovlen == 0) {
+ /* Finished writing the current msg; advance to the next. */
+ c->msgcurr++;
+ }
+ if (c->msgcurr < c->msgused) {
+ struct msghdr *m = &c->msglist[c->msgcurr];
+ res = sendmsg(c->sfd, m, 0);
+ if (res > 0) {
+ stats.bytes_written += res;
+
+ /* We've written some of the data. Remove the completed
+ iovec entries from the list of pending writes. */
+ while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
+ res -= m->msg_iov->iov_len;
+ m->msg_iovlen--;
+ m->msg_iov++;
+ }
+
+ /* Might have written just part of the last iovec entry;
+ adjust it so the next write will do the rest. */
+ if (res > 0) {
+ m->msg_iov->iov_base += res;
+ m->msg_iov->iov_len -= res;
+ }
+ return TRANSMIT_INCOMPLETE;
+ }
+ if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Couldn't update event\n");
+ c->state = conn_closing;
+ return TRANSMIT_HARD_ERROR;
+ }
+ return TRANSMIT_SOFT_ERROR;
+ }
+ /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
+ we have a real error, on which we close the connection */
+ if (settings.verbose > 0)
+ perror("Failed to write, and not due to blocking");
+
+ if (c->udp)
+ c->state = conn_read;
+ else
+ c->state = conn_closing;
+ return TRANSMIT_HARD_ERROR;
+ } else {
+ return TRANSMIT_COMPLETE;
+ }
+}
+
void drive_machine(conn *c) {
int exit = 0;
@@ -856,7 +1160,8 @@
close(sfd);
break;
}
- newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
+ newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+ DATA_BUFFER_SIZE, 0);
if (!newc) {
if (settings.verbose > 0)
fprintf(stderr, "couldn't create new connection\n");
@@ -870,7 +1175,7 @@
if (try_read_command(c)) {
continue;
}
- if (try_read_network(c)) {
+ if (c->udp ? try_read_udp(c) : try_read_network(c)) {
continue;
}
/* we have no command line and no data to read from network */
@@ -976,114 +1281,63 @@
break;
case conn_write:
- /* we are writing wbytes bytes starting from wcurr */
- if (c->wbytes == 0) {
- if (c->write_and_free) {
- free(c->write_and_free);
- c->write_and_free = 0;
- }
- c->state = c->write_and_go;
- if (c->state == conn_read)
- set_cork(c, 0);
- break;
- }
- res = write(c->sfd, c->wcurr, c->wbytes);
- if (res > 0) {
- stats.bytes_written += res;
- c->wcurr += res;
- c->wbytes -= res;
- break;
- }
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- if (!update_event(c, EV_WRITE | EV_PERSIST)) {
+ /*
+ * We want to write out a simple response. If we haven't already,
+ * assemble it into a msgbuf list (this will be a single-entry
+ * list for TCP or a two-entry list for UDP).
+ */
+ if (c->iovused == 0) {
+ if (add_iov(c, c->wcurr, c->wbytes) ||
+ c->udp && build_udp_headers(c)) {
if (settings.verbose > 0)
- fprintf(stderr, "Couldn't update event\n");
+ fprintf(stderr, "Couldn't build response\n");
c->state = conn_closing;
break;
- }
- exit = 1;
- break;
+ }
}
- /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
- we have a real error, on which we close the connection */
- if (settings.verbose > 0)
- fprintf(stderr, "Failed to write, and not due to blocking\n");
- c->state = conn_closing;
- break;
+
+ /* fall through... */
+
case conn_mwrite:
- /*
- * we're writing ibytes bytes from iptr. iptr alternates between
- * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the
- * current item. When we finish a chunk, we choose the next one using
- * ipart, which has the following semantics: 0 - start the loop, 1 -
- * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
- * move to the next item and build its ibuf; 3 - we finished all items,
- * write "END".
- */
- if (c->ibytes > 0) {
- res = write(c->sfd, c->iptr, c->ibytes);
- if (res > 0) {
- stats.bytes_written += res;
- c->iptr += res;
- c->ibytes -= res;
- break;
- }
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- if (!update_event(c, EV_WRITE | EV_PERSIST)) {
- if (settings.verbose > 0)
- fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
- break;
- }
- exit = 1;
- break;
- }
- /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
- we have a real error, on which we close the connection */
- if (settings.verbose > 0)
- fprintf(stderr, "Failed to write, and not due to blocking\n");
- c->state = conn_closing;
- break;
- } else {
- item *it;
- /* we finished a chunk, decide what to do next */
- switch (c->ipart) {
- case 1:
- it = *(c->icurr);
- assert((it->it_flags & ITEM_SLABBED) == 0);
- c->iptr = ITEM_data(it);
- c->ibytes = it->nbytes;
- c->ipart = 2;
- break;
- case 2:
- it = *(c->icurr);
- item_remove(it);
- c->ileft--;
- if (c->ileft <= 0) {
- c->ipart = 3;
- break;
- } else {
+ switch (transmit(c)) {
+ case TRANSMIT_COMPLETE:
+ if (c->state == conn_mwrite) {
+ while (c->ileft > 0) {
+ item *it = *(c->icurr);
+ assert((it->it_flags & ITEM_SLABBED) == 0);
+ item_remove(it);
c->icurr++;
+ c->ileft--;
}
- /* FALL THROUGH */
- case 0:
- it = *(c->icurr);
- assert((it->it_flags & ITEM_SLABBED) == 0);
- c->ibytes = sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
- if (settings.verbose > 1)
- fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
- c->iptr = c->ibuf;
- c->ipart = 1;
- break;
- case 3:
- out_string(c, "END");
- break;
+ c->state = conn_read;
+ } else if (c->state == conn_write) {
+ if (c->write_and_free) {
+ free(c->write_and_free);
+ c->write_and_free = 0;
+ }
+ c->state = c->write_and_go;
+ } else {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Unexpected state %d\n", c->state);
+ c->state = conn_closing;
}
+ break;
+
+ case TRANSMIT_INCOMPLETE:
+ case TRANSMIT_HARD_ERROR:
+ break; /* Continue in state machine. */
+
+ case TRANSMIT_SOFT_ERROR:
+ exit = 1;
+ break;
}
break;
case conn_closing:
- conn_close(c);
+ if (c->udp)
+ conn_cleanup(c);
+ else
+ conn_close(c);
exit = 1;
break;
}
@@ -1115,11 +1369,11 @@
return;
}
-int new_socket(void) {
+int new_socket(int is_udp) {
int sfd;
int flags;
- if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
+ if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
perror("socket()");
return -1;
}
@@ -1133,22 +1387,60 @@
return sfd;
}
-int server_socket(int port) {
+
+/*
+ * Sets a socket's send buffer size to the maximum allowed by the system.
+ */
+void maximize_sndbuf(int sfd) {
+ socklen_t intsize = sizeof(int);
+ int last_good;
+ int min, max, avg;
+ int old_size;
+
+ /* Start with the default size. */
+ if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) {
+ if (settings.verbose > 0)
+ perror("getsockopt(SO_SNDBUF)");
+ return;
+ }
+
+ /* Binary-search for the real maximum. */
+ min = old_size;
+ max = MAX_SENDBUF_SIZE;
+
+ while (min <= max) {
+ avg = ((unsigned int) min + max) / 2;
+ if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &avg, intsize) == 0) {
+ last_good = avg;
+ min = avg + 1;
+ } else {
+ max = avg - 1;
+ }
+ }
+
+ if (settings.verbose > 1)
+ fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
+}
+
+
+int server_socket(int port, int is_udp) {
int sfd;
struct linger ling = {0, 0};
struct sockaddr_in addr;
int flags =1;
- if ((sfd = new_socket()) == -1) {
+ if ((sfd = new_socket(is_udp)) == -1) {
return -1;
}
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
-#if !defined(TCP_NOPUSH)
- setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
-#endif
+ if (is_udp) {
+ maximize_sndbuf(sfd);
+ } else {
+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
+ }
/*
* the memset call clears nonstandard fields in some impementations
@@ -1164,7 +1456,7 @@
close(sfd);
return -1;
}
- if (listen(sfd, 1024) == -1) {
+ if (! is_udp && listen(sfd, 1024) == -1) {
perror("listen()");
close(sfd);
return -1;
@@ -1176,10 +1468,41 @@
void pre_gdb () {
int i = 0;
if(l_socket) close(l_socket);
+ if(u_socket > -1) close(u_socket);
for (i=3; i<=500; i++) close(i); /* so lame */
kill(getpid(), SIGABRT);
}
+/*
+ * We keep the current time of day in a global variable that's updated by a
+ * timer event. This saves us a bunch of time() system calls (we really only
+ * need to get the time once a second, whereas there can be tens of thousands
+ * of requests a second) and allows us to use server-start-relative timestamps
+ * rather than absolute UNIX timestamps, a space savings on systems where
+ * sizeof(time_t) > sizeof(unsigned int).
+ */
+volatile rel_time_t current_time;
+struct event clockevent;
+
+void clock_handler(int fd, short which, void *arg) {
+ struct timeval t;
+ static int initialized = 0;
+
+ if (initialized) {
+ /* only delete the event if it's actually there. */
+ evtimer_del(&clockevent);
+ } else {
+ initialized = 1;
+ }
+
+ evtimer_set(&clockevent, clock_handler, 0);
+ t.tv_sec = 1;
+ t.tv_usec = 0;
+ evtimer_add(&clockevent, &t);
+
+ current_time = (rel_time_t) (time(0) - stats.started);
+}
+
struct event deleteevent;
void delete_handler(int fd, short which, void *arg) {
@@ -1200,7 +1523,7 @@
{
int i, j=0;
- time_t now = time(0);
+ rel_time_t now = current_time;
for (i=0; i<delcurr; i++) {
item *it = todelete[i];
if (it->exptime < now) {
@@ -1214,8 +1537,6 @@
}
delcurr = j;
}
-
- return;
}
void usage(void) {
@@ -1234,6 +1555,8 @@
printf("-h print this help and exit\n");
printf("-i print memcached and libevent license\n");
printf("-P <file> save PID in <file>, only used with -d option\n");
+ printf("-f <factor> chunk size growth factor, default 1.25\n");
+ printf("-s <bytes> minimum space allocated for key+value+flags, default 48\n");
return;
}
@@ -1336,10 +1659,17 @@
}
int l_socket=0;
+int u_socket=-1;
+
+void sig_handler(int sig) {
+ printf("SIGINT handled.\n");
+ exit(0);
+}
int main (int argc, char **argv) {
int c;
conn *l_conn;
+ conn *u_conn;
struct in_addr addr;
int lock_memory = 0;
int daemonize = 0;
@@ -1350,17 +1680,23 @@
struct rlimit rlim;
char *pid_file = NULL;
+ /* handle SIGINT */
+ signal(SIGINT, sig_handler);
+
/* init settings */
settings_init();
-
+
/* process arguments */
- while ((c = getopt(argc, argv, "p:m:Mc:khirvdl:u:P:")) != -1) {
+ while ((c = getopt(argc, argv, "p:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
switch (c) {
+ case 'U':
+ settings.udpport = atoi(optarg);
+ break;
case 'p':
settings.port = atoi(optarg);
break;
case 'm':
- settings.maxbytes = atoi(optarg)*1024*1024;
+ settings.maxbytes = ((size_t)atoi(optarg))*1024*1024;
break;
case 'M':
settings.evict_to_free = 0;
@@ -1400,6 +1736,20 @@
case 'P':
pid_file = optarg;
break;
+ case 'f':
+ settings.factor = atof(optarg);
+ if (settings.factor <= 1.0) {
+ fprintf(stderr, "Factor must be greater than 1\n");
+ return 1;
+ }
+ break;
+ case 's':
+ settings.chunk_size = atoi(optarg);
+ if (settings.chunk_size == 0) {
+ fprintf(stderr, "Chunk size must be greater than 0\n");
+ return 1;
+ }
+ break;
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
@@ -1454,19 +1804,29 @@
}
/*
- * initialization order: first create the listening socket
+ * initialization order: first create the listening sockets
* (may need root on low ports), then drop root if needed,
* then daemonise if needed, then init libevent (in some cases
* descriptors created by libevent wouldn't survive forking).
*/
/* create the listening socket and bind it */
- l_socket = server_socket(settings.port);
+ l_socket = server_socket(settings.port, 0);
if (l_socket == -1) {
fprintf(stderr, "failed to listen\n");
exit(1);
}
+ if (settings.udpport > 0) {
+ /* create the UDP listening socket and bind it */
+ u_socket = server_socket(settings.udpport, 1);
+ if (u_socket == -1) {
+ fprintf(stderr, "failed to listen\n");
+ exit(1);
+ }
+ }
+
+
/* lose root privileges if we have them */
if (getuid()== 0 || geteuid()==0) {
if (username==0 || *username=='\0') {
@@ -1501,7 +1861,7 @@
stats_init();
assoc_init();
conn_init();
- slabs_init(settings.maxbytes);
+ slabs_init(settings.maxbytes, settings.factor);
/* lock paged memory if needed */
if (lock_memory) {
@@ -1525,11 +1885,21 @@
}
/* create the initial listening connection */
- if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST))) {
+ if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) {
fprintf(stderr, "failed to create listening connection");
exit(1);
}
+ /* create the initial listening udp connection */
+ if (u_socket > -1 &&
+ !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, 1))) {
+ fprintf(stderr, "failed to create udp connection");
+ exit(1);
+ }
+
+ /* initialise clock event */
+ clock_handler(0,0,0);
+
/* initialise deletion array and timer event */
deltotal = 200; delcurr = 0;
todelete = malloc(sizeof(item *)*deltotal);
@@ -1548,4 +1918,3 @@
return 0;
}
-
--- ../1.1.12-dist/memcached.h 2006-05-02 14:53:49.000000000 -0700
+++ ./memcached.h 2006-05-02 17:25:32.000000000 -0700
@@ -2,35 +2,41 @@
/* $Id: memcached.h,v 1.21 2004/02/24 23:42:02 bradfitz Exp $ */
#define DATA_BUFFER_SIZE 2048
+#define UDP_READ_BUFFER_SIZE 65536
+#define UDP_MAX_PAYLOAD_SIZE 1400
+#define UDP_HEADER_SIZE 8
+#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
-#if defined(TCP_CORK) && !defined(TCP_NOPUSH)
-#define TCP_NOPUSH TCP_CORK
-#endif
+/* Time relative to server start. Smaller than time_t on 64-bit systems. */
+typedef unsigned int rel_time_t;
struct stats {
unsigned int curr_items;
unsigned int total_items;
- unsigned long long curr_bytes;
+ unsigned long long curr_bytes;
unsigned int curr_conns;
unsigned int total_conns;
unsigned int conn_structs;
- unsigned int get_cmds;
- unsigned int set_cmds;
- unsigned int get_hits;
- unsigned int get_misses;
+ unsigned long long get_cmds;
+ unsigned long long set_cmds;
+ unsigned long long get_hits;
+ unsigned long long get_misses;
time_t started; /* when the process was started */
unsigned long long bytes_read;
unsigned long long bytes_written;
};
struct settings {
- unsigned int maxbytes;
+ size_t maxbytes;
int maxconns;
int port;
+ int udpport;
struct in_addr interface;
int verbose;
- time_t oldest_live; /* ignore existing items older than this */
+ rel_time_t oldest_live; /* ignore existing items older than this */
int evict_to_free;
+ double factor; /* chunk size growth factor */
+ int chunk_size;
};
extern struct stats stats;
@@ -45,24 +51,27 @@
typedef struct _stritem {
struct _stritem *next;
struct _stritem *prev;
- struct _stritem *h_next; /* hash chain next */
+ struct _stritem *h_next; /* hash chain next */
+ rel_time_t time; /* least recent access */
+ rel_time_t exptime; /* expire time */
+ int nbytes; /* size of data */
unsigned short refcount;
- unsigned short flags;
- int nbytes; /* size of data */
- time_t time; /* least recent access */
- time_t exptime; /* expire time */
- unsigned char it_flags; /* ITEM_* above */
- unsigned char slabs_clsid;
- unsigned char nkey; /* key length, with terminating null and padding */
- unsigned char dummy1;
+ unsigned char nsuffix; /* length of flags-and-length string */
+ unsigned char it_flags; /* ITEM_* above */
+ unsigned char slabs_clsid;/* which slab class we're in */
+ unsigned char nkey; /* key length, w/terminating null and padding */
void * end[0];
+ /* then null-terminated key */
+ /* then " flags length\r\n" (no terminating null) */
+ /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
#define ITEM_key(item) ((char*)&((item)->end[0]))
/* warning: don't use these macros with a function, as it evals its arg twice */
-#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey)
-#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nbytes)
+#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey)
+#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix)
+#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes)
enum conn_states {
conn_listening, /* the socket which listens for connections */
@@ -95,7 +104,6 @@
int wbytes;
int write_and_go; /* which state to go into after finishing current write */
void *write_and_free; /* free this memory after finishing writing */
- char is_corked; /* boolean, connection is corked */
char *rcurr;
int rlbytes;
@@ -115,20 +123,39 @@
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
+ struct iovec *iov;
+ int iovsize; /* number of elements allocated in iov[] */
+ int iovused; /* number of elements used in iov[] */
+
+ struct msghdr *msglist;
+ int msgsize; /* number of elements allocated in msglist[] */
+ int msgused; /* number of elements used in msglist[] */
+ int msgcurr; /* element in msglist[] being transmitted now */
+ int msgbytes; /* number of bytes in current msg */
+
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
- int ipart; /* 1 if we're writing a VALUE line, 2 if we're writing data */
- char ibuf[300]; /* for VALUE lines */
- char *iptr;
- int ibytes;
-
+
+ /* data for UDP clients */
+ int udp; /* 1 if this is a UDP "connection" */
+ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
+ struct sockaddr request_addr; /* Who sent the most recent request */
+ socklen_t request_addr_size;
+ unsigned char *hdrbuf; /* udp packet headers */
+ int hdrsize; /* number of headers' worth of space is allocated */
} conn;
/* listening socket */
extern int l_socket;
+/* udp socket */
+extern int u_socket;
+
+/* current time of day (updated periodically) */
+extern volatile rel_time_t current_time;
+
/* temporary hack */
/* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); }
void pre_gdb (); */
@@ -143,22 +170,24 @@
* be that low).
*/
-time_t realtime(time_t exptime);
+rel_time_t realtime(time_t exptime);
/* slabs memory allocation */
-/* Init the subsystem. The argument is the limit on no. of bytes to allocate, 0 if no limit */
-void slabs_init(unsigned int limit);
+/* Init the subsystem. 1st argument is the limit on no. of bytes to allocate,
+ 0 if no limit. 2nd argument is the growth factor; each slab will use a chunk
+ size equal to the previous slab's chunk size times this factor. */
+void slabs_init(size_t limit, double factor);
/* Given object size, return id to use when allocating/freeing memory for object */
/* 0 means error: can't store such a large object */
-unsigned int slabs_clsid(unsigned int size);
+unsigned int slabs_clsid(size_t size);
/* Allocate object of given length. 0 on error */
-void *slabs_alloc(unsigned int size);
+void *slabs_alloc(size_t size);
/* Free previously allocated object */
-void slabs_free(void *ptr, unsigned int size);
+void slabs_free(void *ptr, size_t size);
/* Fill buffer with stats */
char* slabs_stats(int *buflen);
@@ -171,17 +200,22 @@
/* event handling, network IO */
void event_handler(int fd, short which, void *arg);
-conn *conn_new(int sfd, int init_state, int event_flags);
+conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
void conn_close(conn *c);
void conn_init(void);
void drive_machine(conn *c);
-int new_socket(void);
-int server_socket(int port);
+int new_socket(int isUdp);
+int server_socket(int port, int isUdp);
int update_event(conn *c, int new_flags);
int try_read_command(conn *c);
int try_read_network(conn *c);
+int try_read_udp(conn *c);
void complete_nread(conn *c);
void process_command(conn *c, char *command);
+int transmit(conn *c);
+int ensure_iov_space(conn *c);
+int add_iov(conn *c, void *buf, int len);
+int add_msghdr(conn *c);
/* stats */
void stats_reset(void);
@@ -198,7 +232,7 @@
void item_init(void);
-item *item_alloc(char *key, int flags, time_t exptime, int nbytes);
+item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes);
void item_free(item *it);
int item_link(item *it); /* may fail if transgresses limits */
--- ../1.1.12-dist/slabs.c 2006-05-02 14:53:49.000000000 -0700
+++ ./slabs.c 2006-05-02 17:25:32.000000000 -0700
@@ -1,6 +1,11 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
- * Slabs memory allocation, based on powers-of-2
+ * Slabs memory allocation, based on powers-of-N. Slabs are up to 1MB in size
+ * and are divided into chunks. The chunk sizes start off at the size of the
+ * "item" structure plus space for a small key and value. They increase by
+ * a multiplier factor from there, up to half the maximum slab size. The last
+ * slab size is always 1MB, since that's the maximum item size allowed by the
+ * memcached protocol.
*
* $Id: slabs.c,v 1.15 2003/09/05 22:37:36 bradfitz Exp $
*/
@@ -23,11 +28,12 @@
#include "memcached.h"
-#define POWER_SMALLEST 3
-#define POWER_LARGEST 20
+#define POWER_SMALLEST 1
+#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
+#define CHUNK_ALIGN_BYTES (sizeof(void *))
-/* powers-of-2 allocation structures */
+/* powers-of-N allocation structures */
typedef struct {
unsigned int size; /* sizes of items */
@@ -49,46 +55,63 @@
} slabclass_t;
static slabclass_t slabclass[POWER_LARGEST+1];
-static unsigned int mem_limit = 0;
-static unsigned int mem_malloced = 0;
+static size_t mem_limit = 0;
+static size_t mem_malloced = 0;
+static int power_largest;
-unsigned int slabs_clsid(unsigned int size) {
- int res = 1;
+/*
+ * Figures out which slab class (chunk size) is required to store an item of
+ * a given size.
+ */
+unsigned int slabs_clsid(size_t size) {
+ int res = POWER_SMALLEST;
if(size==0)
return 0;
- size--;
- while(size >>= 1)
- res++;
- if (res < POWER_SMALLEST)
- res = POWER_SMALLEST;
- if (res > POWER_LARGEST)
- res = 0;
+ while (size > slabclass[res].size)
+ if (res++ == power_largest) /* won't fit in the biggest slab */
+ return 0;
return res;
}
-void slabs_init(unsigned int limit) {
- int i;
- int size=1;
+/*
+ * Determines the chunk sizes and initializes the slab class descriptors
+ * accordingly.
+ */
+void slabs_init(size_t limit, double factor) {
+ int i = POWER_SMALLEST - 1;
+ unsigned int size = sizeof(item) + settings.chunk_size;
+
+ /* Factor of 2.0 means use the default memcached behavior */
+ if (factor == 2.0 && size < 128)
+ size = 128;
mem_limit = limit;
- for(i=0; i<=POWER_LARGEST; i++, size*=2) {
+ memset(slabclass, 0, sizeof(slabclass));
+
+ while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) {
+ /* Make sure items are always n-byte aligned */
+ if (size % CHUNK_ALIGN_BYTES)
+ size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
+
slabclass[i].size = size;
- slabclass[i].perslab = POWER_BLOCK / size;
- slabclass[i].slots = 0;
- slabclass[i].sl_curr = slabclass[i].sl_total = slabclass[i].slabs = 0;
- slabclass[i].end_page_ptr = 0;
- slabclass[i].end_page_free = 0;
- slabclass[i].slab_list = 0;
- slabclass[i].list_size = 0;
- slabclass[i].killing = 0;
+ slabclass[i].perslab = POWER_BLOCK / slabclass[i].size;
+ size *= factor;
+ if (settings.verbose > 1) {
+ fprintf(stderr, "slab class %3d: chunk size %6d perslab %5d\n",
+ i, slabclass[i].size, slabclass[i].perslab);
+ }
}
+
+ power_largest = i;
+ slabclass[power_largest].size = POWER_BLOCK;
+ slabclass[power_largest].perslab = 1;
}
static int grow_slab_list (unsigned int id) {
slabclass_t *p = &slabclass[id];
if (p->slabs == p->list_size) {
- unsigned int new_size = p->list_size ? p->list_size * 2 : 16;
+ size_t new_size = p->list_size ? p->list_size * 2 : 16;
void *new_list = realloc(p->slab_list, new_size*sizeof(void*));
if (new_list == 0) return 0;
p->list_size = new_size;
@@ -99,11 +122,14 @@
int slabs_newslab(unsigned int id) {
slabclass_t *p = &slabclass[id];
- int num = p->perslab;
+#ifdef ALLOW_SLABS_REASSIGN
int len = POWER_BLOCK;
+#else
+ int len = p->size * p->perslab;
+#endif
char *ptr;
- if (mem_limit && mem_malloced + len > mem_limit)
+ if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
return 0;
if (! grow_slab_list(id)) return 0;
@@ -113,18 +139,18 @@
memset(ptr, 0, len);
p->end_page_ptr = ptr;
- p->end_page_free = num;
+ p->end_page_free = p->perslab;
p->slab_list[p->slabs++] = ptr;
mem_malloced += len;
return 1;
}
-void *slabs_alloc(unsigned int size) {
+void *slabs_alloc(size_t size) {
slabclass_t *p;
unsigned char id = slabs_clsid(size);
- if (id < POWER_SMALLEST || id > POWER_LARGEST)
+ if (id < POWER_SMALLEST || id > power_largest)
return 0;
p = &slabclass[id];
@@ -160,13 +186,13 @@
return 0; /* shouldn't ever get here */
}
-void slabs_free(void *ptr, unsigned int size) {
+void slabs_free(void *ptr, size_t size) {
unsigned char id = slabs_clsid(size);
slabclass_t *p;
assert(((item *)ptr)->slabs_clsid==0);
- assert(id >= POWER_SMALLEST && id <= POWER_LARGEST);
- if (id < POWER_SMALLEST || id > POWER_LARGEST)
+ assert(id >= POWER_SMALLEST && id <= power_largest);
+ if (id < POWER_SMALLEST || id > power_largest)
return;
p = &slabclass[id];
@@ -191,14 +217,14 @@
char* slabs_stats(int *buflen) {
int i, total;
- char *buf = (char*) malloc(8192);
+ char *buf = (char*) malloc(power_largest * 200 + 100);
char *bufcurr = buf;
*buflen = 0;
if (!buf) return 0;
total = 0;
- for(i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
+ for(i = POWER_SMALLEST; i <= power_largest; i++) {
slabclass_t *p = &slabclass[i];
if (p->slabs) {
unsigned int perslab, slabs;
@@ -216,13 +242,19 @@
total++;
}
}
- bufcurr += sprintf(bufcurr, "STAT active_slabs %d\r\nSTAT total_malloced %u\r\n", total, mem_malloced);
+ bufcurr += sprintf(bufcurr, "STAT active_slabs %d\r\nSTAT total_malloced %llu\r\n", total, (unsigned long long) mem_malloced);
bufcurr += sprintf(bufcurr, "END\r\n");
*buflen = bufcurr - buf;
return buf;
}
-/* 1 = success
+#ifdef ALLOW_SLABS_REASSIGN
+/* Blows away all the items in a slab class and moves its slabs to another
+ class. This is only used by the "slabs reassign" command, for manual tweaking
+ of memory allocation. It's disabled by default since it requires that all
+ slabs be the same size (which can waste space for chunk size mantissas of
+ other than 2.0).
+ 1 = success
0 = fail
-1 = tried. busy. send again shortly. */
int slabs_reassign(unsigned char srcid, unsigned char dstid) {
@@ -231,8 +263,8 @@
void *iter;
int was_busy = 0;
- if (srcid < POWER_SMALLEST || srcid > POWER_LARGEST ||
- dstid < POWER_SMALLEST || dstid > POWER_LARGEST)
+ if (srcid < POWER_SMALLEST || srcid > power_largest ||
+ dstid < POWER_SMALLEST || dstid > power_largest)
return 0;
p = &slabclass[srcid];
@@ -288,3 +320,4 @@
return 1;
}
+#endif
More information about the memcached
mailing list