Cache::Memcached fix for problems with sockets used as string

Gerard Goossen gerard at tty.nl
Mon Apr 4 06:21:52 PDT 2005


We had some problems using Cache::Memcached, especially when using
Devel::Profiler, giving strange file handle errors.
Apparently something did not like the trick in Cache::Memcached where
the GLOB for a socket was also used as a string. In the attached patch
this is fixed by using a hash(/object) to store the socket and the
string.

Gerard Goossen.
-------------- next part --------------
--- Cache-Memcached-1.14/Memcached.pm	2004-07-27 19:07:04.000000000 +0200
+++ Lib/Tty/Memcached.pm	2005-04-04 13:26:04.000000000 +0200
@@ -1,18 +1,38 @@
 # $Id: Memcached.pm,v 1.32 2004/07/27 17:07:04 bradfitz Exp $
 #
 # Copyright (c) 2003, 2004  Brad Fitzpatrick <brad at danga.com>
 #
 # See COPYRIGHT section in pod text below for usage and distribution rights.
 #
 
-package Cache::Memcached;
+# Modified by changing a GLOB to a package. The GLOB gave problems as being set as string.
 
 use strict;
+use warnings;
+
+package Cache::Memcached::Socket;
+
+sub new {
+    my ($class, %init) = @_;
+    bless \%init, $class;
+    return \%init;
+}
+
+sub socket { 
+    my $self = shift;
+    Carp::confess "No socket" unless $self->{socket};    
+    return $self->{socket};
+}
+
+sub name { shift->{name} };
+
+package Cache::Memcached;
+
 no strict 'refs';
 use Storable ();
 use Socket qw( MSG_NOSIGNAL PF_INET IPPROTO_TCP SOCK_STREAM );
 use IO::Handle ();
 use Time::HiRes ();
 use String::CRC32;
 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
 
@@ -129,68 +149,68 @@
 sub set_stat_callback {
     my Cache::Memcached $self = shift;
     my ($stat_callback) = @_;
     $self->{'stat_callback'} = $stat_callback;
 }
 
 sub _dead_sock {
     my ($sock, $ret, $dead_for) = @_;
-    if ($sock =~ /^Sock_(.+?):(\d+)$/) {
+    if ($sock->name =~ /^Sock_(.+?):(\d+)$/) {
         my $now = time();
         my ($ip, $port) = ($1, $2);
         my $host = "$ip:$port";
         $host_dead{$host} = $now + $dead_for
             if $dead_for;
         delete $cache_sock{$host};
     }
     return $ret;  # 0 or undef, probably, depending on what caller wants
 }
 
 sub _close_sock {
     my ($sock) = @_;
-    if ($sock =~ /^Sock_(.+?):(\d+)$/) {
+    if ($sock->name =~ /^Sock_(.+?):(\d+)$/) {
         my ($ip, $port) = ($1, $2);
         my $host = "$ip:$port";
-        close $sock;
+        close $sock->socket;
         delete $cache_sock{$host};
     }
 }
 
 sub _connect_sock { # sock, sin, timeout
     my ($sock, $sin, $timeout) = @_;
     $timeout ||= 0.25;
 
     # make the socket non-blocking from now on,
     # except if someone wants 0 timeout, meaning
     # a blocking connect, but even then turn it
     # non-blocking at the end of this function
 
     if ($timeout) {
-        IO::Handle::blocking($sock, 0);
+        IO::Handle::blocking($sock->socket, 0);
     } else {
-        IO::Handle::blocking($sock, 1);
+        IO::Handle::blocking($sock->socket, 1);
     }
 
-    my $ret = connect($sock, $sin);
+    my $ret = connect($sock->{socket}, $sin);
 
     if (!$ret && $timeout && $!==EINPROGRESS) {
 
         my $win='';
-        vec($win, fileno($sock), 1) = 1;
+        vec($win, fileno($sock->socket), 1) = 1;
 
         if (select(undef, $win, undef, $timeout) > 0) {
-            $ret = connect($sock, $sin);
+            $ret = connect($sock->{socket}, $sin);
             # EISCONN means connected & won't re-connect, so success
             $ret = 1 if !$ret && $!==EISCONN;
         }
     }
 
     unless ($timeout) { # socket was temporarily blocking, now revert
-        IO::Handle::blocking($sock, 0);
+        IO::Handle::blocking($sock->socket, 0);
     }
 
     # from here on, we use non-blocking (async) IO for the duration
     # of the socket's life
 
     return $ret;
 }
 
@@ -198,45 +218,48 @@
     my Cache::Memcached $self = ref $_[0] ? shift : undef;
     my $host = $_[0];
     return $cache_sock{$host} if $cache_sock{$host};
 
     my $now = time();
     my ($ip, $port) = $host =~ /(.*):(\d+)/;
     return undef if
         $host_dead{$host} && $host_dead{$host} > $now;
-    my $sock = "Sock_$host";
+    my $sock = Cache::Memcached::Socket->new(name => "Sock_$host", socket => undef);
 
     my $connected = 0;
     my $sin;
     my $proto = $PROTO_TCP ||= getprotobyname('tcp');
 
     # if a preferred IP is known, try that first.
     if ($self && $self->{pref_ip}{$ip}) {
-        socket($sock, PF_INET, SOCK_STREAM, $proto);
+        socket($sock->{socket}, PF_INET, SOCK_STREAM, $proto);
+        binmode $sock->{socket}, ":utf8";
+
         my $prefip = $self->{pref_ip}{$ip};
         $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
         if (_connect_sock($sock,$sin,0.1)) {
             $connected = 1;
         } else {
-            close $sock;
+            close $sock->socket;
         }
     }
 
     # normal path, or fallback path if preferred IP failed
     unless ($connected) {
-        socket($sock, PF_INET, SOCK_STREAM, $proto);
+        socket($sock->{socket}, PF_INET, SOCK_STREAM, $proto);
+        binmode $sock->{socket}, ":utf8";
         $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
         unless (_connect_sock($sock,$sin)) {
             return _dead_sock($sock, undef, 20 + int(rand(10)));
         }
     }
 
-    # make the new socket not buffer writes.
-    my $old = select($sock);
+    # make the new socket not buffer writes.    
+    my $old = select($sock->socket);
     $| = 1;
     select($old);
 
     return $cache_sock{$host} = $sock;
 }
 
 sub get_sock { # (key)
     my Cache::Memcached $self = shift;
@@ -271,17 +294,17 @@
         }
     }
     $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
 }
 
 sub disconnect_all {
     my $sock;
     foreach $sock (values %cache_sock) {
-        close $sock;
+        close $sock->socket;
     }
     %cache_sock = ();
 }
 
 sub _oneline {
     my Cache::Memcached $self = shift;
     my ($sock, $line) = @_;
     my $res;
@@ -297,41 +320,41 @@
     my $copy_state = -1;
     local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
 
     # the select loop
     while(1) {
         if ($copy_state!=$state) {
             last if $state==2;
             ($rin, $win) = ('', '');
-            vec($rin, fileno($sock), 1) = 1 if $state==1;
-            vec($win, fileno($sock), 1) = 1 if $state==0;
+            vec($rin, fileno($sock->socket), 1) = 1 if $state==1;
+            vec($win, fileno($sock->socket), 1) = 1 if $state==0;
             $copy_state = $state;
         }
         $nfound = select($rout=$rin, $wout=$win, undef,
                          $self->{'select_timeout'});
         last unless $nfound;
 
-        if (vec($wout, fileno($sock), 1)) {
-            $res = send($sock, $line, $FLAG_NOSIGNAL);
+        if (vec($wout, fileno($sock->socket), 1)) {
+            $res = send($sock->socket, $line, $FLAG_NOSIGNAL);
             next
                 if not defined $res and $!==EWOULDBLOCK;
             unless ($res > 0) {
                 _close_sock($sock);
                 return undef;
             }
             if ($res == length($line)) { # all sent
                 $state = 1;
             } else { # we only succeeded in sending some of it
                 substr($line, 0, $res, ''); # delete the part we sent
             }
         }
 
-        if (vec($rout, fileno($sock), 1)) {
-            $res = sysread($sock, $ret, 255, $offset);
+        if (vec($rout, fileno($sock->socket), 1)) {
+            $res = sysread($sock->socket, $ret, 255, $offset);
             next
                 if !defined($res) and $!==EWOULDBLOCK;
             if ($res == 0) { # catches 0=conn closed or undef=error
                 _close_sock($sock);
                 return undef;
             }
             $offset += $res;
             if (rindex($ret, "\r\n") + 2 == length($ret)) {
@@ -482,77 +505,82 @@
 }
 
 sub get_multi {
     my Cache::Memcached $self = shift;
     return undef unless $self->{'active'};
     $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
     $self->{'stats'}->{"get_multi"}++;
     my %val;        # what we'll be returning a reference to (realkey -> value)
-    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
+    my @sock_keys;  # sockref_as_scalar -> [ realkeys ]
     my $sock;
 
     foreach my $key (@_) {
         $sock = $self->get_sock($key);
         next unless $sock;
         my $kval = ref $key ? $key->[1] : $key;
-        push @{$sock_keys{$sock}}, $kval;
+        if (my ($sk) = grep { $sock eq $_->[0] } @sock_keys) {
+            push @$sk, $kval;
+        } else {
+            push @sock_keys, [$sock, $kval];
+        }
     }
     $self->{'stats'}->{"get_keys"} += @_;
-    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
+    $self->{'stats'}->{"get_socks"} += @sock_keys;
 
     local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
 
-    _load_multi($self, \%sock_keys, \%val);
+    _load_multi($self, \@sock_keys, \%val);
 
     if ($self->{'debug'}) {
         while (my ($k, $v) = each %val) {
             print STDERR "MemCache: got $k = $v\n";
         }
     }
     return \%val;
 }
 
 sub _load_multi {
     use bytes; # return bytes from length()
     my Cache::Memcached $self = shift;
     my ($sock_keys, $ret) = @_;
 
     # all keyed by a $sock:
-    my %reading; # bool, whether we're reading from this socket
-    my %writing; # bool, whether we're writing into this socket
+    my %reading; # bool, whether we're reading from this socket, value is the socket
+    my %writing; # bool, whether we're writing into this socket, value is the socket
     my %state;   # reading state:
                  # 0 = waiting for a line, N = reading N bytes
     my %buf;     # buffers
     my %offset;  # offsets to read into buffers
     my %key;     # current key per socket
     my %flags;   # flags per socket
 
-    foreach (keys %$sock_keys) {
+    foreach (@$sock_keys) {
         print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
-        $writing{$_} = 1;
-        $buf{$_} = "get ". join(" ", map { "$self->{namespace}$_" } @{$sock_keys->{$_}}) . "\r\n";
+        my $sock = shift @$_;
+        $writing{$sock} = $sock;
+        $buf{$sock} = "get ". join(" ", map { "$self->{namespace}$_" } @$_) . "\r\n";
     }
 
     my $active_changed = 1; # force rebuilding of select sets
 
     my $dead = sub {
         my $sock = shift;
         print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
         delete $reading{$sock};
         delete $writing{$sock};
         delete $ret->{$key{$sock}}
             if $key{$sock};
 
         if ($self->{'stat_callback'}) {
             my $etime = Time::HiRes::time();
             $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
         }
-
-        close $sock;
+        
+        close $sock->socket;
         _dead_sock($sock);
         $active_changed = 1;
     };
 
     my $finalize = sub {
         my $sock = shift;
         my $k = $key{$sock};
 
@@ -582,17 +610,17 @@
     };
 
     my $read = sub {
         my $sock = shift;
         my $res;
 
         # where are we reading into?
         if ($state{$sock}) { # reading value into $ret
-            $res = sysread($sock, $ret->{$key{$sock}},
+            $res = sysread($sock->socket, $ret->{$key{$sock}},
                            $state{$sock} - $offset{$sock},
                            $offset{$sock});
             return
                 if !defined($res) and $!==EWOULDBLOCK;
             if ($res == 0) { # catches 0=conn closed or undef=error
                 $dead->($sock);
                 return;
             }
@@ -602,17 +630,17 @@
                 $state{$sock} = 0; # wait for another VALUE line or END
                 $offset{$sock} = 0;
             }
             return;
         }
 
         # we're reading a single line.
         # first, read whatever's there, but be satisfied with 2048 bytes
-        $res = sysread($sock, $buf{$sock},
+        $res = sysread($sock->socket, $buf{$sock},
                        2048, $offset{$sock});
         return
             if !defined($res) and $!==EWOULDBLOCK;
         if ($res == 0) {
             $dead->($sock);
             return;
         }
         $offset{$sock} += $res;
@@ -665,79 +693,79 @@
         # we don't have a complete line, wait and read more when ready
         return;
     };
 
     my $write = sub {
         my $sock = shift;
         my $res;
 
-        $res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
+        $res = send($sock->socket, $buf{$sock}, $FLAG_NOSIGNAL);
         return
             if not defined $res and $!==EWOULDBLOCK;
         unless ($res > 0) {
             $dead->($sock);
             return;
         }
         if ($res == length($buf{$sock})) { # all sent
             $buf{$sock} = "";
             $offset{$sock} = $state{$sock} = 0;
             # switch the socket from writing state to reading state
             delete $writing{$sock};
-            $reading{$sock} = 1;
+            $reading{$sock} = $sock;
             $active_changed = 1;
         } else { # we only succeeded in sending some of it
             substr($buf{$sock}, 0, $res, ''); # delete the part we sent
         }
         return;
     };
 
     # the bitsets for select
     my ($rin, $rout, $win, $wout);
     my $nfound;
 
     # the big select loop
     while(1) {
         if ($active_changed) {
             last unless %reading or %writing; # no sockets left?
             ($rin, $win) = ('', '');
-            foreach (keys %reading) {
-                vec($rin, fileno($_), 1) = 1;
+            foreach (values %reading) {
+                vec($rin, fileno($_->socket), 1) = 1;
             }
-            foreach (keys %writing) {
-                vec($win, fileno($_), 1) = 1;
+            foreach (values %writing) {
+                vec($win, fileno($_->socket), 1) = 1;
             }
             $active_changed = 0;
         }
         # TODO: more intelligent cumulative timeout?
         $nfound = select($rout=$rin, $wout=$win, undef,
                          $self->{'select_timeout'});
         last unless $nfound;
 
         # TODO: possible robustness improvement: we could select
         # writing sockets for reading also, and raise hell if they're
         # ready (input unread from last time, etc.)
         # maybe do that on the first loop only?
-        foreach (keys %writing) {
-            if (vec($wout, fileno($_), 1)) {
+        foreach (values %writing) {
+            if (vec($wout, fileno($_->socket), 1)) {
                 $write->($_);
             }
         }
-        foreach (keys %reading) {
-            if (vec($rout, fileno($_), 1)) {
+        foreach (values %reading) {
+            if (vec($rout, fileno($_->socket), 1)) {
                 $read->($_);
             }
         }
     }
 
     # if there're active sockets left, they need to die
-    foreach (keys %writing) {
+    foreach (values %writing) {
         $dead->($_);
     }
-    foreach (keys %reading) {
+    foreach (values %reading) {
         $dead->($_);
     }
 
     return;
 }
 
 sub _hashfunc {
     return (crc32(shift) >> 16) & 0x7fff;


More information about the memcached mailing list