A C client for memcached

Sean Chittenden sean at chittenden.org
Mon Oct 25 19:49:08 PDT 2004


> I'd base a C implementation off the Perl version, but I'm biased.  The
> Perl version has some minor inefficiencies, but they're things you'd 
> find
> during a port.  Stability-wise, though, the Perl client is rock-solid.

Ugh, I sure hope you don't have too much time invested in this yet.  
I've been sitting on this for a while and have been meaning to post it. 
  There are other little subtle goodies for memory management in there, 
but I haven't shown'em here.  Maybe I'll write up an mdoc(7) file for 
this if someone hounds me to get it done (ENOTIME).  Please let me know 
if someone has questions.  I'm not quite ready to have this 
incorporated into the tree (if Brad'll take it), so give me a week or 
so to collect bug reports from folks now that I've opened this up.  
It's production quality from my point of view (could be that I've got 
this code in production and haven't had any problems... not to say that 
there aren't potentially problems, but...).  Anyway, when I wrote this, 
I went to great lengths to minimize system calls and memory 
allocations.  I don't like the mc_aget() interface, but I added it 
'cause I knew people would want it.  I'm tempted to write mc_aget2() 
and other mc_*2() calls that only take a const char *key arg instead of 
the key and its length.  I put the length everywhere because I'd like 
to keep API compatibility when memcached grows a binary API (*nudge*).

Other notes that I've been meaning to post:

*) flush_all + add of a flushed key gives me errors... don't think it 
should.

*) The protocol.txt doc isn't quite accurate regarding stats.  I've had 
the patch in my tree for ages... it includes some trailing whitespace 
cleanups too.

*) Is it intentional that a delete on a key that's in the delete hold 
queue purges the key from the delete hold queue?  Ex:  delete foo 900, 
delete foo, add foo 0 0 4  works.


If someone nudges me enough, I'll thump out a C++ wrapper around the C 
API if there's interest.  I'm using the C API in my Qt apps, but it'd 
be nice if there was an actual C++ wrapper to lend some syntactic 
sugar.

-sc


PS  If someone can figure out why memcached runs dog slow on my 
powerbook, I'd be greatful.

PPS I've only tested/used this on FreeBSD and OS-X.  I gladly welcome 
patches to make this compile on more platforms if there are problems 
(don't think there should be major portability problems though).


// Create a new memcache instance
struct memcache *mc = mc_new();

// Add a few servers
mc_server_add(mc, "127.0.0.1", "11211");
mc_server_add(mc, "127.0.0.1", "11212");
mc_server_add(mc, "127.0.0.1", "11213");

// Add a key
mc_add(mc, key, keylen, flags, expire, bytes, val);

// Get a key, caller has to free(3) memory
void *blah = mc_aget(mc, key, keylen);
free(blah);

// Perform a multi-key request
struct memcache_req *req = mc_req_new();
mc_req_add(req, key1, key1_len);
mc_req_add(req, key2, key2_len);
mc_get(mc, req);
// Process the results (need a better interface to looping through 
results)

// Perform a multi-key request the easy way (this is my preferred way 
of getting data):
req = mc_req_new();
res1 = mc_req_add(req, key1, key1_len);
res2 = mc_req_add(req, key2, key2_len);
mc_get(mc, req);
// Play with res1/res2 w/o any looping through req


// Complex multi-key get example:
// Grab the response object that will be used to store a given key
struct memcache_res *res = mc_req_add(req, key3, key3_len);
res->size = 1024;				// Allocate our own memory a head of time (useful 
for loops)
res->val = malloc(res->size);
res->free_on_delete = 't';

// Perform the get
mc_get(mc, req);
mc_res_free(req, res);

// Get stats from the whole cluster
struct memcache_server_stats *s = mc_stats(mc);
mc_server_stats_free(s);

// Storage commands:
mc_add(mc, key, key_len, flags, expire, bytes, val);
mc_replace(mc, key, key_len, flags, expire, bytes, val);
mc_set(mc, key, key_len, flags, expire, bytes, val);

// Delete commands:
mc_delete(mc, key, key_len, hold_timer);

// Atomic opts:
mc_incr(mc, key, key_len, 1);
mc_decr(mc, key, key_len, 1);

mc_free(mc);


-------------- next part --------------
Index: doc/protocol.txt
===================================================================
RCS file: /home/cvspub/wcmtools/memcached/doc/protocol.txt,v
retrieving revision 1.3
diff -u -r1.3 protocol.txt
--- doc/protocol.txt	1 Dec 2003 21:43:35 -0000	1.3
+++ doc/protocol.txt	26 Oct 2004 02:39:44 -0000
@@ -50,7 +50,7 @@
 Commands
 --------
 
-There are three types of commands. 
+There are three types of commands.
 
 Storage commands (there are three: "set", "add" and "replace") ask the
 server to store some data identified by a key. The client sends a
@@ -94,7 +94,7 @@
 Each command sent by a client may be answered with an error string
 from the server. These error strings come in three types:
 
-- "ERROR\r\n" 
+- "ERROR\r\n"
 
   means the client sent a nonexistent command name.
 
@@ -128,10 +128,10 @@
 
 - <command name> is "set", "add" or "replace"
 
-  "set" means "store this data".  
+  "set" means "store this data".
 
   "add" means "store this data, but only if the server *doesn't* already
-  hold data for this key".  
+  hold data for this key".
 
   "replace" means "store this data, but only if the server *does*
   already hold data for this key".
@@ -148,7 +148,7 @@
   items). If it's non-zero (either Unix time or offset in seconds from
   current time), it is guaranteed that clients will not be able to
   retrieve this item after the expiration time arrives (measured by
-  server time).  
+  server time).
 
 - <bytes> is the number of bytes in the data block to follow, *not*
   including the delimiting \r\n. <bytes> may be zero (in which case
@@ -267,7 +267,7 @@
 - <value> is the amount by which the client wants to increase/decrease
 the item. It is a decimal representation of a 32-bit unsigned integer.
 
-The response will be one of: 
+The response will be one of:
 
 - "NOT_FOUND\r\n" to indicate the item with this value was not found
 
@@ -317,8 +317,8 @@
 sent for this name, and the meaning of the value.
 
 In the type column below, "32u" means a 32-bit unsigned integer, "64u"
-means a 64-bit unsigner integer. '32u:32u' means two 32-but unsigned
-integers separated by a colon.
+means a 64-bit unsigner integer. '32u.32u' means two 32-but unsigned
+integers separated by a period.
 
 
 Name              Type     Meaning
@@ -327,32 +327,32 @@
 uptime            32u      Number of seconds this server has been running
 time              32u      current UNIX time according to the server
 version           string   Version string of this server
-rusage_user       32u:32u  Accumulated user time for this process 
-                           (seconds:microseconds)
-rusage_system     32u:32u  Accumulated system time for this process 
-                           (seconds:microseconds)
+rusage_user       32u.32u  Accumulated user time for this process
+                           (seconds.microseconds)
+rusage_system     32u.32u  Accumulated system time for this process
+                           (seconds.microseconds)
 curr_items        32u      Current number of items stored by the server
-total_items       32u      Total number of items stored by this server 
+total_items       32u      Total number of items stored by this server
                            ever since it started
-bytes             64u      Current number of bytes used by this server 
+bytes             64u      Current number of bytes used by this server
                            to store items
 curr_connections  32u      Number of open connections
-total_connections 32u      Total number of connections opened since 
+total_connections 32u      Total number of connections opened since
                            the server started running
-connection_structures 32u  Number of connection structures allocated 
+connection_structures 32u  Number of connection structures allocated
                            by the server
 cmd_get           32u      Cumulative number of retrieval requests
 cmd_set           32u      Cumulative number of storage requests
-get_hits          32u      Number of keys that have been requested and 
+get_hits          32u      Number of keys that have been requested and
                            found present
-get_misses        32u      Number of items that have been requested 
+get_misses        32u      Number of items that have been requested
                            and not found
-bytes_read        64u      Total number of bytes read by this server 
+bytes_read        64u      Total number of bytes read by this server
                            from network
-bytes_written     64u      Total number of bytes sent by this server to 
+bytes_written     64u      Total number of bytes sent by this server to
                            network
 limit_maxbytes    32u      Number of bytes this server is allowed to
-                           use for storage. 
+                           use for storage.
 
 
 
-------------- next part --------------


-------------- next part --------------
/*
 * Copyright (c) 2004 Sean Chittenden <sean at chittenden.org>
 *
 * All rights reserved until such time as this code is released with
 * an official license.  Use of this code for commerical,
 * non-commercial, and personal purposes is encouraged.  Public forks
 * of this code is permitted so long as the fork and its decendents
 * use this copyright/license.  Use of this software in programs
 * released under the GPL programs is expressly prohibited by the
 * author (ie, BSD, closed source, or artistic license is okay, but
 * GPL is not). */

/* The crc32 functions and data was originally written by Spencer
 * Garrett <srg at quick.com> and was cleaned from the PostgreSQL source
 * tree via the files contrib/ltree/crc32.[ch].  No license was
 * included, therefore it is assumed that this code is public
 * domain. */

#include <ctype.h>
#include <err.h>
#include <string.h>
#include <strings.h>
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <sysexits.h>
#include <sys/errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <fcntl.h>

/* Prototype for strtoll ripped from stdlib.h */
long long strtoll(const char *, char **, int);

#include "libmemcache.h"

static char			*mc_get_line(struct memcache *mc, struct memcache_server *ms);
static void			 mc_server_block(struct memcache_server *ms, int block);
static void			 mc_reset_buf(struct memcache *mc);
static struct memcache_res	*mc_res_new(void);
static int			 mc_server_connect(struct memcache *mc, struct memcache_server *ms);
static void			 mc_server_free(struct memcache_server *ms);
static struct memcache_server	*mc_server_new(struct memcache *mc);
static int			 mc_server_resolve(struct memcache_server *ms);
static struct memcache_server_stats	*mc_server_stats_new(void);
static int			 mc_storage_cmd(struct memcache *mc, const char *cmd, const size_t cmd_len,
						const char *key, const size_t key_len, const u_int16_t flags,
						const time_t expire, const size_t bytes, const void *val);
static u_int32_t		 mc_atomic_cmd(struct memcache *mc, const char *cmd, const size_t cmd_len,
					       const char *key, const size_t key_len, const u_int32_t val);



int
mc_add(struct memcache *mc, const char *key, const size_t key_len, const u_int16_t flags,
       const time_t expire, const size_t bytes, const void *val) {
  return mc_storage_cmd(mc, "add ", strlen("add "), key, key_len, flags, expire, bytes, val);
}


/* Issues a "get" command to the memcache server that should contain
 * the key.  The result is malloc(3)'ed and it is assumed that the
 * caller is required to free(3) the memory. */
void *
mc_aget(struct memcache *mc, const char *key, size_t len) {
  struct memcache_req *req;
  struct memcache_res *res;
  void *ret;

  req = mc_req_new();
  res = mc_req_add(req, key, len);
  res->free_on_delete = 'f';
  mc_get(mc, req);
  ret = res->val;
  mc_req_free(req);
  return ret;
}


static u_int32_t
mc_atomic_cmd(struct memcache *mc, const char *cmd, const size_t cmd_len,
	      const char *key, const size_t key_len, const u_int32_t val) {
  struct memcache_server *ms;
  u_int32_t hash;
  char *cp, *cur;
  size_t buf_left;
  struct iovec av[5];
  int i;
  u_int32_t ret;

  hash = mc_hash_key(key, key_len);
  ms = mc_find_server(mc, hash);
  if (ms == NULL) {
    warnx("%s:%u\tUnable to find a valid server", __FILE__, __LINE__);
    return 0;
  }

  if (mc_server_connect(mc, ms) == -1)
    return 0;

  mc_reset_buf(mc);
  cur = mc->buf;
  buf_left = mc->size;

  av[0].iov_base = cmd;
  av[0].iov_len = cmd_len;
  av[1].iov_base = key;
  av[1].iov_len = key_len;
  av[2].iov_base = " ";
  av[2].iov_len = strlen(av[2].iov_base);

  /* Convert the value to a string */
  i = snprintf(cur, buf_left, "%u", val);
  if (i < 1)
    err(EX_SOFTWARE, "%s:%u\tsnprintf()", __FILE__, __LINE__);

  av[3].iov_base = cur;
  av[3].iov_len = i;
  av[4].iov_base = "\r\n";
  av[4].iov_len = strlen(av[4].iov_base);

  if (writev(ms->fd, av, 5) < 0) {
    warn("%s:%u\twritev()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    /* XXX Should we recursively attempt to try this query on the
     * remaining servers in the cluster if the writev() fails?
     * Eventually we'd fail once all servers were exhausted?  For now,
     * just fail and return 0. */
    return 0;
  }

  mc_server_block(ms, 1);
  mc_reset_buf(mc);
  cur = mc_get_line(mc, ms);
  if (cur != NULL && memcmp(cur, "NOT_FOUND", strlen("NOT_FOUND")) == 0) {
    mc_server_block(ms, 0);
    return 0;
  }

  /* Try converting the value to an integer. If it succeeds, we've got
   * a winner. */
  ret = (u_int32_t)strtol(cur, &cp, 10);
  if (ret == 0 && (errno == EINVAL || errno == ERANGE))
    err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid value \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);

  if (*cp != '\0')
    errx(EX_PROTOCOL, "%s:%u\tProtocol error: %u", __FILE__, __LINE__, *cp);

  return ret;
}


void
mc_deactivate_server(struct memcache *mc, struct memcache_server *ms) {
  u_int32_t i, found;

  /* Since adding servers is so rare, and servers do come back, don't
   * bother realloc(3)'ing mc->live_servers.  Instead, just find the
   * dead server in the array, remove it, shift the remaining servers
   * down a place in the array, and decrement number of servers in the
   * array.  This should making adding servers back to the list more
   * efficient. */
  for (i = 0, found = 0; i < mc->num_live_servers; i++) {
    if (!found) {
      if (mc->live_servers[i] == ms)
	found = 1;
      else
	continue;
    }

    mc->live_servers[i] = mc->live_servers[i + 1];
  }

  if (found) {
    mc->num_live_servers--;
    /* If we've still got the server marked up, then set it to down.
     * If we've already marked it as down, keep the original reason
     * since it's going to be more verbose than saying 'down'.  A
     * 'down' server can be resurected, where as a server that's
     * marked 'no host' won't ever be resurected. */
    if (ms->active == 'u')
      ms->active = 'd';

    if (ms->fd != -1) {
      if (close(ms->fd) != 0)
	warn("%s:%u\tclose()", __FILE__, __LINE__);
      ms->fd = -1;
    }
  }
}


u_int32_t
mc_decr(struct memcache *mc, const char *key, const size_t key_len, const u_int32_t val) {
  return mc_atomic_cmd(mc, "decr ", strlen("decr "), key, key_len, val);
}


int
mc_delete(struct memcache *mc, const char *key, const size_t key_len, const time_t hold) {
  struct memcache_server *ms;
  u_int32_t hash;
  char *cur;
  size_t buf_left;
  struct iovec dv[5];
  int i;

  hash = mc_hash_key(key, key_len);
  ms = mc_find_server(mc, hash);
  if (ms == NULL) {
    warnx("%s:%u\tUnable to find a valid server", __FILE__, __LINE__);
    return 1;
  }

  if (mc_server_connect(mc, ms) == -1)
    return 2;

  mc_reset_buf(mc);
  cur = mc->buf;
  buf_left = mc->size;

  dv[0].iov_base = "delete ";
  dv[0].iov_len = strlen(dv[0].iov_base);
  dv[1].iov_base = key;
  dv[1].iov_len = key_len;
  dv[2].iov_base = " ";
  dv[2].iov_len = strlen(dv[2].iov_base);

  /* Convert expiration time to a string */
  i = snprintf(cur, buf_left, "%lu", hold);
  if (i < 1)
    err(EX_SOFTWARE, "%s:%u\tsnprintf()", __FILE__, __LINE__);

  dv[3].iov_base = cur;		/* Note where our flags string is located */
  dv[3].iov_len = i;
  dv[4].iov_base = "\r\n";	/* Add a space */
  dv[4].iov_len = strlen(dv[4].iov_base);

  if (writev(ms->fd, dv, 5) < 0) {
    warn("%s:%u\twritev()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    /* XXX Should we recursively attempt to try this query on the
     * remaining servers in the cluster if the writev() fails?
     * Eventually we'd fail once all servers were exhausted?  For now,
     * just fail and return NULL. */
    return 3;
  }

  mc_server_block(ms, 1);
  mc_reset_buf(mc);
  cur = mc_get_line(mc, ms);
  if (cur != NULL && memcmp(cur, "DELETED", strlen("DELETED")) == 0) {
    mc_server_block(ms, 0);
    return 0;
  } else if (cur != NULL && memcmp(cur, "NOT_FOUND", strlen("NOT_FOUND")) == 0) {
    mc_server_block(ms, 0);
    return -1;
  } else {
    errx(EX_PROTOCOL, "%s:%u\tProtocol error", __FILE__, __LINE__);
  }
}


struct memcache_server *
mc_find_server(struct memcache *mc, const u_int32_t hash) {
  if (mc->num_live_servers < 1)
    return NULL;

  /* Grab the correct server from the list. */
  return mc->live_servers[hash % mc->num_live_servers];
}


int
mc_flush_all(struct memcache *mc, const char *key, const size_t key_len) {
  struct memcache_server *ms;
  u_int32_t hash;
  char *cur;

  hash = mc_hash_key(key, key_len);
  ms = mc_find_server(mc, hash);
  if (ms == NULL) {
    warnx("%s:%u\tUnable to find a valid server", __FILE__, __LINE__);
    return 1;
  }

  if (mc_server_connect(mc, ms) == -1)
    return 2;

  if (write(ms->fd, "flush_all\r\n", strlen("flush_all\r\n")) < 0) {
    warn("%s:%u\twrite()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    /* XXX Should we recursively attempt to try this query on the
     * remaining servers in the cluster if the write() fails?
     * Eventually we'd fail once all servers were exhausted?  For now,
     * just fail and return NULL. */
    return 3;
  }

  mc_server_block(ms, 1);
  mc_reset_buf(mc);
  cur = mc_get_line(mc, ms);
  if (cur != NULL && memcmp(cur, "OK", strlen("OK")) == 0) {
    mc_server_block(ms, 0);
    return 0;
  } else {
    errx(EX_PROTOCOL, "%s:%u\tProtocol error", __FILE__, __LINE__);
  }
}


void
mc_free(struct memcache *mc) {
  struct memcache_server *ms, *tms;

  if (mc == NULL)
    return;

  for (ms = mc->server_list.tqh_first; ms != NULL; ms = ms->entries.tqe_next) {
    tms = ms->entries.tqe_next;

    mc_server_free(ms);

    if (tms != NULL)
      ms = tms;
  }

  if (mc->size > 0)
    free(mc->buf);

  free(mc);
}


void
mc_get(struct memcache *mc, struct memcache_req *req) {
  struct memcache_res *res;
  struct memcache_server *ms;
  struct iovec *rv;
  u_int32_t i, num_vec;
  u_int16_t flags;
  char buf[GET_INIT_BUF_SIZE];
  char *cp, *cur;
  size_t bytes;
  ssize_t rb, tb;

  if (req->num_keys == 0)
    return;

  /* Need two extra iovec structs, one for the command, one for the
   * trailing \r\n. */
  num_vec = 2 * req->num_keys + 1;
  rv = (struct iovec *)malloc(sizeof(struct iovec) * num_vec);
  rv[0].iov_base = "get ";
  rv[0].iov_len = strlen(rv[0].iov_base);
  for (i = 1, res = req->query.tqh_first; res != NULL; res = res->entries.tqe_next, i++) {
    if (res->hash == 0)
      res->hash = mc_hash_key(res->key, res->len);

    rv[i].iov_base = res->key;
    rv[i].iov_len = res->len;

    if (res->entries.tqe_next != NULL) {
      rv[++i].iov_base = " ";
      rv[i].iov_len = 1;
    }

    /* While we're looping, might as well see if we should be auto
     * deleting any of these keys. */
    if (res->free_on_delete == 'u')
      res->free_on_delete = res->size > 0 ? 'f' : 't';
  }
  rv[i].iov_base = "\r\n";
  rv[i].iov_len = strlen(rv[i].iov_base);

  /* Yuk.  Right now I'm grabbing the hash from the first key and
   * using that, but that's dangerous in the event that the second key
   * in the request could be stored on a different server.  memcache
   * servers need the ability to query other servers on the users
   * behalf, IMHO. */
  ms = mc_find_server(mc, req->query.tqh_first->hash);
  if (ms == NULL) {
    warnx("%s:%u\tUnable to find a valid server", __FILE__, __LINE__);
    return;
  }

  if (mc_server_connect(mc, ms) == -1)
    return;

  if (writev(ms->fd, rv, num_vec) < 0) {
    warn("%s:%u\twritev()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    /* XXX Should we recursively attempt to try this query on the
     * remaining servers in the cluster if the writev() fails?
     * Eventually we'd fail once all servers were exhausted?  For now,
     * just fail and return NULL. */
    return;
  }

  /* Read the response */
  rb = read(ms->fd, buf, GET_INIT_BUF_SIZE);
  switch(rb) {
  case -1:
    warn("%s:%u\tread()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    return; /* Should we try and read again depending on errno? */
  case 0:
    /* Seems like an error to me if the server closes its connection
     * here.  deactivate the server instead of just closing it. */
    mc_deactivate_server(mc, ms);
    return;
  }

  res = NULL;
  /* Loop through reading the server's response until we're done. */
  cur = buf;
  while(1) {
    if (memcmp(cur, "VALUE ", strlen("VALUE ")) == 0) {
      /* Found something */
      cur = &cur[strlen("VALUE ")]; /* advance cursor */

      /* We can do something cool here.  Assume the server is going to
       * return the responses in the same order we requested them.
       * With that presumption being true, start searching from
       * res->tqe_next instead of starting over again. */
      if (res != NULL && res->entries.tqe_next != NULL) {
	for (res = res->entries.tqe_next; res != NULL; res = res->entries.tqe_next) {
	  if (memcmp(cur, res->key, res->len) == 0)
	    break;
	}
      }

      /* If we're still NULL, sequentially scan through the list. */
      if (res == NULL) {
	for (res = req->query.tqh_first; res != NULL; res = res->entries.tqe_next) {
	  if (memcmp(cur, res->key, res->len) == 0)
	    break;
	}
      }

      /* If we're *still* null, then we've got a problem. */
      if (res == NULL) {
	warnx("%s:%u\tUnable to find a response object for the key %.*s", __FILE__, __LINE__, MAX_KEY_LEN, cur);
	abort();
      }

      cur = &cur[res->len]; /* advance cursor */

      /* Look for the flags */
      flags = (u_int16_t)strtol(cur, &cp, 10);
      if (flags == 0 && (errno == EINVAL || errno == ERANGE)) {
	warn("%s:%u\tstrtol(): invalid flags \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
	return;
      }
      res->flags = flags;
      cur = cp; /* advance cursor */

      /* Look for the bytes */
      bytes = (size_t)strtol(cur, &cp, 10);
      if (bytes == 0 && (errno == EINVAL || errno == ERANGE)) {
	warn("%s:%u\tstrtol(): invalid bytes \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
	return;
      }
      res->bytes = bytes;
      cur = cp; /* advance cursor */

      /* Protocol integrity check */
      if (cur[0] == '\r' && cur[1] == '\n')
	cur = &cur[2];
      else {
	warnx("%s:%u\tProtocol violation: expecting a \\r\\n after bytes", __FILE__, __LINE__);
	return;
      }

      if (res->size == 0) {
	/* Allocate the necessary response.
	 *
	 * XXX Should think about having an option to mmap(2) an
	 * anonymous region to speed up the read(2) instead of using
	 * malloc(3). */
	res->val = malloc(res->bytes + 1);
	if (res->val == NULL)
	  return;

	res->size = res->bytes + 1;

	/* Throw in a null character for convenience */
	cp = (char *)res->val;
	cp[res->bytes] = '\0';
      }

      if (rb <= GET_INIT_BUF_SIZE) {
	/* We got lucky and can special case the response to prevent a
	 * second read(2) call. */
	memcpy(res->val, cur, res->bytes);

	/* Advance our cursor past the data */
	cur = &cur[res->bytes];
      } else {
	/* We're going to have to make a second read(2) call to fetch
	 * the remaining data on the network. */
	tb = (ssize_t)GET_INIT_BUF_SIZE - (ssize_t)(cur - buf);
	memcpy(res->val, cur, (size_t)tb);
	cp = (char *)res->val;
	rv[0].iov_base = &cp[rb];
	rv[0].iov_len = res->bytes - tb;

	/* Recycle the buffer at this point */
	rv[1].iov_base = buf;
	rv[1].iov_len = GET_INIT_BUF_SIZE;
	rb = readv(ms->fd, rv, 2);
	switch(rb) {
	case -1:
	  warn("%s:%u\tread()", __FILE__, __LINE__);
	  mc_deactivate_server(mc, ms);
	  return; /* Should we try and read again depending on errno? */
	case 0:
	  /* Seems like an error to me if the server closes its
	   * connection here.  deactivate the server instead of just
	   * closing it. */
	  mc_deactivate_server(mc, ms);
	  return;
	}

	/* Start searching/reading through the buffer from the start again */
	cur = buf;
      }

      if (cur[0] == '\r' && cur[1] == '\n')
	cur = &cur[2];
      else {
	warnx("%s:%u\tProtocol violation: expecting a \\r\\n after data block", __FILE__, __LINE__);
	if (res->free_on_delete == 't') {
	  free(res->val);
	  res->size = 0;
	  res->bytes = 0;
	}
	return;
      }
    } else if (memcmp(cur, "END\r\n", strlen("END\r\n")) == 0) {
      /* No value found. */
      free(rv);
      return;
    } else if (memcmp(cur, "ERROR\r\n", strlen("ERROR\r\n")) == 0) {
      /* drat, an error: something bad happened */
    } else if (memcmp(cur, "CLIENT_ERROR", strlen("CLIENT_ERROR")) == 0) {
      err(EX_PROTOCOL, "%s:%u\tClient error", __FILE__, __LINE__);
    } else if (memcmp(cur, "SERVER_ERROR", strlen("SERVER_ERROR")) == 0) {
      err(EX_PROTOCOL, "%s:%u\tServer error", __FILE__, __LINE__);
    } else {
      err(EX_PROTOCOL, "%s:%u\tUnable to handle response %.*s", 12, buf);
    }
  }
}


static char *
mc_get_line(struct memcache *mc, struct memcache_server *ms) {
  ssize_t rb;
  char *cp;
  char *new_start;

  if (mc->read_cur == NULL) {
    mc->read_cur = mc->buf;
    goto try_read;
  }

  for(;;) {
    /* We're guaranteed to have data in our buffer at this point.
     * Search for a newline.  If we don't have one, we need to either
     * move data around and read(2) again, or realloc(3) and read(2)
     * again. */
    cp = memchr(mc->start, (int)'\n', mc->size - (size_t)(mc->cur - mc->buf));
    if (cp == NULL) {
      get_more_bits:
      /* If our cursor is at the start of the buffer, we need to
       * allocate more memory. */
      if (mc->buf == mc->cur) {
	cp = (char *)realloc(mc->buf, mc->size * 2);
	if (cp == NULL) {
	  /* No sense in continuing if we can't read(2) in a whole
	   * line. There are likely bigger problems on a system if its
	   * experiencing a problem like this. */
	  warn("%s:%u\trealloc()", __FILE__, __LINE__);
	  return NULL;
	}

	mc->buf = cp;
	mc->read_cur = &mc->cur[mc->size];
	mc->size *= 2; /* Note the change in buffer size */
      } else {
	/* bcopy(3) the data from mc->cur to the end of mc->buf back
	 * to the start of mc->buf.  Use bcopy(3) in case there is
	 * overlap. */
	bcopy(mc->cur, mc->buf, mc->size - (size_t)(mc->cur - mc->buf));

	/* Advance where the read(2) should place new data. */
	mc->read_cur = &mc->buf[mc->size - (size_t)(mc->cur - mc->buf)];

	/* Reset the cursor to the front of the buffer */
	mc->cur = mc->buf;
      }

      /* Now that we've got space to read(2) in bits, read(2) in
       * data. */
      try_read:
      rb = read(ms->fd, mc->read_cur, mc->size - (size_t)(mc->cur - mc->buf));
      switch(rb) {
      case -1:
	/* We're in non-blocking mode, don't abort because of EAGAIN or
	 * EINTR */
	if (errno == EAGAIN || errno == EINTR)
	  goto try_read;
	warn("%s:%u\tread()", __FILE__, __LINE__);
	mc_deactivate_server(mc, ms);
	return NULL; /* Should we try again depending on errno? */
      case 0:
	/* Seems like an error to me if the server closes its connection
	 * here.  deactivate the server instead of just closing it. */
	mc_deactivate_server(mc, ms);
	return NULL;
      }

      /* Search for a newline again, starting from where we read(2)
       * data in.  Slight optimization over just starting the loop
       * over again since in most cases, we'll find a newline in the
       * newly read(2) chunk of data. */
      cp = memchr(mc->read_cur, (int)'\n', mc->size - (size_t)(mc->cur - mc->buf));
      if (cp == NULL) {
	/* We suck.  Try again. This is nearly the same as a continue,
	 * but saves us from doing an extra memchr(). */
	goto get_more_bits;
      }
    }

    /* At this point, we're guaranteed to have a complete line in the
     * buffer.  cp should be pointing to a newline character. */
    if (*cp != '\n') abort(); /* XXX remove this check */

    /* Protocol check, make sure there's a carige return before the
     * newline */
    if (*(cp - 1) != '\r') {
      warnx("%s:%u\tProtocol violation, no \\r before the \\n", __FILE__, __LINE__);
      mc_deactivate_server(mc, ms);
      return NULL;
    }

    new_start = cp + 1; /* Advance the start of the next line */
    *(cp - 1) = '\0'; /* Add a null character */
    cp = mc->start;
    mc->start = new_start;

    /* Handy little debugging function: */
    /* warnx("Line reads: \"%.*s\"", (size_t)(mc->start - cp - 2), cp); */
    return cp;
  }

  errx(EX_PROTOCOL, "%s:%u\tProtocol violation: Unable to obtain a new line", __FILE__, __LINE__);
}


static const unsigned int crc32tab[256] = {
  0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
  0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
  0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
  0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
  0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
  0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
  0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
  0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
  0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
  0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
  0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
  0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
  0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
  0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
  0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
  0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
  0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
  0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
  0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
  0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
  0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
  0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
  0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
  0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
  0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
  0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
  0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
  0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
  0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
  0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
  0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
  0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
  0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
  0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
  0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
  0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
  0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
  0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
  0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
  0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
  0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
  0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
  0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
  0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
  0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
  0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
  0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
  0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
  0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
  0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
  0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
  0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
  0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
  0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
  0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
  0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
  0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
  0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
  0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
  0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
  0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
  0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
  0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
  0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
};

u_int32_t
mc_hash_key(const char *key, size_t len) {
  size_t i;
  u_int32_t crc = ~0;

  for (i = 0; i < len; i++)
    crc = (crc >> 8) ^ crc32tab[(crc ^ (key[i])) & 0xff];

  return((~crc >> 16) & 0x7fff);
}


u_int32_t
mc_incr(struct memcache *mc, const char *key, const size_t key_len, const u_int32_t val) {
  return mc_atomic_cmd(mc, "incr ", strlen("incr "), key, key_len, val);
}


struct memcache *
mc_new(void) {
  struct memcache *mc;

  mc = (struct memcache *)malloc(sizeof(struct memcache));
  if (mc != NULL) {
    bzero(mc, sizeof(struct memcache));

    mc->buf = (char *)malloc(GET_INIT_BUF_SIZE);
    if (mc->buf == NULL) {
      free(mc);
      return NULL;
    }
    mc->size = GET_INIT_BUF_SIZE;

    TAILQ_INIT(&mc->server_list);

    /* Set any default values */
    mc->tv.tv_sec = 2;
    mc->tv.tv_usec = 600;
    mc->read_size = 2048;
  }

  return mc;
}


int
mc_replace(struct memcache *mc, const char *key, const size_t key_len, const u_int16_t flags,
	   const time_t expire, const size_t bytes, const void *val) {
  return mc_storage_cmd(mc, "replace ", strlen("replace "), key, key_len, flags, expire, bytes, val);
}


struct memcache_res *
mc_req_add(struct memcache_req *req, const char *key, size_t len) {
  struct memcache_res *res;
  res = mc_res_new();

  res->key = key; /* XXX Should I be strdup()'ing key? */
  res->len = len;
  res->hash = mc_hash_key(key, len);

  TAILQ_INSERT_TAIL(&req->query, res, entries);
  req->num_keys++;

  return res;
}


void
mc_req_free(struct memcache_req *req) {
  while (req->query.tqh_first != NULL)
    mc_res_free(req, req->query.tqh_first);

  free(req);
}


struct memcache_req *
mc_req_new(void) {
  struct memcache_req *req;

  req = (struct memcache_req *)malloc(sizeof(struct memcache_req));
  if (req != NULL) {
    bzero(req, sizeof(struct memcache_req));

    TAILQ_INIT(&req->query);
  }

  return req;
}


void
mc_res_free(struct memcache_req *req, struct memcache_res *res) {
  TAILQ_REMOVE(&req->query, res, entries);
  if (res->free_on_delete != 'f') {
    if (res->size > 0)
      free(res->val);
  }

  free(res);
}


static struct memcache_res *
mc_res_new(void) {
  struct memcache_res *res;

  res = (struct memcache_res *)malloc(sizeof(struct memcache_res));
  if (res != NULL) {
    bzero(res, sizeof(struct memcache_res));

    /* Default values */
    res->free_on_delete = 'u'; /* 'u'nset */
  }

  return res;
}


void
mc_server_add(struct memcache *mc, const char *hostname, const char *port) {
  struct memcache_server *ms;
  int ret;

  ms = mc_server_new(mc);
  if (ms == NULL)
    return;

  ms->hostname = (hostname == NULL ? "localhost" : hostname);
  ms->port = (port == NULL ? "11211" : port);

  ret = mc_server_resolve(ms);
  if (ret != 0) {
    warn("host %s does not exist: %s.  Not adding to server list.", ms->hostname, gai_strerror(ret));
    ms->active = 'n';
    return;
  }

  TAILQ_INSERT_TAIL(&mc->server_list, ms, entries);

  /* Add ms to the array of servers to try */
  if (*mc->live_servers == NULL) {
    mc->num_live_servers = 1;
    *mc->live_servers = (struct memcache_server *)malloc(sizeof(struct memcache_server) * mc->num_live_servers);
    mc->live_servers[0] = ms;
  } else {
    /* Reallocate mc->live_servers to fit the number of struct
     * memcache_servers entries. */
    mc->num_live_servers++;
    *mc->live_servers = (struct memcache_server*)realloc(mc->live_servers, sizeof(struct memcache_server) * mc->num_live_servers);
    if (mc->live_servers == NULL)
      err(EX_TEMPFAIL, "Unable to realloc() enough memory to add a new server: ");

    /* Add the new server to the end of the list */
    mc->live_servers[mc->num_live_servers - 1] = ms;
  }
}


static int
mc_server_connect(struct memcache *mc, struct memcache_server *ms) {
  struct addrinfo *res;
  int i;
#ifdef TCP_NODELAY
  int val;
#endif

  if (ms->fd != -1)
    return ms->fd;

  if (ms->active == 'd' || ms->active == 'n')
    return -1;

  if (ms->hostinfo == NULL || ms->hostinfo->ai_addrlen == 0) {
    i = mc_server_resolve(ms);
    if (i != 0) {
      warn("host %s does not exist: %s.  Not adding to server list.", ms->hostname, gai_strerror(i));
      ms->active = 'n';
      return -1;
    }
  }

  for (i = 0, res = ms->hostinfo; res != NULL; res = res->ai_next, i++) {
    ms->fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
    if (ms->fd < 0) {
      warn("%s:%u\tsocket()", __FILE__, __LINE__);
      continue;
    }

#ifdef TCP_NODELAY
    val = 1;
    if (setsockopt(ms->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
      warn("%s:%u\tsetsockopt(TCP_NODELAY)", __FILE__, __LINE__);
    }
#endif

    if (setsockopt(ms->fd, SOL_SOCKET, SO_SNDTIMEO, &ms->tv, sizeof(struct timeval)) != 0) {
      warn("%s:%u\tsetsockopt(SO_SNDTIMEO)", __FILE__, __LINE__);
      /* Close the socket, set the file descriptor to -1, and continue
       * trying to connect to the rest of the servers that match this
       * hostname.  More than likely there is only one IP per host
       * name, but, in the event there isn't, continue to the next
       * entry. */
      if (close(ms->fd) != 0)
	warn("%s:%u\tclose()", __FILE__, __LINE__);
      ms->fd = -1;
      continue;
    }

    if (connect(ms->fd, res->ai_addr, res->ai_addrlen) != 0) {
      warn("%s:%u\tconnect() %c", __FILE__, __LINE__);
      if (close(ms->fd) != 0)
	warn("%s:%u\tclose()", __FILE__, __LINE__);
      ms->fd = -1;
      continue;
    } else {
      return ms->fd;
    }
  }

  if (ms->fd != -1) abort(); /* XXX remove check */

  /* If none of the IP addresses for this hostname work, remove the
   * server from the live_server list (we assume they're live by
   * default) and return -1. */
  mc_deactivate_server(mc, ms);
  return -1;
}


static void
mc_server_free(struct memcache_server *ms) {
  if (ms == NULL)
    return;

  if (ms->hostinfo != NULL)
    freeaddrinfo(ms->hostinfo);

  if (ms->fd != -1) {
    if (close(ms->fd) != 0)
      warn("%s:%u\tclose()", __FILE__, __LINE__);
    ms->fd = -1;
  }

  free(ms);
}


static struct memcache_server *
mc_server_new(struct memcache *mc) {
  struct memcache_server *ms;

  ms = (struct memcache_server *)malloc(sizeof(struct memcache_server));
  if (ms != NULL) {
    bzero(ms, sizeof(struct memcache_server));

    /* Set any default values */
    ms->active = 't';
    ms->fd = -1;
    ms->flags = -1;

    /* Defaults from mc */
    ms->tv.tv_sec = mc->tv.tv_sec;
    ms->tv.tv_usec = mc->tv.tv_usec;
  }

  return ms;
}


static int
mc_server_resolve(struct memcache_server *ms) {
  struct addrinfo hints, *res;
  int ret;

  /* Resolve the hostname ahead of time */
  bzero(&hints, sizeof(struct addrinfo));
  hints.ai_family = PF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  ret = getaddrinfo(ms->hostname, ms->port, &hints, &(ms->hostinfo));
  if (ret != 0)
    return ret;

  for (res = ms->hostinfo; res != NULL; res = res->ai_next)
    ms->num_addrs++;

  return 0;
}


void
mc_server_stats_free(struct memcache_server_stats *s) {
  if (s->version != NULL)
    free(s->version);
  free(s);
}


struct memcache_server_stats *
mc_stats(struct memcache *mc) {
  struct memcache_server *ms;
  struct memcache_server_stats *s, *ts;

  s = mc_server_stats_new();
  for (ms = mc->server_list.tqh_first; ms != NULL; ms = ms->entries.tqe_next) {
    ts = mc_server_stats(mc, ms);
    if (ts == NULL)
      continue;

    /* Merge the values from ts into s.  Any per-server specific data
     * is pulled from the last server. */
    s->pid = ts->pid;
    s->uptime = ts->uptime;
    s->time = ts->time;
    if (s->version == NULL && ts->version != NULL)
      s->version = strdup(ts->version);

    s->rusage_user.tv_sec += ts->rusage_user.tv_sec;
    s->rusage_user.tv_usec += ts->rusage_user.tv_usec;
    if (s->rusage_user.tv_usec > 1000000) {
      s->rusage_user.tv_sec += s->rusage_user.tv_usec / 1000000;
      s->rusage_user.tv_usec -= 1000000 * (s->rusage_user.tv_usec / 1000000);
    }

    s->rusage_system.tv_sec += ts->rusage_system.tv_sec;
    s->rusage_system.tv_usec += ts->rusage_system.tv_usec;
    if (s->rusage_system.tv_usec > 1000000) {
      s->rusage_system.tv_sec += s->rusage_system.tv_usec / 1000000;
      s->rusage_system.tv_usec -= 1000000 * (s->rusage_system.tv_usec / 1000000);
    }

    s->curr_items += ts->curr_items;
    s->total_items += ts->total_items;
    s->bytes = s->bytes + ts->bytes;
    s->curr_connections += ts->curr_connections;
    s->total_connections += ts->total_connections;
    s->connection_structures += ts->connection_structures;
    s->cmd_get += ts->cmd_get;
    s->cmd_set += ts->cmd_set;
    s->get_hits += ts->get_hits;
    s->get_misses += ts->get_misses;
    s->bytes_read += ts->bytes_read;
    s->bytes_written += ts->bytes_written;
    s->limit_maxbytes += ts->limit_maxbytes;

    mc_server_stats_free(ts);
  }

  return s;
}


static struct memcache_server_stats *
mc_server_stats_new(void) {
  struct memcache_server_stats *s;
  s = (struct memcache_server_stats *)malloc(sizeof(struct memcache_server_stats));
  if (s != NULL) {
    bzero(s, sizeof(struct memcache_server_stats));
  }

  return s;
}


struct memcache_server_stats *
mc_server_stats(struct memcache *mc, struct memcache_server *ms) {
  struct memcache_server_stats *s;
  char *cp, *cur, *line;

  if (mc_server_connect(mc, ms) == -1)
    return NULL;

  if (write(ms->fd, "stats\r\n", strlen("stats\r\n")) < 0) {
    warn("%s:%u\twrite()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    return NULL;
  }

  s = mc_server_stats_new();
  if (s == NULL)
    return NULL;

  /* Switch to non-blocking io */
  mc_server_block(ms, 1);

  /* Reset our buffer */
  mc_reset_buf(mc);

  for(;;) {
    cur = line = mc_get_line(mc, ms);

    if (cur != NULL && memcmp(cur, "STAT ", strlen("STAT ")) == 0) {
      cur = &cur[strlen("STAT ")];

      /* Time to loop through the potential stats keys.  Joy.  This is
       * going to complete in O(1 + 2 + 3 ... N) operations (currently
       * 190).  Ugh.  Don't know of a better way to handle this
       * without a hash.  Besides, this is just stats. */
      if (memcmp(cur, "pid ", strlen("pid ")) == 0) {
	cur = &cur[strlen("pid ")];
	s->pid = (pid_t)strtol(cur, &cp, 10);
	if (s->pid == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid pid \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "uptime ", strlen("uptime ")) == 0) {
	cur = &cur[strlen("uptime ")];
	s->uptime = (time_t)strtol(cur, &cp, 10);
	if (s->uptime == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid uptime \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "time ", strlen("time ")) == 0) {
	cur = &cur[strlen("time ")];
	s->time = (time_t)strtol(cur, &cp, 10);
	if (s->time == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid time \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "version ", strlen("version ")) == 0) {
	cur = &cur[strlen("version ")];
	for (cp = cur; !isspace(*cp); cp++);
	s->version = (char *)malloc((size_t)(cp - cur + 1));
	if (s->version == NULL) {
	  warn("%s:%u\tmalloc()", __FILE__, __LINE__);
	} else {
	  memcpy(s->version, cur, (size_t)(cp - cur));
	  s->version[(size_t)(cp - cur)] = '\0';
	}
      } else if (memcmp(cur, "rusage_user ", strlen("rusage_user ")) == 0) {
	cur = &cur[strlen("rusage_user ")];
	s->rusage_user.tv_sec = (int32_t)strtol(cur, &cp, 10);
	if (s->rusage_user.tv_sec == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid rusage_user seconds \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
	else {
	  cur = cp; /* advance cursor */
	  if (*cur != '.')
	    warn("%s:%u\tProtocol violation: invalid separator: %p", __FILE__, __LINE__, *cur);
	  else {
	    cur++; /* advance past colon */
	    s->rusage_user.tv_usec = (int32_t)strtol(cur, &cp, 10);
	    if (s->rusage_user.tv_usec == 0 && (errno == EINVAL || errno == ERANGE))
	      err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid rusage_user microseconds \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
	  }
	}

      } else if (memcmp(cur, "rusage_system ", strlen("rusage_system ")) == 0) {
	cur = &cur[strlen("rusage_system ")];
	s->rusage_system.tv_sec = (int32_t)strtol(cur, &cp, 10);
	if (s->rusage_system.tv_sec == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid rusage_system seconds \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
	else {
	  cur = cp; /* advance cursor */
	  if (*cur != '.')
	    err(EX_PROTOCOL, "%s:%u\tProtocol violation: invalid separator: %p", __FILE__, __LINE__, *cur);
	  else {
	    cur++; /* advance past colon */
	    s->rusage_system.tv_usec = (int32_t)strtol(cur, &cp, 10);
	    if (s->rusage_system.tv_usec == 0 && (errno == EINVAL || errno == ERANGE))
	      err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid rusage_system microseconds \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
	  }
	}
      } else if (memcmp(cur, "curr_items ", strlen("curr_items ")) == 0) {
	cur = &cur[strlen("curr_items ")];
	s->curr_items = (u_int32_t)strtol(cur, &cp, 10);
	if (s->curr_items == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid curr_items \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "total_items ", strlen("total_items ")) == 0) {
	cur = &cur[strlen("total_items ")];
	s->total_items = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->total_items == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid total_items \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "bytes ", strlen("bytes ")) == 0) {
	cur = &cur[strlen("bytes")];
	s->bytes = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->bytes == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid bytes \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "curr_connections ", strlen("curr_connections ")) == 0) {
	cur = &cur[strlen("curr_connections ")];
	s->curr_connections = (u_int32_t)strtol(cur, &cp, 10);
	if (s->curr_connections == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid curr_connections \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "total_connections ", strlen("total_connections ")) == 0) {
	cur = &cur[strlen("total_connections ")];
	s->total_connections = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->total_connections == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid total_connections \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "connection_structures ", strlen("connection_structures ")) == 0) {
	cur = &cur[strlen("connection_structures ")];
	s->connection_structures = (u_int32_t)strtol(cur, &cp, 10);
	if (s->connection_structures == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtol(): invalid connection_structures \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "cmd_get ", strlen("cmd_get ")) == 0) {
	cur = &cur[strlen("cmd_get ")];
	s->cmd_get = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->cmd_get == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid cmd_get \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "cmd_set ", strlen("cmd_set ")) == 0) {
	cur = &cur[strlen("cmd_set ")];
	s->cmd_set = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->cmd_set == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid cmd_set \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "get_hits ", strlen("get_hits ")) == 0) {
	cur = &cur[strlen("get_hits ")];
	s->get_hits = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->get_hits == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid get_hits \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "get_misses ", strlen("get_misses ")) == 0) {
	cur = &cur[strlen("get_misses ")];
	s->get_misses = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->get_misses == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid get_misses \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "bytes_read ", strlen("bytes_read ")) == 0) {
	cur = &cur[strlen("bytes_read ")];
	s->bytes_read = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->bytes_read == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid bytes_read \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "bytes_written ", strlen("bytes_written ")) == 0) {
	cur = &cur[strlen("bytes_written ")];
	s->bytes_written = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->bytes_written == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid bytes_written \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else if (memcmp(cur, "limit_maxbytes ", strlen("limit_maxbytes ")) == 0) {
	cur = &cur[strlen("limit_maxbytes ")];
	s->limit_maxbytes = (u_int64_t)strtoll(cur, &cp, 10);
	if (s->limit_maxbytes == 0 && (errno == EINVAL || errno == ERANGE))
	  err(EX_PROTOCOL, "%s:%u\tstrtoll(): invalid limit_maxbytes \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      } else {
	for (cp = cur; !isspace(*cp); cp++);
	warn("%s:%u\tProtocol error: unknown stat \"%.*s\"", __FILE__, __LINE__, (size_t)(cp - cur), cur);
      }

      /* Now that we've sucked in our stats value, set our cursor to
       * the end of the value. */
      cp = memchr(cur, (int)'\r', mc->size - (size_t)(cur - mc->buf));
      if (cp == NULL || cp[1] != '\n') {
	warnx("Protocol error: anticipated end of stats value: not at end of stats value");
	mc_server_stats_free(s);
	mc_deactivate_server(mc, ms);
	return NULL;
      }
    } else if (cur != NULL && memcmp(cur, "END", strlen("END")) == 0) {
      /* We're done reading in stats. */
      break;
    } else {
      errx(EX_PROTOCOL, "%s:%u\tUnable to handle response: \"%.*s\"", __FILE__, __LINE__, 15, cur);
    }
  }

  /* Switch to blocking io */
  mc_server_block(ms, 0);

  return s;
}


void
mc_timeout(struct memcache *mc, int sec, int msec) {
  mc->tv.tv_sec = sec;
  mc->tv.tv_usec = msec;
}


static void
mc_reset_buf(struct memcache *mc) {
  /* Reset our cursor to the front of the buffer */
  mc->cur = mc->start = mc->buf;
  mc->read_cur = NULL;
}


static void
mc_server_block(struct memcache_server *ms, int use_nbio) {
  if (ms->flags == -1) {
    ms->flags = fcntl(ms->fd, F_GETFL, 0);
    if (ms->flags == -1)
      err(EX_OSERR, "%s:%u\tfcntl(F_GETFL)", __FILE__, __LINE__);
  }

  if (use_nbio) {
    /* Switch to non-blocking io */
    if (!(ms->flags & O_NDELAY)) {
      ms->flags |= O_NDELAY;
      goto set_flags;
    }
  } else {
    if (ms->flags & O_NDELAY) {
      ms->flags &= ~O_NDELAY;
      goto set_flags;
    }
  }
  return;

  set_flags:
  if (fcntl(ms->fd, F_SETFL, ms->flags) < 0)
    err(EX_OSERR, "%s:%u\tfcntl(F_SETFL)", __FILE__, __LINE__);
}


int
mc_set(struct memcache *mc, const char *key, const size_t key_len, const u_int16_t flags,
       const time_t expire, const size_t bytes, const void *val) {
  return mc_storage_cmd(mc, "set ", strlen("set "), key, key_len, flags, expire, bytes, val);
}


static int
mc_storage_cmd(struct memcache *mc, const char *cmd, const size_t cmd_len,
	       const char *key, const size_t key_len, const u_int16_t flags,
	       const time_t expire, const size_t bytes, const void *val) {
  struct memcache_server *ms;
  u_int32_t hash;
  char *cur;
  size_t buf_left;
  int i;
  struct iovec wv[10];

  hash = mc_hash_key(key, key_len);
  ms = mc_find_server(mc, hash);
  if (ms == NULL) {
    warnx("%s:%u\tUnable to find a valid server", __FILE__, __LINE__);
    return 1;
  }

  if (mc_server_connect(mc, ms) == -1)
    return 2;

  /* Reset the buffer so that I can chop it up to use it as a scratch
   * pad for converting numbers from binary to ascii.  *hears the baby
   * jesus crying* */
  mc_reset_buf(mc);
  cur = mc->buf;
  buf_left = mc->size;

  wv[0].iov_base = cmd;
  wv[0].iov_len = cmd_len;
  wv[1].iov_base = key;
  wv[1].iov_len = key_len;
  wv[2].iov_base = " ";
  wv[2].iov_len = strlen(wv[2].iov_base);

  /* Convert flags to a string */
  i = snprintf(cur, buf_left, "%hu", flags);
  if (i < 1)
    err(EX_SOFTWARE, "%s:%u\tsnprintf()", __FILE__, __LINE__);

  wv[3].iov_base = cur;		/* Note where our flags string is located */
  wv[3].iov_len = i;
  wv[4].iov_base = wv[2].iov_base;	/* Add a space */
  wv[4].iov_len = wv[2].iov_len;

  cur = &cur[++i];	/* advance cursor past trailing '\0' */
  buf_left -= i;	/* Note our consumption of some buffer */

  /* Convert expiration time to a string */
  i = snprintf(cur, buf_left, "%lu", expire);
  if (i < 1)
    err(EX_SOFTWARE, "%s:%u\tsnprintf()", __FILE__, __LINE__);

  wv[5].iov_base = cur;		/* Note where our flags string is located */
  wv[5].iov_len = i;
  wv[6].iov_base = wv[2].iov_base;	/* Add a space */
  wv[6].iov_len = wv[2].iov_len;

  cur = &cur[++i];	/* advance cursor past trailing '\0' */
  buf_left -= i;	/* Note our consumption of some buffer */

  /* Convert bytes to a string */
  i = snprintf(cur, buf_left, "%zu", bytes);
  if (i < 1)
    err(EX_SOFTWARE, "%s:%u\tsnprintf()", __FILE__, __LINE__);

  wv[7].iov_base = cur;	/* Note where our flags string is located */
  wv[7].iov_len = i;
  wv[8].iov_base = "\r\n";	/* Newline */
  wv[8].iov_len = strlen(wv[8].iov_base);

  cur = &cur[++i];	/* advance cursor past trailing '\0' */
  buf_left -= i;	/* Note our consumption of some buffer */

  /* Add the data */
  wv[9].iov_base = val;
  wv[9].iov_len = bytes;

  /* Add another carrige return */
  wv[10].iov_base = wv[8].iov_base;
  wv[10].iov_len = wv[8].iov_len;

  if (writev(ms->fd, wv, 11) < 0) {
    warn("%s:%u\twritev()", __FILE__, __LINE__);
    mc_deactivate_server(mc, ms);
    /* XXX Should we recursively attempt to try this query on the
     * remaining servers in the cluster if the writev() fails?
     * Eventually we'd fail once all servers were exhausted?  For now,
     * just fail and return NULL. */
    return 3;
  }

  mc_server_block(ms, 1);
  mc_reset_buf(mc);
  cur = mc_get_line(mc, ms);
  if (cur != NULL && memcmp(cur, "STORED", strlen("STORED")) == 0) {
    /* Groovy Tuesday */
    mc_server_block(ms, 0);
    return 0;
  } else if (cur != NULL && memcmp(cur, "NOT_STORED", strlen("NOT_STORED")) == 0) {
    /* Fuck beans.  That was them, wasn't it? */
    mc_server_block(ms, 0);
    return -1;
  } else {
    errx(EX_PROTOCOL, "%s:%u\tProtocol error", __FILE__, __LINE__);
  }
}
-------------- next part --------------
/* Copyright (c) 2004 Sean Chittenden <sean at chittenden.org>
 *
 * All rights reserved until such time as this code is released with
 * an official license.  Use of this code for commerical,
 * non-commercial, and personal purposes is encouraged.  Public forks
 * of this code is permitted so long as the fork and its decendents
 * use this copyright/license.  Use of this software in programs
 * released under the GPL programs is expressly prohibited by the
 * author (ie, BSD, closed source, or artistic license is okay, but
 * GPL is not). */

#ifndef MEMCACHE_H
#define MEMCACHE_H

#include <netdb.h>
#include <sys/queue.h>
#include <sys/time.h>

/* Our initial read(2) buffer has to be long enough to read the
 * first line of the response.  ie:
 *
 * "VALUE #{'k' * 250} #{2 ** 15} #{2 ** 32}\r\n.length => 275
 *
 * However, since we want to avoid the number of system calls
 * necessary, include trailing part of the protocol in our estimate:
 *
 * "\r\nEND\r\n".length => 7
 *
 * Which yields a manditory limit of 282 bytes for a successful
 * response.  If we wish to try and get lucky with our first read(2)
 * call and be able to read(2) in small values without making a second
 * read(2) call, pad this number with a sufficiently large byte value.
 * If most of your keys are 512B, then a GET_INIT_BUF_SIZE of 794
 * would be prudent (512 + 282).
 *
 * The default value of 1024 means that values less than 724 bytes
 * will always be read(2) via the first read(2) call.  Increasing this
 * value to large values is not beneficial.  If a second read(2) call
 * is necessary, the read(2) will be made with a sufficiently large
 * buffer already allocated. */
#define GET_INIT_BUF_SIZE 1024

#define MAX_KEY_LEN 250

struct memcache_res {
  const char *key;	/* key */
  size_t len;		/* length of key */
  u_int32_t hash;	/* hash of the key */
  void *val;		/* the value */
  size_t bytes;		/* length of val */

  /* If size is zero (default), the memory for val is automatically
   * allocated using malloc(3).  If size is zero, free_on_delete is
   * set to 't' by default.
   *
   * If size is non-zero, libmemcache(3) assumes that the caller has
   * set val to an available portion of memory that is size bytes
   * long.  libmemcache(3) will only populate val with as many bytes
   * as are specified by size (ie, it will trim the value in order to
   * fit into val). If size is non-zero, free_on_delete is set to
   * 'f' by default. */
  size_t size;
  TAILQ_ENTRY(memcache_res) entries;
  u_int16_t flags;

  /* If free_on_delete is 't'rue, val will be free(3)'ed on when this
   * struct is cleaned up via mc_res_free() or the request is cleaned
   * up via mc_req_free().
   *
   * If free_on_delete is 'f'alse, val will not be free(3)'ed when
   * this response or its parent request are cleaned up. */
  char free_on_delete;
};

struct memcache_req {
  TAILQ_HEAD(memcache_res_list, memcache_res) query;
  u_int16_t num_keys;
};

struct memcache_server {
  /* The hostname of the server. */
  const char *hostname;

  /* Port number of the host we're connecting to. */
  const char *port;

  /* The file descriptor for this server */
  int fd;

  /* The file descriptor flags */
  int flags;

  /* The timeout for this server */
  struct timeval tv;

  /* Is this particular server active or not?
   *
   * 'd' == Down	Last request was unsuccessful
   * 'n' == No host	The hostname doesn't exist
   * 't' == Try		Haven't connected to it yet, will attempt
   * 'u' == Up		Has been connected to successfully
   */
  char active;

  /* A cached copy of the looked up host. */
  struct addrinfo *hostinfo;

  /* The number of addresses in the cached copy.  If there is more
   * than one per DNS entry (discouraged), we establish a connection
   * to them all. */
  u_int32_t num_addrs;

  /* Misc list bits */
  TAILQ_ENTRY(memcache_server) entries;
};


struct memcache_server_stats {
  pid_t pid;
  time_t uptime;
  time_t time;
  char *version;
  struct timeval rusage_user;
  struct timeval rusage_system;
  u_int32_t curr_items;
  u_int64_t total_items;
  u_int64_t bytes;
  u_int32_t curr_connections;
  u_int64_t total_connections;
  u_int32_t connection_structures;
  u_int64_t cmd_get;
  u_int64_t cmd_set;
  u_int64_t get_hits;
  u_int64_t get_misses;
  u_int64_t bytes_read;
  u_int64_t bytes_written;
  u_int64_t limit_maxbytes;
};


struct memcache {
  /* The default timeout for all servers */
  struct timeval tv;

  /* The default read(2) size when reading a response. */
  size_t read_size;

  /* The complete list of servers */
  TAILQ_HEAD(memcache_server_list, memcache_server) server_list;

  /* A buffer for data */
  char *buf;

  /* A cursor for where we are in the buffer */
  char *cur;

  /* A pointer to where data should be appended with future read(2)
   * calls. */
  char *read_cur;

  /* A pointer to the start of the current line in the buffer. */
  char *start;

  /* The allocated size of the buffer */
  size_t size;

  /* The number of servers in live_servers */
  u_int32_t num_live_servers;

  /* An array of usable memcache_servers */
  struct memcache_server *live_servers[];
};


/* Adds a given key to the cache */
int			 mc_add(struct memcache *mc, const char *key, const size_t key_len,
				const u_int16_t flags, const time_t expire,
				const size_t bytes, const void *val);

/* Gets the value from memcache and allocates the data for the
 * caller.  It is the caller's responsibility to free the returned
 * value.  mc_get() is the preferred interface, however. */
void			*mc_aget(struct memcache *mc, const char *key, size_t len);

/* Disconnects from a given server and marks it as down. */
void			 mc_deactivate_server(struct memcache *mc, struct memcache_server *ms);

/* Decrements a given key */
u_int32_t		 mc_decr(struct memcache *mc, const char *key, const size_t key_len, const u_int32_t val);

/* Deletes a given key */
int			 mc_delete(struct memcache *mc, const char *key, const size_t key_len, const time_t hold);

/* When given a hash value, this function returns the appropriate
 * server to connect to in order to find the key. */
struct memcache_server	*mc_find_server(struct memcache *mc, const u_int32_t hash);

/* Flushes all keys */
int			 mc_flush_all(struct memcache *mc, const char *key, const size_t key_len);

/* cleans up a memcache object. */
void			 mc_free(struct memcache *mc);

/* mc_get() is the preferred method of accessing memcache.  It
 * accepts multiple keys and lets a user (should they so choose)
 * perform memory caching to reduce the number of malloc(3) calls
 * mades. */
void			 mc_get(struct memcache *mc, struct memcache_req *req);

/* Generates a hash value from a given key */
u_int32_t		 mc_hash_key(const char *key, size_t len);

/* Increments a given key */
u_int32_t		 mc_incr(struct memcache *mc, const char *key, const size_t key_len, const u_int32_t val);

/* Allocates a new memcache object */
struct memcache	*mc_new(void);

/* Replaces a given key to the cache */
int			 mc_replace(struct memcache *mc, const char *key, const size_t key_len,
				    const u_int16_t flags, const time_t expire,
				    const size_t bytes, const void *val);

/* Adds a key to a given request */
struct memcache_res	*mc_req_add(struct memcache_req *req, const char *key, size_t len);

/* Cleans up a given request and its subsequent responses.  If
 * free_on_delete is true (default), it will clean up the value too.
 * If free_on_delete is false, however, it is the caller's
 * responsibility to free the value.  To prevent double free(3)
 * errors, if a value is free(3)'ed before mc_req_free() is called,
 * set val to NULL. */
void			 mc_req_free(struct memcache_req *req);

/* Allocates a new memcache request object. */
struct memcache_req	*mc_req_new(void);

/* Cleans up an individual response object.  Normally this is not
 * necessary as a call to mc_req_free() will clean up its response
 * objects. */
void			 mc_res_free(struct memcache_req *req, struct memcache_res *res);

/* Adds a server to the list of available servers.  By default,
 * servers are assumed to be available. */
void			 mc_server_add(struct memcache *mc, const char *hostname, const char *port);

/* Cleans up a given stat's object */
void			 mc_server_stats_free(struct memcache_server_stats *s);

/* Gets a stats object from the given server.  It is the caller's
 * responsibility to cleanup the resulting object via
 * mc_server_stats_free(). */
struct memcache_server_stats	*mc_server_stats(struct memcache *mc, struct memcache_server *ms);

/* Sets a given key */
int			 mc_set(struct memcache *mc, const char *key, const size_t key_len,
				const u_int16_t flags, const time_t expire,
				const size_t bytes, const void *val);

/* Creates a stats object for all available servers and returns the
 * cumulative stats.  Per host-specific data is generally the same as
 * the last server querried. */
struct memcache_server_stats	*mc_stats(struct memcache *mc);

/* Sets the default timeout for new servers. */
void mc_timeout(struct memcache *mc, int sec, int usec);



/* APIs that should be implemented: */

/* Resets all hosts that are down to try */
void mc_server_reset_all_active(struct memcache *mc);

/* Resets a given host back to a try state */
void mc_server_reset_active(struct memcache *mc, const char *hostname, int port);

/* Resets all dns entries */
void mc_server_reset_all_dns(struct memcache *mc);

/* Resets only one host's DNS cache */
void mc_server_reset_dns(struct memcache *mc, const char *hostname, int port);

/* Disconnects from all memcache servers */
void mc_server_disconnect_all(struct memcache *mc);

/* Disconnects from one memcache server */
void mc_server_disconnect(struct memcache *mc, const char *hostname, int port);

#ifdef TCP_NODELAY
/* Enable/disable TCP_NODELAY */
void mc_nodelay_enable(struct memcache *mc, int enable);

/* Enable/disable TCP_NODELAY for a given server */
void mc_server_nodelay_enable(struct memcache_server *ms, int enable);
#endif

/* Set the timeout on a per server basis */
void mc_server_timeout(struct memcache_server *ms, int sec, int usec);

/* Set the number of seconds you're willing to wait in total for a
 * response. ??? */

#endif
-------------- next part --------------
/* This is a test program for the memcache C API */

#include <err.h>
#include <sysexits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "libmemcache.h"

void test_add(struct memcache *mc);
void test_aget(struct memcache *mc, const char *key);
void test_decr(struct memcache *mc);
void test_delete(struct memcache *mc);
void test_incr(struct memcache *mc);
void test_loop(struct memcache *mc);
void test_multi_get(struct memcache *mc);
void test_multi_get_loop(struct memcache *mc);
void test_set(struct memcache *mc);
void test_stats(struct memcache *mc, int out);
void test_replace(struct memcache *mc);

int
main(int argc, char *argv[]) {
  struct memcache *mc = NULL;

  mc = mc_new();
  if (mc == NULL)
    err(EX_OSERR, "Unable to allocate a new memcache object");

  mc_server_add(mc, "127.0.0.1", "11211");

  warnx("starting incr test");
  test_incr(mc);

  warnx("starting decr test");
  test_decr(mc);

  warnx("starting add test");
  test_add(mc);

  warnx("starting replace test");
  test_replace(mc);

  warnx("starting delete test");
  test_delete(mc);

  warnx("starting set test");
  test_set(mc);

  mc_add(mc, "foo", strlen("foo"), 0, 0, 4, "test");

  warnx("starting multi_get test");
  test_multi_get(mc);

  warnx("starting stats run");
  test_stats(mc, 1);

  warnx("starting multi_get loop test");
  test_multi_get_loop(mc);

  warnx("starting aget loop test");
  test_loop(mc);

  warnx("starting aget miss test");
  test_aget(mc, "frob");

  for (int i = 0; i < 10000; i++) {
    test_multi_get_loop(mc);
    test_set(mc);
    test_loop(mc);
    test_replace(mc);
    test_aget(mc, "frob");
    test_stats(mc, 0);
  }

  test_stats(mc, 1);

  mc_free(mc);

  return EX_OK;
}


void
test_stats(struct memcache *mc, int out) {
  struct memcache_server_stats *s;

  s = mc_stats(mc);
  if (s == NULL)
    warnx("Unable to get stats");
  else {
    if (out) {
      printf("pid:\t\t\t%u\n", s->pid);
      printf("uptime:\t\t\t%llu\n", (u_int64_t)s->uptime);
      printf("time:\t\t\t%llu\n", (u_int64_t)s->time);
      printf("version:\t\t%s\n", s->version);
      printf("rusage_user:\t\t%u.%us\n", s->rusage_user.tv_sec, s->rusage_user.tv_usec);
      printf("rusage_system:\t\t%u.%us\n", s->rusage_system.tv_sec, s->rusage_system.tv_usec);
      printf("curr_items:\t\t%u\n", s->curr_items);
      printf("total_items:\t\t%llu\n", s->total_items);
      printf("bytes:\t\t\t%llu\n", s->bytes);
      printf("curr_connections:\t%u\n", s->curr_connections);
      printf("total_connections:\t%llu\n", s->total_connections);
      printf("connection_structures:\t%u\n", s->connection_structures);
      printf("cmd_get:\t\t%llu\n", s->cmd_get);
      printf("cmd_set:\t\t%llu\n", s->cmd_set);
      printf("get_hits:\t\t%llu\n", s->get_hits);
      printf("get_misses:\t\t%llu\n", s->get_misses);
      printf("bytes_read:\t\t%llu\n", s->bytes_read);
      printf("bytes_written:\t\t%llu\n", s->bytes_written);
      printf("limit_maxbytes:\t\t%llu\n", s->limit_maxbytes);
    }
  }

  mc_server_stats_free(s);
}


void
test_add(struct memcache *mc) {
  int i;
  size_t left, len ;
  char buf[50];
  char *cp;

  bzero(&buf, 50);
  memcpy(&buf, "testing_key", strlen("testing_key"));
  cp = &buf[strlen("testing_key")];
  left = 50 - strlen("testing_key");
  for (i = 0; i < 100; i++) {
    len = snprintf(cp, left, "%d", i);
    if (len > 0) {
      if (mc_add(mc, buf, len + strlen("testing_key"), 0, 0, 4, "test") == 0) {
	/* Worked */
      } else {
	/* Skunk fucked */
	warnx("Unable to add a key");
      }
    }
  }
}


void
test_aget(struct memcache *mc, const char *key) {
  void *val;

  val = mc_aget(mc, key, strlen(key));
  if (val != NULL) {
    free(val);
  }
}


void
test_loop(struct memcache *mc) {
  const char *key = "foo";
  void *val;
  u_int32_t i;
  size_t len;

  len = strlen(key);
  for (i = 0; i < 100; i++) {
    val = mc_aget(mc, key, len);
    if (val == NULL)
      errx(EX_SOFTWARE, "%s:%u:[%u]\tUnable to find value for key \"%s\"", __FILE__, __LINE__, i, key);
    free(val);
  }
}


void
test_multi_get(struct memcache *mc) {
  struct memcache_req *req;
  struct memcache_res *res;

  req = mc_req_new();
  mc_req_add(req, "foo", 3);
  mc_req_add(req, "frob", 4);
  res = mc_req_add(req, "foo", 3);
  res->size = 1024;
  res->val = malloc(res->size);
  res->free_on_delete = 't';

  mc_get(mc, req);

  mc_res_free(req, res);
}


void
test_multi_get_loop(struct memcache *mc) {
  struct memcache_req *req;
  struct memcache_res *res;
  int i;

  req = mc_req_new();
  res = mc_req_add(req, "foo", 3);
  res->size = 1024;
  res->val = malloc(res->size);
  res->free_on_delete = 't';

  for (i = 0; i < 10; i++)
    mc_get(mc, req);

   mc_req_free(req);
}


void
test_replace(struct memcache *mc) {
  int i;
  size_t left, len ;
  char buf[50];
  char *cp;

  bzero(&buf, 50);
  memcpy(&buf, "testing_key", strlen("testing_key"));
  cp = &buf[strlen("testing_key")];
  left = 50 - strlen("testing_key");
  for (i = 0; i < 100; i++) {
    len = snprintf(cp, left, "%d", i);
    if (len > 0) {
      if (mc_replace(mc, buf, len + strlen("testing_key"), 0, 0, 5, "test2") == 0) {
	/* Worked */
      } else {
	/* Skunk fucked */
	warnx("Unable to add a key");
      }
    }
  }
}


void
test_set(struct memcache *mc) {
  int i;
  size_t left, len ;
  char buf[50];
  char *cp;

  bzero(&buf, 50);
  memcpy(&buf, "testing_key", strlen("testing_key"));
  cp = &buf[strlen("testing_key")];
  left = 50 - strlen("testing_key");
  for (i = 0; i < 100; i++) {
    len = snprintf(cp, left, "%d", i);
    if (len > 0) {
      if (mc_set(mc, buf, len + strlen("testing_key"), 0, 0, 6, "foobar") == 0) {
	/* Worked */
      } else {
	/* Skunk fucked */
	warnx("Unable to add a key");
      }
    }
  }
}


void
test_delete(struct memcache *mc) {
  int i;
  size_t left, len ;
  char buf[50];
  char *cp;

  bzero(&buf, 50);
  memcpy(&buf, "testing_key", strlen("testing_key"));
  cp = &buf[strlen("testing_key")];
  left = 50 - strlen("testing_key");
  for (i = 0; i < 100; i++) {
    len = snprintf(cp, left, "%d", i);
    if (len > 0) {
      if (mc_delete(mc, buf, len + strlen("testing_key"), 0) == 0) {
	/* Worked */
      } else {
	/* Skunk fucked */
	warnx("Unable to remove a key");
      }
    }
  }
}


void
test_incr(struct memcache *mc) {
  u_int32_t i, val;

  mc_set(mc, "atop_key", strlen("atop_key"), 0, 0, 1, "0");
  for (i = 1; i < 1000; i++) {
    val = mc_incr(mc, "atop_key", strlen("atop_key"), 1);
    if (val == i) {
      /* Worked */
    } else {
      /* Skunk fucked */
      warnx("Unable to incr a key");
    }
  }
}


void
test_decr(struct memcache *mc) {
  u_int32_t i, val;

  mc_set(mc, "atop_key", strlen("atop_key"), 0, 0, 4, "1000");
  for (i = 1000; i > 0; i--) {
    val = mc_decr(mc, "atop_key", strlen("atop_key"), 1);
    if (val == i - 1) {
      /* Worked */
    } else {
      /* Skunk fucked */
      warnx("Unable to decr a key");
    }
  }

  if (mc_decr(mc, "atop_key", strlen("atop_key"), 1) != 0)
    warnx("underflow");
}
-------------- next part --------------

-- 
Sean Chittenden


More information about the memcached mailing list