[memcached] sgrimm,
r442: Multithreading support. This change allo...
commits at code.sixapart.com
commits at code.sixapart.com
Thu Nov 23 07:38:29 UTC 2006
Multithreading support. This change allows memcached to be built either
as a single-threaded program (the default) or a multithreaded one if
the "--enable-threads" option is passed to the configure script. See
doc/threads.txt for a more detailed description.
U branches/multithreaded/server/ChangeLog
U branches/multithreaded/server/Makefile.am
U branches/multithreaded/server/assoc.c
U branches/multithreaded/server/configure.ac
U branches/multithreaded/server/doc/memcached.1
A branches/multithreaded/server/doc/threads.txt
U branches/multithreaded/server/items.c
U branches/multithreaded/server/memcached.c
U branches/multithreaded/server/memcached.h
U branches/multithreaded/server/slabs.c
Modified: branches/multithreaded/server/ChangeLog
===================================================================
--- branches/multithreaded/server/ChangeLog 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/ChangeLog 2006-11-23 07:38:27 UTC (rev 442)
@@ -1,3 +1,8 @@
+2006-11-22
+ * Steven Grimm <sgrimm at facebook.com>: Add support for multithreaded
+ execution. Run configure with "--enable-threads" to enable. See
+ doc/threads.txt for details.
+
2006-10-15
* Steven Grimm <sgrimm at facebook.com>: Dynamic sizing of hashtable to
reduce collisions on very large caches and conserve memory on
Modified: branches/multithreaded/server/Makefile.am
===================================================================
--- branches/multithreaded/server/Makefile.am 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/Makefile.am 2006-11-23 07:38:27 UTC (rev 442)
@@ -1,6 +1,6 @@
bin_PROGRAMS = memcached memcached-debug
-memcached_SOURCES = memcached.c slabs.c items.c assoc.c memcached.h
+memcached_SOURCES = memcached.c slabs.c items.c memcached.h assoc.c thread.c
memcached_debug_SOURCES = $(memcached_SOURCES)
memcached_CPPFLAGS = -DNDEBUG
memcached_LDADD = @LIBOBJS@
Modified: branches/multithreaded/server/assoc.c
===================================================================
--- branches/multithreaded/server/assoc.c 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/assoc.c 2006-11-23 07:38:27 UTC (rev 442)
@@ -12,6 +12,7 @@
*
* $Id$
*/
+
#include "memcached.h"
#include <sys/stat.h>
#include <sys/socket.h>
@@ -535,39 +536,41 @@
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
- if (settings.verbose > 1)
- fprintf(stderr, "Hash table expansion starting\n");
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = 1;
expand_bucket = 0;
- assoc_move_next_bucket();
+ do_assoc_move_next_bucket();
} else {
primary_hashtable = old_hashtable;
- /* Bad news, but we can keep running. */
+ /* Bad news, but we can keep running. */
}
}
/* migrates the next bucket to the primary hashtable if we're expanding. */
-void assoc_move_next_bucket(void) {
+void do_assoc_move_next_bucket(void) {
item *it, *next;
int bucket;
if (expanding) {
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
- next = it->h_next;
+ next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
- }
+ }
- expand_bucket++;
- if (expand_bucket == hashsize(hashpower - 1)) {
- expanding = 0;
- free(old_hashtable);
- if (settings.verbose > 1)
- fprintf(stderr, "Hash table expansion done\n");
- }
+ old_hashtable[expand_bucket] = NULL;
+
+ expand_bucket++;
+ if (expand_bucket == hashsize(hashpower - 1)) {
+ expanding = 0;
+ free(old_hashtable);
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion done\n");
+ }
}
}
Modified: branches/multithreaded/server/configure.ac
===================================================================
--- branches/multithreaded/server/configure.ac 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/configure.ac 2006-11-23 07:38:27 UTC (rev 442)
@@ -94,6 +94,7 @@
AC_SEARCH_LIBS(socket, socket)
AC_SEARCH_LIBS(gethostbyname, nsl)
AC_SEARCH_LIBS(mallinfo, malloc)
+AC_SEARCH_LIBS(pthread_create, pthread)
AC_CHECK_FUNC(daemon,AC_DEFINE([HAVE_DAEMON],,[Define this if you have daemon()]),[AC_LIBOBJ(daemon)])
@@ -155,6 +156,15 @@
AC_C_ENDIAN
+dnl Check whether the user wants threads or not
+AC_ARG_ENABLE(threads,
+ [AS_HELP_STRING([--enable-threads],[support multithreaded execution])],
+ [if test "$ac_cv_search_pthread_create" != "no"; then
+ AC_DEFINE([USE_THREADS],,[Define this if you want to use pthreads])
+ else
+ AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
+ fi])
+
AC_CHECK_FUNCS(mlockall)
AC_CONFIG_FILES(Makefile doc/Makefile)
Modified: branches/multithreaded/server/doc/memcached.1
===================================================================
--- branches/multithreaded/server/doc/memcached.1 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/doc/memcached.1 2006-11-23 07:38:27 UTC (rev 442)
@@ -84,6 +84,12 @@
.TP
.B \-P <filename>
Print pidfile to <filename>, only used under -d option.
+.TP
+.B \-t <threads>
+Number of threads to use to process incoming requests. This option is only
+meaningful if memcached was compiled with thread support enabled. It is
+typically not useful to set this higher than the number of CPU cores on the
+memcached server.
.br
.SH LICENSE
The memcached daemon is copyright Danga Interactive and is distributed under
Added: branches/multithreaded/server/doc/threads.txt
===================================================================
--- branches/multithreaded/server/doc/threads.txt 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/doc/threads.txt 2006-11-23 07:38:27 UTC (rev 442)
@@ -0,0 +1,68 @@
+Multithreading support in memcached
+
+OVERVIEW
+
+By default, memcached is compiled as a single-threaded application. This is
+the most CPU-efficient mode of operation, and it is appropriate for memcached
+instances running on single-processor servers or whose request volume is
+low enough that available CPU power is not a bottleneck.
+
+More heavily-used memcached instances can benefit from multithreaded mode.
+To enable it, use the "--enable-threads" option to the configure script:
+
+./configure --enable-threads
+
+You must have the POSIX thread functions (pthread_*) on your system in order
+to use memcached's multithreaded mode.
+
+Once you have a thread-capable memcached executable, you can control the
+number of threads using the "-t" option; the default is 4. On a machine
+that's dedicated to memcached, you will typically want one thread per
+processor core. Due to memcached's nonblocking architecture, there is no
+real advantage to using more threads than the number of CPUs on the machine;
+doing so will increase lock contention and is likely to degrade performance.
+
+
+INTERNALS
+
+The threading support is mostly implemented as a series of wrapper functions
+that protect calls to underlying code with one of a small number of locks.
+In single-threaded mode, the wrappers are replaced with direct invocations
+of the target code using #define; that is done in memcached.h. This approach
+allows memcached to be compiled in either single- or multi-threaded mode.
+
+Each thread has its own instance of libevent ("base" in libevent terminology).
+The only direct interaction between threads is for new connections. One of
+the threads handles the TCP listen socket; each new connection is passed to
+a different thread on a round-robin basis. After that, each thread operates
+on its set of connections as if it were running in single-threaded mode,
+using libevent to manage nonblocking I/O as usual.
+
+UDP requests are a bit different, since there is only one UDP socket that's
+shared by all clients. The UDP socket is monitored by all of the threads.
+When a datagram comes in, all the threads that aren't already processing
+another request will receive "socket readable" callbacks from libevent.
+Only one thread will successfully read the request; the others will go back
+to sleep or, in the case of a very busy server, will read whatever other
+UDP requests are waiting in the socket buffer. Note that in the case of
+moderately busy servers, this results in increased CPU consumption since
+threads will constantly wake up and find no input waiting for them. But
+short of much more major surgery on the I/O code, this is not easy to avoid.
+
+
+TO DO
+
+The locking is currently very coarse-grained. There is, for example, one
+lock that protects all the calls to the hashtable-related functions. Since
+memcached spends much of its CPU time on command parsing and response
+assembly, rather than managing the hashtable per se, this is not a huge
+bottleneck for small numbers of processors. However, the locking will likely
+have to be refined in the event that memcached needs to run well on
+massively-parallel machines.
+
+One cheap optimization to reduce contention on that lock: move the hash value
+computation so it occurs before the lock is obtained whenever possible.
+Right now the hash is performed at the lowest levels of the functions in
+assoc.c. If instead it was computed in memcached.c, then passed along with
+the key and length into the items.c code and down into assoc.c, that would
+reduce the amount of time each thread needs to keep the hashtable lock held.
Modified: branches/multithreaded/server/items.c
===================================================================
--- branches/multithreaded/server/items.c 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/items.c 2006-11-23 07:38:27 UTC (rev 442)
@@ -65,7 +65,7 @@
return sizeof(item) + nkey + *nsuffix + nbytes;
}
-item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
+item *do_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
int nsuffix, ntotal;
item *it;
unsigned int id;
@@ -100,7 +100,7 @@
for (search = tails[id]; tries>0 && search; tries--, search=search->prev) {
if (search->refcount==0) {
- item_unlink(search);
+ do_item_unlink(search);
break;
}
}
@@ -194,34 +194,38 @@
return;
}
-int item_link(item *it) {
+int do_item_link(item *it) {
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
assert(it->nbytes < 1048576);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
assoc_insert(it);
+ STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
+ STATS_UNLOCK();
item_link_q(it);
return 1;
}
-void item_unlink(item *it) {
+void do_item_unlink(item *it) {
if (it->it_flags & ITEM_LINKED) {
it->it_flags &= ~ITEM_LINKED;
+ STATS_LOCK();
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
+ STATS_UNLOCK();
assoc_delete(ITEM_key(it), it->nkey);
item_unlink_q(it);
+ if (it->refcount == 0) item_free(it);
}
- if (it->refcount == 0) item_free(it);
}
-void item_remove(item *it) {
+void do_item_remove(item *it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
if (it->refcount) {
it->refcount--;
@@ -233,21 +237,23 @@
}
}
-void item_update(item *it) {
+void do_item_update(item *it) {
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
assert((it->it_flags & ITEM_SLABBED) == 0);
- item_unlink_q(it);
- it->time = current_time;
- item_link_q(it);
+ if (it->it_flags & ITEM_LINKED) {
+ item_unlink_q(it);
+ it->time = current_time;
+ item_link_q(it);
+ }
}
}
-int item_replace(item *it, item *new_it) {
+int do_item_replace(item *it, item *new_it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
- item_unlink(it);
- return item_link(new_it);
+ do_item_unlink(it);
+ return do_item_link(new_it);
}
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
@@ -349,7 +355,7 @@
}
/* wrapper around assoc_find which does the lazy expiration/deletion logic */
-item *item_get_notedeleted(char *key, size_t nkey, int *delete_locked) {
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked) {
item *it = assoc_find(key, nkey);
if (delete_locked) *delete_locked = 0;
if (it && (it->it_flags & ITEM_DELETED)) {
@@ -363,11 +369,11 @@
}
if (it && settings.oldest_live && settings.oldest_live <= current_time &&
it->time <= settings.oldest_live) {
- item_unlink(it);
+ do_item_unlink(it); // MTSAFE - cache_lock held
it = 0;
}
if (it && it->exptime && it->exptime <= current_time) {
- item_unlink(it);
+ do_item_unlink(it); // MTSAFE - cache_lock held
it = 0;
}
@@ -383,7 +389,7 @@
}
/* returns an item whether or not it's delete-locked or expired. */
-item *item_get_nocheck(char *key, size_t nkey) {
+item *do_item_get_nocheck(char *key, size_t nkey) {
item *it = assoc_find(key, nkey);
if (it) {
it->refcount++;
@@ -393,7 +399,7 @@
}
/* expires items that are more recent than the oldest_live setting. */
-void item_flush_expired() {
+void do_item_flush_expired() {
int i;
item *iter, *next;
if (! settings.oldest_live)
@@ -408,7 +414,7 @@
if (iter->time >= settings.oldest_live) {
next = iter->next;
if ((iter->it_flags & ITEM_SLABBED) == 0) {
- item_unlink(iter);
+ do_item_unlink(iter);
}
} else {
/* We've hit the first old item. Continue to the next queue. */
Modified: branches/multithreaded/server/memcached.c
===================================================================
--- branches/multithreaded/server/memcached.c 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/memcached.c 2006-11-23 07:38:27 UTC (rev 442)
@@ -101,9 +101,11 @@
}
void stats_reset(void) {
+ STATS_LOCK();
stats.total_items = stats.total_conns = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
stats.bytes_read = stats.bytes_written = 0;
+ STATS_UNLOCK();
}
void settings_init(void) {
@@ -119,6 +121,11 @@
settings.managed = 0;
settings.factor = 1.25;
settings.chunk_size = 48; /* space for a modest key and value */
+#ifdef USE_THREADS
+ settings.num_threads = 4;
+#else
+ settings.num_threads = 1;
+#endif
}
/*
@@ -177,9 +184,9 @@
* Returns a connection from the freelist, if any. Should call this using
* conn_from_freelist() for thread safety.
*/
-conn *conn_from_freelist() {
+conn *do_conn_from_freelist() {
conn *c;
-
+
if (freecurr > 0) {
c = freeconns[--freecurr];
} else {
@@ -193,7 +200,7 @@
* Adds a connection to the freelist. 0 = success. Should call this using
* conn_add_to_freelist() for thread safety.
*/
-int conn_add_to_freelist(conn *c) {
+int do_conn_add_to_freelist(conn *c) {
if (freecurr < freetotal) {
freeconns[freecurr++] = c;
return 0;
@@ -211,7 +218,7 @@
}
conn *conn_new(int sfd, int init_state, int event_flags,
- int read_buffer_size, int is_udp, struct event_base *base) {
+ int read_buffer_size, int is_udp, struct event_base *base) {
conn *c = conn_from_freelist();
if (NULL == c) {
@@ -250,7 +257,9 @@
return 0;
}
+ STATS_LOCK();
stats.conn_structs++;
+ STATS_UNLOCK();
}
if (settings.verbose > 1) {
@@ -293,8 +302,10 @@
return 0;
}
+ STATS_LOCK();
stats.curr_conns++;
stats.total_conns++;
+ STATS_UNLOCK();
return c;
}
@@ -354,7 +365,9 @@
conn_free(c);
}
+ STATS_LOCK();
stats.curr_conns--;
+ STATS_UNLOCK();
return;
}
@@ -572,7 +585,9 @@
item *it = c->item;
int comm = c->item_comm;
+ STATS_LOCK();
stats.set_cmds++;
+ STATS_UNLOCK();
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
@@ -583,6 +598,7 @@
out_string(c, "NOT_STORED");
}
}
+
item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
@@ -593,15 +609,15 @@
*
* Returns true if the item was stored.
*/
-int store_item(item *it, int comm) {
+int do_store_item(item *it, int comm) {
char *key = ITEM_key(it);
int delete_locked = 0;
- item *old_it = item_get_notedeleted(key, it->nkey, &delete_locked);
+ item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
int stored = 0;
if (old_it && comm == NREAD_ADD) {
/* add only adds a nonexistent item, but promote to head of LRU */
- item_update(old_it);
+ do_item_update(old_it);
} else if (!old_it && comm == NREAD_REPLACE) {
/* replace only replaces an existing value; don't store */
} else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
@@ -612,19 +628,19 @@
that's in the namespace/LRU but wasn't returned by
item_get.... because we need to replace it */
if (delete_locked)
- old_it = item_get_nocheck(key, it->nkey);
+ old_it = do_item_get_nocheck(key, it->nkey);
if (old_it)
- item_replace(old_it, it);
+ do_item_replace(old_it, it);
else
- item_link(it);
+ do_item_link(it);
stored = 1;
- }
+ }
- if (old_it)
- item_remove(old_it); /* release our reference */
- return stored;
+ if (old_it)
+ do_item_remove(old_it); /* release our reference */
+ return stored;
}
typedef struct token_s {
@@ -722,6 +738,7 @@
getrusage(RUSAGE_SELF, &usage);
+ STATS_LOCK();
pos += sprintf(pos, "STAT pid %u\r\n", pid);
pos += sprintf(pos, "STAT uptime %u\r\n", now);
pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
@@ -742,7 +759,9 @@
pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long) settings.maxbytes);
+ pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
pos += sprintf(pos, "END");
+ STATS_UNLOCK();
out_string(c, temp);
return;
}
@@ -923,7 +942,9 @@
return;
}
+ STATS_LOCK();
stats.get_cmds++;
+ STATS_UNLOCK();
it = item_get(key, nkey);
if (it) {
if (i >= c->isize) {
@@ -951,12 +972,18 @@
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
/* item_get() has incremented it->refcount for us */
+ STATS_LOCK();
stats.get_hits++;
+ STATS_UNLOCK();
item_update(it);
*(c->ilist + i) = it;
i++;
-
- } else stats.get_misses++;
+
+ } else {
+ STATS_LOCK();
+ stats.get_misses++;
+ STATS_UNLOCK();
+ }
key_token++;
}
@@ -1103,7 +1130,7 @@
*
* returns a response string to send back to the client.
*/
-char *add_delta(item *it, int incr, unsigned int delta, char *buf) {
+char *do_add_delta(item *it, int incr, unsigned int delta, char *buf) {
char *ptr;
unsigned int value;
int res;
@@ -1127,18 +1154,19 @@
res = strlen(buf);
if (res + 2 > it->nbytes) { /* need to realloc */
item *new_it;
- new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
+ new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
if (new_it == 0) {
return "SERVER_ERROR out of memory";
}
memcpy(ITEM_data(new_it), buf, res);
memcpy(ITEM_data(new_it) + res, "\r\n", 2);
- item_replace(it, new_it);
- item_remove(new_it); /* release our reference */
+ do_item_replace(it, new_it);
+ do_item_remove(new_it); /* release our reference */
} else { /* replace in-place */
memcpy(ITEM_data(it), buf, res);
memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
}
+
return buf;
}
@@ -1198,7 +1226,7 @@
*
* Returns the result to send to the client.
*/
-char *defer_delete(item *it, time_t exptime)
+char *do_defer_delete(item *it, time_t exptime)
{
if (delcurr >= deltotal) {
item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
@@ -1447,7 +1475,9 @@
0, &c->request_addr, &c->request_addr_size);
if (res > 8) {
unsigned char *buf = (unsigned char *)c->rbuf;
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
/* Beginning of UDP packet is the request ID; save it. */
c->request_id = buf[0] * 256 + buf[1];
@@ -1512,7 +1542,9 @@
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
gotdata = 1;
c->rbytes += res;
continue;
@@ -1546,6 +1578,8 @@
* Sets whether we are listening for new connections or not.
*/
void accept_new_conns(int do_accept) {
+ if (! is_listen_thread())
+ return;
if (do_accept) {
update_event(listen_conn, EV_READ | EV_PERSIST);
if (listen(listen_conn->sfd, 1024)) {
@@ -1582,7 +1616,9 @@
struct msghdr *m = &c->msglist[c->msgcurr];
res = sendmsg(c->sfd, m, 0);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_written += res;
+ STATS_UNLOCK();
/* We've written some of the data. Remove the completed
iovec entries from the list of pending writes. */
@@ -1639,14 +1675,16 @@
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* these are transient, so don't log anything */
stop = 1;
- break;
} else if (errno == EMFILE) {
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
accept_new_conns(0);
+ stop = 1;
} else {
perror("accept()");
+ stop = 1;
}
break;
}
@@ -1656,15 +1694,8 @@
close(sfd);
break;
}
- newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, 0, main_base);
- if (!newc) {
- if (settings.verbose > 0)
- fprintf(stderr, "couldn't create new connection\n");
- close(sfd);
- break;
- }
-
+ dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+ DATA_BUFFER_SIZE, 0);
break;
case conn_read:
@@ -1704,7 +1735,9 @@
/* now try reading from the socket */
res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
c->ritem += res;
c->rlbytes -= res;
break;
@@ -1748,7 +1781,9 @@
/* now try reading from the socket */
res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
c->sbytes -= res;
break;
}
@@ -1833,7 +1868,6 @@
stop = 1;
break;
}
-
}
return;
@@ -1853,7 +1887,6 @@
return;
}
- /* do as much I/O as possible until we block */
drive_machine(c);
/* wait for next event */
@@ -2089,16 +2122,18 @@
run_deferred_deletes();
}
-void run_deferred_deletes()
+/* Call run_deferred_deletes instead of this. */
+void do_run_deferred_deletes()
{
int i, j=0;
+
for (i=0; i<delcurr; i++) {
item *it = todelete[i];
if (item_delete_lock_over(it)) {
assert(it->refcount > 0);
it->it_flags &= ~ITEM_DELETED;
- item_unlink(it);
- item_remove(it);
+ do_item_unlink(it);
+ do_item_remove(it);
} else {
todelete[j++] = it;
}
@@ -2127,6 +2162,9 @@
printf("-P <file> save PID in <file>, only used with -d option\n");
printf("-f <factor> chunk size growth factor, default 1.25\n");
printf("-n <bytes> minimum space allocated for key+value+flags, default 48\n");
+#ifdef USE_THREADS
+ printf("-t <num> number of threads to use, default 4\n");
+#endif
return;
}
@@ -2259,7 +2297,7 @@
setbuf(stderr, NULL);
/* process arguments */
- while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
+ while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:t:")) != -1) {
switch (c) {
case 'U':
settings.udpport = atoi(optarg);
@@ -2328,6 +2366,13 @@
return 1;
}
break;
+ case 't':
+ settings.num_threads = atoi(optarg);
+ if (settings.num_threads == 0) {
+ fprintf(stderr, "Number of threads must be greater than 0\n");
+ return 1;
+ }
+ break;
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
@@ -2488,23 +2533,26 @@
fprintf(stderr, "failed to create listening connection");
exit(1);
}
+ /* save the PID in if we're a daemon */
+ if (daemonize)
+ save_pid(getpid(),pid_file);
+ /* start up worker threads if MT mode */
+ thread_init(settings.num_threads, main_base);
/* initialise clock event */
clock_handler(0,0,0);
/* initialise deletion array and timer event */
deltotal = 200; delcurr = 0;
todelete = malloc(sizeof(item *)*deltotal);
delete_handler(0,0,0); /* sets up the event */
- /* save the PID in if we're a daemon */
- if (daemonize)
- save_pid(getpid(),pid_file);
- /* create the initial listening udp connection */
- if (u_socket > -1 &&
- !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
- UDP_READ_BUFFER_SIZE, 1, main_base))) {
- fprintf(stderr, "failed to create udp connection");
- exit(1);
+ /* create the initial listening udp connection, monitored on all threads */
+ if (u_socket > -1) {
+ for (c = 0; c < settings.num_threads; c++) {
+ /* this is guaranteed to hit all threads because we round-robin */
+ dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
+ UDP_READ_BUFFER_SIZE, 1);
+ }
}
- /* enter the loop */
+ /* enter the event loop */
event_base_loop(main_base, 0);
/* remove the PID file if we're a daemon */
if (daemonize)
Modified: branches/multithreaded/server/memcached.h
===================================================================
--- branches/multithreaded/server/memcached.h 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/memcached.h 2006-11-23 07:38:27 UTC (rev 442)
@@ -59,6 +59,7 @@
char *socketpath; /* path to unix socket if using local socket */
double factor; /* chunk size growth factor */
int chunk_size;
+ int num_threads; /* number of libevent threads to run */
};
extern struct stats stats;
@@ -223,25 +224,25 @@
unsigned int slabs_clsid(size_t size);
/* Allocate object of given length. 0 on error */
-void *slabs_alloc(size_t size);
+void *do_slabs_alloc(size_t size);
/* Free previously allocated object */
-void slabs_free(void *ptr, size_t size);
+void do_slabs_free(void *ptr, size_t size);
/* Fill buffer with stats */
-char* slabs_stats(int *buflen);
+char *do_slabs_stats(int *buflen);
/* Request some slab be moved between classes
1 = success
0 = fail
-1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid);
-int slabs_newslab(unsigned int id);
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid);
+int do_slabs_newslab(unsigned int id);
/* event handling, network IO */
void event_handler(int fd, short which, void *arg);
-conn *conn_from_freelist();
-int conn_add_to_freelist(conn *c);
+conn *do_conn_from_freelist();
+int do_conn_add_to_freelist(conn *c);
conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp, struct event_base *event_base);
void conn_close(conn *c);
void conn_init(void);
@@ -258,11 +259,11 @@
int transmit(conn *c);
int ensure_iov_space(conn *c);
int add_iov(conn *c, const void *buf, int len);
-char *defer_delete(item *item, time_t exptime);
-void run_deferred_deletes(void);
int add_msghdr(conn *c);
-char *add_delta(item *item, int incr, unsigned int delta, char *buf);
-int store_item(item *item, int comm);
+char *do_defer_delete(item *item, time_t exptime);
+void do_run_deferred_deletes(void);
+char *do_add_delta(item *item, int incr, unsigned int delta, char *buf);
+int do_store_item(item *item, int comm);
/* stats */
void stats_reset(void);
void stats_init(void);
@@ -273,27 +274,127 @@
item *assoc_find(const char *key, size_t nkey);
int assoc_insert(item *item);
void assoc_delete(const char *key, size_t nkey);
-void assoc_move_next_bucket(void);
+void do_assoc_move_next_bucket(void);
void item_init(void);
-item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+item *do_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
void item_free(item *it);
int item_size_ok(char *key, size_t nkey, int flags, int nbytes);
-
-int item_link(item *it); /* may fail if transgresses limits */
-void item_unlink(item *it);
-void item_remove(item *it);
-void item_update(item *it); /* update LRU time to current and reposition */
-int item_replace(item *it, item *new_it);
+int do_item_link(item *it); /* may fail if transgresses limits */
+void do_item_unlink(item *it);
+void do_item_remove(item *it);
+void do_item_update(item *it); /* update LRU time to current and reposition */
+int do_item_replace(item *it, item *new_it);
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes);
char *item_stats_sizes(int *bytes);
void item_stats(char *buffer, int buflen);
-item *item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
-item *item_get_nocheck(char *key, size_t nkey);
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *do_item_get_nocheck(char *key, size_t nkey);
item *item_get(char *key, size_t nkey);
-void item_flush_expired(void);
+void do_item_flush_expired(void);
/* time handling */
void set_current_time (); /* update the global variable holding
global 32-bit seconds-since-start time
(to avoid 64 bit time_t) */
+
+/*
+ * In multithreaded mode, we wrap certain functions with lock management and
+ * replace the logic of some other functions. All wrapped functions have
+ * "mt_" and "do_" variants. In multithreaded mode, the plain version of a
+ * function is #define-d to the "mt_" variant, which often just grabs a
+ * lock and calls the "do_" function. In singlethreaded mode, the "do_"
+ * function is called directly.
+ *
+ * Functions such as the libevent-related calls that need to do cross-thread
+ * communication in multithreaded mode (rather than actually doing the work
+ * in the current thread) are called via "dispatch_" frontends, which are
+ * also #define-d to directly call the underlying code in singlethreaded mode.
+ */
+#ifdef USE_THREADS
+
+void thread_init(int nthreads, struct event_base *main_base);
+int dispatch_event_add(int thread, conn *c);
+void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
+
+/* Lock wrappers for cache functions that are called from main loop. */
+char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf);
+conn *mt_conn_from_freelist(void);
+int mt_conn_add_to_freelist(conn *c);
+char *mt_defer_delete(item *it, time_t exptime);
+int mt_is_listen_thread(void);
+item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+void mt_item_flush_expired(void);
+item *mt_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *mt_item_get_nocheck(char *key, size_t nkey);
+int mt_item_link(item *it);
+void mt_item_remove(item *it);
+int mt_item_replace(item *it, item *new_it);
+void mt_item_unlink(item *it);
+void mt_item_update(item *it);
+void mt_run_deferred_deletes(void);
+void *mt_slabs_alloc(size_t size);
+void mt_slabs_free(void *ptr, size_t size);
+int mt_slabs_reassign(unsigned char srcid, unsigned char dstid);
+char *mt_slabs_stats(int *buflen);
+void mt_stats_lock(void);
+void mt_stats_unlock(void);
+int mt_store_item(item *item, int comm);
+
+
+# define add_delta(x,y,z,a) mt_add_delta(x,y,z,a)
+# define assoc_move_next_bucket() mt_assoc_move_next_bucket()
+# define conn_from_freelist() mt_conn_from_freelist()
+# define conn_add_to_freelist(x) mt_conn_add_to_freelist(x)
+# define defer_delete(x,y) mt_defer_delete(x,y)
+# define is_listen_thread() mt_is_listen_thread()
+# define item_alloc(x,y,z,a,b) mt_item_alloc(x,y,z,a,b)
+# define item_flush_expired() mt_item_flush_expired()
+# define item_get_nocheck(x,y) mt_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z)
+# define item_link(x) mt_item_link(x)
+# define item_remove(x) mt_item_remove(x)
+# define item_replace(x,y) mt_item_replace(x,y)
+# define item_update(x) mt_item_update(x)
+# define item_unlink(x) mt_item_unlink(x)
+# define run_deferred_deletes() mt_run_deferred_deletes()
+# define slabs_alloc(x) mt_slabs_alloc(x)
+# define slabs_free(x,y) mt_slabs_free(x,y)
+# define slabs_reassign(x,y) mt_slabs_reassign(x,y)
+# define slabs_stats(x) mt_slabs_stats(x)
+# define store_item(x,y) mt_store_item(x,y)
+
+# define STATS_LOCK() mt_stats_lock()
+# define STATS_UNLOCK() mt_stats_unlock()
+
+#else /* !USE_THREADS */
+
+# define add_delta(x,y,z,a) do_add_delta(x,y,z,a)
+# define assoc_move_next_bucket() do_assoc_move_next_bucket()
+# define conn_from_freelist() do_conn_from_freelist()
+# define conn_add_to_freelist(x) do_conn_add_to_freelist(x)
+# define defer_delete(x,y) do_defer_delete(x,y)
+# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
+# define dispatch_event_add(t,c) event_add(&(c)->event, 0)
+# define is_listen_thread() 1
+# define item_alloc(x,y,z,a,b) do_item_alloc(x,y,z,a,b)
+# define item_flush_expired() do_item_flush_expired()
+# define item_get_nocheck(x,y) do_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z)
+# define item_link(x) do_item_link(x)
+# define item_remove(x) do_item_remove(x)
+# define item_replace(x,y) do_item_replace(x,y)
+# define item_unlink(x) do_item_unlink(x)
+# define item_update(x) do_item_update(x)
+# define run_deferred_deletes() do_run_deferred_deletes()
+# define slabs_alloc(x) do_slabs_alloc(x)
+# define slabs_free(x,y) do_slabs_free(x,y)
+# define slabs_reassign(x,y) do_slabs_reassign(x,y)
+# define slabs_stats(x) do_slabs_stats(x)
+# define store_item(x,y) do_store_item(x,y)
+# define thread_init(x,y) 0
+
+# define STATS_LOCK() /**/
+# define STATS_UNLOCK() /**/
+
+#endif /* !USE_THREADS */
Modified: branches/multithreaded/server/slabs.c
===================================================================
--- branches/multithreaded/server/slabs.c 2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/slabs.c 2006-11-23 07:38:27 UTC (rev 442)
@@ -26,6 +26,7 @@
#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
#define CHUNK_ALIGN_BYTES (sizeof(void *))
+#define DONT_PREALLOC_SLABS
/* powers-of-N allocation structures */
@@ -133,7 +134,7 @@
for(i=POWER_SMALLEST; i<=POWER_LARGEST; i++) {
if (++prealloc > maxslabs)
return;
- slabs_newslab(i);
+ do_slabs_newslab(i);
}
}
@@ -150,7 +151,7 @@
return 1;
}
-int slabs_newslab(unsigned int id) {
+int do_slabs_newslab(unsigned int id) {
slabclass_t *p = &slabclass[id];
#ifdef ALLOW_SLABS_REASSIGN
int len = POWER_BLOCK;
@@ -176,7 +177,7 @@
return 1;
}
-void *slabs_alloc(size_t size) {
+void *do_slabs_alloc(size_t size) {
slabclass_t *p;
unsigned char id = slabs_clsid(size);
@@ -195,7 +196,7 @@
/* fail unless we have space at the end of a recently allocated page,
we have something on our freelist, or we could allocate a new page */
- if (! (p->end_page_ptr || p->sl_curr || slabs_newslab(id)))
+ if (! (p->end_page_ptr || p->sl_curr || do_slabs_newslab(id)))
return 0;
/* return off our freelist, if we have one */
@@ -216,7 +217,7 @@
return 0; /* shouldn't ever get here */
}
-void slabs_free(void *ptr, size_t size) {
+void do_slabs_free(void *ptr, size_t size) {
unsigned char id = slabs_clsid(size);
slabclass_t *p;
@@ -245,7 +246,7 @@
return;
}
-char* slabs_stats(int *buflen) {
+char* do_slabs_stats(int *buflen) {
int i, total;
char *buf = (char*) malloc(power_largest * 200 + 100);
char *bufcurr = buf;
@@ -287,7 +288,7 @@
1 = success
0 = fail
-1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid) {
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid) {
void *slab, *slab_end;
slabclass_t *p, *dp;
void *iter;
More information about the memcached-commits
mailing list