[memcached] bradfitz, r296: 2006-07-03
commits at code.sixapart.com
commits at code.sixapart.com
Tue Jul 4 05:17:04 UTC 2006
2006-07-03
* don't use dual scalar/glob sockets. makes it all profilable
again under SmallProf, DProf, and Devel::Profiler, all three
of which used to barf on those weird sockets previously
* only init_buckets once, when servers are changed
* don't call sock_to_host and get_sock as much: cache closer
in get_multi
* more internal caching (buck2sock, etc)
* fast paths for namespaces/single sock/etc in a few more places
* general micro-speedups all over
U trunk/api/perl/ChangeLog
U trunk/api/perl/lib/Cache/Memcached.pm
Modified: trunk/api/perl/ChangeLog
===================================================================
--- trunk/api/perl/ChangeLog 2006-06-28 06:01:37 UTC (rev 295)
+++ trunk/api/perl/ChangeLog 2006-07-04 05:17:03 UTC (rev 296)
@@ -1,3 +1,14 @@
+2006-07-03
+ * don't use dual scalar/glob sockets. makes it all profilable
+ again under SmallProf, DProf, and Devel::Profiler, all three
+ of which used to barf on those weird sockets previously
+ * only init_buckets once, when servers are changed
+ * don't call sock_to_host and get_sock as much: cache closer
+ in get_multi
+ * more internal caching (buck2sock, etc)
+ * fast paths for namespaces/single sock/etc in a few more places
+ * general micro-speedups all over
+
2006-06-27
* patch from Maxim Dounin <mdounin at rambler-co.ru> to fix a typo
which caused no_rehash flag to not work.
Modified: trunk/api/perl/lib/Cache/Memcached.pm
===================================================================
--- trunk/api/perl/lib/Cache/Memcached.pm 2006-06-28 06:01:37 UTC (rev 295)
+++ trunk/api/perl/lib/Cache/Memcached.pm 2006-07-04 05:17:03 UTC (rev 296)
@@ -43,6 +43,7 @@
my %host_dead; # host -> unixtime marked dead until
my %cache_sock; # host -> socket
+my @buck2sock; # bucket number -> $sock
my $PROTO_TCP;
@@ -85,6 +86,8 @@
$self->{'active'} = scalar @{$self->{'servers'}};
$self->{'buckets'} = undef;
$self->{'bucketcount'} = 0;
+ $self->init_buckets;
+ @buck2sock = ();
$self->{'_single_sock'} = undef;
if (@{$self->{'servers'}} == 1) {
@@ -136,6 +139,7 @@
sub forget_dead_hosts {
%host_dead = ();
+ @buck2sock = ();
}
sub set_stat_callback {
@@ -144,27 +148,29 @@
$self->{'stat_callback'} = $stat_callback;
}
+my %sock_map; # scalaraddr -> "$ip:$port";
+
sub _dead_sock {
my ($sock, $ret, $dead_for) = @_;
- if ($sock =~ /^Sock_(.+?):(\d+)$/) {
+ if (my $ipport = $sock_map{\$sock}) {
my $now = time();
- my ($ip, $port) = ($1, $2);
- my $host = "$ip:$port";
- $host_dead{$host} = $now + $dead_for
+ $host_dead{$ipport} = $now + $dead_for
if $dead_for;
- delete $cache_sock{$host};
+ delete $cache_sock{$ipport};
+ delete $sock_map{\$sock};
}
+ @buck2sock = ();
return $ret; # 0 or undef, probably, depending on what caller wants
}
sub _close_sock {
my ($sock) = @_;
- if ($sock =~ /^Sock_(.+?):(\d+)$/) {
- my ($ip, $port) = ($1, $2);
- my $host = "$ip:$port";
+ if (my $ipport = $sock_map{\$sock}) {
close $sock;
- delete $cache_sock{$host};
+ delete $cache_sock{$ipport};
+ delete $sock_map{\$sock};
}
+ @buck2sock = ();
}
sub _connect_sock { # sock, sin, timeout
@@ -215,7 +221,7 @@
my ($ip, $port) = $host =~ /(.*):(\d+)/;
return undef if
$host_dead{$host} && $host_dead{$host} > $now;
- my $sock = "Sock_$host";
+ my $sock;
my $connected = 0;
my $sin;
@@ -253,18 +259,19 @@
$| = 1;
select($old);
- return $cache_sock{$host} = $sock;
+ $sock_map{$sock} = $host;
+ $cache_sock{$host} = $sock;
+
+ return $sock;
}
sub get_sock { # (key)
- my Cache::Memcached $self = shift;
- my ($key) = @_;
+ my Cache::Memcached $self = $_[0];
+ my $key = $_[1];
return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
return undef unless $self->{'active'};
my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
- $self->init_buckets() unless $self->{'buckets'};
-
my $real_key = ref $key ? $key->[1] : $key;
my $tries = 0;
while ($tries++ < 20) {
@@ -495,8 +502,8 @@
}
sub get {
- my Cache::Memcached $self = shift;
- my ($key) = @_;
+ my Cache::Memcached $self = $_[0];
+ my $key = $_[1];
# TODO: make a fast path for this? or just keep using get_multi?
my $r = $self->get_multi($key);
@@ -506,19 +513,42 @@
sub get_multi {
my Cache::Memcached $self = shift;
- return undef unless $self->{'active'};
+ return {} unless $self->{'active'};
$self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
$self->{'stats'}->{"get_multi"}++;
+
my %val; # what we'll be returning a reference to (realkey -> value)
my %sock_keys; # sockref_as_scalar -> [ realkeys ]
my $sock;
- foreach my $key (@_) {
- $sock = $self->get_sock($key);
- next unless $sock;
- my $kval = ref $key ? $key->[1] : $key;
- push @{$sock_keys{$sock}}, $kval;
+ if ($self->{'_single_sock'}) {
+ $sock = $self->sock_to_host($self->{'_single_sock'});
+ foreach my $key (@_) {
+ my $kval = ref $key ? $key->[1] : $key;
+ push @{$sock_keys{$sock}}, $kval;
+ }
+ } else {
+ my $bcount = $self->{'bucketcount'};
+ my $sock;
+ KEY:
+ foreach my $key (@_) {
+ my ($hv, $real_key) = ref $key ?
+ (int($key->[0]), $key->[1]) :
+ ((crc32($key) >> 16) & 0x7fff, $key);
+
+ my $tries;
+ while (1) {
+ my $bucket = $hv % $bcount;
+ $sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
+ and last;
+ next KEY if $tries++ >= 20;
+ $hv += _hashfunc($tries . $real_key);
+ }
+
+ push @{$sock_keys{$sock}}, $real_key;
+ }
}
+
$self->{'stats'}->{"get_keys"} += @_;
$self->{'stats'}->{"get_socks"} += keys %sock_keys;
@@ -536,23 +566,31 @@
sub _load_multi {
use bytes; # return bytes from length()
- my Cache::Memcached $self = shift;
- my ($sock_keys, $ret) = @_;
+ my Cache::Memcached $self;
+ my ($sock_keys, $ret);
- # all keyed by a $sock:
- my %reading; # bool, whether we're reading from this socket
- my %writing; # bool, whether we're writing into this socket
+ ($self, $sock_keys, $ret) = @_;
+
+ # all keyed by $sockstr:
+ my %reading; # $sockstr -> $sock. bool, whether we're reading from this socket
+ my %writing; # $sockstr -> $sock. bool, whether we're writing to this socket
my %state; # reading state:
# 0 = waiting for a line, N = reading N bytes
- my %buf; # buffers
+ my %buf; # buffers, for both reading/writing
my %offset; # offsets to read into buffers
my %key; # current key per socket
my %flags; # flags per socket
foreach (keys %$sock_keys) {
+ my $ipport = $sock_map{$_} or die "No map found matching for $_";
+ my $sock = $cache_sock{$ipport} or die "No sock found for $ipport";
print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
- $writing{$_} = 1;
- $buf{$_} = "get ". join(" ", map { "$self->{namespace}$_" } @{$sock_keys->{$_}}) . "\r\n";
+ $writing{$_} = $sock;
+ if ($self->{namespace}) {
+ $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
+ } else {
+ $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
+ }
}
my $active_changed = 1; # force rebuilding of select sets
@@ -576,14 +614,14 @@
};
my $finalize = sub {
- my $sock = shift;
+ my $sock = "$_[0]";
my $k = $key{$sock};
# remove trailing \r\n
chop $ret->{$k}; chop $ret->{$k};
unless (length($ret->{$k}) == $state{$sock}-2) {
- $dead->($sock);
+ $dead->($_[0]);
return;
}
@@ -605,40 +643,40 @@
};
my $read = sub {
- my $sock = shift;
+ my $sockstr = "$_[0]"; # $sock is $_[0];
my $res;
# where are we reading into?
- if ($state{$sock}) { # reading value into $ret
- $res = sysread($sock, $ret->{$key{$sock}},
- $state{$sock} - $offset{$sock},
- $offset{$sock});
+ if ($state{$sockstr}) { # reading value into $ret
+ $res = sysread($_[0], $ret->{$key{$sockstr}},
+ $state{$sockstr} - $offset{$sockstr},
+ $offset{$sockstr});
return
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) { # catches 0=conn closed or undef=error
- $dead->($sock);
+ $dead->($_[0]);
return;
}
- $offset{$sock} += $res;
- if ($offset{$sock} == $state{$sock}) { # finished reading
- $finalize->($sock);
- $state{$sock} = 0; # wait for another VALUE line or END
- $offset{$sock} = 0;
+ $offset{$sockstr} += $res;
+ if ($offset{$sockstr} == $state{$sockstr}) { # finished reading
+ $finalize->($_[0]);
+ $state{$sockstr} = 0; # wait for another VALUE line or END
+ $offset{$sockstr} = 0;
}
return;
}
# we're reading a single line.
# first, read whatever's there, but be satisfied with 2048 bytes
- $res = sysread($sock, $buf{$sock},
- 2048, $offset{$sock});
+ $res = sysread($_[0], $buf{$sockstr},
+ 2048, $offset{$sockstr});
return
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) {
- $dead->($sock);
+ $dead->($_[0]);
return;
}
- $offset{$sock} += $res;
+ $offset{$sockstr} += $res;
# Below is a hot path. In preparation for rewriting it in Perl/C,
@@ -655,32 +693,32 @@
SEARCH:
while(1) { # may have to search many times
# do we have a complete END line?
- if ($buf{$sock} =~ /^END\r\n/) {
+ if ($buf{$sockstr} =~ /^END\r\n/) {
# okay, finished with this socket
- delete $reading{$sock};
+ delete $reading{$sockstr};
$active_changed = 1;
return;
}
# do we have a complete VALUE line?
- if ($buf{$sock} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
- ($key{$sock}, $flags{$sock}, $state{$sock}) =
+ if ($buf{$sockstr} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
+ ($key{$sockstr}, $flags{$sockstr}, $state{$sockstr}) =
(substr($1, $self->{namespace_len}), int($2), $3+2);
- # Note: we use $+[0] and not pos($buf{$sock}) because pos()
+ # Note: we use $+[0] and not pos($buf{$sockstr}) because pos()
# seems to have problems under perl's taint mode. nobody
# on the list discovered why, but this seems a reasonable
# work-around:
my $p = $+[0];
- my $len = length($buf{$sock});
- my $copy = $len-$p > $state{$sock} ? $state{$sock} : $len-$p;
- $ret->{$key{$sock}} = substr($buf{$sock}, $p, $copy)
+ my $len = length($buf{$sockstr});
+ my $copy = $len-$p > $state{$sockstr} ? $state{$sockstr} : $len-$p;
+ $ret->{$key{$sockstr}} = substr($buf{$sockstr}, $p, $copy)
if $copy;
- $offset{$sock} = $copy;
- substr($buf{$sock}, 0, $p+$copy, ''); # delete the stuff we used
- if ($offset{$sock} == $state{$sock}) { # have it all?
- $finalize->($sock);
- $state{$sock} = 0; # wait for another VALUE line or END
- $offset{$sock} = 0;
+ $offset{$sockstr} = $copy;
+ substr($buf{$sockstr}, 0, $p+$copy, ''); # delete the stuff we used
+ if ($offset{$sockstr} == $state{$sockstr}) { # have it all?
+ $finalize->($_[0]);
+ $state{$sockstr} = 0; # wait for another VALUE line or END
+ $offset{$sockstr} = 0;
next SEARCH; # look again
}
last SEARCH; # buffer is empty now
@@ -693,7 +731,7 @@
# but first, make sure subsequent reads don't destroy our
# partial VALUE/END line.
- $offset{$sock} = length($buf{$sock});
+ $offset{$sockstr} = length($buf{$sockstr});
last SEARCH;
}
@@ -702,25 +740,26 @@
};
my $write = sub {
- my $sock = shift;
+ my ($sock, $sockstr) = ($_[0], "$_[0]");
my $res;
- $res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
+ $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
+
return
if not defined $res and $!==EWOULDBLOCK;
unless ($res > 0) {
$dead->($sock);
return;
}
- if ($res == length($buf{$sock})) { # all sent
- $buf{$sock} = "";
- $offset{$sock} = $state{$sock} = 0;
+ if ($res == length($buf{$sockstr})) { # all sent
+ $buf{$sockstr} = "";
+ $offset{$sockstr} = $state{$sockstr} = 0;
# switch the socket from writing state to reading state
- delete $writing{$sock};
- $reading{$sock} = 1;
+ delete $writing{$sockstr};
+ $reading{$sockstr} = $sock;
$active_changed = 1;
} else { # we only succeeded in sending some of it
- substr($buf{$sock}, 0, $res, ''); # delete the part we sent
+ substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
}
return;
};
@@ -734,10 +773,10 @@
if ($active_changed) {
last unless %reading or %writing; # no sockets left?
($rin, $win) = ('', '');
- foreach (keys %reading) {
+ foreach (values %reading) {
vec($rin, fileno($_), 1) = 1;
}
- foreach (keys %writing) {
+ foreach (values %writing) {
vec($win, fileno($_), 1) = 1;
}
$active_changed = 0;
@@ -751,12 +790,12 @@
# writing sockets for reading also, and raise hell if they're
# ready (input unread from last time, etc.)
# maybe do that on the first loop only?
- foreach (keys %writing) {
+ foreach (values %writing) {
if (vec($wout, fileno($_), 1)) {
$write->($_);
}
}
- foreach (keys %reading) {
+ foreach (values %reading) {
if (vec($rout, fileno($_), 1)) {
$read->($_);
}
@@ -764,10 +803,10 @@
}
# if there're active sockets left, they need to die
- foreach (keys %writing) {
+ foreach (values %writing) {
$dead->($_);
}
- foreach (keys %reading) {
+ foreach (values %reading) {
$dead->($_);
}
@@ -775,7 +814,7 @@
}
sub _hashfunc {
- return (crc32(shift) >> 16) & 0x7fff;
+ return (crc32($_[0]) >> 16) & 0x7fff;
}
sub flush_all {
@@ -783,7 +822,6 @@
my $success = 1;
- $self->init_buckets() unless $self->{'buckets'};
my @hosts = @{$self->{'buckets'}};
foreach my $host (@hosts) {
my $sock = $self->sock_to_host($host);
@@ -827,8 +865,6 @@
}
}
- $self->init_buckets() unless $self->{'buckets'};
-
my $stats_hr = { };
# The "self" stat type is special, it only applies to this very
@@ -865,11 +901,11 @@
if ($typename =~ /^(malloc|sizes|misc)$/) {
# This stat is key-value.
foreach my $line (@lines) {
- my($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
+ my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
if ($key) {
$stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
}
- $malloc_keys{$key} = 1 if $typename eq 'malloc';
+ $malloc_keys{$key} = 1 if $key && $typename eq 'malloc';
}
} else {
# This stat is not key-value so just pull it
@@ -912,8 +948,6 @@
my ($types) = @_;
return 0 unless $self->{'active'};
- $self->init_buckets() unless $self->{'buckets'};
-
HOST: foreach my $host (@{$self->{'buckets'}}) {
my $sock = $self->sock_to_host($host);
my $ok = _write_and_read($self, $sock, "stats reset");
More information about the memcached-commits
mailing list