Cache::Memcached fix for problems with sockets used as string

Gerard Goossen gerard at tty.nl
Tue Apr 5 06:53:54 PDT 2005


The patch did make it a lot slower. So I rewrote it to only use an array
with two elements instead of an object (using the name (first element)
for the key (using the hashref as key is slow)). Then there isn't much difference
in speed. 
The patch also set the binmode to utf8, but there are still problems
with utf8 (see the test script), any idea how to fix this?
Also attached is a small test script, I used for testing/measuring
speed (it assume a local memcached).


On Mon, Apr 04, 2005 at 09:43:33AM -0700, Brad Fitzpatrick wrote:
> Could probably remove the "no strict 'refs'" then, huh?
>
> Also, the %reading/%writing hashes... now that $sock is a HASH and not a
> string, using $sock as a key is kinda weird.  Might want to change it to
> $reading{$sock->name} = $sock.
> 
> You do any speed tests before/after this patch?  Cache::Memcached isn't
> the fastest thing in the world as it was... hopefully this didn't make it
> too much slower?
> 
> - Brad
> 
> 
> On Mon, 4 Apr 2005, Gerard Goossen wrote:
> 
> > We had some problems using Cache::Memcached, especially when using
> > Devel::Profiler, giving strange file handle errors.
> > Apparently something did not like the trick in Cache::Memcached where
> > the GLOB for a socket was also used as a string. In the attached patch
> > this is fixed by using a hash(/object) to store the socket and the
> > string.
> >
> > Gerard Goossen.
> >
> 
-------------- next part --------------
--- Lib/Tty/Memcached.pm	2005-04-05 15:01:42.000000000 +0200
+++ ../perlkit/perl/lib/site_perl/5.8.3/Cache/Memcached.pm	2005-04-05 12:15:42.000000000 +0200
@@ -129,68 +129,68 @@
 sub set_stat_callback {
     my Cache::Memcached $self = shift;
     my ($stat_callback) = @_;
     $self->{'stat_callback'} = $stat_callback;
 }
 
 sub _dead_sock {
     my ($sock, $ret, $dead_for) = @_;
-    if ($sock->[0] =~ /^Sock_(.+?):(\d+)$/) {
+    if ($sock =~ /^Sock_(.+?):(\d+)$/) {
         my $now = time();
         my ($ip, $port) = ($1, $2);
         my $host = "$ip:$port";
         $host_dead{$host} = $now + $dead_for
             if $dead_for;
         delete $cache_sock{$host};
     }
     return $ret;  # 0 or undef, probably, depending on what caller wants
 }
 
 sub _close_sock {
     my ($sock) = @_;
-    if ($sock->[0] =~ /^Sock_(.+?):(\d+)$/) {
+    if ($sock =~ /^Sock_(.+?):(\d+)$/) {
         my ($ip, $port) = ($1, $2);
         my $host = "$ip:$port";
-        close $sock->[1];
+        close $sock;
         delete $cache_sock{$host};
     }
 }
 
 sub _connect_sock { # sock, sin, timeout
     my ($sock, $sin, $timeout) = @_;
     $timeout ||= 0.25;
 
     # make the socket non-blocking from now on,
     # except if someone wants 0 timeout, meaning
     # a blocking connect, but even then turn it
     # non-blocking at the end of this function
 
     if ($timeout) {
-        IO::Handle::blocking($sock->[1], 0);
+        IO::Handle::blocking($sock, 0);
     } else {
-        IO::Handle::blocking($sock->[1], 1);
+        IO::Handle::blocking($sock, 1);
     }
 
-    my $ret = connect($sock->[1], $sin);
+    my $ret = connect($sock, $sin);
 
     if (!$ret && $timeout && $!==EINPROGRESS) {
 
         my $win='';
-        vec($win, fileno($sock->[1]), 1) = 1;
+        vec($win, fileno($sock), 1) = 1;
 
         if (select(undef, $win, undef, $timeout) > 0) {
-            $ret = connect($sock->[1], $sin);
+            $ret = connect($sock, $sin);
             # EISCONN means connected & won't re-connect, so success
             $ret = 1 if !$ret && $!==EISCONN;
         }
     }
 
     unless ($timeout) { # socket was temporarily blocking, now revert
-        IO::Handle::blocking($sock->[1], 0);
+        IO::Handle::blocking($sock, 0);
     }
 
     # from here on, we use non-blocking (async) IO for the duration
     # of the socket's life
 
     return $ret;
 }
 
@@ -198,48 +198,45 @@
     my Cache::Memcached $self = ref $_[0] ? shift : undef;
     my $host = $_[0];
     return $cache_sock{$host} if $cache_sock{$host};
 
     my $now = time();
     my ($ip, $port) = $host =~ /(.*):(\d+)/;
     return undef if
         $host_dead{$host} && $host_dead{$host} > $now;
-    my $sock = ["Sock_$host", undef];
+    my $sock = "Sock_$host";
 
     my $connected = 0;
     my $sin;
     my $proto = $PROTO_TCP ||= getprotobyname('tcp');
 
     # if a preferred IP is known, try that first.
     if ($self && $self->{pref_ip}{$ip}) {
-        socket($sock->[1], PF_INET, SOCK_STREAM, $proto);
-        # binmode $sock->[1], ":utf8";
-
+        socket($sock, PF_INET, SOCK_STREAM, $proto);
         my $prefip = $self->{pref_ip}{$ip};
         $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
         if (_connect_sock($sock,$sin,0.1)) {
             $connected = 1;
         } else {
-            close $sock->[1];
+            close $sock;
         }
     }
 
     # normal path, or fallback path if preferred IP failed
     unless ($connected) {
-        socket($sock->[1], PF_INET, SOCK_STREAM, $proto);
-        # binmode $sock->[1], ":utf8";
+        socket($sock, PF_INET, SOCK_STREAM, $proto);
         $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
         unless (_connect_sock($sock,$sin)) {
             return _dead_sock($sock, undef, 20 + int(rand(10)));
         }
     }
 
-    # make the new socket not buffer writes.    
-    my $old = select($sock->[1]);
+    # make the new socket not buffer writes.
+    my $old = select($sock);
     $| = 1;
     select($old);
 
     return $cache_sock{$host} = $sock;
 }
 
 sub get_sock { # (key)
     my Cache::Memcached $self = shift;
@@ -274,17 +271,17 @@
         }
     }
     $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
 }
 
 sub disconnect_all {
     my $sock;
     foreach $sock (values %cache_sock) {
-        close $sock->[1];
+        close $sock;
     }
     %cache_sock = ();
 }
 
 sub _oneline {
     my Cache::Memcached $self = shift;
     my ($sock, $line) = @_;
     my $res;
@@ -300,41 +297,41 @@
     my $copy_state = -1;
     local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
 
     # the select loop
     while(1) {
         if ($copy_state!=$state) {
             last if $state==2;
             ($rin, $win) = ('', '');
-            vec($rin, fileno($sock->[1]), 1) = 1 if $state==1;
-            vec($win, fileno($sock->[1]), 1) = 1 if $state==0;
+            vec($rin, fileno($sock), 1) = 1 if $state==1;
+            vec($win, fileno($sock), 1) = 1 if $state==0;
             $copy_state = $state;
         }
         $nfound = select($rout=$rin, $wout=$win, undef,
                          $self->{'select_timeout'});
         last unless $nfound;
 
-        if (vec($wout, fileno($sock->[1]), 1)) {
-            $res = send($sock->[1], $line, $FLAG_NOSIGNAL);
+        if (vec($wout, fileno($sock), 1)) {
+            $res = send($sock, $line, $FLAG_NOSIGNAL);
             next
                 if not defined $res and $!==EWOULDBLOCK;
             unless ($res > 0) {
                 _close_sock($sock);
                 return undef;
             }
             if ($res == length($line)) { # all sent
                 $state = 1;
             } else { # we only succeeded in sending some of it
                 substr($line, 0, $res, ''); # delete the part we sent
             }
         }
 
-        if (vec($rout, fileno($sock->[1]), 1)) {
-            $res = sysread($sock->[1], $ret, 255, $offset);
+        if (vec($rout, fileno($sock), 1)) {
+            $res = sysread($sock, $ret, 255, $offset);
             next
                 if !defined($res) and $!==EWOULDBLOCK;
             if ($res == 0) { # catches 0=conn closed or undef=error
                 _close_sock($sock);
                 return undef;
             }
             $offset += $res;
             if (rindex($ret, "\r\n") + 2 == length($ret)) {
@@ -485,28 +482,24 @@
 }
 
 sub get_multi {
     my Cache::Memcached $self = shift;
     return undef unless $self->{'active'};
     $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
     $self->{'stats'}->{"get_multi"}++;
     my %val;        # what we'll be returning a reference to (realkey -> value)
-    my %sock_keys;  # sockref_as_scalar -> [ socket, realkeys ]
+    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
     my $sock;
 
     foreach my $key (@_) {
         $sock = $self->get_sock($key);
         next unless $sock;
         my $kval = ref $key ? $key->[1] : $key;
-        if ($sock_keys{$sock->[0]}) {
-            push @{$sock_keys{$sock->[0]}}, $kval;
-        } else {
-            $sock_keys{$sock->[0]} = [$sock, $kval];
-        }
+        push @{$sock_keys{$sock}}, $kval;
     }
     $self->{'stats'}->{"get_keys"} += @_;
     $self->{'stats'}->{"get_socks"} += keys %sock_keys;
 
     local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
 
     _load_multi($self, \%sock_keys, \%val);
 
@@ -519,67 +512,66 @@
 }
 
 sub _load_multi {
     use bytes; # return bytes from length()
     my Cache::Memcached $self = shift;
     my ($sock_keys, $ret) = @_;
 
     # all keyed by a $sock:
-    my %reading; # bool, whether we're reading from this socket, value is the socket
-    my %writing; # bool, whether we're writing into this socket, value is the socket
+    my %reading; # bool, whether we're reading from this socket
+    my %writing; # bool, whether we're writing into this socket
     my %state;   # reading state:
                  # 0 = waiting for a line, N = reading N bytes
     my %buf;     # buffers
     my %offset;  # offsets to read into buffers
     my %key;     # current key per socket
     my %flags;   # flags per socket
 
-    foreach (values %$sock_keys) {
+    foreach (keys %$sock_keys) {
         print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
-        my $sock = shift @$_;
-        $writing{$sock->[0]} = $sock;
-        $buf{$sock->[0]} = "get ". join(" ", map { "$self->{namespace}$_" } @$_) . "\r\n";
+        $writing{$_} = 1;
+        $buf{$_} = "get ". join(" ", map { "$self->{namespace}$_" } @{$sock_keys->{$_}}) . "\r\n";
     }
 
     my $active_changed = 1; # force rebuilding of select sets
 
     my $dead = sub {
         my $sock = shift;
         print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
-        delete $reading{$sock->[0]};
-        delete $writing{$sock->[0]};
-        delete $ret->{$key{$sock->[0]}}
-            if $key{$sock->[0]};
+        delete $reading{$sock};
+        delete $writing{$sock};
+        delete $ret->{$key{$sock}}
+            if $key{$sock};
 
         if ($self->{'stat_callback'}) {
             my $etime = Time::HiRes::time();
             $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
         }
-        
-        close $sock->[1];
+
+        close $sock;
         _dead_sock($sock);
         $active_changed = 1;
     };
 
     my $finalize = sub {
         my $sock = shift;
-        my $k = $key{$sock->[0]};
+        my $k = $key{$sock};
 
         # remove trailing \r\n
         chop $ret->{$k}; chop $ret->{$k};
 
-        unless (length($ret->{$k}) == $state{$sock->[0]}-2) {
+        unless (length($ret->{$k}) == $state{$sock}-2) {
             $dead->($sock);
             return;
         }
 
         $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
-            if $HAVE_ZLIB && $flags{$sock->[0]} & F_COMPRESS;
-        if ($flags{$sock->[0]} & F_STORABLE) {
+            if $HAVE_ZLIB && $flags{$sock} & F_COMPRESS;
+        if ($flags{$sock} & F_STORABLE) {
             # wrapped in eval in case a perl 5.6 Storable tries to
             # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
             # and dies if the version number changes at all.  in 5.8
             # they made it only die if it unencounters a new feature)
             eval {
                 $ret->{$k} = Storable::thaw($ret->{$k});
             };
             # so if there was a problem, just treat it as a cache miss.
@@ -589,163 +581,163 @@
         }
     };
 
     my $read = sub {
         my $sock = shift;
         my $res;
 
         # where are we reading into?
-        if ($state{$sock->[0]}) { # reading value into $ret
-            $res = sysread($sock->[1], $ret->{$key{$sock->[0]}},
-                           $state{$sock->[0]} - $offset{$sock->[0]},
-                           $offset{$sock->[0]});
+        if ($state{$sock}) { # reading value into $ret
+            $res = sysread($sock, $ret->{$key{$sock}},
+                           $state{$sock} - $offset{$sock},
+                           $offset{$sock});
             return
                 if !defined($res) and $!==EWOULDBLOCK;
             if ($res == 0) { # catches 0=conn closed or undef=error
                 $dead->($sock);
                 return;
             }
-            $offset{$sock->[0]} += $res;
-            if ($offset{$sock} == $state{$sock->[0]}) { # finished reading
+            $offset{$sock} += $res;
+            if ($offset{$sock} == $state{$sock}) { # finished reading
                 $finalize->($sock);
-                $state{$sock->[0]} = 0; # wait for another VALUE line or END
-                $offset{$sock->[0]} = 0;
+                $state{$sock} = 0; # wait for another VALUE line or END
+                $offset{$sock} = 0;
             }
             return;
         }
 
         # we're reading a single line.
         # first, read whatever's there, but be satisfied with 2048 bytes
-        $res = sysread($sock->[1], $buf{$sock->[0]},
-                       2048, $offset{$sock->[0]});
+        $res = sysread($sock, $buf{$sock},
+                       2048, $offset{$sock});
         return
             if !defined($res) and $!==EWOULDBLOCK;
         if ($res == 0) {
             $dead->($sock);
             return;
         }
-        $offset{$sock->[0]} += $res;
+        $offset{$sock} += $res;
 
       SEARCH:
         while(1) { # may have to search many times
             # do we have a complete END line?
-            if ($buf{$sock->[0]} =~ /^END\r\n/) {
+            if ($buf{$sock} =~ /^END\r\n/) {
                 # okay, finished with this socket
-                delete $reading{$sock->[0]};
+                delete $reading{$sock};
                 $active_changed = 1;
                 return;
             }
 
             # do we have a complete VALUE line?
-            if ($buf{$sock->[0]} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
-                ($key{$sock->[0]}, $flags{$sock->[0]}, $state{$sock->[0]}) =
+            if ($buf{$sock} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
+                ($key{$sock}, $flags{$sock}, $state{$sock}) =
                     (substr($1, $self->{namespace_len}), int($2), $3+2);
-                # Note: we use $+[0] and not pos($buf{$sock->[0]}) because pos()
+                # Note: we use $+[0] and not pos($buf{$sock}) because pos()
                 # seems to have problems under perl's taint mode.  nobody
                 # on the list discovered why, but this seems a reasonable
                 # work-around:
                 my $p = $+[0];
-                my $len = length($buf{$sock->[0]});
-                my $copy = $len-$p > $state{$sock->[0]} ? $state{$sock->[0]} : $len-$p;
-                $ret->{$key{$sock->[0]}} = substr($buf{$sock->[0]}, $p, $copy)
+                my $len = length($buf{$sock});
+                my $copy = $len-$p > $state{$sock} ? $state{$sock} : $len-$p;
+                $ret->{$key{$sock}} = substr($buf{$sock}, $p, $copy)
                     if $copy;
-                $offset{$sock->[0]} = $copy;
-                substr($buf{$sock->[0]}, 0, $p+$copy, ''); # delete the stuff we used
-                if ($offset{$sock->[0]} == $state{$sock->[0]}) { # have it all?
+                $offset{$sock} = $copy;
+                substr($buf{$sock}, 0, $p+$copy, ''); # delete the stuff we used
+                if ($offset{$sock} == $state{$sock}) { # have it all?
                     $finalize->($sock);
-                    $state{$sock->[0]} = 0; # wait for another VALUE line or END
-                    $offset{$sock->[0]} = 0;
+                    $state{$sock} = 0; # wait for another VALUE line or END
+                    $offset{$sock} = 0;
                     next SEARCH; # look again
                 }
                 last SEARCH; # buffer is empty now
             }
 
             # if we're here probably means we only have a partial VALUE
             # or END line in the buffer. Could happen with multi-get,
             # though probably very rarely. Exit the loop and let it read
             # more.
 
             # but first, make sure subsequent reads don't destroy our
             # partial VALUE/END line.
-            $offset{$sock->[0]} = length($buf{$sock->[0]});
+            $offset{$sock} = length($buf{$sock});
             last SEARCH;
         }
 
         # we don't have a complete line, wait and read more when ready
         return;
     };
 
     my $write = sub {
         my $sock = shift;
         my $res;
 
-        $res = send($sock->[1], $buf{$sock->[0]}, $FLAG_NOSIGNAL);
+        $res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
         return
             if not defined $res and $!==EWOULDBLOCK;
         unless ($res > 0) {
             $dead->($sock);
             return;
         }
-        if ($res == length($buf{$sock->[0]})) { # all sent
-            $buf{$sock->[0]} = "";
-            $offset{$sock->[0]} = $state{$sock->[0]} = 0;
+        if ($res == length($buf{$sock})) { # all sent
+            $buf{$sock} = "";
+            $offset{$sock} = $state{$sock} = 0;
             # switch the socket from writing state to reading state
-            delete $writing{$sock->[0]};
-            $reading{$sock->[0]} = $sock;
+            delete $writing{$sock};
+            $reading{$sock} = 1;
             $active_changed = 1;
         } else { # we only succeeded in sending some of it
-            substr($buf{$sock->[0]}, 0, $res, ''); # delete the part we sent
+            substr($buf{$sock}, 0, $res, ''); # delete the part we sent
         }
         return;
     };
 
     # the bitsets for select
     my ($rin, $rout, $win, $wout);
     my $nfound;
 
     # the big select loop
     while(1) {
         if ($active_changed) {
             last unless %reading or %writing; # no sockets left?
             ($rin, $win) = ('', '');
-            foreach (values %reading) {
-                vec($rin, fileno($_->[1]), 1) = 1;
+            foreach (keys %reading) {
+                vec($rin, fileno($_), 1) = 1;
             }
-            foreach (values %writing) {
-                vec($win, fileno($_->[1]), 1) = 1;
+            foreach (keys %writing) {
+                vec($win, fileno($_), 1) = 1;
             }
             $active_changed = 0;
         }
         # TODO: more intelligent cumulative timeout?
         $nfound = select($rout=$rin, $wout=$win, undef,
                          $self->{'select_timeout'});
         last unless $nfound;
 
         # TODO: possible robustness improvement: we could select
         # writing sockets for reading also, and raise hell if they're
         # ready (input unread from last time, etc.)
         # maybe do that on the first loop only?
-        foreach (values %writing) {
-            if (vec($wout, fileno($_->[1]), 1)) {
+        foreach (keys %writing) {
+            if (vec($wout, fileno($_), 1)) {
                 $write->($_);
             }
         }
-        foreach (values %reading) {
-            if (vec($rout, fileno($_->[1]), 1)) {
+        foreach (keys %reading) {
+            if (vec($rout, fileno($_), 1)) {
                 $read->($_);
             }
         }
     }
 
     # if there're active sockets left, they need to die
-    foreach (values %writing) {
+    foreach (keys %writing) {
         $dead->($_);
     }
-    foreach (values %reading) {
+    foreach (keys %reading) {
         $dead->($_);
     }
 
     return;
 }
 
 sub _hashfunc {
     return (crc32(shift) >> 16) & 0x7fff;
-------------- next part --------------
A non-text attachment was scrubbed...
Name: memcached.t
Type: application/x-troff
Size: 910 bytes
Desc: not available
Url : http://lists.danga.com/pipermail/memcached/attachments/20050405/5277fda5/memcached-0001.t


More information about the memcached mailing list