[memcached] sgrimm, r431: Multithreading prep: Update the referenc...

commits at code.sixapart.com commits at code.sixapart.com
Fri Nov 10 01:04:21 UTC 2006


Multithreading prep: Update the reference count logic so that a reference to
an item in a local variable or conn structure is reflected in the reference
count. The main program logic now frees items solely by decreasing their
reference counts, rather than by explicitly calling item_free(), since
another thread could be manipulating the same item at the same time.

Refactor the deferred delete logic into separate functions. An item's
presence in the deferred-delete queue also counts as an open reference.


U   branches/multithreaded/server/items.c
U   branches/multithreaded/server/memcached.c
U   branches/multithreaded/server/memcached.h


Modified: branches/multithreaded/server/items.c
===================================================================
--- branches/multithreaded/server/items.c	2006-11-10 00:34:53 UTC (rev 430)
+++ branches/multithreaded/server/items.c	2006-11-10 01:04:21 UTC (rev 431)
@@ -35,6 +35,17 @@
     }
 }
 
+/* Enable this for reference-count debugging. */
+#if 0
+# define DEBUG_REFCNT(it,op) \
+                fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
+                        it, op, it->refcount, \
+                        (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
+                        (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
+                        (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
+#else
+# define DEBUG_REFCNT(it,op) while(0)
+#endif
 
 /*
  * Generates the variable-sized part of the header for an object.
@@ -104,7 +115,8 @@
     assert(it != heads[it->slabs_clsid]);
 
     it->next = it->prev = it->h_next = 0;
-    it->refcount = 0;
+    it->refcount = 1;     /* the caller will have a reference */
+    DEBUG_REFCNT(it, '*');
     it->it_flags = 0;
     it->nkey = nkey;
     it->nbytes = nbytes;
@@ -125,6 +137,7 @@
     /* so slab size changer can tell later if item is already free or not */
     it->slabs_clsid = 0;
     it->it_flags |= ITEM_SLABBED;
+    DEBUG_REFCNT(it, 'F');
     slabs_free(it, ntotal);
 }
 
@@ -210,7 +223,10 @@
 
 void item_remove(item *it) {
     assert((it->it_flags & ITEM_SLABBED) == 0);
-    if (it->refcount) it->refcount--;
+    if (it->refcount) {
+        it->refcount--;
+        DEBUG_REFCNT(it, '-');
+    }
     assert((it->it_flags & ITEM_DELETED) == 0 || it->refcount);
     if (it->refcount == 0 && (it->it_flags & ITEM_LINKED) == 0) {
         item_free(it);
@@ -354,6 +370,11 @@
         item_unlink(it);
         it = 0;
     }
+
+    if (it) {
+        it->refcount++;
+        DEBUG_REFCNT(it, '+');
+    }
     return it;
 }
 
@@ -364,5 +385,9 @@
 /* returns an item whether or not it's delete-locked or expired. */
 item *item_get_nocheck(char *key, size_t nkey) {
     item *it = assoc_find(key, nkey);
+    if (it) {
+        it->refcount++;
+        DEBUG_REFCNT(it, '+');
+    }
     return it;
 }

Modified: branches/multithreaded/server/memcached.c
===================================================================
--- branches/multithreaded/server/memcached.c	2006-11-10 00:34:53 UTC (rev 430)
+++ branches/multithreaded/server/memcached.c	2006-11-10 01:04:21 UTC (rev 431)
@@ -269,7 +269,7 @@
 
 void conn_cleanup(conn *c) {
     if (c->item) {
-        item_free(c->item);
+        item_remove(c->item);
         c->item = 0;
     }
 
@@ -564,7 +564,7 @@
             out_string(c, "NOT_STORED");
         }
     }
-
+    item_remove(c->item);       /* release the c->item reference */
     c->item = 0;
 }
 
@@ -603,6 +603,8 @@
         stored = 1;
      }
 
+     if (old_it)
+         item_remove(old_it);         /* release our reference */
      return stored;
 }
 
@@ -929,8 +931,8 @@
                 if (settings.verbose > 1)
                     fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
 
+                /* item_get() has incremented it->refcount for us */
                 stats.get_hits++;
-                it->refcount++;
                 item_update(it);
                 *(c->ilist + i) = it;
                 i++;
@@ -1024,7 +1026,6 @@
     c->ritem = ITEM_data(it);
     c->rlbytes = it->nbytes;
     conn_set_state(c, conn_nread);
-    return;
 }
 
 void process_arithmetic_command(conn *c, token_t* tokens, size_t ntokens, int incr) {
@@ -1070,6 +1071,7 @@
     }
 
     out_string(c, add_delta(it, incr, delta, temp));
+    item_remove(it);         /* release our reference */
 }
 
 /*
@@ -1113,6 +1115,7 @@
         memcpy(ITEM_data(new_it), buf, res);
         memcpy(ITEM_data(new_it) + res, "\r\n", 2);
         item_replace(it, new_it);
+        item_remove(new_it);       /* release our reference */
     } else { /* replace in-place */
         memcpy(ITEM_data(it), buf, res);
         memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
@@ -1157,16 +1160,27 @@
     }
 
     it = item_get(key, nkey);
-    if (!it) {
+    if (it) {
+        if (exptime == 0) {
+            item_unlink(it);
+            item_remove(it);      /* release our reference */
+            out_string(c, "DELETED");
+        } else {
+            /* our reference will be transfered to the delete queue */
+            out_string(c, defer_delete(it, exptime));
+        }
+    } else {
         out_string(c, "NOT_FOUND");
-        return;
     }
-    
-    if (exptime == 0) {
-        item_unlink(it);
-        out_string(c, "DELETED");
-        return;
-    }
+}
+
+/*
+ * Adds an item to the deferred-delete list so it can be reaped later.
+ *
+ * Returns the result to send to the client.
+ */
+char *defer_delete(item *it, time_t exptime)
+{
     if (delcurr >= deltotal) {
         item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
         if (new_delete) {
@@ -1177,18 +1191,17 @@
              * can't delete it immediately, user wants a delay,
              * but we ran out of memory for the delete queue
              */
-            out_string(c, "SERVER_ERROR out of memory");
-            return;
+            item_remove(it);    /* release reference */
+            return "SERVER_ERROR out of memory";
         }
     }
     
-    it->refcount++;
     /* use its expiration time as its deletion time now */
     it->exptime = realtime(exptime);
     it->it_flags |= ITEM_DELETED;
     todelete[delcurr++] = it;
-    out_string(c, "DELETED");
-    return;
+
+    return "DELETED";
 }
 
 void process_command(conn *c, char *command) {
@@ -2048,21 +2061,24 @@
     t.tv_sec = 5; t.tv_usec=0;
     evtimer_add(&deleteevent, &t);
 
-    {
-        int i, j=0;
-        for (i=0; i<delcurr; i++) {
-            item *it = todelete[i];
-            if (item_delete_lock_over(it)) {
-                assert(it->refcount > 0);
-                it->it_flags &= ~ITEM_DELETED;
-                item_unlink(it);
-                item_remove(it);
-            } else {
-                todelete[j++] = it;
-            }
+    run_deferred_deletes();
+}
+
+void run_deferred_deletes()
+{
+    int i, j=0;
+    for (i=0; i<delcurr; i++) {
+        item *it = todelete[i];
+        if (item_delete_lock_over(it)) {
+            assert(it->refcount > 0);
+            it->it_flags &= ~ITEM_DELETED;
+            item_unlink(it);
+            item_remove(it);
+        } else {
+            todelete[j++] = it;
         }
-        delcurr = j;
     }
+    delcurr = j;
 }
 
 void usage(void) {

Modified: branches/multithreaded/server/memcached.h
===================================================================
--- branches/multithreaded/server/memcached.h	2006-11-10 00:34:53 UTC (rev 430)
+++ branches/multithreaded/server/memcached.h	2006-11-10 01:04:21 UTC (rev 431)
@@ -256,6 +256,8 @@
 int transmit(conn *c);
 int ensure_iov_space(conn *c);
 int add_iov(conn *c, const void *buf, int len);
+char *defer_delete(item *item, time_t exptime);
+void run_deferred_deletes(void);
 int add_msghdr(conn *c);
 char *add_delta(item *item, int incr, unsigned int delta, char *buf);
 int store_item(item *item, int comm);




More information about the memcached-commits mailing list