[memcached] sgrimm, r315: Dynamically shrink I/O buffers and data ...

commits at code.sixapart.com commits at code.sixapart.com
Wed Aug 23 19:15:08 UTC 2006


Dynamically shrink I/O buffers and data structures if they grow too large.
This prevents a site that uses persistent TCP connections from gradually
consuming tons of memory if occasional large requests appear on various
connections.



U   branches/facebook/memcached.c
U   branches/facebook/memcached.h


Modified: branches/facebook/memcached.c
===================================================================
--- branches/facebook/memcached.c	2006-08-21 21:39:45 UTC (rev 314)
+++ branches/facebook/memcached.c	2006-08-23 19:15:07 UTC (rev 315)
@@ -176,9 +176,9 @@
 
         c->rsize = read_buffer_size;
         c->wsize = DATA_BUFFER_SIZE;
-        c->isize = 200;    /* TODO: make these two things #define'd if not runtime */
-        c->iovsize = 200;  /* TODO: can this be different on init, or must it be c->isize?  two magic values is code is bad */
-        c->msgsize = 10;  /* TODO: likewise, what is this magic constant? */
+        c->isize = ITEM_LIST_INITIAL;
+        c->iovsize = IOV_LIST_INITIAL;
+        c->msgsize = MSG_LIST_INITIAL;
         c->hdrsize = 0;
 
         c->rbuf = (char *) malloc(c->rsize);
@@ -277,6 +277,27 @@
     }
 }
 
+/*
+ * Frees a connection.
+ */
+static void conn_free(conn *c) {
+    if (c) {
+        if (c->hdrbuf)
+            free(c->hdrbuf);
+        if (c->msglist)
+            free(c->msglist);
+        if (c->rbuf)
+            free(c->rbuf);
+        if (c->wbuf)
+            free(c->wbuf);
+        if (c->ilist)
+            free(c->ilist);
+        if (c->iov)
+            free(c->iov);
+        free(c);
+    }
+}
+
 void conn_close(conn *c) {
     /* delete the event, the socket and the conn */
     event_del(&c->event);
@@ -287,8 +308,11 @@
     close(c->sfd);
     conn_cleanup(c);
 
-    /* if we have enough space in the free connections array, put the structure there */
-    if (freecurr < freetotal) {
+    /* if the connection has big buffers, just free it */
+    if (c->rsize > READ_BUFFER_HIGHWAT) {
+        conn_free(c);
+    } else if (freecurr < freetotal) {
+        /* if we have enough space in the free connections array, put the structure there */
         freeconns[freecurr++] = c;
     } else {
         /* try to enlarge free connections array */
@@ -298,14 +322,7 @@
             freeconns = new_freeconns;
             freeconns[freecurr++] = c;
         } else {
-            if (c->hdrbuf)
-                free(c->hdrbuf);
-            free(c->msglist);
-            free(c->rbuf);
-            free(c->wbuf);
-            free(c->ilist);
-            free(c->iov);
-            free(c);
+            conn_free(c);
         }
     }
 
@@ -314,8 +331,64 @@
     return;
 }
 
+/*
+ * Reallocates memory and updates a buffer size if successful.
+ */
+int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
+    void *newbuf = realloc(*orig, newsize * bytes_per_item);
+    if (newbuf) {
+        *orig = newbuf;
+        *size = newsize;
+       return 1;
+    }
+    return 0;
+}
 
+ /*
+ * Shrinks a connection's buffers if they're too big.  This prevents
+ * periodic large "get" requests from permanently chewing lots of server
+ * memory.
+ *
+ * This should only be called in between requests since it can wipe output
+ * buffers!
+ */
+void conn_shrink(conn *c) {
+    if (c->udp)
+        return;
+
+    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
+       do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
+    }
+
+    if (c->isize > ITEM_LIST_HIGHWAT) {
+        do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
+    }
+
+    if (c->msgsize > MSG_LIST_HIGHWAT) {
+        do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
+    }
+
+    if (c->iovsize > IOV_LIST_HIGHWAT) {
+        do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
+    }
+}
+
 /*
+ * Sets a connection's current state in the state machine. Any special
+ * processing that needs to happen on certain state transitions can
+ * happen here.
+ */
+void conn_set_state(conn *c, int state) {
+    if (state != c->state) {
+        if (state == conn_read) {
+            conn_shrink(c);
+        }
+        c->state = state;
+    }
+}
+
+
+/*
  * Ensures that there is room for another struct iovec in a connection's
  * iov list.
  *
@@ -448,7 +521,7 @@
     c->wbytes = len + 2;
     c->wcurr = c->wbuf;
 
-    c->state = conn_write;
+    conn_set_state(c, conn_write);
     c->write_and_go = conn_read;
     return;
 }
@@ -614,7 +687,7 @@
         c->write_and_free=wbuf;
         c->wcurr=wbuf;
         c->wbytes = res + 6;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         close(fd);
         return;
@@ -638,7 +711,7 @@
         c->write_and_free = buf;
         c->wcurr = buf;
         c->wbytes = bytes;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         return;
     }
@@ -653,7 +726,7 @@
         c->write_and_free = buf;
         c->wcurr = buf;
         c->wbytes = bytes;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         return;
     }
@@ -676,7 +749,7 @@
         c->write_and_free = buf;
         c->wcurr = buf;
         c->wbytes = bytes;
-        c->state = conn_write;
+        conn_set_state(c, conn_write);
         c->write_and_go = conn_read;
         return;
     }
@@ -749,7 +822,7 @@
         c->item = it;
         c->ritem = ITEM_data(it);
         c->rlbytes = it->nbytes;
-        c->state = conn_nread;
+        conn_set_state(c, conn_nread);
         return;
     }
 
@@ -918,7 +991,7 @@
             out_string(c, "SERVER_ERROR out of memory");
         }
         else {
-            c->state = conn_mwrite;
+            conn_set_state(c, conn_mwrite);
             c->msgcurr = 0;
         }
         return;
@@ -1037,7 +1110,7 @@
                 c->bucket = bucket;
                 c->gen = gen;
             }
-            c->state = conn_read;
+            conn_set_state(c, conn_read);
             return;
         } else {
             out_string(c, "CLIENT_ERROR bad format");
@@ -1077,7 +1150,7 @@
     }
 
     if (strcmp(command, "quit") == 0) {
-        c->state = conn_closing;
+        conn_set_state(c, conn_closing);
         return;
     }
 
@@ -1219,7 +1292,7 @@
         }
         if (res == 0) {
             /* connection closed */
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
             return 1;
         }
         if (res == -1) {
@@ -1283,7 +1356,7 @@
             if (!update_event(c, EV_WRITE | EV_PERSIST)) {
                 if (settings.verbose > 0)
                     fprintf(stderr, "Couldn't update event\n");
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 return TRANSMIT_HARD_ERROR;
             }
             return TRANSMIT_SOFT_ERROR;
@@ -1294,9 +1367,9 @@
             perror("Failed to write, and not due to blocking");
 
         if (c->udp)
-            c->state = conn_read;
+            conn_set_state(c, conn_read);
         else
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
         return TRANSMIT_HARD_ERROR;
     } else {
         return TRANSMIT_COMPLETE;
@@ -1353,7 +1426,7 @@
             if (!update_event(c, EV_READ | EV_PERSIST)) {
                 if (settings.verbose > 0)
                     fprintf(stderr, "Couldn't update event\n");
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 break;
             }
             exit = 1;
@@ -1385,14 +1458,14 @@
                 break;
             }
             if (res == 0) { /* end of stream */
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 break;
             }
             if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
                 if (!update_event(c, EV_READ | EV_PERSIST)) {
                     if (settings.verbose > 0) 
                         fprintf(stderr, "Couldn't update event\n");
-                    c->state = conn_closing;
+                    conn_set_state(c, conn_closing);
                     break;
                 }
                 exit = 1;
@@ -1401,13 +1474,13 @@
             /* otherwise we have a real error, on which we close the connection */
             if (settings.verbose > 0)
                 fprintf(stderr, "Failed to read, and not due to blocking\n");
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
             break;
 
         case conn_swallow:
             /* we are reading sbytes and throwing them away */
             if (c->sbytes == 0) {
-                c->state = conn_read;
+                conn_set_state(c, conn_read);
                 break;
             }
 
@@ -1428,14 +1501,14 @@
                 break;
             }
             if (res == 0) { /* end of stream */
-                c->state = conn_closing;
+                conn_set_state(c, conn_closing);
                 break;
             }
             if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
                 if (!update_event(c, EV_READ | EV_PERSIST)) {
                     if (settings.verbose > 0)
                         fprintf(stderr, "Couldn't update event\n");
-                    c->state = conn_closing;
+                    conn_set_state(c, conn_closing);
                     break;
                 }
                 exit = 1;
@@ -1444,7 +1517,7 @@
             /* otherwise we have a real error, on which we close the connection */
             if (settings.verbose > 0)
                 fprintf(stderr, "Failed to read, and not due to blocking\n");
-            c->state = conn_closing;
+            conn_set_state(c, conn_closing);
             break;
 
         case conn_write:
@@ -1458,7 +1531,7 @@
                         c->udp && build_udp_headers(c)) {
                     if (settings.verbose > 0)
                         fprintf(stderr, "Couldn't build response\n");
-                    c->state = conn_closing;
+                    conn_set_state(c, conn_closing);
                     break;
                 }
             }
@@ -1476,17 +1549,17 @@
                         c->icurr++;
                         c->ileft--;
                     }
-                    c->state = conn_read;
+                    conn_set_state(c, conn_read);
                 } else if (c->state == conn_write) {
                     if (c->write_and_free) {
                         free(c->write_and_free);
                         c->write_and_free = 0;
                     }
-                    c->state = c->write_and_go;
+                    conn_set_state(c, c->write_and_go);
                 } else {
                     if (settings.verbose > 0)
                         fprintf(stderr, "Unexpected state %d\n", c->state);
-                    c->state = conn_closing;
+                    conn_set_state(c, conn_closing);
                 }
                 break;
 

Modified: branches/facebook/memcached.h
===================================================================
--- branches/facebook/memcached.h	2006-08-21 21:39:45 UTC (rev 314)
+++ branches/facebook/memcached.h	2006-08-23 19:15:07 UTC (rev 315)
@@ -7,6 +7,21 @@
 #define UDP_HEADER_SIZE 8
 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
 
+/* Initial size of list of items being returned by "get". */
+#define ITEM_LIST_INITIAL 200
+
+/* Initial size of the sendmsg() scatter/gather array. */
+#define IOV_LIST_INITIAL 400
+
+/* Initial number of sendmsg() argument structures to allocate. */
+#define MSG_LIST_INITIAL 10
+
+/* High water marks for buffer shrinking */
+#define READ_BUFFER_HIGHWAT 8192
+#define ITEM_LIST_HIGHWAT 400
+#define IOV_LIST_HIGHWAT 600
+#define MSG_LIST_HIGHWAT 100
+
 /* Time relative to server start. Smaller than time_t on 64-bit systems. */
 typedef unsigned int rel_time_t;
 




More information about the memcached-commits mailing list