Cache::Memcached fix for problems with sockets used as string
Gerard Goossen
gerard at tty.nl
Tue Apr 5 06:53:54 PDT 2005
The patch did make it a lot slower. So I rewrote it to only use an array
with two elements instead of an object (using the name (first element)
for the key (using the hashref as key is slow)). Then there isn't much difference
in speed.
The patch also set the binmode to utf8, but there are still problems
with utf8 (see the test script), any idea how to fix this?
Also attached is a small test script, I used for testing/measuring
speed (it assume a local memcached).
On Mon, Apr 04, 2005 at 09:43:33AM -0700, Brad Fitzpatrick wrote:
> Could probably remove the "no strict 'refs'" then, huh?
>
> Also, the %reading/%writing hashes... now that $sock is a HASH and not a
> string, using $sock as a key is kinda weird. Might want to change it to
> $reading{$sock->name} = $sock.
>
> You do any speed tests before/after this patch? Cache::Memcached isn't
> the fastest thing in the world as it was... hopefully this didn't make it
> too much slower?
>
> - Brad
>
>
> On Mon, 4 Apr 2005, Gerard Goossen wrote:
>
> > We had some problems using Cache::Memcached, especially when using
> > Devel::Profiler, giving strange file handle errors.
> > Apparently something did not like the trick in Cache::Memcached where
> > the GLOB for a socket was also used as a string. In the attached patch
> > this is fixed by using a hash(/object) to store the socket and the
> > string.
> >
> > Gerard Goossen.
> >
>
-------------- next part --------------
--- Lib/Tty/Memcached.pm 2005-04-05 15:01:42.000000000 +0200
+++ ../perlkit/perl/lib/site_perl/5.8.3/Cache/Memcached.pm 2005-04-05 12:15:42.000000000 +0200
@@ -129,68 +129,68 @@
sub set_stat_callback {
my Cache::Memcached $self = shift;
my ($stat_callback) = @_;
$self->{'stat_callback'} = $stat_callback;
}
sub _dead_sock {
my ($sock, $ret, $dead_for) = @_;
- if ($sock->[0] =~ /^Sock_(.+?):(\d+)$/) {
+ if ($sock =~ /^Sock_(.+?):(\d+)$/) {
my $now = time();
my ($ip, $port) = ($1, $2);
my $host = "$ip:$port";
$host_dead{$host} = $now + $dead_for
if $dead_for;
delete $cache_sock{$host};
}
return $ret; # 0 or undef, probably, depending on what caller wants
}
sub _close_sock {
my ($sock) = @_;
- if ($sock->[0] =~ /^Sock_(.+?):(\d+)$/) {
+ if ($sock =~ /^Sock_(.+?):(\d+)$/) {
my ($ip, $port) = ($1, $2);
my $host = "$ip:$port";
- close $sock->[1];
+ close $sock;
delete $cache_sock{$host};
}
}
sub _connect_sock { # sock, sin, timeout
my ($sock, $sin, $timeout) = @_;
$timeout ||= 0.25;
# make the socket non-blocking from now on,
# except if someone wants 0 timeout, meaning
# a blocking connect, but even then turn it
# non-blocking at the end of this function
if ($timeout) {
- IO::Handle::blocking($sock->[1], 0);
+ IO::Handle::blocking($sock, 0);
} else {
- IO::Handle::blocking($sock->[1], 1);
+ IO::Handle::blocking($sock, 1);
}
- my $ret = connect($sock->[1], $sin);
+ my $ret = connect($sock, $sin);
if (!$ret && $timeout && $!==EINPROGRESS) {
my $win='';
- vec($win, fileno($sock->[1]), 1) = 1;
+ vec($win, fileno($sock), 1) = 1;
if (select(undef, $win, undef, $timeout) > 0) {
- $ret = connect($sock->[1], $sin);
+ $ret = connect($sock, $sin);
# EISCONN means connected & won't re-connect, so success
$ret = 1 if !$ret && $!==EISCONN;
}
}
unless ($timeout) { # socket was temporarily blocking, now revert
- IO::Handle::blocking($sock->[1], 0);
+ IO::Handle::blocking($sock, 0);
}
# from here on, we use non-blocking (async) IO for the duration
# of the socket's life
return $ret;
}
@@ -198,48 +198,45 @@
my Cache::Memcached $self = ref $_[0] ? shift : undef;
my $host = $_[0];
return $cache_sock{$host} if $cache_sock{$host};
my $now = time();
my ($ip, $port) = $host =~ /(.*):(\d+)/;
return undef if
$host_dead{$host} && $host_dead{$host} > $now;
- my $sock = ["Sock_$host", undef];
+ my $sock = "Sock_$host";
my $connected = 0;
my $sin;
my $proto = $PROTO_TCP ||= getprotobyname('tcp');
# if a preferred IP is known, try that first.
if ($self && $self->{pref_ip}{$ip}) {
- socket($sock->[1], PF_INET, SOCK_STREAM, $proto);
- # binmode $sock->[1], ":utf8";
-
+ socket($sock, PF_INET, SOCK_STREAM, $proto);
my $prefip = $self->{pref_ip}{$ip};
$sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
if (_connect_sock($sock,$sin,0.1)) {
$connected = 1;
} else {
- close $sock->[1];
+ close $sock;
}
}
# normal path, or fallback path if preferred IP failed
unless ($connected) {
- socket($sock->[1], PF_INET, SOCK_STREAM, $proto);
- # binmode $sock->[1], ":utf8";
+ socket($sock, PF_INET, SOCK_STREAM, $proto);
$sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
unless (_connect_sock($sock,$sin)) {
return _dead_sock($sock, undef, 20 + int(rand(10)));
}
}
- # make the new socket not buffer writes.
- my $old = select($sock->[1]);
+ # make the new socket not buffer writes.
+ my $old = select($sock);
$| = 1;
select($old);
return $cache_sock{$host} = $sock;
}
sub get_sock { # (key)
my Cache::Memcached $self = shift;
@@ -274,17 +271,17 @@
}
}
$self->{'bucketcount'} = scalar @{$self->{'buckets'}};
}
sub disconnect_all {
my $sock;
foreach $sock (values %cache_sock) {
- close $sock->[1];
+ close $sock;
}
%cache_sock = ();
}
sub _oneline {
my Cache::Memcached $self = shift;
my ($sock, $line) = @_;
my $res;
@@ -300,41 +297,41 @@
my $copy_state = -1;
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
# the select loop
while(1) {
if ($copy_state!=$state) {
last if $state==2;
($rin, $win) = ('', '');
- vec($rin, fileno($sock->[1]), 1) = 1 if $state==1;
- vec($win, fileno($sock->[1]), 1) = 1 if $state==0;
+ vec($rin, fileno($sock), 1) = 1 if $state==1;
+ vec($win, fileno($sock), 1) = 1 if $state==0;
$copy_state = $state;
}
$nfound = select($rout=$rin, $wout=$win, undef,
$self->{'select_timeout'});
last unless $nfound;
- if (vec($wout, fileno($sock->[1]), 1)) {
- $res = send($sock->[1], $line, $FLAG_NOSIGNAL);
+ if (vec($wout, fileno($sock), 1)) {
+ $res = send($sock, $line, $FLAG_NOSIGNAL);
next
if not defined $res and $!==EWOULDBLOCK;
unless ($res > 0) {
_close_sock($sock);
return undef;
}
if ($res == length($line)) { # all sent
$state = 1;
} else { # we only succeeded in sending some of it
substr($line, 0, $res, ''); # delete the part we sent
}
}
- if (vec($rout, fileno($sock->[1]), 1)) {
- $res = sysread($sock->[1], $ret, 255, $offset);
+ if (vec($rout, fileno($sock), 1)) {
+ $res = sysread($sock, $ret, 255, $offset);
next
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) { # catches 0=conn closed or undef=error
_close_sock($sock);
return undef;
}
$offset += $res;
if (rindex($ret, "\r\n") + 2 == length($ret)) {
@@ -485,28 +482,24 @@
}
sub get_multi {
my Cache::Memcached $self = shift;
return undef unless $self->{'active'};
$self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
$self->{'stats'}->{"get_multi"}++;
my %val; # what we'll be returning a reference to (realkey -> value)
- my %sock_keys; # sockref_as_scalar -> [ socket, realkeys ]
+ my %sock_keys; # sockref_as_scalar -> [ realkeys ]
my $sock;
foreach my $key (@_) {
$sock = $self->get_sock($key);
next unless $sock;
my $kval = ref $key ? $key->[1] : $key;
- if ($sock_keys{$sock->[0]}) {
- push @{$sock_keys{$sock->[0]}}, $kval;
- } else {
- $sock_keys{$sock->[0]} = [$sock, $kval];
- }
+ push @{$sock_keys{$sock}}, $kval;
}
$self->{'stats'}->{"get_keys"} += @_;
$self->{'stats'}->{"get_socks"} += keys %sock_keys;
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
_load_multi($self, \%sock_keys, \%val);
@@ -519,67 +512,66 @@
}
sub _load_multi {
use bytes; # return bytes from length()
my Cache::Memcached $self = shift;
my ($sock_keys, $ret) = @_;
# all keyed by a $sock:
- my %reading; # bool, whether we're reading from this socket, value is the socket
- my %writing; # bool, whether we're writing into this socket, value is the socket
+ my %reading; # bool, whether we're reading from this socket
+ my %writing; # bool, whether we're writing into this socket
my %state; # reading state:
# 0 = waiting for a line, N = reading N bytes
my %buf; # buffers
my %offset; # offsets to read into buffers
my %key; # current key per socket
my %flags; # flags per socket
- foreach (values %$sock_keys) {
+ foreach (keys %$sock_keys) {
print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
- my $sock = shift @$_;
- $writing{$sock->[0]} = $sock;
- $buf{$sock->[0]} = "get ". join(" ", map { "$self->{namespace}$_" } @$_) . "\r\n";
+ $writing{$_} = 1;
+ $buf{$_} = "get ". join(" ", map { "$self->{namespace}$_" } @{$sock_keys->{$_}}) . "\r\n";
}
my $active_changed = 1; # force rebuilding of select sets
my $dead = sub {
my $sock = shift;
print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
- delete $reading{$sock->[0]};
- delete $writing{$sock->[0]};
- delete $ret->{$key{$sock->[0]}}
- if $key{$sock->[0]};
+ delete $reading{$sock};
+ delete $writing{$sock};
+ delete $ret->{$key{$sock}}
+ if $key{$sock};
if ($self->{'stat_callback'}) {
my $etime = Time::HiRes::time();
$self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
}
-
- close $sock->[1];
+
+ close $sock;
_dead_sock($sock);
$active_changed = 1;
};
my $finalize = sub {
my $sock = shift;
- my $k = $key{$sock->[0]};
+ my $k = $key{$sock};
# remove trailing \r\n
chop $ret->{$k}; chop $ret->{$k};
- unless (length($ret->{$k}) == $state{$sock->[0]}-2) {
+ unless (length($ret->{$k}) == $state{$sock}-2) {
$dead->($sock);
return;
}
$ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
- if $HAVE_ZLIB && $flags{$sock->[0]} & F_COMPRESS;
- if ($flags{$sock->[0]} & F_STORABLE) {
+ if $HAVE_ZLIB && $flags{$sock} & F_COMPRESS;
+ if ($flags{$sock} & F_STORABLE) {
# wrapped in eval in case a perl 5.6 Storable tries to
# unthaw data from a perl 5.8 Storable. (5.6 is stupid
# and dies if the version number changes at all. in 5.8
# they made it only die if it unencounters a new feature)
eval {
$ret->{$k} = Storable::thaw($ret->{$k});
};
# so if there was a problem, just treat it as a cache miss.
@@ -589,163 +581,163 @@
}
};
my $read = sub {
my $sock = shift;
my $res;
# where are we reading into?
- if ($state{$sock->[0]}) { # reading value into $ret
- $res = sysread($sock->[1], $ret->{$key{$sock->[0]}},
- $state{$sock->[0]} - $offset{$sock->[0]},
- $offset{$sock->[0]});
+ if ($state{$sock}) { # reading value into $ret
+ $res = sysread($sock, $ret->{$key{$sock}},
+ $state{$sock} - $offset{$sock},
+ $offset{$sock});
return
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) { # catches 0=conn closed or undef=error
$dead->($sock);
return;
}
- $offset{$sock->[0]} += $res;
- if ($offset{$sock} == $state{$sock->[0]}) { # finished reading
+ $offset{$sock} += $res;
+ if ($offset{$sock} == $state{$sock}) { # finished reading
$finalize->($sock);
- $state{$sock->[0]} = 0; # wait for another VALUE line or END
- $offset{$sock->[0]} = 0;
+ $state{$sock} = 0; # wait for another VALUE line or END
+ $offset{$sock} = 0;
}
return;
}
# we're reading a single line.
# first, read whatever's there, but be satisfied with 2048 bytes
- $res = sysread($sock->[1], $buf{$sock->[0]},
- 2048, $offset{$sock->[0]});
+ $res = sysread($sock, $buf{$sock},
+ 2048, $offset{$sock});
return
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) {
$dead->($sock);
return;
}
- $offset{$sock->[0]} += $res;
+ $offset{$sock} += $res;
SEARCH:
while(1) { # may have to search many times
# do we have a complete END line?
- if ($buf{$sock->[0]} =~ /^END\r\n/) {
+ if ($buf{$sock} =~ /^END\r\n/) {
# okay, finished with this socket
- delete $reading{$sock->[0]};
+ delete $reading{$sock};
$active_changed = 1;
return;
}
# do we have a complete VALUE line?
- if ($buf{$sock->[0]} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
- ($key{$sock->[0]}, $flags{$sock->[0]}, $state{$sock->[0]}) =
+ if ($buf{$sock} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
+ ($key{$sock}, $flags{$sock}, $state{$sock}) =
(substr($1, $self->{namespace_len}), int($2), $3+2);
- # Note: we use $+[0] and not pos($buf{$sock->[0]}) because pos()
+ # Note: we use $+[0] and not pos($buf{$sock}) because pos()
# seems to have problems under perl's taint mode. nobody
# on the list discovered why, but this seems a reasonable
# work-around:
my $p = $+[0];
- my $len = length($buf{$sock->[0]});
- my $copy = $len-$p > $state{$sock->[0]} ? $state{$sock->[0]} : $len-$p;
- $ret->{$key{$sock->[0]}} = substr($buf{$sock->[0]}, $p, $copy)
+ my $len = length($buf{$sock});
+ my $copy = $len-$p > $state{$sock} ? $state{$sock} : $len-$p;
+ $ret->{$key{$sock}} = substr($buf{$sock}, $p, $copy)
if $copy;
- $offset{$sock->[0]} = $copy;
- substr($buf{$sock->[0]}, 0, $p+$copy, ''); # delete the stuff we used
- if ($offset{$sock->[0]} == $state{$sock->[0]}) { # have it all?
+ $offset{$sock} = $copy;
+ substr($buf{$sock}, 0, $p+$copy, ''); # delete the stuff we used
+ if ($offset{$sock} == $state{$sock}) { # have it all?
$finalize->($sock);
- $state{$sock->[0]} = 0; # wait for another VALUE line or END
- $offset{$sock->[0]} = 0;
+ $state{$sock} = 0; # wait for another VALUE line or END
+ $offset{$sock} = 0;
next SEARCH; # look again
}
last SEARCH; # buffer is empty now
}
# if we're here probably means we only have a partial VALUE
# or END line in the buffer. Could happen with multi-get,
# though probably very rarely. Exit the loop and let it read
# more.
# but first, make sure subsequent reads don't destroy our
# partial VALUE/END line.
- $offset{$sock->[0]} = length($buf{$sock->[0]});
+ $offset{$sock} = length($buf{$sock});
last SEARCH;
}
# we don't have a complete line, wait and read more when ready
return;
};
my $write = sub {
my $sock = shift;
my $res;
- $res = send($sock->[1], $buf{$sock->[0]}, $FLAG_NOSIGNAL);
+ $res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
return
if not defined $res and $!==EWOULDBLOCK;
unless ($res > 0) {
$dead->($sock);
return;
}
- if ($res == length($buf{$sock->[0]})) { # all sent
- $buf{$sock->[0]} = "";
- $offset{$sock->[0]} = $state{$sock->[0]} = 0;
+ if ($res == length($buf{$sock})) { # all sent
+ $buf{$sock} = "";
+ $offset{$sock} = $state{$sock} = 0;
# switch the socket from writing state to reading state
- delete $writing{$sock->[0]};
- $reading{$sock->[0]} = $sock;
+ delete $writing{$sock};
+ $reading{$sock} = 1;
$active_changed = 1;
} else { # we only succeeded in sending some of it
- substr($buf{$sock->[0]}, 0, $res, ''); # delete the part we sent
+ substr($buf{$sock}, 0, $res, ''); # delete the part we sent
}
return;
};
# the bitsets for select
my ($rin, $rout, $win, $wout);
my $nfound;
# the big select loop
while(1) {
if ($active_changed) {
last unless %reading or %writing; # no sockets left?
($rin, $win) = ('', '');
- foreach (values %reading) {
- vec($rin, fileno($_->[1]), 1) = 1;
+ foreach (keys %reading) {
+ vec($rin, fileno($_), 1) = 1;
}
- foreach (values %writing) {
- vec($win, fileno($_->[1]), 1) = 1;
+ foreach (keys %writing) {
+ vec($win, fileno($_), 1) = 1;
}
$active_changed = 0;
}
# TODO: more intelligent cumulative timeout?
$nfound = select($rout=$rin, $wout=$win, undef,
$self->{'select_timeout'});
last unless $nfound;
# TODO: possible robustness improvement: we could select
# writing sockets for reading also, and raise hell if they're
# ready (input unread from last time, etc.)
# maybe do that on the first loop only?
- foreach (values %writing) {
- if (vec($wout, fileno($_->[1]), 1)) {
+ foreach (keys %writing) {
+ if (vec($wout, fileno($_), 1)) {
$write->($_);
}
}
- foreach (values %reading) {
- if (vec($rout, fileno($_->[1]), 1)) {
+ foreach (keys %reading) {
+ if (vec($rout, fileno($_), 1)) {
$read->($_);
}
}
}
# if there're active sockets left, they need to die
- foreach (values %writing) {
+ foreach (keys %writing) {
$dead->($_);
}
- foreach (values %reading) {
+ foreach (keys %reading) {
$dead->($_);
}
return;
}
sub _hashfunc {
return (crc32(shift) >> 16) & 0x7fff;
-------------- next part --------------
A non-text attachment was scrubbed...
Name: memcached.t
Type: application/x-troff
Size: 910 bytes
Desc: not available
Url : http://lists.danga.com/pipermail/memcached/attachments/20050405/5277fda5/memcached-0001.t
More information about the memcached
mailing list