[patch] combining packets
Brad Fitzpatrick
brad@danga.com
Mon, 11 Aug 2003 19:58:16 -0700 (PDT)
It looks like the word "END\r\n" would still come in its own packet.
Why don't you try to get that into there if there are 5 bytes free?
Also, I don't see any work here about matching our output sizes to the MTU
of the interface.
Should we just go with TCP_CORK/TCP_NOPUSH instead? Any thoughts from
people with experience? is TCP_CORK/etc widely supported enough? If this
were ported to Windows, or commercial Unixes, do they have fun TCP options
like that?
On Sat, 16 Aug 2003, Anatoly Vorobey wrote:
> Here's an attempt to combine many smaller packets into one larger packet
> (in answers to GET queries... that's the only place where it matters).
> This patch also simplifies the conn_mwrite finite machine state
> considerably. Tested only cursorily so far, needs review and more
> testing.
>
> main -> wcmtools src/memcached/memcached.c
> --- cvs/wcmtools/memcached/memcached.c 2003-08-11 16:59:47.000000000 +0300
> +++ src/memcached/memcached.c 2003-08-16 05:23:39.000000000 +0300
> @@ -149,8 +149,6 @@
> c->rcurr = c->rbuf;
> c->icurr = c->ilist;
> c->ileft = 0;
> - c->iptr = c->ibuf;
> - c->ibytes = 0;
>
> c->write_and_go = conn_read;
> c->write_and_free = 0;
> @@ -609,7 +607,8 @@
> if (c->ileft) {
> c->ipart = 0;
> c->state = conn_mwrite;
> - c->ibytes = 0;
> + c->wbytes = 0;
> + c->wcurr = c->wbuf;
> return;
> } else {
> out_string(c, "END");
> @@ -763,7 +762,7 @@
> if (event_add(&c->event, 0) == -1) return 0;
> return 1;
> }
> -
> +
> void drive_machine(conn *c) {
>
> int exit = 0;
> @@ -919,6 +918,7 @@
> c->write_and_free = 0;
> }
> c->state = c->write_and_go;
> + c->wcurr = c->wbuf;
> break;
> }
> res = write(c->sfd, c->wcurr, c->wbytes);
> @@ -946,74 +946,114 @@
> break;
> case conn_mwrite:
> /*
> - * we're writing ibytes bytes from iptr. iptr alternates between
> - * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the
> - * current item. When we finish a chunk, we choose the next one using
> + * we're writing wbytes bytes from wcurr. wcurr alternates between
> + * wbuf, in which we build a string "VALUE...", and item data for smaller items,
> + * and ITEM_data(it) for the larger items. When we finish a chunk, we choose the next one using
> * ipart, which has the following semantics: 0 - start the loop, 1 -
> * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
> * move to the next item and build its ibuf; 3 - we finished all items,
> - * write "END".
> + * write "END". wcurr points inside wbuf when we accumulate
> + * data, but we restore it to =wbuf before actual writing.
> */
> - if (c->ibytes > 0) {
> - res = write(c->sfd, c->iptr, c->ibytes);
> - if (res > 0) {
> - stats.bytes_written += res;
> - c->iptr += res;
> - c->ibytes -= res;
> - break;
> - }
> - if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
> - if (!update_event(c, EV_WRITE | EV_PERSIST)) {
> - if (settings.verbose > 0)
> - fprintf(stderr, "Couldn't update event\n");
> - c->state = conn_closing;
> - break;
> - }
> - exit = 1;
> - break;
> - }
> - /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
> - we have a real error, on which we close the connection */
> - if (settings.verbose > 0)
> - fprintf(stderr, "Failed to write, and not due to blocking\n");
> - c->state = conn_closing;
> - break;
> - } else {
> + switch (c->ipart) {
> + int len, res;
> item *it;
> - /* we finished a chunk, decide what to do next */
> - switch (c->ipart) {
> - case 1:
> - it = *(c->icurr);
> - assert((it->it_flags & ITEM_SLABBED) == 0);
> - c->iptr = ITEM_data(it);
> - c->ibytes = it->nbytes;
> +
> + case 1:
> + it = *(c->icurr);
> + assert((it->it_flags & ITEM_SLABBED) == 0);
> +
> + /* is there enough place in the fixed buffer for this item? */
> + if (c->wsize - c->wbytes >= it->nbytes) {
> + memcpy(c->wcurr, ITEM_data(it), it->nbytes);
> + c->wbytes += it->nbytes;
> + c->wcurr += it->nbytes;
> c->ipart = 2;
> break;
> - case 2:
> - it = *(c->icurr);
> - item_remove(it);
> - c->ileft--;
> - if (c->ileft <= 0) {
> - c->ipart = 3;
> + } else {
> + if (c->wbytes) {
> + /*
> + * flush the buffer, then come back here and try the
> + * above size condition again.
> + */
> + c->wcurr = c->wbuf;
> + c->state = conn_write;
> + c->write_and_go = conn_mwrite;
> break;
> } else {
> - c->icurr++;
> + /*
> + * the buffer is empty and the data is still
> + * larger than the buffer, so write it out
> + */
> + c->wcurr = ITEM_data(it);
> + c->wbytes = it->nbytes;
> + c->state = conn_write;
> + c->write_and_go = conn_mwrite;
> + c->ipart = 2;
> + break;
> }
> - /* FALL THROUGH */
> - case 0:
> - it = *(c->icurr);
> - assert((it->it_flags & ITEM_SLABBED) == 0);
> - sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
> + }
> +
> + case 2:
> + it = *(c->icurr);
> + item_remove(it);
> + c->ileft--;
> + if (c->ileft <= 0) {
> + c->ipart = 3;
> + break;
> + } else {
> + c->icurr++;
> + }
> +
> + /*
> + * break and come back to ipart=0, rather than fall
> + * through: in ipart=0, we may need to flush the write
> + * buffer and return to the same place, and if we didn't
> + * change ipart, we'd've returned to ipart=2 erroneously.
> + */
> +
> + c->ipart = 0;
> + break;
> +
> + case 0:
> + it = *(c->icurr);
> + assert((it->it_flags & ITEM_SLABBED) == 0);
> +
> + len = strlen(ITEM_key(it));
> + if (c->wsize - c->wbytes >= len+32) {
> + res = sprintf(c->wcurr, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
> + c->wcurr += res;
> + c->wbytes += res;
> if (settings.verbose > 1)
> fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
> - c->iptr = c->ibuf;
> - c->ibytes = strlen(c->iptr);
> c->ipart = 1;
> break;
> - case 3:
> - out_string(c, "END");
> + } else {
> + if (c->wbytes) {
> + /* flush the buffer and come back here */
> + c->wcurr = c->wbuf;
> + c->state = conn_write;
> + c->write_and_go = conn_mwrite;
> + break;
> + } else {
> + /* should never happen */
> + c->state = conn_closing;
> + break;
> + }
> + }
> +
> + case 3:
> + /* flush the buffer and come back here */
> + if (c->wbytes) {
> + c->wcurr = c->wbuf;
> + c->state = conn_write;
> + c->write_and_go = conn_mwrite;
> break;
> }
> +
> + /* write "END\r\n" and finish */
> + out_string(c, "END");
> + break;
> }
> break;
>
> main -> wcmtools src/memcached/memcached.h
> --- cvs/wcmtools/memcached/memcached.h 2003-07-22 12:14:33.000000000 +0300
> +++ src/memcached/memcached.h 2003-08-16 05:23:54.000000000 +0300
> @@ -113,9 +113,6 @@
> item **icurr;
> int ileft;
> int ipart; /* 1 if we're writing a VALUE line, 2 if we're writing data */
> - char ibuf[256]; /* for VALUE lines */
> - char *iptr;
> - int ibytes;
>
> } conn;
>
>
>