[memcached] sgrimm,
r431: Multithreading prep: Update the referenc...
commits at code.sixapart.com
commits at code.sixapart.com
Fri Nov 10 01:04:21 UTC 2006
Multithreading prep: Update the reference count logic so that a reference to
an item in a local variable or conn structure is reflected in the reference
count. The main program logic now frees items solely by decreasing their
reference counts, rather than by explicitly calling item_free(), since
another thread could be manipulating the same item at the same time.
Refactor the deferred delete logic into separate functions. An item's
presence in the deferred-delete queue also counts as an open reference.
U branches/multithreaded/server/items.c
U branches/multithreaded/server/memcached.c
U branches/multithreaded/server/memcached.h
Modified: branches/multithreaded/server/items.c
===================================================================
--- branches/multithreaded/server/items.c 2006-11-10 00:34:53 UTC (rev 430)
+++ branches/multithreaded/server/items.c 2006-11-10 01:04:21 UTC (rev 431)
@@ -35,6 +35,17 @@
}
}
+/* Enable this for reference-count debugging. */
+#if 0
+# define DEBUG_REFCNT(it,op) \
+ fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
+ it, op, it->refcount, \
+ (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
+ (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
+ (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
+#else
+# define DEBUG_REFCNT(it,op) while(0)
+#endif
/*
* Generates the variable-sized part of the header for an object.
@@ -104,7 +115,8 @@
assert(it != heads[it->slabs_clsid]);
it->next = it->prev = it->h_next = 0;
- it->refcount = 0;
+ it->refcount = 1; /* the caller will have a reference */
+ DEBUG_REFCNT(it, '*');
it->it_flags = 0;
it->nkey = nkey;
it->nbytes = nbytes;
@@ -125,6 +137,7 @@
/* so slab size changer can tell later if item is already free or not */
it->slabs_clsid = 0;
it->it_flags |= ITEM_SLABBED;
+ DEBUG_REFCNT(it, 'F');
slabs_free(it, ntotal);
}
@@ -210,7 +223,10 @@
void item_remove(item *it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
- if (it->refcount) it->refcount--;
+ if (it->refcount) {
+ it->refcount--;
+ DEBUG_REFCNT(it, '-');
+ }
assert((it->it_flags & ITEM_DELETED) == 0 || it->refcount);
if (it->refcount == 0 && (it->it_flags & ITEM_LINKED) == 0) {
item_free(it);
@@ -354,6 +370,11 @@
item_unlink(it);
it = 0;
}
+
+ if (it) {
+ it->refcount++;
+ DEBUG_REFCNT(it, '+');
+ }
return it;
}
@@ -364,5 +385,9 @@
/* returns an item whether or not it's delete-locked or expired. */
item *item_get_nocheck(char *key, size_t nkey) {
item *it = assoc_find(key, nkey);
+ if (it) {
+ it->refcount++;
+ DEBUG_REFCNT(it, '+');
+ }
return it;
}
Modified: branches/multithreaded/server/memcached.c
===================================================================
--- branches/multithreaded/server/memcached.c 2006-11-10 00:34:53 UTC (rev 430)
+++ branches/multithreaded/server/memcached.c 2006-11-10 01:04:21 UTC (rev 431)
@@ -269,7 +269,7 @@
void conn_cleanup(conn *c) {
if (c->item) {
- item_free(c->item);
+ item_remove(c->item);
c->item = 0;
}
@@ -564,7 +564,7 @@
out_string(c, "NOT_STORED");
}
}
-
+ item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
@@ -603,6 +603,8 @@
stored = 1;
}
+ if (old_it)
+ item_remove(old_it); /* release our reference */
return stored;
}
@@ -929,8 +931,8 @@
if (settings.verbose > 1)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+ /* item_get() has incremented it->refcount for us */
stats.get_hits++;
- it->refcount++;
item_update(it);
*(c->ilist + i) = it;
i++;
@@ -1024,7 +1026,6 @@
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
conn_set_state(c, conn_nread);
- return;
}
void process_arithmetic_command(conn *c, token_t* tokens, size_t ntokens, int incr) {
@@ -1070,6 +1071,7 @@
}
out_string(c, add_delta(it, incr, delta, temp));
+ item_remove(it); /* release our reference */
}
/*
@@ -1113,6 +1115,7 @@
memcpy(ITEM_data(new_it), buf, res);
memcpy(ITEM_data(new_it) + res, "\r\n", 2);
item_replace(it, new_it);
+ item_remove(new_it); /* release our reference */
} else { /* replace in-place */
memcpy(ITEM_data(it), buf, res);
memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
@@ -1157,16 +1160,27 @@
}
it = item_get(key, nkey);
- if (!it) {
+ if (it) {
+ if (exptime == 0) {
+ item_unlink(it);
+ item_remove(it); /* release our reference */
+ out_string(c, "DELETED");
+ } else {
+ /* our reference will be transfered to the delete queue */
+ out_string(c, defer_delete(it, exptime));
+ }
+ } else {
out_string(c, "NOT_FOUND");
- return;
}
-
- if (exptime == 0) {
- item_unlink(it);
- out_string(c, "DELETED");
- return;
- }
+}
+
+/*
+ * Adds an item to the deferred-delete list so it can be reaped later.
+ *
+ * Returns the result to send to the client.
+ */
+char *defer_delete(item *it, time_t exptime)
+{
if (delcurr >= deltotal) {
item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
if (new_delete) {
@@ -1177,18 +1191,17 @@
* can't delete it immediately, user wants a delay,
* but we ran out of memory for the delete queue
*/
- out_string(c, "SERVER_ERROR out of memory");
- return;
+ item_remove(it); /* release reference */
+ return "SERVER_ERROR out of memory";
}
}
- it->refcount++;
/* use its expiration time as its deletion time now */
it->exptime = realtime(exptime);
it->it_flags |= ITEM_DELETED;
todelete[delcurr++] = it;
- out_string(c, "DELETED");
- return;
+
+ return "DELETED";
}
void process_command(conn *c, char *command) {
@@ -2048,21 +2061,24 @@
t.tv_sec = 5; t.tv_usec=0;
evtimer_add(&deleteevent, &t);
- {
- int i, j=0;
- for (i=0; i<delcurr; i++) {
- item *it = todelete[i];
- if (item_delete_lock_over(it)) {
- assert(it->refcount > 0);
- it->it_flags &= ~ITEM_DELETED;
- item_unlink(it);
- item_remove(it);
- } else {
- todelete[j++] = it;
- }
+ run_deferred_deletes();
+}
+
+void run_deferred_deletes()
+{
+ int i, j=0;
+ for (i=0; i<delcurr; i++) {
+ item *it = todelete[i];
+ if (item_delete_lock_over(it)) {
+ assert(it->refcount > 0);
+ it->it_flags &= ~ITEM_DELETED;
+ item_unlink(it);
+ item_remove(it);
+ } else {
+ todelete[j++] = it;
}
- delcurr = j;
}
+ delcurr = j;
}
void usage(void) {
Modified: branches/multithreaded/server/memcached.h
===================================================================
--- branches/multithreaded/server/memcached.h 2006-11-10 00:34:53 UTC (rev 430)
+++ branches/multithreaded/server/memcached.h 2006-11-10 01:04:21 UTC (rev 431)
@@ -256,6 +256,8 @@
int transmit(conn *c);
int ensure_iov_space(conn *c);
int add_iov(conn *c, const void *buf, int len);
+char *defer_delete(item *item, time_t exptime);
+void run_deferred_deletes(void);
int add_msghdr(conn *c);
char *add_delta(item *item, int incr, unsigned int delta, char *buf);
int store_item(item *item, int comm);
More information about the memcached-commits
mailing list