[memcached] sgrimm, r442: Multithreading support. This change allo...

commits at code.sixapart.com commits at code.sixapart.com
Thu Nov 23 07:38:29 UTC 2006


Multithreading support. This change allows memcached to be built either
as a single-threaded program (the default) or a multithreaded one if
the "--enable-threads" option is passed to the configure script. See
doc/threads.txt for a more detailed description.


U   branches/multithreaded/server/ChangeLog
U   branches/multithreaded/server/Makefile.am
U   branches/multithreaded/server/assoc.c
U   branches/multithreaded/server/configure.ac
U   branches/multithreaded/server/doc/memcached.1
A   branches/multithreaded/server/doc/threads.txt
U   branches/multithreaded/server/items.c
U   branches/multithreaded/server/memcached.c
U   branches/multithreaded/server/memcached.h
U   branches/multithreaded/server/slabs.c


Modified: branches/multithreaded/server/ChangeLog
===================================================================
--- branches/multithreaded/server/ChangeLog	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/ChangeLog	2006-11-23 07:38:27 UTC (rev 442)
@@ -1,3 +1,8 @@
+2006-11-22
+	* Steven Grimm <sgrimm at facebook.com>: Add support for multithreaded
+	  execution. Run configure with "--enable-threads" to enable. See
+	  doc/threads.txt for details.
+
 2006-10-15
 	* Steven Grimm <sgrimm at facebook.com>: Dynamic sizing of hashtable to
 	  reduce collisions on very large caches and conserve memory on

Modified: branches/multithreaded/server/Makefile.am
===================================================================
--- branches/multithreaded/server/Makefile.am	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/Makefile.am	2006-11-23 07:38:27 UTC (rev 442)
@@ -1,6 +1,6 @@
 bin_PROGRAMS = memcached memcached-debug
 
-memcached_SOURCES = memcached.c slabs.c items.c assoc.c memcached.h
+memcached_SOURCES = memcached.c slabs.c items.c memcached.h assoc.c thread.c
 memcached_debug_SOURCES = $(memcached_SOURCES)
 memcached_CPPFLAGS = -DNDEBUG
 memcached_LDADD = @LIBOBJS@

Modified: branches/multithreaded/server/assoc.c
===================================================================
--- branches/multithreaded/server/assoc.c	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/assoc.c	2006-11-23 07:38:27 UTC (rev 442)
@@ -12,6 +12,7 @@
  *
  * $Id$
  */
+
 #include "memcached.h"
 #include <sys/stat.h>
 #include <sys/socket.h>
@@ -535,39 +536,41 @@
 
     primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
     if (primary_hashtable) {
-	if (settings.verbose > 1)
-	    fprintf(stderr, "Hash table expansion starting\n");
+        if (settings.verbose > 1)
+            fprintf(stderr, "Hash table expansion starting\n");
         hashpower++;
         expanding = 1;
         expand_bucket = 0;
-	assoc_move_next_bucket();
+        do_assoc_move_next_bucket();
     } else {
         primary_hashtable = old_hashtable;
-	/* Bad news, but we can keep running. */
+        /* Bad news, but we can keep running. */
     }
 }
 
 /* migrates the next bucket to the primary hashtable if we're expanding. */
-void assoc_move_next_bucket(void) {
+void do_assoc_move_next_bucket(void) {
     item *it, *next;
     int bucket;
 
     if (expanding) {
         for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
-	    next = it->h_next;
+            next = it->h_next;
 
             bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
             it->h_next = primary_hashtable[bucket];
             primary_hashtable[bucket] = it;
-	}
+        }
 
-	expand_bucket++;
-	if (expand_bucket == hashsize(hashpower - 1)) {
-	    expanding = 0;
-	    free(old_hashtable);
-	    if (settings.verbose > 1)
-	        fprintf(stderr, "Hash table expansion done\n");
-	}
+        old_hashtable[expand_bucket] = NULL;
+
+        expand_bucket++;
+        if (expand_bucket == hashsize(hashpower - 1)) {
+            expanding = 0;
+            free(old_hashtable);
+            if (settings.verbose > 1)
+                fprintf(stderr, "Hash table expansion done\n");
+        }
     }
 }
 

Modified: branches/multithreaded/server/configure.ac
===================================================================
--- branches/multithreaded/server/configure.ac	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/configure.ac	2006-11-23 07:38:27 UTC (rev 442)
@@ -94,6 +94,7 @@
 AC_SEARCH_LIBS(socket, socket)
 AC_SEARCH_LIBS(gethostbyname, nsl)
 AC_SEARCH_LIBS(mallinfo, malloc)
+AC_SEARCH_LIBS(pthread_create, pthread)
 
 AC_CHECK_FUNC(daemon,AC_DEFINE([HAVE_DAEMON],,[Define this if you have daemon()]),[AC_LIBOBJ(daemon)])
 
@@ -155,6 +156,15 @@
 
 AC_C_ENDIAN
 
+dnl Check whether the user wants threads or not
+AC_ARG_ENABLE(threads,
+  [AS_HELP_STRING([--enable-threads],[support multithreaded execution])],
+  [if test "$ac_cv_search_pthread_create" != "no"; then
+    AC_DEFINE([USE_THREADS],,[Define this if you want to use pthreads])
+   else
+    AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
+   fi])
+
 AC_CHECK_FUNCS(mlockall)
 
 AC_CONFIG_FILES(Makefile doc/Makefile)

Modified: branches/multithreaded/server/doc/memcached.1
===================================================================
--- branches/multithreaded/server/doc/memcached.1	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/doc/memcached.1	2006-11-23 07:38:27 UTC (rev 442)
@@ -84,6 +84,12 @@
 .TP
 .B \-P <filename>
 Print pidfile to <filename>, only used under -d option.
+.TP
+.B \-t <threads>
+Number of threads to use to process incoming requests. This option is only
+meaningful if memcached was compiled with thread support enabled. It is 
+typically not useful to set this higher than the number of CPU cores on the
+memcached server.
 .br
 .SH LICENSE
 The memcached daemon is copyright Danga Interactive and is distributed under 

Added: branches/multithreaded/server/doc/threads.txt
===================================================================
--- branches/multithreaded/server/doc/threads.txt	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/doc/threads.txt	2006-11-23 07:38:27 UTC (rev 442)
@@ -0,0 +1,68 @@
+Multithreading support in memcached
+
+OVERVIEW
+
+By default, memcached is compiled as a single-threaded application. This is
+the most CPU-efficient mode of operation, and it is appropriate for memcached
+instances running on single-processor servers or whose request volume is
+low enough that available CPU power is not a bottleneck.
+
+More heavily-used memcached instances can benefit from multithreaded mode.
+To enable it, use the "--enable-threads" option to the configure script:
+
+./configure --enable-threads
+
+You must have the POSIX thread functions (pthread_*) on your system in order
+to use memcached's multithreaded mode.
+
+Once you have a thread-capable memcached executable, you can control the
+number of threads using the "-t" option; the default is 4. On a machine
+that's dedicated to memcached, you will typically want one thread per
+processor core. Due to memcached's nonblocking architecture, there is no
+real advantage to using more threads than the number of CPUs on the machine;
+doing so will increase lock contention and is likely to degrade performance.
+
+
+INTERNALS
+
+The threading support is mostly implemented as a series of wrapper functions
+that protect calls to underlying code with one of a small number of locks.
+In single-threaded mode, the wrappers are replaced with direct invocations
+of the target code using #define; that is done in memcached.h. This approach
+allows memcached to be compiled in either single- or multi-threaded mode.
+
+Each thread has its own instance of libevent ("base" in libevent terminology).
+The only direct interaction between threads is for new connections. One of
+the threads handles the TCP listen socket; each new connection is passed to
+a different thread on a round-robin basis. After that, each thread operates
+on its set of connections as if it were running in single-threaded mode,
+using libevent to manage nonblocking I/O as usual.
+
+UDP requests are a bit different, since there is only one UDP socket that's
+shared by all clients. The UDP socket is monitored by all of the threads.
+When a datagram comes in, all the threads that aren't already processing
+another request will receive "socket readable" callbacks from libevent.
+Only one thread will successfully read the request; the others will go back
+to sleep or, in the case of a very busy server, will read whatever other
+UDP requests are waiting in the socket buffer. Note that in the case of
+moderately busy servers, this results in increased CPU consumption since
+threads will constantly wake up and find no input waiting for them. But
+short of much more major surgery on the I/O code, this is not easy to avoid.
+
+
+TO DO
+
+The locking is currently very coarse-grained.  There is, for example, one
+lock that protects all the calls to the hashtable-related functions. Since
+memcached spends much of its CPU time on command parsing and response
+assembly, rather than managing the hashtable per se, this is not a huge
+bottleneck for small numbers of processors. However, the locking will likely
+have to be refined in the event that memcached needs to run well on
+massively-parallel machines.
+
+One cheap optimization to reduce contention on that lock: move the hash value
+computation so it occurs before the lock is obtained whenever possible.
+Right now the hash is performed at the lowest levels of the functions in
+assoc.c. If instead it was computed in memcached.c, then passed along with
+the key and length into the items.c code and down into assoc.c, that would
+reduce the amount of time each thread needs to keep the hashtable lock held.

Modified: branches/multithreaded/server/items.c
===================================================================
--- branches/multithreaded/server/items.c	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/items.c	2006-11-23 07:38:27 UTC (rev 442)
@@ -65,7 +65,7 @@
     return sizeof(item) + nkey + *nsuffix + nbytes;
 }
  
-item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
+item *do_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
     int nsuffix, ntotal;
     item *it;
     unsigned int id;
@@ -100,7 +100,7 @@
 
         for (search = tails[id]; tries>0 && search; tries--, search=search->prev) {
             if (search->refcount==0) {
-                item_unlink(search);
+                do_item_unlink(search);
                 break;
             }
         }
@@ -194,34 +194,38 @@
     return;
 }
 
-int item_link(item *it) {
+int do_item_link(item *it) {
     assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
     assert(it->nbytes < 1048576);
     it->it_flags |= ITEM_LINKED;
     it->time = current_time;
     assoc_insert(it);
 
+    STATS_LOCK();
     stats.curr_bytes += ITEM_ntotal(it);
     stats.curr_items += 1;
     stats.total_items += 1;
+    STATS_UNLOCK();
 
     item_link_q(it);
 
     return 1;
 }
 
-void item_unlink(item *it) {
+void do_item_unlink(item *it) {
     if (it->it_flags & ITEM_LINKED) {
         it->it_flags &= ~ITEM_LINKED;
+        STATS_LOCK();
         stats.curr_bytes -= ITEM_ntotal(it);
         stats.curr_items -= 1;
+        STATS_UNLOCK();
         assoc_delete(ITEM_key(it), it->nkey);
         item_unlink_q(it);
+        if (it->refcount == 0) item_free(it);
     }
-    if (it->refcount == 0) item_free(it);
 }
 
-void item_remove(item *it) {
+void do_item_remove(item *it) {
     assert((it->it_flags & ITEM_SLABBED) == 0);
     if (it->refcount) {
         it->refcount--;
@@ -233,21 +237,23 @@
     }
 }
 
-void item_update(item *it) {
+void do_item_update(item *it) {
     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
         assert((it->it_flags & ITEM_SLABBED) == 0);
 
-        item_unlink_q(it);
-        it->time = current_time;
-        item_link_q(it);
+        if (it->it_flags & ITEM_LINKED) {
+            item_unlink_q(it);
+            it->time = current_time;
+            item_link_q(it);
+        }
     }
 }
 
-int item_replace(item *it, item *new_it) {
+int do_item_replace(item *it, item *new_it) {
     assert((it->it_flags & ITEM_SLABBED) == 0);
 
-    item_unlink(it);
-    return item_link(new_it);
+    do_item_unlink(it);
+    return do_item_link(new_it);
 }
 
 char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
@@ -349,7 +355,7 @@
 }
 
 /* wrapper around assoc_find which does the lazy expiration/deletion logic */
-item *item_get_notedeleted(char *key, size_t nkey, int *delete_locked) {
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked) {
     item *it = assoc_find(key, nkey);
     if (delete_locked) *delete_locked = 0;
     if (it && (it->it_flags & ITEM_DELETED)) {
@@ -363,11 +369,11 @@
     }
     if (it && settings.oldest_live && settings.oldest_live <= current_time &&
         it->time <= settings.oldest_live) {
-        item_unlink(it);
+        do_item_unlink(it);           // MTSAFE - cache_lock held
         it = 0;
     }
     if (it && it->exptime && it->exptime <= current_time) {
-        item_unlink(it);
+        do_item_unlink(it);           // MTSAFE - cache_lock held
         it = 0;
     }
 
@@ -383,7 +389,7 @@
 }
 
 /* returns an item whether or not it's delete-locked or expired. */
-item *item_get_nocheck(char *key, size_t nkey) {
+item *do_item_get_nocheck(char *key, size_t nkey) {
     item *it = assoc_find(key, nkey);
     if (it) {
         it->refcount++;
@@ -393,7 +399,7 @@
 }
 
 /* expires items that are more recent than the oldest_live setting. */
-void item_flush_expired() {
+void do_item_flush_expired() {
     int i;
     item *iter, *next;
     if (! settings.oldest_live)
@@ -408,7 +414,7 @@
             if (iter->time >= settings.oldest_live) {
                 next = iter->next;
                 if ((iter->it_flags & ITEM_SLABBED) == 0) {
-                    item_unlink(iter);
+                    do_item_unlink(iter);
                 }
             } else {
                 /* We've hit the first old item. Continue to the next queue. */

Modified: branches/multithreaded/server/memcached.c
===================================================================
--- branches/multithreaded/server/memcached.c	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/memcached.c	2006-11-23 07:38:27 UTC (rev 442)
@@ -101,9 +101,11 @@
 }
 
 void stats_reset(void) {
+    STATS_LOCK();
     stats.total_items = stats.total_conns = 0;
     stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
     stats.bytes_read = stats.bytes_written = 0;
+    STATS_UNLOCK();
 }
 
 void settings_init(void) {
@@ -119,6 +121,11 @@
     settings.managed = 0;
     settings.factor = 1.25;
     settings.chunk_size = 48;         /* space for a modest key and value */
+#ifdef USE_THREADS
+    settings.num_threads = 4;
+#else
+    settings.num_threads = 1;
+#endif
 }
 
 /*
@@ -177,9 +184,9 @@
  * Returns a connection from the freelist, if any. Should call this using
  * conn_from_freelist() for thread safety.
  */
-conn *conn_from_freelist() {
+conn *do_conn_from_freelist() {
     conn *c;
- 
+
     if (freecurr > 0) {
         c = freeconns[--freecurr];
     } else {
@@ -193,7 +200,7 @@
  * Adds a connection to the freelist. 0 = success. Should call this using
  * conn_add_to_freelist() for thread safety.
  */
-int conn_add_to_freelist(conn *c) {
+int do_conn_add_to_freelist(conn *c) {
     if (freecurr < freetotal) {
         freeconns[freecurr++] = c;
         return 0;
@@ -211,7 +218,7 @@
 }
 
 conn *conn_new(int sfd, int init_state, int event_flags,
-               int read_buffer_size, int is_udp, struct event_base *base) {
+                int read_buffer_size, int is_udp, struct event_base *base) {
     conn *c = conn_from_freelist();
 
     if (NULL == c) {
@@ -250,7 +257,9 @@
             return 0;
         }
 
+        STATS_LOCK();
         stats.conn_structs++;
+        STATS_UNLOCK();
     }
 
     if (settings.verbose > 1) {
@@ -293,8 +302,10 @@
         return 0;
     }
 
+    STATS_LOCK();
     stats.curr_conns++;
     stats.total_conns++;
+    STATS_UNLOCK();
 
     return c;
 }
@@ -354,7 +365,9 @@
         conn_free(c);
     }
 
+    STATS_LOCK();
     stats.curr_conns--;
+    STATS_UNLOCK();
 
     return;
 }
@@ -572,7 +585,9 @@
     item *it = c->item;
     int comm = c->item_comm;
 
+    STATS_LOCK();
     stats.set_cmds++;
+    STATS_UNLOCK();
 
     if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
         out_string(c, "CLIENT_ERROR bad data chunk");
@@ -583,6 +598,7 @@
             out_string(c, "NOT_STORED");
         }
     }
+
     item_remove(c->item);       /* release the c->item reference */
     c->item = 0;
 }
@@ -593,15 +609,15 @@
  *
  * Returns true if the item was stored.
  */
-int store_item(item *it, int comm) {
+int do_store_item(item *it, int comm) {
     char *key = ITEM_key(it);
     int delete_locked = 0;
-    item *old_it = item_get_notedeleted(key, it->nkey, &delete_locked);
+    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
     int stored = 0;
 
     if (old_it && comm == NREAD_ADD) {
         /* add only adds a nonexistent item, but promote to head of LRU */
-        item_update(old_it);
+        do_item_update(old_it);
     } else if (!old_it && comm == NREAD_REPLACE) {
         /* replace only replaces an existing value; don't store */
     } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
@@ -612,19 +628,19 @@
            that's in the namespace/LRU but wasn't returned by
            item_get.... because we need to replace it */
         if (delete_locked)
-            old_it = item_get_nocheck(key, it->nkey);
+            old_it = do_item_get_nocheck(key, it->nkey);
 
         if (old_it)
-            item_replace(old_it, it);
+            do_item_replace(old_it, it);
         else
-            item_link(it);
+            do_item_link(it);
 
         stored = 1;
-     }
+    }
 
-     if (old_it)
-         item_remove(old_it);         /* release our reference */
-     return stored;
+    if (old_it)
+        do_item_remove(old_it);         /* release our reference */
+    return stored;
 }
 
 typedef struct token_s {
@@ -722,6 +738,7 @@
 
         getrusage(RUSAGE_SELF, &usage);
 
+        STATS_LOCK();
         pos += sprintf(pos, "STAT pid %u\r\n", pid);
         pos += sprintf(pos, "STAT uptime %u\r\n", now);
         pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
@@ -742,7 +759,9 @@
         pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
         pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
         pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long) settings.maxbytes);
+        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
         pos += sprintf(pos, "END");
+        STATS_UNLOCK();
         out_string(c, temp);
         return;
     }
@@ -923,7 +942,9 @@
                 return;
             }
                 
+            STATS_LOCK();
             stats.get_cmds++;
+            STATS_UNLOCK();
             it = item_get(key, nkey);
             if (it) {
                 if (i >= c->isize) {
@@ -951,12 +972,18 @@
                     fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
 
                 /* item_get() has incremented it->refcount for us */
+                STATS_LOCK();
                 stats.get_hits++;
+                STATS_UNLOCK();
                 item_update(it);
                 *(c->ilist + i) = it;
                 i++;
-                
-            } else stats.get_misses++;
+
+            } else {
+                STATS_LOCK();
+                stats.get_misses++;
+                STATS_UNLOCK();
+            }
             
             key_token++;
         }
@@ -1103,7 +1130,7 @@
  * 
  * returns a response string to send back to the client.
  */
-char *add_delta(item *it, int incr, unsigned int delta, char *buf) {
+char *do_add_delta(item *it, int incr, unsigned int delta, char *buf) {
     char *ptr;
     unsigned int value;
     int res;
@@ -1127,18 +1154,19 @@
     res = strlen(buf);
     if (res + 2 > it->nbytes) { /* need to realloc */
         item *new_it;
-        new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
+        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
         if (new_it == 0) {
             return "SERVER_ERROR out of memory";
         }
         memcpy(ITEM_data(new_it), buf, res);
         memcpy(ITEM_data(new_it) + res, "\r\n", 2);
-        item_replace(it, new_it);
-        item_remove(new_it);       /* release our reference */
+        do_item_replace(it, new_it);
+        do_item_remove(new_it);       /* release our reference */
     } else { /* replace in-place */
         memcpy(ITEM_data(it), buf, res);
         memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
     }
+
     return buf;
 }
 
@@ -1198,7 +1226,7 @@
  *
  * Returns the result to send to the client.
  */
-char *defer_delete(item *it, time_t exptime)
+char *do_defer_delete(item *it, time_t exptime)
 {
     if (delcurr >= deltotal) {
         item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
@@ -1447,7 +1475,9 @@
                    0, &c->request_addr, &c->request_addr_size);
     if (res > 8) {
         unsigned char *buf = (unsigned char *)c->rbuf;
+        STATS_LOCK();
         stats.bytes_read += res;
+        STATS_UNLOCK();
 
         /* Beginning of UDP packet is the request ID; save it. */
         c->request_id = buf[0] * 256 + buf[1];
@@ -1512,7 +1542,9 @@
 
         res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
         if (res > 0) {
+            STATS_LOCK();
             stats.bytes_read += res;
+            STATS_UNLOCK();
             gotdata = 1;
             c->rbytes += res;
             continue;
@@ -1546,6 +1578,8 @@
  * Sets whether we are listening for new connections or not.
  */
 void accept_new_conns(int do_accept) {
+    if (! is_listen_thread())
+        return;
     if (do_accept) {
         update_event(listen_conn, EV_READ | EV_PERSIST);
         if (listen(listen_conn->sfd, 1024)) {
@@ -1582,7 +1616,9 @@
         struct msghdr *m = &c->msglist[c->msgcurr];
         res = sendmsg(c->sfd, m, 0);
         if (res > 0) {
+            STATS_LOCK();
             stats.bytes_written += res;
+            STATS_UNLOCK();
 
             /* We've written some of the data. Remove the completed
                iovec entries from the list of pending writes. */
@@ -1639,14 +1675,16 @@
             addrlen = sizeof(addr);
             if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    /* these are transient, so don't log anything */
                     stop = 1;
-                    break;
                 } else if (errno == EMFILE) {
                     if (settings.verbose > 0)
                         fprintf(stderr, "Too many open connections\n");
                     accept_new_conns(0);
+                    stop = 1;
                 } else {
                     perror("accept()");
+                    stop = 1;
                 }
                 break;
             }
@@ -1656,15 +1694,8 @@
                 close(sfd);
                 break;
             }
-            newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
-                            DATA_BUFFER_SIZE, 0, main_base);
-            if (!newc) {
-                if (settings.verbose > 0)
-                    fprintf(stderr, "couldn't create new connection\n");
-                close(sfd);
-                break;
-            }
-
+            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+                                     DATA_BUFFER_SIZE, 0);
             break;
 
         case conn_read:
@@ -1704,7 +1735,9 @@
             /*  now try reading from the socket */
             res = read(c->sfd, c->ritem, c->rlbytes);
             if (res > 0) {
+                STATS_LOCK();
                 stats.bytes_read += res;
+                STATS_UNLOCK();
                 c->ritem += res;
                 c->rlbytes -= res;
                 break;
@@ -1748,7 +1781,9 @@
             /*  now try reading from the socket */
             res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
             if (res > 0) {
+                STATS_LOCK();
                 stats.bytes_read += res;
+                STATS_UNLOCK();
                 c->sbytes -= res;
                 break;
             }
@@ -1833,7 +1868,6 @@
             stop = 1;
             break;
         }
-
     }
 
     return;
@@ -1853,7 +1887,6 @@
         return;
     }
 
-    /* do as much I/O as possible until we block */
     drive_machine(c);
 
     /* wait for next event */
@@ -2089,16 +2122,18 @@
     run_deferred_deletes();
 }
 
-void run_deferred_deletes()
+/* Call run_deferred_deletes instead of this. */
+void do_run_deferred_deletes()
 {
     int i, j=0;
+
     for (i=0; i<delcurr; i++) {
         item *it = todelete[i];
         if (item_delete_lock_over(it)) {
             assert(it->refcount > 0);
             it->it_flags &= ~ITEM_DELETED;
-            item_unlink(it);
-            item_remove(it);
+            do_item_unlink(it);
+            do_item_remove(it);
         } else {
             todelete[j++] = it;
         }
@@ -2127,6 +2162,9 @@
     printf("-P <file>     save PID in <file>, only used with -d option\n");
     printf("-f <factor>   chunk size growth factor, default 1.25\n");
     printf("-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
+#ifdef USE_THREADS
+    printf("-t <num>      number of threads to use, default 4\n");
+#endif
     return;
 }
 
@@ -2259,7 +2297,7 @@
     setbuf(stderr, NULL);
 
     /* process arguments */
-    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
+    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:t:")) != -1) {
         switch (c) {
         case 'U':
             settings.udpport = atoi(optarg);
@@ -2328,6 +2366,13 @@
                 return 1;
             }
             break;
+        case 't':
+            settings.num_threads = atoi(optarg);
+            if (settings.num_threads == 0) {
+                fprintf(stderr, "Number of threads must be greater than 0\n");
+                return 1;
+            }
+            break;
         default:
             fprintf(stderr, "Illegal argument \"%c\"\n", c);
             return 1;
@@ -2488,23 +2533,26 @@
         fprintf(stderr, "failed to create listening connection");
         exit(1);
     }
+    /* save the PID in if we're a daemon */
+    if (daemonize)
+        save_pid(getpid(),pid_file);
+    /* start up worker threads if MT mode */
+    thread_init(settings.num_threads, main_base);
     /* initialise clock event */
     clock_handler(0,0,0);
     /* initialise deletion array and timer event */
     deltotal = 200; delcurr = 0;
     todelete = malloc(sizeof(item *)*deltotal);
     delete_handler(0,0,0); /* sets up the event */
-    /* save the PID in if we're a daemon */
-    if (daemonize)
-        save_pid(getpid(),pid_file);
-    /* create the initial listening udp connection */
-    if (u_socket > -1 &&
-        !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
-                            UDP_READ_BUFFER_SIZE, 1, main_base))) {
-        fprintf(stderr, "failed to create udp connection");
-        exit(1);
+    /* create the initial listening udp connection, monitored on all threads */
+    if (u_socket > -1) {
+        for (c = 0; c < settings.num_threads; c++) {
+            /* this is guaranteed to hit all threads because we round-robin */
+            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
+                              UDP_READ_BUFFER_SIZE, 1);
+        }
     }
-    /* enter the loop */
+    /* enter the event loop */
     event_base_loop(main_base, 0);
     /* remove the PID file if we're a daemon */
     if (daemonize)

Modified: branches/multithreaded/server/memcached.h
===================================================================
--- branches/multithreaded/server/memcached.h	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/memcached.h	2006-11-23 07:38:27 UTC (rev 442)
@@ -59,6 +59,7 @@
     char *socketpath;   /* path to unix socket if using local socket */
     double factor;          /* chunk size growth factor */
     int chunk_size;
+    int num_threads;        /* number of libevent threads to run */
 };
 
 extern struct stats stats;
@@ -223,25 +224,25 @@
 unsigned int slabs_clsid(size_t size);
 
 /* Allocate object of given length. 0 on error */
-void *slabs_alloc(size_t size);
+void *do_slabs_alloc(size_t size);
 
 /* Free previously allocated object */
-void slabs_free(void *ptr, size_t size);
+void do_slabs_free(void *ptr, size_t size);
 
 /* Fill buffer with stats */
-char* slabs_stats(int *buflen);
+char *do_slabs_stats(int *buflen);
 
 /* Request some slab be moved between classes
   1 = success
    0 = fail
    -1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid);
-int slabs_newslab(unsigned int id);
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid);
+int do_slabs_newslab(unsigned int id);
 
 /* event handling, network IO */
 void event_handler(int fd, short which, void *arg);
-conn *conn_from_freelist();
-int conn_add_to_freelist(conn *c);
+conn *do_conn_from_freelist();
+int do_conn_add_to_freelist(conn *c);
 conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp, struct event_base *event_base);
 void conn_close(conn *c);
 void conn_init(void);
@@ -258,11 +259,11 @@
 int transmit(conn *c);
 int ensure_iov_space(conn *c);
 int add_iov(conn *c, const void *buf, int len);
-char *defer_delete(item *item, time_t exptime);
-void run_deferred_deletes(void);
 int add_msghdr(conn *c);
-char *add_delta(item *item, int incr, unsigned int delta, char *buf);
-int store_item(item *item, int comm);
+char *do_defer_delete(item *item, time_t exptime);
+void do_run_deferred_deletes(void);
+char *do_add_delta(item *item, int incr, unsigned int delta, char *buf);
+int do_store_item(item *item, int comm);
 /* stats */
 void stats_reset(void);
 void stats_init(void);
@@ -273,27 +274,127 @@
 item *assoc_find(const char *key, size_t nkey);
 int assoc_insert(item *item);
 void assoc_delete(const char *key, size_t nkey);
-void assoc_move_next_bucket(void);
+void do_assoc_move_next_bucket(void);
 
 void item_init(void);
-item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+item *do_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
 void item_free(item *it);
 int item_size_ok(char *key, size_t nkey, int flags, int nbytes);
-
-int item_link(item *it);    /* may fail if transgresses limits */
-void item_unlink(item *it);
-void item_remove(item *it);
-void item_update(item *it);   /* update LRU time to current and reposition */
-int item_replace(item *it, item *new_it);
+int do_item_link(item *it);    /* may fail if transgresses limits */
+void do_item_unlink(item *it);
+void do_item_remove(item *it);
+void do_item_update(item *it);   /* update LRU time to current and reposition */
+int do_item_replace(item *it, item *new_it);
 char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes);
 char *item_stats_sizes(int *bytes);
 void item_stats(char *buffer, int buflen);
-item *item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
-item *item_get_nocheck(char *key, size_t nkey);
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *do_item_get_nocheck(char *key, size_t nkey);
 item *item_get(char *key, size_t nkey);
-void item_flush_expired(void);
+void do_item_flush_expired(void);
 
 /* time handling */
 void set_current_time ();  /* update the global variable holding
                               global 32-bit seconds-since-start time
                               (to avoid 64 bit time_t) */
+
+/*
+ * In multithreaded mode, we wrap certain functions with lock management and
+ * replace the logic of some other functions. All wrapped functions have
+ * "mt_" and "do_" variants. In multithreaded mode, the plain version of a
+ * function is #define-d to the "mt_" variant, which often just grabs a
+ * lock and calls the "do_" function. In singlethreaded mode, the "do_"
+ * function is called directly.
+ *
+ * Functions such as the libevent-related calls that need to do cross-thread
+ * communication in multithreaded mode (rather than actually doing the work
+ * in the current thread) are called via "dispatch_" frontends, which are
+ * also #define-d to directly call the underlying code in singlethreaded mode.
+ */
+#ifdef USE_THREADS
+
+void thread_init(int nthreads, struct event_base *main_base);
+int  dispatch_event_add(int thread, conn *c);
+void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
+
+/* Lock wrappers for cache functions that are called from main loop. */
+char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf);
+conn *mt_conn_from_freelist(void);
+int   mt_conn_add_to_freelist(conn *c);
+char *mt_defer_delete(item *it, time_t exptime);
+int   mt_is_listen_thread(void);
+item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+void  mt_item_flush_expired(void);
+item *mt_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *mt_item_get_nocheck(char *key, size_t nkey);
+int   mt_item_link(item *it);
+void  mt_item_remove(item *it);
+int   mt_item_replace(item *it, item *new_it);
+void  mt_item_unlink(item *it);
+void  mt_item_update(item *it);
+void  mt_run_deferred_deletes(void);
+void *mt_slabs_alloc(size_t size);
+void  mt_slabs_free(void *ptr, size_t size);
+int   mt_slabs_reassign(unsigned char srcid, unsigned char dstid);
+char *mt_slabs_stats(int *buflen);
+void  mt_stats_lock(void);
+void  mt_stats_unlock(void);
+int   mt_store_item(item *item, int comm);
+
+
+# define add_delta(x,y,z,a)          mt_add_delta(x,y,z,a)
+# define assoc_move_next_bucket()    mt_assoc_move_next_bucket()
+# define conn_from_freelist()        mt_conn_from_freelist()
+# define conn_add_to_freelist(x)     mt_conn_add_to_freelist(x)
+# define defer_delete(x,y)           mt_defer_delete(x,y)
+# define is_listen_thread()          mt_is_listen_thread()
+# define item_alloc(x,y,z,a,b)       mt_item_alloc(x,y,z,a,b)
+# define item_flush_expired()        mt_item_flush_expired()
+# define item_get_nocheck(x,y)       mt_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z)
+# define item_link(x)                mt_item_link(x)
+# define item_remove(x)              mt_item_remove(x)
+# define item_replace(x,y)           mt_item_replace(x,y)
+# define item_update(x)              mt_item_update(x)
+# define item_unlink(x)              mt_item_unlink(x)
+# define run_deferred_deletes()      mt_run_deferred_deletes()
+# define slabs_alloc(x)              mt_slabs_alloc(x)
+# define slabs_free(x,y)             mt_slabs_free(x,y)
+# define slabs_reassign(x,y)         mt_slabs_reassign(x,y)
+# define slabs_stats(x)              mt_slabs_stats(x)
+# define store_item(x,y)             mt_store_item(x,y)
+
+# define STATS_LOCK()                mt_stats_lock()
+# define STATS_UNLOCK()              mt_stats_unlock()
+
+#else /* !USE_THREADS */
+
+# define add_delta(x,y,z,a)          do_add_delta(x,y,z,a)
+# define assoc_move_next_bucket()    do_assoc_move_next_bucket()
+# define conn_from_freelist()        do_conn_from_freelist()
+# define conn_add_to_freelist(x)     do_conn_add_to_freelist(x)
+# define defer_delete(x,y)           do_defer_delete(x,y)
+# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
+# define dispatch_event_add(t,c)     event_add(&(c)->event, 0)
+# define is_listen_thread()          1
+# define item_alloc(x,y,z,a,b)       do_item_alloc(x,y,z,a,b)
+# define item_flush_expired()        do_item_flush_expired()
+# define item_get_nocheck(x,y)       do_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z)
+# define item_link(x)                do_item_link(x)
+# define item_remove(x)              do_item_remove(x)
+# define item_replace(x,y)           do_item_replace(x,y)
+# define item_unlink(x)              do_item_unlink(x)
+# define item_update(x)              do_item_update(x)
+# define run_deferred_deletes()      do_run_deferred_deletes()
+# define slabs_alloc(x)              do_slabs_alloc(x)
+# define slabs_free(x,y)             do_slabs_free(x,y)
+# define slabs_reassign(x,y)         do_slabs_reassign(x,y)
+# define slabs_stats(x)              do_slabs_stats(x)
+# define store_item(x,y)             do_store_item(x,y)
+# define thread_init(x,y)            0
+
+# define STATS_LOCK()                /**/
+# define STATS_UNLOCK()              /**/
+
+#endif /* !USE_THREADS */

Modified: branches/multithreaded/server/slabs.c
===================================================================
--- branches/multithreaded/server/slabs.c	2006-11-23 05:44:38 UTC (rev 441)
+++ branches/multithreaded/server/slabs.c	2006-11-23 07:38:27 UTC (rev 442)
@@ -26,6 +26,7 @@
 #define POWER_LARGEST  200
 #define POWER_BLOCK 1048576
 #define CHUNK_ALIGN_BYTES (sizeof(void *))
+#define DONT_PREALLOC_SLABS
 
 /* powers-of-N allocation structures */
 
@@ -133,7 +134,7 @@
     for(i=POWER_SMALLEST; i<=POWER_LARGEST; i++) {
         if (++prealloc > maxslabs)
             return;
-        slabs_newslab(i);
+        do_slabs_newslab(i);
     }
 
 }
@@ -150,7 +151,7 @@
     return 1;
 }
 
-int slabs_newslab(unsigned int id) {
+int do_slabs_newslab(unsigned int id) {
     slabclass_t *p = &slabclass[id];
 #ifdef ALLOW_SLABS_REASSIGN
     int len = POWER_BLOCK;
@@ -176,7 +177,7 @@
     return 1;
 }
 
-void *slabs_alloc(size_t size) {
+void *do_slabs_alloc(size_t size) {
     slabclass_t *p;
 
     unsigned char id = slabs_clsid(size);
@@ -195,7 +196,7 @@
 
     /* fail unless we have space at the end of a recently allocated page,
        we have something on our freelist, or we could allocate a new page */
-    if (! (p->end_page_ptr || p->sl_curr || slabs_newslab(id)))
+    if (! (p->end_page_ptr || p->sl_curr || do_slabs_newslab(id)))
         return 0;
 
     /* return off our freelist, if we have one */
@@ -216,7 +217,7 @@
     return 0;  /* shouldn't ever get here */
 }
 
-void slabs_free(void *ptr, size_t size) {
+void do_slabs_free(void *ptr, size_t size) {
     unsigned char id = slabs_clsid(size);
     slabclass_t *p;
 
@@ -245,7 +246,7 @@
     return;
 }
 
-char* slabs_stats(int *buflen) {
+char* do_slabs_stats(int *buflen) {
     int i, total;
     char *buf = (char*) malloc(power_largest * 200 + 100);
     char *bufcurr = buf;
@@ -287,7 +288,7 @@
    1 = success
    0 = fail
    -1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid) {
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid) {
     void *slab, *slab_end;
     slabclass_t *p, *dp;
     void *iter;




More information about the memcached-commits mailing list