[patch] combining packets

Anatoly Vorobey mellon@pobox.com
Sat, 16 Aug 2003 05:30:27 +0300


Here's an attempt to combine many smaller packets into one larger packet 
(in answers to GET queries... that's the only place where it matters).
This patch also simplifies the conn_mwrite finite machine state 
considerably. Tested only cursorily so far, needs review and more 
testing.

main -> wcmtools          src/memcached/memcached.c
--- cvs/wcmtools/memcached/memcached.c	2003-08-11 16:59:47.000000000 +0300
+++ src/memcached/memcached.c	2003-08-16 05:23:39.000000000 +0300
@@ -149,8 +149,6 @@
     c->rcurr = c->rbuf;
     c->icurr = c->ilist; 
     c->ileft = 0;
-    c->iptr = c->ibuf;
-    c->ibytes = 0;
 
     c->write_and_go = conn_read;
     c->write_and_free = 0;
@@ -609,7 +607,8 @@
         if (c->ileft) {
             c->ipart = 0;
             c->state = conn_mwrite;
-            c->ibytes = 0;
+            c->wbytes = 0;
+            c->wcurr = c->wbuf;
             return;
         } else {
             out_string(c, "END");
@@ -763,7 +762,7 @@
     if (event_add(&c->event, 0) == -1) return 0;
     return 1;
 }
-    
+
 void drive_machine(conn *c) {
 
     int exit = 0;
@@ -919,6 +918,7 @@
                     c->write_and_free = 0;
                 }
                 c->state = c->write_and_go;
+                c->wcurr = c->wbuf;
                 break;
             }
             res = write(c->sfd, c->wcurr, c->wbytes);
@@ -946,74 +946,114 @@
             break;
         case conn_mwrite:
             /* 
-             * we're writing ibytes bytes from iptr. iptr alternates between
-             * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the 
-             * current item. When we finish a chunk, we choose the next one using 
+             * we're writing wbytes bytes from wcurr. wcurr alternates between
+             * wbuf, in which we build a string "VALUE...", and item data for smaller items,
+             * and ITEM_data(it) for the larger items. When we finish a chunk, we choose the next one using 
              * ipart, which has the following semantics: 0 - start the loop, 1 - 
              * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
              * move to the next item and build its ibuf; 3 - we finished all items, 
-             * write "END".
+             * write "END". wcurr points inside wbuf when we accumulate 
+             * data, but we restore it to =wbuf before actual writing.
              */
-            if (c->ibytes > 0) {
-                res = write(c->sfd, c->iptr, c->ibytes);
-                if (res > 0) {
-                    stats.bytes_written += res;
-                    c->iptr += res;
-                    c->ibytes -= res;
-                    break;
-                }
-                if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
-                        if (settings.verbose > 0)
-                            fprintf(stderr, "Couldn't update event\n");
-                        c->state = conn_closing;
-                        break;
-                    }
-                    exit = 1;
-                    break;
-                }
-                /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
-                   we have a real error, on which we close the connection */
-                if (settings.verbose > 0)
-                    fprintf(stderr, "Failed to write, and not due to blocking\n");
-                c->state = conn_closing;
-                break;
-            } else {
+            switch (c->ipart) {
+                int len, res;
                 item *it;
-                /* we finished a chunk, decide what to do next */
-                switch (c->ipart) {
-                case 1:
-                    it = *(c->icurr);
-                    assert((it->it_flags & ITEM_SLABBED) == 0);
-                    c->iptr = ITEM_data(it);
-                    c->ibytes = it->nbytes;
+
+            case 1:
+                it = *(c->icurr);
+                assert((it->it_flags & ITEM_SLABBED) == 0);
+
+                /* is there enough place in the fixed buffer for this item? */
+                if (c->wsize - c->wbytes >= it->nbytes) {
+                    memcpy(c->wcurr, ITEM_data(it), it->nbytes);
+                    c->wbytes += it->nbytes;
+                    c->wcurr += it->nbytes;
                     c->ipart = 2;
                     break;
-                case 2:
-                    it = *(c->icurr);
-                    item_remove(it);
-                    c->ileft--;
-                    if (c->ileft <= 0) {
-                        c->ipart = 3;
+                } else {
+                    if (c->wbytes) {
+                    /* 
+                     * flush the buffer, then come back here and try the
+                     * above size condition again.
+                     */
+                        c->wcurr = c->wbuf;
+                        c->state = conn_write;
+                        c->write_and_go = conn_mwrite;
                         break;
                     } else {
-                        c->icurr++;
+                        /* 
+                         * the buffer is empty and the data is still
+                         * larger than the buffer, so write it out
+                         */
+                        c->wcurr = ITEM_data(it);
+                        c->wbytes = it->nbytes;
+                        c->state = conn_write;
+                        c->write_and_go = conn_mwrite;
+                        c->ipart = 2;
+                        break;
                     }
-                    /* FALL THROUGH */
-                case 0:
-                    it = *(c->icurr);
-                    assert((it->it_flags & ITEM_SLABBED) == 0);
-                    sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
+                }
+         
+            case 2:
+                it = *(c->icurr);
+                item_remove(it);
+                c->ileft--;
+                if (c->ileft <= 0) {
+                    c->ipart = 3;
+                    break;
+                } else {
+                    c->icurr++;
+                }
+
+                /* 
+                 * break and come back to ipart=0, rather than fall
+                 * through: in ipart=0, we may need to flush the write
+                 * buffer and return to the same place, and if we didn't
+                 * change ipart, we'd've returned to ipart=2 erroneously.
+                 */
+
+                c->ipart = 0;
+                break; 
+
+            case 0:
+                it = *(c->icurr);
+                assert((it->it_flags & ITEM_SLABBED) == 0);
+
+                len = strlen(ITEM_key(it));
+                if (c->wsize - c->wbytes >= len+32) {
+                    res = sprintf(c->wcurr, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
+                    c->wcurr += res;
+                    c->wbytes += res;
                     if (settings.verbose > 1)
                         fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
-                    c->iptr = c->ibuf;
-                    c->ibytes = strlen(c->iptr);
                     c->ipart = 1;
                     break;
-                case 3:
-                    out_string(c, "END");
+                } else {
+                    if (c->wbytes) {
+                        /* flush the buffer and come back here */
+                        c->wcurr = c->wbuf;
+                        c->state = conn_write;
+                        c->write_and_go = conn_mwrite;
+                        break;
+                    } else {
+                        /* should never happen */
+                        c->state = conn_closing;
+                        break;
+                    }
+                }
+
+            case 3:
+                /* flush the buffer and come back here */
+                if (c->wbytes) {
+                    c->wcurr = c->wbuf;
+                    c->state = conn_write;
+                    c->write_and_go = conn_mwrite;
                     break;
                 }
+
+                /* write "END\r\n" and finish */
+                out_string(c, "END");
+                break;
             }
             break;
 
main -> wcmtools          src/memcached/memcached.h
--- cvs/wcmtools/memcached/memcached.h	2003-07-22 12:14:33.000000000 +0300
+++ src/memcached/memcached.h	2003-08-16 05:23:54.000000000 +0300
@@ -113,9 +113,6 @@
     item   **icurr;
     int    ileft;
     int    ipart;     /* 1 if we're writing a VALUE line, 2 if we're writing data */
-    char   ibuf[256]; /* for VALUE lines */
-    char   *iptr;
-    int    ibytes;
                          
 } conn;