[patch] combining packets

Brad Fitzpatrick brad@danga.com
Mon, 11 Aug 2003 19:58:16 -0700 (PDT)


It looks like the word "END\r\n" would still come in its own packet.
Why don't you try to get that into there if there are 5 bytes free?

Also, I don't see any work here about matching our output sizes to the MTU
of the interface.

Should we just go with TCP_CORK/TCP_NOPUSH instead?  Any thoughts from
people with experience?  is TCP_CORK/etc widely supported enough?  If this
were ported to Windows, or commercial Unixes, do they have fun TCP options
like that?


On Sat, 16 Aug 2003, Anatoly Vorobey wrote:

> Here's an attempt to combine many smaller packets into one larger packet
> (in answers to GET queries... that's the only place where it matters).
> This patch also simplifies the conn_mwrite finite machine state
> considerably. Tested only cursorily so far, needs review and more
> testing.
>
> main -> wcmtools          src/memcached/memcached.c
> --- cvs/wcmtools/memcached/memcached.c	2003-08-11 16:59:47.000000000 +0300
> +++ src/memcached/memcached.c	2003-08-16 05:23:39.000000000 +0300
> @@ -149,8 +149,6 @@
>      c->rcurr = c->rbuf;
>      c->icurr = c->ilist;
>      c->ileft = 0;
> -    c->iptr = c->ibuf;
> -    c->ibytes = 0;
>
>      c->write_and_go = conn_read;
>      c->write_and_free = 0;
> @@ -609,7 +607,8 @@
>          if (c->ileft) {
>              c->ipart = 0;
>              c->state = conn_mwrite;
> -            c->ibytes = 0;
> +            c->wbytes = 0;
> +            c->wcurr = c->wbuf;
>              return;
>          } else {
>              out_string(c, "END");
> @@ -763,7 +762,7 @@
>      if (event_add(&c->event, 0) == -1) return 0;
>      return 1;
>  }
> -
> +
>  void drive_machine(conn *c) {
>
>      int exit = 0;
> @@ -919,6 +918,7 @@
>                      c->write_and_free = 0;
>                  }
>                  c->state = c->write_and_go;
> +                c->wcurr = c->wbuf;
>                  break;
>              }
>              res = write(c->sfd, c->wcurr, c->wbytes);
> @@ -946,74 +946,114 @@
>              break;
>          case conn_mwrite:
>              /*
> -             * we're writing ibytes bytes from iptr. iptr alternates between
> -             * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the
> -             * current item. When we finish a chunk, we choose the next one using
> +             * we're writing wbytes bytes from wcurr. wcurr alternates between
> +             * wbuf, in which we build a string "VALUE...", and item data for smaller items,
> +             * and ITEM_data(it) for the larger items. When we finish a chunk, we choose the next one using
>               * ipart, which has the following semantics: 0 - start the loop, 1 -
>               * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
>               * move to the next item and build its ibuf; 3 - we finished all items,
> -             * write "END".
> +             * write "END". wcurr points inside wbuf when we accumulate
> +             * data, but we restore it to =wbuf before actual writing.
>               */
> -            if (c->ibytes > 0) {
> -                res = write(c->sfd, c->iptr, c->ibytes);
> -                if (res > 0) {
> -                    stats.bytes_written += res;
> -                    c->iptr += res;
> -                    c->ibytes -= res;
> -                    break;
> -                }
> -                if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
> -                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
> -                        if (settings.verbose > 0)
> -                            fprintf(stderr, "Couldn't update event\n");
> -                        c->state = conn_closing;
> -                        break;
> -                    }
> -                    exit = 1;
> -                    break;
> -                }
> -                /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
> -                   we have a real error, on which we close the connection */
> -                if (settings.verbose > 0)
> -                    fprintf(stderr, "Failed to write, and not due to blocking\n");
> -                c->state = conn_closing;
> -                break;
> -            } else {
> +            switch (c->ipart) {
> +                int len, res;
>                  item *it;
> -                /* we finished a chunk, decide what to do next */
> -                switch (c->ipart) {
> -                case 1:
> -                    it = *(c->icurr);
> -                    assert((it->it_flags & ITEM_SLABBED) == 0);
> -                    c->iptr = ITEM_data(it);
> -                    c->ibytes = it->nbytes;
> +
> +            case 1:
> +                it = *(c->icurr);
> +                assert((it->it_flags & ITEM_SLABBED) == 0);
> +
> +                /* is there enough place in the fixed buffer for this item? */
> +                if (c->wsize - c->wbytes >= it->nbytes) {
> +                    memcpy(c->wcurr, ITEM_data(it), it->nbytes);
> +                    c->wbytes += it->nbytes;
> +                    c->wcurr += it->nbytes;
>                      c->ipart = 2;
>                      break;
> -                case 2:
> -                    it = *(c->icurr);
> -                    item_remove(it);
> -                    c->ileft--;
> -                    if (c->ileft <= 0) {
> -                        c->ipart = 3;
> +                } else {
> +                    if (c->wbytes) {
> +                    /*
> +                     * flush the buffer, then come back here and try the
> +                     * above size condition again.
> +                     */
> +                        c->wcurr = c->wbuf;
> +                        c->state = conn_write;
> +                        c->write_and_go = conn_mwrite;
>                          break;
>                      } else {
> -                        c->icurr++;
> +                        /*
> +                         * the buffer is empty and the data is still
> +                         * larger than the buffer, so write it out
> +                         */
> +                        c->wcurr = ITEM_data(it);
> +                        c->wbytes = it->nbytes;
> +                        c->state = conn_write;
> +                        c->write_and_go = conn_mwrite;
> +                        c->ipart = 2;
> +                        break;
>                      }
> -                    /* FALL THROUGH */
> -                case 0:
> -                    it = *(c->icurr);
> -                    assert((it->it_flags & ITEM_SLABBED) == 0);
> -                    sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
> +                }
> +
> +            case 2:
> +                it = *(c->icurr);
> +                item_remove(it);
> +                c->ileft--;
> +                if (c->ileft <= 0) {
> +                    c->ipart = 3;
> +                    break;
> +                } else {
> +                    c->icurr++;
> +                }
> +
> +                /*
> +                 * break and come back to ipart=0, rather than fall
> +                 * through: in ipart=0, we may need to flush the write
> +                 * buffer and return to the same place, and if we didn't
> +                 * change ipart, we'd've returned to ipart=2 erroneously.
> +                 */
> +
> +                c->ipart = 0;
> +                break;
> +
> +            case 0:
> +                it = *(c->icurr);
> +                assert((it->it_flags & ITEM_SLABBED) == 0);
> +
> +                len = strlen(ITEM_key(it));
> +                if (c->wsize - c->wbytes >= len+32) {
> +                    res = sprintf(c->wcurr, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
> +                    c->wcurr += res;
> +                    c->wbytes += res;
>                      if (settings.verbose > 1)
>                          fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
> -                    c->iptr = c->ibuf;
> -                    c->ibytes = strlen(c->iptr);
>                      c->ipart = 1;
>                      break;
> -                case 3:
> -                    out_string(c, "END");
> +                } else {
> +                    if (c->wbytes) {
> +                        /* flush the buffer and come back here */
> +                        c->wcurr = c->wbuf;
> +                        c->state = conn_write;
> +                        c->write_and_go = conn_mwrite;
> +                        break;
> +                    } else {
> +                        /* should never happen */
> +                        c->state = conn_closing;
> +                        break;
> +                    }
> +                }
> +
> +            case 3:
> +                /* flush the buffer and come back here */
> +                if (c->wbytes) {
> +                    c->wcurr = c->wbuf;
> +                    c->state = conn_write;
> +                    c->write_and_go = conn_mwrite;
>                      break;
>                  }
> +
> +                /* write "END\r\n" and finish */
> +                out_string(c, "END");
> +                break;
>              }
>              break;
>
> main -> wcmtools          src/memcached/memcached.h
> --- cvs/wcmtools/memcached/memcached.h	2003-07-22 12:14:33.000000000 +0300
> +++ src/memcached/memcached.h	2003-08-16 05:23:54.000000000 +0300
> @@ -113,9 +113,6 @@
>      item   **icurr;
>      int    ileft;
>      int    ipart;     /* 1 if we're writing a VALUE line, 2 if we're writing data */
> -    char   ibuf[256]; /* for VALUE lines */
> -    char   *iptr;
> -    int    ibytes;
>
>  } conn;
>
>
>