Cache::Memcached fix for problems with sockets used as string
Gerard Goossen
gerard at tty.nl
Mon Apr 4 06:21:52 PDT 2005
We had some problems using Cache::Memcached, especially when using
Devel::Profiler, giving strange file handle errors.
Apparently something did not like the trick in Cache::Memcached where
the GLOB for a socket was also used as a string. In the attached patch
this is fixed by using a hash(/object) to store the socket and the
string.
Gerard Goossen.
-------------- next part --------------
--- Cache-Memcached-1.14/Memcached.pm 2004-07-27 19:07:04.000000000 +0200
+++ Lib/Tty/Memcached.pm 2005-04-04 13:26:04.000000000 +0200
@@ -1,18 +1,38 @@
# $Id: Memcached.pm,v 1.32 2004/07/27 17:07:04 bradfitz Exp $
#
# Copyright (c) 2003, 2004 Brad Fitzpatrick <brad at danga.com>
#
# See COPYRIGHT section in pod text below for usage and distribution rights.
#
-package Cache::Memcached;
+# Modified by changing a GLOB to a package. The GLOB gave problems as being set as string.
use strict;
+use warnings;
+
+package Cache::Memcached::Socket;
+
+sub new {
+ my ($class, %init) = @_;
+ bless \%init, $class;
+ return \%init;
+}
+
+sub socket {
+ my $self = shift;
+ Carp::confess "No socket" unless $self->{socket};
+ return $self->{socket};
+}
+
+sub name { shift->{name} };
+
+package Cache::Memcached;
+
no strict 'refs';
use Storable ();
use Socket qw( MSG_NOSIGNAL PF_INET IPPROTO_TCP SOCK_STREAM );
use IO::Handle ();
use Time::HiRes ();
use String::CRC32;
use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
@@ -129,68 +149,68 @@
sub set_stat_callback {
my Cache::Memcached $self = shift;
my ($stat_callback) = @_;
$self->{'stat_callback'} = $stat_callback;
}
sub _dead_sock {
my ($sock, $ret, $dead_for) = @_;
- if ($sock =~ /^Sock_(.+?):(\d+)$/) {
+ if ($sock->name =~ /^Sock_(.+?):(\d+)$/) {
my $now = time();
my ($ip, $port) = ($1, $2);
my $host = "$ip:$port";
$host_dead{$host} = $now + $dead_for
if $dead_for;
delete $cache_sock{$host};
}
return $ret; # 0 or undef, probably, depending on what caller wants
}
sub _close_sock {
my ($sock) = @_;
- if ($sock =~ /^Sock_(.+?):(\d+)$/) {
+ if ($sock->name =~ /^Sock_(.+?):(\d+)$/) {
my ($ip, $port) = ($1, $2);
my $host = "$ip:$port";
- close $sock;
+ close $sock->socket;
delete $cache_sock{$host};
}
}
sub _connect_sock { # sock, sin, timeout
my ($sock, $sin, $timeout) = @_;
$timeout ||= 0.25;
# make the socket non-blocking from now on,
# except if someone wants 0 timeout, meaning
# a blocking connect, but even then turn it
# non-blocking at the end of this function
if ($timeout) {
- IO::Handle::blocking($sock, 0);
+ IO::Handle::blocking($sock->socket, 0);
} else {
- IO::Handle::blocking($sock, 1);
+ IO::Handle::blocking($sock->socket, 1);
}
- my $ret = connect($sock, $sin);
+ my $ret = connect($sock->{socket}, $sin);
if (!$ret && $timeout && $!==EINPROGRESS) {
my $win='';
- vec($win, fileno($sock), 1) = 1;
+ vec($win, fileno($sock->socket), 1) = 1;
if (select(undef, $win, undef, $timeout) > 0) {
- $ret = connect($sock, $sin);
+ $ret = connect($sock->{socket}, $sin);
# EISCONN means connected & won't re-connect, so success
$ret = 1 if !$ret && $!==EISCONN;
}
}
unless ($timeout) { # socket was temporarily blocking, now revert
- IO::Handle::blocking($sock, 0);
+ IO::Handle::blocking($sock->socket, 0);
}
# from here on, we use non-blocking (async) IO for the duration
# of the socket's life
return $ret;
}
@@ -198,45 +218,48 @@
my Cache::Memcached $self = ref $_[0] ? shift : undef;
my $host = $_[0];
return $cache_sock{$host} if $cache_sock{$host};
my $now = time();
my ($ip, $port) = $host =~ /(.*):(\d+)/;
return undef if
$host_dead{$host} && $host_dead{$host} > $now;
- my $sock = "Sock_$host";
+ my $sock = Cache::Memcached::Socket->new(name => "Sock_$host", socket => undef);
my $connected = 0;
my $sin;
my $proto = $PROTO_TCP ||= getprotobyname('tcp');
# if a preferred IP is known, try that first.
if ($self && $self->{pref_ip}{$ip}) {
- socket($sock, PF_INET, SOCK_STREAM, $proto);
+ socket($sock->{socket}, PF_INET, SOCK_STREAM, $proto);
+ binmode $sock->{socket}, ":utf8";
+
my $prefip = $self->{pref_ip}{$ip};
$sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
if (_connect_sock($sock,$sin,0.1)) {
$connected = 1;
} else {
- close $sock;
+ close $sock->socket;
}
}
# normal path, or fallback path if preferred IP failed
unless ($connected) {
- socket($sock, PF_INET, SOCK_STREAM, $proto);
+ socket($sock->{socket}, PF_INET, SOCK_STREAM, $proto);
+ binmode $sock->{socket}, ":utf8";
$sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
unless (_connect_sock($sock,$sin)) {
return _dead_sock($sock, undef, 20 + int(rand(10)));
}
}
- # make the new socket not buffer writes.
- my $old = select($sock);
+ # make the new socket not buffer writes.
+ my $old = select($sock->socket);
$| = 1;
select($old);
return $cache_sock{$host} = $sock;
}
sub get_sock { # (key)
my Cache::Memcached $self = shift;
@@ -271,17 +294,17 @@
}
}
$self->{'bucketcount'} = scalar @{$self->{'buckets'}};
}
sub disconnect_all {
my $sock;
foreach $sock (values %cache_sock) {
- close $sock;
+ close $sock->socket;
}
%cache_sock = ();
}
sub _oneline {
my Cache::Memcached $self = shift;
my ($sock, $line) = @_;
my $res;
@@ -297,41 +320,41 @@
my $copy_state = -1;
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
# the select loop
while(1) {
if ($copy_state!=$state) {
last if $state==2;
($rin, $win) = ('', '');
- vec($rin, fileno($sock), 1) = 1 if $state==1;
- vec($win, fileno($sock), 1) = 1 if $state==0;
+ vec($rin, fileno($sock->socket), 1) = 1 if $state==1;
+ vec($win, fileno($sock->socket), 1) = 1 if $state==0;
$copy_state = $state;
}
$nfound = select($rout=$rin, $wout=$win, undef,
$self->{'select_timeout'});
last unless $nfound;
- if (vec($wout, fileno($sock), 1)) {
- $res = send($sock, $line, $FLAG_NOSIGNAL);
+ if (vec($wout, fileno($sock->socket), 1)) {
+ $res = send($sock->socket, $line, $FLAG_NOSIGNAL);
next
if not defined $res and $!==EWOULDBLOCK;
unless ($res > 0) {
_close_sock($sock);
return undef;
}
if ($res == length($line)) { # all sent
$state = 1;
} else { # we only succeeded in sending some of it
substr($line, 0, $res, ''); # delete the part we sent
}
}
- if (vec($rout, fileno($sock), 1)) {
- $res = sysread($sock, $ret, 255, $offset);
+ if (vec($rout, fileno($sock->socket), 1)) {
+ $res = sysread($sock->socket, $ret, 255, $offset);
next
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) { # catches 0=conn closed or undef=error
_close_sock($sock);
return undef;
}
$offset += $res;
if (rindex($ret, "\r\n") + 2 == length($ret)) {
@@ -482,77 +505,82 @@
}
sub get_multi {
my Cache::Memcached $self = shift;
return undef unless $self->{'active'};
$self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
$self->{'stats'}->{"get_multi"}++;
my %val; # what we'll be returning a reference to (realkey -> value)
- my %sock_keys; # sockref_as_scalar -> [ realkeys ]
+ my @sock_keys; # sockref_as_scalar -> [ realkeys ]
my $sock;
foreach my $key (@_) {
$sock = $self->get_sock($key);
next unless $sock;
my $kval = ref $key ? $key->[1] : $key;
- push @{$sock_keys{$sock}}, $kval;
+ if (my ($sk) = grep { $sock eq $_->[0] } @sock_keys) {
+ push @$sk, $kval;
+ } else {
+ push @sock_keys, [$sock, $kval];
+ }
}
$self->{'stats'}->{"get_keys"} += @_;
- $self->{'stats'}->{"get_socks"} += keys %sock_keys;
+ $self->{'stats'}->{"get_socks"} += @sock_keys;
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
- _load_multi($self, \%sock_keys, \%val);
+ _load_multi($self, \@sock_keys, \%val);
if ($self->{'debug'}) {
while (my ($k, $v) = each %val) {
print STDERR "MemCache: got $k = $v\n";
}
}
return \%val;
}
sub _load_multi {
use bytes; # return bytes from length()
my Cache::Memcached $self = shift;
my ($sock_keys, $ret) = @_;
# all keyed by a $sock:
- my %reading; # bool, whether we're reading from this socket
- my %writing; # bool, whether we're writing into this socket
+ my %reading; # bool, whether we're reading from this socket, value is the socket
+ my %writing; # bool, whether we're writing into this socket, value is the socket
my %state; # reading state:
# 0 = waiting for a line, N = reading N bytes
my %buf; # buffers
my %offset; # offsets to read into buffers
my %key; # current key per socket
my %flags; # flags per socket
- foreach (keys %$sock_keys) {
+ foreach (@$sock_keys) {
print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
- $writing{$_} = 1;
- $buf{$_} = "get ". join(" ", map { "$self->{namespace}$_" } @{$sock_keys->{$_}}) . "\r\n";
+ my $sock = shift @$_;
+ $writing{$sock} = $sock;
+ $buf{$sock} = "get ". join(" ", map { "$self->{namespace}$_" } @$_) . "\r\n";
}
my $active_changed = 1; # force rebuilding of select sets
my $dead = sub {
my $sock = shift;
print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
delete $reading{$sock};
delete $writing{$sock};
delete $ret->{$key{$sock}}
if $key{$sock};
if ($self->{'stat_callback'}) {
my $etime = Time::HiRes::time();
$self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
}
-
- close $sock;
+
+ close $sock->socket;
_dead_sock($sock);
$active_changed = 1;
};
my $finalize = sub {
my $sock = shift;
my $k = $key{$sock};
@@ -582,17 +610,17 @@
};
my $read = sub {
my $sock = shift;
my $res;
# where are we reading into?
if ($state{$sock}) { # reading value into $ret
- $res = sysread($sock, $ret->{$key{$sock}},
+ $res = sysread($sock->socket, $ret->{$key{$sock}},
$state{$sock} - $offset{$sock},
$offset{$sock});
return
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) { # catches 0=conn closed or undef=error
$dead->($sock);
return;
}
@@ -602,17 +630,17 @@
$state{$sock} = 0; # wait for another VALUE line or END
$offset{$sock} = 0;
}
return;
}
# we're reading a single line.
# first, read whatever's there, but be satisfied with 2048 bytes
- $res = sysread($sock, $buf{$sock},
+ $res = sysread($sock->socket, $buf{$sock},
2048, $offset{$sock});
return
if !defined($res) and $!==EWOULDBLOCK;
if ($res == 0) {
$dead->($sock);
return;
}
$offset{$sock} += $res;
@@ -665,79 +693,79 @@
# we don't have a complete line, wait and read more when ready
return;
};
my $write = sub {
my $sock = shift;
my $res;
- $res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
+ $res = send($sock->socket, $buf{$sock}, $FLAG_NOSIGNAL);
return
if not defined $res and $!==EWOULDBLOCK;
unless ($res > 0) {
$dead->($sock);
return;
}
if ($res == length($buf{$sock})) { # all sent
$buf{$sock} = "";
$offset{$sock} = $state{$sock} = 0;
# switch the socket from writing state to reading state
delete $writing{$sock};
- $reading{$sock} = 1;
+ $reading{$sock} = $sock;
$active_changed = 1;
} else { # we only succeeded in sending some of it
substr($buf{$sock}, 0, $res, ''); # delete the part we sent
}
return;
};
# the bitsets for select
my ($rin, $rout, $win, $wout);
my $nfound;
# the big select loop
while(1) {
if ($active_changed) {
last unless %reading or %writing; # no sockets left?
($rin, $win) = ('', '');
- foreach (keys %reading) {
- vec($rin, fileno($_), 1) = 1;
+ foreach (values %reading) {
+ vec($rin, fileno($_->socket), 1) = 1;
}
- foreach (keys %writing) {
- vec($win, fileno($_), 1) = 1;
+ foreach (values %writing) {
+ vec($win, fileno($_->socket), 1) = 1;
}
$active_changed = 0;
}
# TODO: more intelligent cumulative timeout?
$nfound = select($rout=$rin, $wout=$win, undef,
$self->{'select_timeout'});
last unless $nfound;
# TODO: possible robustness improvement: we could select
# writing sockets for reading also, and raise hell if they're
# ready (input unread from last time, etc.)
# maybe do that on the first loop only?
- foreach (keys %writing) {
- if (vec($wout, fileno($_), 1)) {
+ foreach (values %writing) {
+ if (vec($wout, fileno($_->socket), 1)) {
$write->($_);
}
}
- foreach (keys %reading) {
- if (vec($rout, fileno($_), 1)) {
+ foreach (values %reading) {
+ if (vec($rout, fileno($_->socket), 1)) {
$read->($_);
}
}
}
# if there're active sockets left, they need to die
- foreach (keys %writing) {
+ foreach (values %writing) {
$dead->($_);
}
- foreach (keys %reading) {
+ foreach (values %reading) {
$dead->($_);
}
return;
}
sub _hashfunc {
return (crc32(shift) >> 16) & 0x7fff;
More information about the memcached
mailing list