[memcached] bradfitz, r296: 2006-07-03

commits at code.sixapart.com commits at code.sixapart.com
Tue Jul 4 05:17:04 UTC 2006


2006-07-03
       * don't use dual scalar/glob sockets.  makes it all profilable
         again under SmallProf, DProf, and Devel::Profiler, all three
         of which used to barf on those weird sockets previously
       * only init_buckets once, when servers are changed
       * don't call sock_to_host and get_sock as much:  cache closer
         in get_multi
       * more internal caching (buck2sock, etc)
       * fast paths for namespaces/single sock/etc in a few more places
       * general micro-speedups all over




U   trunk/api/perl/ChangeLog
U   trunk/api/perl/lib/Cache/Memcached.pm


Modified: trunk/api/perl/ChangeLog
===================================================================
--- trunk/api/perl/ChangeLog	2006-06-28 06:01:37 UTC (rev 295)
+++ trunk/api/perl/ChangeLog	2006-07-04 05:17:03 UTC (rev 296)
@@ -1,3 +1,14 @@
+2006-07-03
+	* don't use dual scalar/glob sockets.  makes it all profilable
+	  again under SmallProf, DProf, and Devel::Profiler, all three
+	  of which used to barf on those weird sockets previously
+	* only init_buckets once, when servers are changed
+	* don't call sock_to_host and get_sock as much:  cache closer
+	  in get_multi
+	* more internal caching (buck2sock, etc)
+	* fast paths for namespaces/single sock/etc in a few more places
+	* general micro-speedups all over
+
 2006-06-27
 	* patch from Maxim Dounin <mdounin at rambler-co.ru> to fix a typo
 	  which caused no_rehash flag to not work.

Modified: trunk/api/perl/lib/Cache/Memcached.pm
===================================================================
--- trunk/api/perl/lib/Cache/Memcached.pm	2006-06-28 06:01:37 UTC (rev 295)
+++ trunk/api/perl/lib/Cache/Memcached.pm	2006-07-04 05:17:03 UTC (rev 296)
@@ -43,6 +43,7 @@
 
 my %host_dead;   # host -> unixtime marked dead until
 my %cache_sock;  # host -> socket
+my @buck2sock;   # bucket number -> $sock
 
 my $PROTO_TCP;
 
@@ -85,6 +86,8 @@
     $self->{'active'} = scalar @{$self->{'servers'}};
     $self->{'buckets'} = undef;
     $self->{'bucketcount'} = 0;
+    $self->init_buckets;
+    @buck2sock = ();
 
     $self->{'_single_sock'} = undef;
     if (@{$self->{'servers'}} == 1) {
@@ -136,6 +139,7 @@
 
 sub forget_dead_hosts {
     %host_dead = ();
+    @buck2sock = ();
 }
 
 sub set_stat_callback {
@@ -144,27 +148,29 @@
     $self->{'stat_callback'} = $stat_callback;
 }
 
+my %sock_map;  # scalaraddr -> "$ip:$port";
+
 sub _dead_sock {
     my ($sock, $ret, $dead_for) = @_;
-    if ($sock =~ /^Sock_(.+?):(\d+)$/) {
+    if (my $ipport = $sock_map{\$sock}) {
         my $now = time();
-        my ($ip, $port) = ($1, $2);
-        my $host = "$ip:$port";
-        $host_dead{$host} = $now + $dead_for
+        $host_dead{$ipport} = $now + $dead_for
             if $dead_for;
-        delete $cache_sock{$host};
+        delete $cache_sock{$ipport};
+        delete $sock_map{\$sock};
     }
+    @buck2sock = ();
     return $ret;  # 0 or undef, probably, depending on what caller wants
 }
 
 sub _close_sock {
     my ($sock) = @_;
-    if ($sock =~ /^Sock_(.+?):(\d+)$/) {
-        my ($ip, $port) = ($1, $2);
-        my $host = "$ip:$port";
+    if (my $ipport = $sock_map{\$sock}) {
         close $sock;
-        delete $cache_sock{$host};
+        delete $cache_sock{$ipport};
+        delete $sock_map{\$sock};
     }
+    @buck2sock = ();
 }
 
 sub _connect_sock { # sock, sin, timeout
@@ -215,7 +221,7 @@
     my ($ip, $port) = $host =~ /(.*):(\d+)/;
     return undef if
         $host_dead{$host} && $host_dead{$host} > $now;
-    my $sock = "Sock_$host";
+    my $sock;
 
     my $connected = 0;
     my $sin;
@@ -253,18 +259,19 @@
     $| = 1;
     select($old);
 
-    return $cache_sock{$host} = $sock;
+    $sock_map{$sock} = $host;
+    $cache_sock{$host} = $sock;
+
+    return $sock;
 }
 
 sub get_sock { # (key)
-    my Cache::Memcached $self = shift;
-    my ($key) = @_;
+    my Cache::Memcached $self = $_[0];
+    my $key = $_[1];
     return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
     return undef unless $self->{'active'};
     my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
 
-    $self->init_buckets() unless $self->{'buckets'};
-
     my $real_key = ref $key ? $key->[1] : $key;
     my $tries = 0;
     while ($tries++ < 20) {
@@ -495,8 +502,8 @@
 }
 
 sub get {
-    my Cache::Memcached $self = shift;
-    my ($key) = @_;
+    my Cache::Memcached $self = $_[0];
+    my $key = $_[1];
 
     # TODO: make a fast path for this?  or just keep using get_multi?
     my $r = $self->get_multi($key);
@@ -506,19 +513,42 @@
 
 sub get_multi {
     my Cache::Memcached $self = shift;
-    return undef unless $self->{'active'};
+    return {} unless $self->{'active'};
     $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
     $self->{'stats'}->{"get_multi"}++;
+
     my %val;        # what we'll be returning a reference to (realkey -> value)
     my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
     my $sock;
 
-    foreach my $key (@_) {
-        $sock = $self->get_sock($key);
-        next unless $sock;
-        my $kval = ref $key ? $key->[1] : $key;
-        push @{$sock_keys{$sock}}, $kval;
+    if ($self->{'_single_sock'}) {
+        $sock = $self->sock_to_host($self->{'_single_sock'});
+        foreach my $key (@_) {
+            my $kval = ref $key ? $key->[1] : $key;
+            push @{$sock_keys{$sock}}, $kval;
+        }
+    } else {
+        my $bcount = $self->{'bucketcount'};
+        my $sock;
+      KEY:
+        foreach my $key (@_) {
+            my ($hv, $real_key) = ref $key ?
+                (int($key->[0]),               $key->[1]) :
+                ((crc32($key) >> 16) & 0x7fff, $key);
+
+            my $tries;
+            while (1) {
+                my $bucket = $hv % $bcount;
+                $sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
+                    and last;
+                next KEY if $tries++ >= 20;
+                $hv += _hashfunc($tries . $real_key);
+            }
+
+            push @{$sock_keys{$sock}}, $real_key;
+        }
     }
+
     $self->{'stats'}->{"get_keys"} += @_;
     $self->{'stats'}->{"get_socks"} += keys %sock_keys;
 
@@ -536,23 +566,31 @@
 
 sub _load_multi {
     use bytes; # return bytes from length()
-    my Cache::Memcached $self = shift;
-    my ($sock_keys, $ret) = @_;
+    my Cache::Memcached $self;
+    my ($sock_keys, $ret);
 
-    # all keyed by a $sock:
-    my %reading; # bool, whether we're reading from this socket
-    my %writing; # bool, whether we're writing into this socket
+    ($self, $sock_keys, $ret) = @_;
+
+    # all keyed by $sockstr:
+    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
+    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
     my %state;   # reading state:
                  # 0 = waiting for a line, N = reading N bytes
-    my %buf;     # buffers
+    my %buf;     # buffers, for both reading/writing
     my %offset;  # offsets to read into buffers
     my %key;     # current key per socket
     my %flags;   # flags per socket
 
     foreach (keys %$sock_keys) {
+        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
+        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
         print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
-        $writing{$_} = 1;
-        $buf{$_} = "get ". join(" ", map { "$self->{namespace}$_" } @{$sock_keys->{$_}}) . "\r\n";
+        $writing{$_} = $sock;
+        if ($self->{namespace}) {
+            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
+        } else {
+            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
+        }
     }
 
     my $active_changed = 1; # force rebuilding of select sets
@@ -576,14 +614,14 @@
     };
 
     my $finalize = sub {
-        my $sock = shift;
+        my $sock = "$_[0]";
         my $k = $key{$sock};
 
         # remove trailing \r\n
         chop $ret->{$k}; chop $ret->{$k};
 
         unless (length($ret->{$k}) == $state{$sock}-2) {
-            $dead->($sock);
+            $dead->($_[0]);
             return;
         }
 
@@ -605,40 +643,40 @@
     };
 
     my $read = sub {
-        my $sock = shift;
+        my $sockstr = "$_[0]";  # $sock is $_[0];
         my $res;
 
         # where are we reading into?
-        if ($state{$sock}) { # reading value into $ret
-            $res = sysread($sock, $ret->{$key{$sock}},
-                           $state{$sock} - $offset{$sock},
-                           $offset{$sock});
+        if ($state{$sockstr}) { # reading value into $ret
+            $res = sysread($_[0], $ret->{$key{$sockstr}},
+                           $state{$sockstr} - $offset{$sockstr},
+                           $offset{$sockstr});
             return
                 if !defined($res) and $!==EWOULDBLOCK;
             if ($res == 0) { # catches 0=conn closed or undef=error
-                $dead->($sock);
+                $dead->($_[0]);
                 return;
             }
-            $offset{$sock} += $res;
-            if ($offset{$sock} == $state{$sock}) { # finished reading
-                $finalize->($sock);
-                $state{$sock} = 0; # wait for another VALUE line or END
-                $offset{$sock} = 0;
+            $offset{$sockstr} += $res;
+            if ($offset{$sockstr} == $state{$sockstr}) { # finished reading
+                $finalize->($_[0]);
+                $state{$sockstr} = 0; # wait for another VALUE line or END
+                $offset{$sockstr} = 0;
             }
             return;
         }
 
         # we're reading a single line.
         # first, read whatever's there, but be satisfied with 2048 bytes
-        $res = sysread($sock, $buf{$sock},
-                       2048, $offset{$sock});
+        $res = sysread($_[0], $buf{$sockstr},
+                       2048, $offset{$sockstr});
         return
             if !defined($res) and $!==EWOULDBLOCK;
         if ($res == 0) {
-            $dead->($sock);
+            $dead->($_[0]);
             return;
         }
-        $offset{$sock} += $res;
+        $offset{$sockstr} += $res;
 
 
         # Below is a hot path.  In preparation for rewriting it in Perl/C,
@@ -655,32 +693,32 @@
       SEARCH:
         while(1) { # may have to search many times
             # do we have a complete END line?
-            if ($buf{$sock} =~ /^END\r\n/) {
+            if ($buf{$sockstr} =~ /^END\r\n/) {
                 # okay, finished with this socket
-                delete $reading{$sock};
+                delete $reading{$sockstr};
                 $active_changed = 1;
                 return;
             }
 
             # do we have a complete VALUE line?
-            if ($buf{$sock} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
-                ($key{$sock}, $flags{$sock}, $state{$sock}) =
+            if ($buf{$sockstr} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
+                ($key{$sockstr}, $flags{$sockstr}, $state{$sockstr}) =
                     (substr($1, $self->{namespace_len}), int($2), $3+2);
-                # Note: we use $+[0] and not pos($buf{$sock}) because pos()
+                # Note: we use $+[0] and not pos($buf{$sockstr}) because pos()
                 # seems to have problems under perl's taint mode.  nobody
                 # on the list discovered why, but this seems a reasonable
                 # work-around:
                 my $p = $+[0];
-                my $len = length($buf{$sock});
-                my $copy = $len-$p > $state{$sock} ? $state{$sock} : $len-$p;
-                $ret->{$key{$sock}} = substr($buf{$sock}, $p, $copy)
+                my $len = length($buf{$sockstr});
+                my $copy = $len-$p > $state{$sockstr} ? $state{$sockstr} : $len-$p;
+                $ret->{$key{$sockstr}} = substr($buf{$sockstr}, $p, $copy)
                     if $copy;
-                $offset{$sock} = $copy;
-                substr($buf{$sock}, 0, $p+$copy, ''); # delete the stuff we used
-                if ($offset{$sock} == $state{$sock}) { # have it all?
-                    $finalize->($sock);
-                    $state{$sock} = 0; # wait for another VALUE line or END
-                    $offset{$sock} = 0;
+                $offset{$sockstr} = $copy;
+                substr($buf{$sockstr}, 0, $p+$copy, ''); # delete the stuff we used
+                if ($offset{$sockstr} == $state{$sockstr}) { # have it all?
+                    $finalize->($_[0]);
+                    $state{$sockstr} = 0; # wait for another VALUE line or END
+                    $offset{$sockstr} = 0;
                     next SEARCH; # look again
                 }
                 last SEARCH; # buffer is empty now
@@ -693,7 +731,7 @@
 
             # but first, make sure subsequent reads don't destroy our
             # partial VALUE/END line.
-            $offset{$sock} = length($buf{$sock});
+            $offset{$sockstr} = length($buf{$sockstr});
             last SEARCH;
         }
 
@@ -702,25 +740,26 @@
     };
 
     my $write = sub {
-        my $sock = shift;
+        my ($sock, $sockstr) = ($_[0], "$_[0]");
         my $res;
 
-        $res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
+        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
+
         return
             if not defined $res and $!==EWOULDBLOCK;
         unless ($res > 0) {
             $dead->($sock);
             return;
         }
-        if ($res == length($buf{$sock})) { # all sent
-            $buf{$sock} = "";
-            $offset{$sock} = $state{$sock} = 0;
+        if ($res == length($buf{$sockstr})) { # all sent
+            $buf{$sockstr} = "";
+            $offset{$sockstr} = $state{$sockstr} = 0;
             # switch the socket from writing state to reading state
-            delete $writing{$sock};
-            $reading{$sock} = 1;
+            delete $writing{$sockstr};
+            $reading{$sockstr} = $sock;
             $active_changed = 1;
         } else { # we only succeeded in sending some of it
-            substr($buf{$sock}, 0, $res, ''); # delete the part we sent
+            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
         }
         return;
     };
@@ -734,10 +773,10 @@
         if ($active_changed) {
             last unless %reading or %writing; # no sockets left?
             ($rin, $win) = ('', '');
-            foreach (keys %reading) {
+            foreach (values %reading) {
                 vec($rin, fileno($_), 1) = 1;
             }
-            foreach (keys %writing) {
+            foreach (values %writing) {
                 vec($win, fileno($_), 1) = 1;
             }
             $active_changed = 0;
@@ -751,12 +790,12 @@
         # writing sockets for reading also, and raise hell if they're
         # ready (input unread from last time, etc.)
         # maybe do that on the first loop only?
-        foreach (keys %writing) {
+        foreach (values %writing) {
             if (vec($wout, fileno($_), 1)) {
                 $write->($_);
             }
         }
-        foreach (keys %reading) {
+        foreach (values %reading) {
             if (vec($rout, fileno($_), 1)) {
                 $read->($_);
             }
@@ -764,10 +803,10 @@
     }
 
     # if there're active sockets left, they need to die
-    foreach (keys %writing) {
+    foreach (values %writing) {
         $dead->($_);
     }
-    foreach (keys %reading) {
+    foreach (values %reading) {
         $dead->($_);
     }
 
@@ -775,7 +814,7 @@
 }
 
 sub _hashfunc {
-    return (crc32(shift) >> 16) & 0x7fff;
+    return (crc32($_[0]) >> 16) & 0x7fff;
 }
 
 sub flush_all {
@@ -783,7 +822,6 @@
 
     my $success = 1;
 
-    $self->init_buckets() unless $self->{'buckets'};
     my @hosts = @{$self->{'buckets'}};
     foreach my $host (@hosts) {
         my $sock = $self->sock_to_host($host);
@@ -827,8 +865,6 @@
         }
     }
 
-    $self->init_buckets() unless $self->{'buckets'};
-
     my $stats_hr = { };
 
     # The "self" stat type is special, it only applies to this very
@@ -865,11 +901,11 @@
             if ($typename =~ /^(malloc|sizes|misc)$/) {
                 # This stat is key-value.
                 foreach my $line (@lines) {
-                    my($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
+                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
                     if ($key) {
                         $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
                     }
-                    $malloc_keys{$key} = 1 if $typename eq 'malloc';
+                    $malloc_keys{$key} = 1 if $key && $typename eq 'malloc';
                 }
             } else {
                 # This stat is not key-value so just pull it
@@ -912,8 +948,6 @@
     my ($types) = @_;
     return 0 unless $self->{'active'};
 
-    $self->init_buckets() unless $self->{'buckets'};
-
   HOST: foreach my $host (@{$self->{'buckets'}}) {
         my $sock = $self->sock_to_host($host);
         my $ok = _write_and_read($self, $sock, "stats reset");




More information about the memcached-commits mailing list