Index: items.c =================================================================== --- items.c (revision 1) +++ items.c (revision 13) @@ -71,7 +71,7 @@ } /*@null@*/ -item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) { +item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes, uint32_t identifier) { uint8_t nsuffix; item *it; char suffix[40]; @@ -133,6 +133,7 @@ it->exptime = exptime; memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix); it->nsuffix = nsuffix; + it->identifier = identifier; return it; } Index: items.h =================================================================== --- items.h (revision 1) +++ items.h (revision 13) @@ -1,7 +1,7 @@ /* See items.c */ void item_init(void); /*@null@*/ -item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes); +item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes, uint32_t identifier); void item_free(item *it); bool item_size_ok(const size_t nkey, const int flags, const int nbytes); Index: memcached.c =================================================================== --- memcached.c (revision 1) +++ memcached.c (revision 13) @@ -715,8 +715,9 @@ char *key = ITEM_key(it); bool delete_locked = false; item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked); - int stored = 0; - + int stored = 0; + uint32_t in_memory_ptr; + if (old_it != NULL && comm == NREAD_ADD) { /* add only adds a nonexistent item, but promote to head of LRU */ do_item_update(old_it); @@ -724,6 +725,28 @@ /* replace only replaces an existing value; don't store */ } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) { /* replace and add can't override delete locks; don't store */ + } else if (comm == NREAD_CAS) { + /* validiate cas */ + item *itmp=item_get(key, it->nkey); + /* Release the reference */ + if(itmp) { + item_remove(itmp); + } + in_memory_ptr = (uint32_t)itmp; + + if(in_memory_ptr == it->identifier) + { + if (delete_locked) + old_it = do_item_get_nocheck(key, it->nkey); + + if (old_it != NULL) + do_item_replace(old_it, it); + else + do_item_link(it); + + stored = 1; + } + } else { /* "set" commands can override the delete lock window... in which case we have to find the old hidden item @@ -731,17 +754,19 @@ item_get.... because we need to replace it */ if (delete_locked) old_it = do_item_get_nocheck(key, it->nkey); - + if (old_it != NULL) do_item_replace(old_it, it); else do_item_link(it); + stored = 1; } if (old_it) do_item_remove(old_it); /* release our reference */ + return stored; } @@ -1052,13 +1077,14 @@ } /* ntokens is overwritten here... shrug.. */ -static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) { +static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_key_ptr) { char *key; size_t nkey; int i = 0; item *it; token_t *key_token = &tokens[KEY_TOKEN]; - + char suffix[255]; + uint32_t in_memory_ptr; assert(c != NULL); if (settings.managed) { @@ -1108,12 +1134,32 @@ * key * " " + flags + " " + data length + "\r\n" + data (with \r\n) */ - if (add_iov(c, "VALUE ", 6) != 0 || - add_iov(c, ITEM_key(it), it->nkey) != 0 || - add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) - { - break; - } + + if(return_key_ptr == true) + { + in_memory_ptr = (uint32_t)item_get(key, nkey); + + sprintf(suffix," %d %d %lu\r\n", atoi(ITEM_suffix(it) + 1), it->nbytes - 2, in_memory_ptr); + if (add_iov(c, "VALUE ", 6) != 0 || + add_iov(c, ITEM_key(it), it->nkey) != 0 || + add_iov(c, suffix, strlen(suffix)) != 0 || + add_iov(c, ITEM_data(it), it->nbytes) != 0) + { + break; + } + + } + else + { + if (add_iov(c, "VALUE ", 6) != 0 || + add_iov(c, ITEM_key(it), it->nkey) != 0 || + add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) + { + break; + } + } + + if (settings.verbose > 1) fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it)); @@ -1152,6 +1198,7 @@ fprintf(stderr, ">%d END\n", c->sfd); add_iov(c, "END\r\n", 5); + if (c->udp && build_udp_headers(c) != 0) { out_string(c, "SERVER_ERROR out of memory"); } @@ -1162,14 +1209,16 @@ return; } -static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) { +static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) { char *key; size_t nkey; int flags; time_t exptime; int vlen; + uint32_t req_memory_ptr; + item *it; - + assert(c != NULL); if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { @@ -1184,6 +1233,16 @@ exptime = strtol(tokens[3].value, NULL, 10); vlen = strtol(tokens[4].value, NULL, 10); + // does cas value exist? + if(tokens[5].value && handle_cas == true) + { + req_memory_ptr = strtoull(tokens[5].value, NULL, 10); + } + else + { + req_memory_ptr = 0; + } + if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) { out_string(c, "CLIENT_ERROR bad command line format"); return; @@ -1206,7 +1265,7 @@ } } - it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2); + it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2, req_memory_ptr); if (it == 0) { if (! item_size_ok(nkey, flags, vlen + 2)) @@ -1307,7 +1366,7 @@ res = strlen(buf); if (res + 2 > it->nbytes) { /* need to realloc */ item *new_it; - new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); + new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 , 0); if (new_it == 0) { return "SERVER_ERROR out of memory"; } @@ -1446,24 +1505,31 @@ } ntokens = tokenize_command(command, tokens, MAX_TOKENS); - if (ntokens >= 3 && ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { - process_get_command(c, tokens, ntokens); + process_get_command(c, tokens, ntokens, false); } else if (ntokens == 6 && ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) { + + process_update_command(c, tokens, ntokens, comm, false); - process_update_command(c, tokens, ntokens, comm); + } else if (ntokens == 6 && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) { + process_update_command(c, tokens, ntokens, comm, true); + } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 1); + } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) { + + process_get_command(c, tokens, ntokens, true); + } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 0); Index: memcached.h =================================================================== --- memcached.h (revision 1) +++ memcached.h (revision 13) @@ -108,6 +108,10 @@ uint8_t it_flags; /* ITEM_* above */ uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ + + /* CAS */ + uint32_t identifier; + void * end[]; /* then null-terminated key */ /* then " flags length\r\n" (no terminating null) */ @@ -134,6 +138,7 @@ #define NREAD_ADD 1 #define NREAD_SET 2 #define NREAD_REPLACE 3 +#define NREAD_CAS 4 typedef struct { int sfd; @@ -199,6 +204,8 @@ int bucket; /* bucket number for the next command, if running as a managed instance. -1 (_not_ 0) means invalid. */ int gen; /* generation requested for the bucket */ + + } conn; /* number of virtual buckets for a managed instance */ @@ -256,7 +263,7 @@ int mt_conn_add_to_freelist(conn *c); char *mt_defer_delete(item *it, time_t exptime); int mt_is_listen_thread(void); -item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); +item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes, uint32_t identifier); void mt_item_flush_expired(void); item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked); int mt_item_link(item *it); @@ -280,7 +287,7 @@ # define conn_add_to_freelist(x) mt_conn_add_to_freelist(x) # define defer_delete(x,y) mt_defer_delete(x,y) # define is_listen_thread() mt_is_listen_thread() -# define item_alloc(x,y,z,a,b) mt_item_alloc(x,y,z,a,b) +# define item_alloc(x,y,z,a,b,c) mt_item_alloc(x,y,z,a,b,c) # define item_flush_expired() mt_item_flush_expired() # define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z) # define item_link(x) mt_item_link(x) @@ -308,7 +315,7 @@ # define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base) # define dispatch_event_add(t,c) event_add(&(c)->event, 0) # define is_listen_thread() 1 -# define item_alloc(x,y,z,a,b) do_item_alloc(x,y,z,a,b) +# define item_alloc(x,y,z,a,b,c) do_item_alloc(x,y,z,a,b,c) # define item_flush_expired() do_item_flush_expired() # define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z) # define item_link(x) do_item_link(x)