[memcached] sgrimm,
r315: Dynamically shrink I/O buffers and data ...
commits at code.sixapart.com
commits at code.sixapart.com
Wed Aug 23 19:15:08 UTC 2006
Dynamically shrink I/O buffers and data structures if they grow too large.
This prevents a site that uses persistent TCP connections from gradually
consuming tons of memory if occasional large requests appear on various
connections.
U branches/facebook/memcached.c
U branches/facebook/memcached.h
Modified: branches/facebook/memcached.c
===================================================================
--- branches/facebook/memcached.c 2006-08-21 21:39:45 UTC (rev 314)
+++ branches/facebook/memcached.c 2006-08-23 19:15:07 UTC (rev 315)
@@ -176,9 +176,9 @@
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
- c->isize = 200; /* TODO: make these two things #define'd if not runtime */
- c->iovsize = 200; /* TODO: can this be different on init, or must it be c->isize? two magic values is code is bad */
- c->msgsize = 10; /* TODO: likewise, what is this magic constant? */
+ c->isize = ITEM_LIST_INITIAL;
+ c->iovsize = IOV_LIST_INITIAL;
+ c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
c->rbuf = (char *) malloc(c->rsize);
@@ -277,6 +277,27 @@
}
}
+/*
+ * Frees a connection.
+ */
+static void conn_free(conn *c) {
+ if (c) {
+ if (c->hdrbuf)
+ free(c->hdrbuf);
+ if (c->msglist)
+ free(c->msglist);
+ if (c->rbuf)
+ free(c->rbuf);
+ if (c->wbuf)
+ free(c->wbuf);
+ if (c->ilist)
+ free(c->ilist);
+ if (c->iov)
+ free(c->iov);
+ free(c);
+ }
+}
+
void conn_close(conn *c) {
/* delete the event, the socket and the conn */
event_del(&c->event);
@@ -287,8 +308,11 @@
close(c->sfd);
conn_cleanup(c);
- /* if we have enough space in the free connections array, put the structure there */
- if (freecurr < freetotal) {
+ /* if the connection has big buffers, just free it */
+ if (c->rsize > READ_BUFFER_HIGHWAT) {
+ conn_free(c);
+ } else if (freecurr < freetotal) {
+ /* if we have enough space in the free connections array, put the structure there */
freeconns[freecurr++] = c;
} else {
/* try to enlarge free connections array */
@@ -298,14 +322,7 @@
freeconns = new_freeconns;
freeconns[freecurr++] = c;
} else {
- if (c->hdrbuf)
- free(c->hdrbuf);
- free(c->msglist);
- free(c->rbuf);
- free(c->wbuf);
- free(c->ilist);
- free(c->iov);
- free(c);
+ conn_free(c);
}
}
@@ -314,8 +331,64 @@
return;
}
+/*
+ * Reallocates memory and updates a buffer size if successful.
+ */
+int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
+ void *newbuf = realloc(*orig, newsize * bytes_per_item);
+ if (newbuf) {
+ *orig = newbuf;
+ *size = newsize;
+ return 1;
+ }
+ return 0;
+}
+ /*
+ * Shrinks a connection's buffers if they're too big. This prevents
+ * periodic large "get" requests from permanently chewing lots of server
+ * memory.
+ *
+ * This should only be called in between requests since it can wipe output
+ * buffers!
+ */
+void conn_shrink(conn *c) {
+ if (c->udp)
+ return;
+
+ if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
+ do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
+ }
+
+ if (c->isize > ITEM_LIST_HIGHWAT) {
+ do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
+ }
+
+ if (c->msgsize > MSG_LIST_HIGHWAT) {
+ do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
+ }
+
+ if (c->iovsize > IOV_LIST_HIGHWAT) {
+ do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
+ }
+}
+
/*
+ * Sets a connection's current state in the state machine. Any special
+ * processing that needs to happen on certain state transitions can
+ * happen here.
+ */
+void conn_set_state(conn *c, int state) {
+ if (state != c->state) {
+ if (state == conn_read) {
+ conn_shrink(c);
+ }
+ c->state = state;
+ }
+}
+
+
+/*
* Ensures that there is room for another struct iovec in a connection's
* iov list.
*
@@ -448,7 +521,7 @@
c->wbytes = len + 2;
c->wcurr = c->wbuf;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -614,7 +687,7 @@
c->write_and_free=wbuf;
c->wcurr=wbuf;
c->wbytes = res + 6;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
close(fd);
return;
@@ -638,7 +711,7 @@
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -653,7 +726,7 @@
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -676,7 +749,7 @@
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
- c->state = conn_write;
+ conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
@@ -749,7 +822,7 @@
c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
- c->state = conn_nread;
+ conn_set_state(c, conn_nread);
return;
}
@@ -918,7 +991,7 @@
out_string(c, "SERVER_ERROR out of memory");
}
else {
- c->state = conn_mwrite;
+ conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
return;
@@ -1037,7 +1110,7 @@
c->bucket = bucket;
c->gen = gen;
}
- c->state = conn_read;
+ conn_set_state(c, conn_read);
return;
} else {
out_string(c, "CLIENT_ERROR bad format");
@@ -1077,7 +1150,7 @@
}
if (strcmp(command, "quit") == 0) {
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
return;
}
@@ -1219,7 +1292,7 @@
}
if (res == 0) {
/* connection closed */
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
return 1;
}
if (res == -1) {
@@ -1283,7 +1356,7 @@
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
return TRANSMIT_HARD_ERROR;
}
return TRANSMIT_SOFT_ERROR;
@@ -1294,9 +1367,9 @@
perror("Failed to write, and not due to blocking");
if (c->udp)
- c->state = conn_read;
+ conn_set_state(c, conn_read);
else
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
return TRANSMIT_HARD_ERROR;
} else {
return TRANSMIT_COMPLETE;
@@ -1353,7 +1426,7 @@
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
exit = 1;
@@ -1385,14 +1458,14 @@
break;
}
if (res == 0) { /* end of stream */
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
exit = 1;
@@ -1401,13 +1474,13 @@
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
case conn_swallow:
/* we are reading sbytes and throwing them away */
if (c->sbytes == 0) {
- c->state = conn_read;
+ conn_set_state(c, conn_read);
break;
}
@@ -1428,14 +1501,14 @@
break;
}
if (res == 0) { /* end of stream */
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
exit = 1;
@@ -1444,7 +1517,7 @@
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
case conn_write:
@@ -1458,7 +1531,7 @@
c->udp && build_udp_headers(c)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't build response\n");
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
break;
}
}
@@ -1476,17 +1549,17 @@
c->icurr++;
c->ileft--;
}
- c->state = conn_read;
+ conn_set_state(c, conn_read);
} else if (c->state == conn_write) {
if (c->write_and_free) {
free(c->write_and_free);
c->write_and_free = 0;
}
- c->state = c->write_and_go;
+ conn_set_state(c, c->write_and_go);
} else {
if (settings.verbose > 0)
fprintf(stderr, "Unexpected state %d\n", c->state);
- c->state = conn_closing;
+ conn_set_state(c, conn_closing);
}
break;
Modified: branches/facebook/memcached.h
===================================================================
--- branches/facebook/memcached.h 2006-08-21 21:39:45 UTC (rev 314)
+++ branches/facebook/memcached.h 2006-08-23 19:15:07 UTC (rev 315)
@@ -7,6 +7,21 @@
#define UDP_HEADER_SIZE 8
#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
+/* Initial size of list of items being returned by "get". */
+#define ITEM_LIST_INITIAL 200
+
+/* Initial size of the sendmsg() scatter/gather array. */
+#define IOV_LIST_INITIAL 400
+
+/* Initial number of sendmsg() argument structures to allocate. */
+#define MSG_LIST_INITIAL 10
+
+/* High water marks for buffer shrinking */
+#define READ_BUFFER_HIGHWAT 8192
+#define ITEM_LIST_HIGHWAT 400
+#define IOV_LIST_HIGHWAT 600
+#define MSG_LIST_HIGHWAT 100
+
/* Time relative to server start. Smaller than time_t on 64-bit systems. */
typedef unsigned int rel_time_t;
More information about the memcached-commits
mailing list