[memcached] bradfitz, r301: move all the parsing code into its own l...

commits at code.sixapart.com commits at code.sixapart.com
Mon Jul 17 21:02:28 UTC 2006


move all the parsing code into its own little class/object, in prep
for writing a version of it in C.




A   trunk/api/perl/lib/Cache/Memcached/
A   trunk/api/perl/lib/Cache/Memcached/GetParser.pm
U   trunk/api/perl/lib/Cache/Memcached.pm


Added: trunk/api/perl/lib/Cache/Memcached/GetParser.pm
===================================================================
--- trunk/api/perl/lib/Cache/Memcached/GetParser.pm	2006-07-17 21:00:08 UTC (rev 300)
+++ trunk/api/perl/lib/Cache/Memcached/GetParser.pm	2006-07-17 21:02:22 UTC (rev 301)
@@ -0,0 +1,115 @@
+package Cache::Memcached::GetParser;
+use strict;
+use warnings;
+use integer;
+
+use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
+
+use constant DEST    => 0;  # destination hashref we're writing into
+use constant NSLEN   => 1;  # length of namespace to ignore on keys
+use constant ON_ITEM => 2;
+use constant BUF     => 3;  # read buffer
+use constant STATE   => 4;  # 0 = waiting for a line, N = reading N bytes
+use constant OFFSET  => 5;  # offsets to read into buffers
+use constant FLAGS   => 6;
+use constant KEY     => 7;  # current key we're parsing (without the namespace prefix)
+
+sub new {
+    my ($class, $dest, $nslen, $on_item) = @_;
+    return bless [$dest, $nslen, $on_item, '', 0, 0], $class;
+}
+
+sub current_key {
+    return $_[0][KEY];
+}
+
+# returns 1 on success, -1 on failure, and 0 if still working.
+sub parse_from_sock {
+    my ($self, $sock) = @_;
+
+    my $res;
+    my $ret = $self->[DEST];
+
+    # where are we reading into?
+    if ($self->[STATE]) { # reading value into $ret
+        $res = sysread($sock, $ret->{$self->[KEY]},
+                       $self->[STATE] - $self->[OFFSET],
+                       $self->[OFFSET]);
+
+        return 0
+            if !defined($res) and $!==EWOULDBLOCK;
+
+        if ($res == 0) { # catches 0=conn closed or undef=error
+            return -1;
+        }
+
+        $self->[OFFSET] += $res;
+        if ($self->[OFFSET] == $self->[STATE]) { # finished reading
+            $self->[ON_ITEM]->($self->[KEY], $self->[FLAGS]);
+            $self->[OFFSET] = 0;
+            $self->[STATE]  = 0;
+            # wait for another VALUE line or END...
+        }
+        return 0; # still working, haven't got to end yet
+    }
+
+    # we're reading a single line.
+    # first, read whatever's there, but be satisfied with 2048 bytes
+    $res = sysread($sock, $self->[BUF],
+                   2048, $self->[OFFSET]);
+    return 0
+        if !defined($res) and $!==EWOULDBLOCK;
+    return -1 if $res == 0;
+
+    $self->[OFFSET] += $res;
+
+    # Below is a hot path.  Should be written in C.
+
+  SEARCH:
+    while(1) { # may have to search many times
+
+        # do we have a complete END line?
+        if ($self->[BUF] =~ /^END\r\n/) {
+            $self->[ON_ITEM] = undef;
+            return 1;  # we're done successfully, return 1 to finish
+        }
+
+        # do we have a complete VALUE line?
+        if ($self->[BUF] =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
+            ($self->[KEY], $self->[FLAGS], $self->[STATE]) =
+                (substr($1, $self->[NSLEN]), int($2), $3+2);
+            # Note: we use $+[0] and not pos($self->[BUF]) because pos()
+            # seems to have problems under perl's taint mode.  nobody
+            # on the list discovered why, but this seems a reasonable
+            # work-around:
+            my $p = $+[0];
+            my $len = length($self->[BUF]);
+            my $copy = $len-$p > $self->[STATE] ? $self->[STATE] : $len-$p;
+            $ret->{$self->[KEY]} = substr($self->[BUF], $p, $copy)
+                if $copy;
+            $self->[OFFSET] = $copy;
+            substr($self->[BUF], 0, $p+$copy, ''); # delete the stuff we used
+
+            if ($self->[OFFSET] == $self->[STATE]) { # have it all?
+                $self->[ON_ITEM]->($self->[KEY], $self->[FLAGS]);
+                $self->[OFFSET] = 0;
+                $self->[STATE]  = 0;
+                next SEARCH; # look again
+            }
+
+            last SEARCH; # buffer is empty now
+        }
+
+        # if we're here probably means we only have a partial VALUE
+        # or END line in the buffer. Could happen with multi-get,
+        # though probably very rarely. Exit the loop and let it read
+        # more.
+
+        # but first, make sure subsequent reads don't destroy our
+        # partial VALUE/END line.
+        $self->[OFFSET] = length($self->[BUF]);
+        last SEARCH;
+    }
+}
+
+1;

Modified: trunk/api/perl/lib/Cache/Memcached.pm
===================================================================
--- trunk/api/perl/lib/Cache/Memcached.pm	2006-07-17 21:00:08 UTC (rev 300)
+++ trunk/api/perl/lib/Cache/Memcached.pm	2006-07-17 21:02:22 UTC (rev 301)
@@ -15,6 +15,7 @@
 use Time::HiRes ();
 use String::CRC32;
 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
+use Cache::Memcached::GetParser;
 
 use fields qw{
     debug no_rehash stats compress_threshold compress_enable stat_callback
@@ -575,24 +576,9 @@
     # all keyed by $sockstr:
     my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
     my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
-    my %state;   # reading state:
-                 # 0 = waiting for a line, N = reading N bytes
-    my %buf;     # buffers, for both reading/writing
-    my %offset;  # offsets to read into buffers
-    my %key;     # current key per socket
-    my %flags;   # flags per socket
+    my %buf;     # buffers, for writing
 
-    foreach (keys %$sock_keys) {
-        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
-        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
-        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
-        $writing{$_} = $sock;
-        if ($self->{namespace}) {
-            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
-        } else {
-            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
-        }
-    }
+    my %parser;  # $sockstr -> Cache::Memcached::GetParser
 
     my $active_changed = 1; # force rebuilding of select sets
 
@@ -601,9 +587,12 @@
         print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
         delete $reading{$sock};
         delete $writing{$sock};
-        delete $ret->{$key{$sock}}
-            if $key{$sock};
 
+        if (my $p = $parser{$sock}) {
+            my $key = $p->current_key;
+            delete $ret->{$key} if $key;
+        }
+
         if ($self->{'stat_callback'}) {
             my $etime = Time::HiRes::time();
             $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
@@ -611,24 +600,17 @@
 
         close $sock;
         _dead_sock($sock);
-        $active_changed = 1;
     };
 
     my $finalize = sub {
-        my $sock = "$_[0]";
-        my $k = $key{$sock};
+        my ($k, $flags) = @_;
 
         # remove trailing \r\n
         chop $ret->{$k}; chop $ret->{$k};
 
-        unless (length($ret->{$k}) == $state{$sock}-2) {
-            $dead->($_[0]);
-            return;
-        }
-
         $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
-            if $HAVE_ZLIB && $flags{$sock} & F_COMPRESS;
-        if ($flags{$sock} & F_STORABLE) {
+            if $HAVE_ZLIB && $flags & F_COMPRESS;
+        if ($flags & F_STORABLE) {
             # wrapped in eval in case a perl 5.6 Storable tries to
             # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
             # and dies if the version number changes at all.  in 5.8
@@ -643,129 +625,56 @@
         }
     };
 
-    my $read = sub {
-        my $sockstr = "$_[0]";  # $sock is $_[0];
-        my $res;
-
-        # where are we reading into?
-        if ($state{$sockstr}) { # reading value into $ret
-            $res = sysread($_[0], $ret->{$key{$sockstr}},
-                           $state{$sockstr} - $offset{$sockstr},
-                           $offset{$sockstr});
-            return
-                if !defined($res) and $!==EWOULDBLOCK;
-            if ($res == 0) { # catches 0=conn closed or undef=error
-                $dead->($_[0]);
-                return;
-            }
-            $offset{$sockstr} += $res;
-            if ($offset{$sockstr} == $state{$sockstr}) { # finished reading
-                $finalize->($_[0]);
-                $state{$sockstr} = 0; # wait for another VALUE line or END
-                $offset{$sockstr} = 0;
-            }
-            return;
+    foreach (keys %$sock_keys) {
+        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
+        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
+        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
+        $writing{$_} = $sock;
+        if ($self->{namespace}) {
+            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
+        } else {
+            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
         }
+        $parser{$_} = Cache::Memcached::GetParser->new($ret, $self->{namespace_len}, $finalize);
+    }
 
-        # we're reading a single line.
-        # first, read whatever's there, but be satisfied with 2048 bytes
-        $res = sysread($_[0], $buf{$sockstr},
-                       2048, $offset{$sockstr});
-        return
-            if !defined($res) and $!==EWOULDBLOCK;
-        if ($res == 0) {
+    my $read = sub {
+        my $sockstr = "$_[0]";  # $sock is $_[0];
+        my $p = $parser{$sockstr} or die;
+        my $rv = $p->parse_from_sock($_[0]);
+        if ($rv > 0) {
+            # okay, finished with this socket
+            delete $reading{$sockstr};
+        } elsif ($rv < 0) {
             $dead->($_[0]);
-            return;
         }
-        $offset{$sockstr} += $res;
-
-
-        # Below is a hot path.  In preparation for rewriting it in Perl/C,
-        # here are some notes.
-        #
-        # The while(1) below uses:
-        #   %buf
-        #   %reading
-        #   $active_changed
-        #   %key, %flags, %state, %offset
-        #   $finalize (CV)
-        #   $self->{namespace_len}
-
-      SEARCH:
-        while(1) { # may have to search many times
-
-
-            # do we have a complete END line?
-            if ($buf{$sockstr} =~ /^END\r\n/) {
-                # okay, finished with this socket
-                delete $reading{$sockstr};
-                $active_changed = 1;
-                return;
-            }
-
-            # do we have a complete VALUE line?
-            if ($buf{$sockstr} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
-                ($key{$sockstr}, $flags{$sockstr}, $state{$sockstr}) =
-                    (substr($1, $self->{namespace_len}), int($2), $3+2);
-                # Note: we use $+[0] and not pos($buf{$sockstr}) because pos()
-                # seems to have problems under perl's taint mode.  nobody
-                # on the list discovered why, but this seems a reasonable
-                # work-around:
-                my $p = $+[0];
-                my $len = length($buf{$sockstr});
-                my $copy = $len-$p > $state{$sockstr} ? $state{$sockstr} : $len-$p;
-                $ret->{$key{$sockstr}} = substr($buf{$sockstr}, $p, $copy)
-                    if $copy;
-                $offset{$sockstr} = $copy;
-                substr($buf{$sockstr}, 0, $p+$copy, ''); # delete the stuff we used
-                if ($offset{$sockstr} == $state{$sockstr}) { # have it all?
-                    $finalize->($_[0]);
-                    $state{$sockstr} = 0; # wait for another VALUE line or END
-                    $offset{$sockstr} = 0;
-                    next SEARCH; # look again
-                }
-                last SEARCH; # buffer is empty now
-            }
-
-
-            # if we're here probably means we only have a partial VALUE
-            # or END line in the buffer. Could happen with multi-get,
-            # though probably very rarely. Exit the loop and let it read
-            # more.
-
-            # but first, make sure subsequent reads don't destroy our
-            # partial VALUE/END line.
-            $offset{$sockstr} = length($buf{$sockstr});
-            last SEARCH;
-        }
-
-        # we don't have a complete line, wait and read more when ready
-        return;
+        return $rv;
     };
 
+    # returns 1 when it's done, for success or error.  0 if still working.
     my $write = sub {
         my ($sock, $sockstr) = ($_[0], "$_[0]");
         my $res;
 
         $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
 
-        return
+        return 0
             if not defined $res and $!==EWOULDBLOCK;
         unless ($res > 0) {
             $dead->($sock);
-            return;
+            return 1;
         }
         if ($res == length($buf{$sockstr})) { # all sent
             $buf{$sockstr} = "";
-            $offset{$sockstr} = $state{$sockstr} = 0;
-            # switch the socket from writing state to reading state
+
+            # switch the socket from writing to reading
             delete $writing{$sockstr};
             $reading{$sockstr} = $sock;
-            $active_changed = 1;
+            return 1;
         } else { # we only succeeded in sending some of it
             substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
         }
-        return;
+        return 0;
     };
 
     # the bitsets for select
@@ -797,12 +706,12 @@
         # maybe do that on the first loop only?
         foreach (values %writing) {
             if (vec($wout, fileno($_), 1)) {
-                $write->($_);
+                $active_changed = 1 if $write->($_);
             }
         }
         foreach (values %reading) {
             if (vec($rout, fileno($_), 1)) {
-                $read->($_);
+                $active_changed = 1 if $read->($_);
             }
         }
     }




More information about the memcached-commits mailing list