[memcached] bradfitz,
r301: move all the parsing code into its own l...
commits at code.sixapart.com
commits at code.sixapart.com
Mon Jul 17 21:02:28 UTC 2006
move all the parsing code into its own little class/object, in prep
for writing a version of it in C.
A trunk/api/perl/lib/Cache/Memcached/
A trunk/api/perl/lib/Cache/Memcached/GetParser.pm
U trunk/api/perl/lib/Cache/Memcached.pm
Added: trunk/api/perl/lib/Cache/Memcached/GetParser.pm
===================================================================
--- trunk/api/perl/lib/Cache/Memcached/GetParser.pm 2006-07-17 21:00:08 UTC (rev 300)
+++ trunk/api/perl/lib/Cache/Memcached/GetParser.pm 2006-07-17 21:02:22 UTC (rev 301)
@@ -0,0 +1,115 @@
+package Cache::Memcached::GetParser;
+use strict;
+use warnings;
+use integer;
+
+use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
+
+use constant DEST => 0; # destination hashref we're writing into
+use constant NSLEN => 1; # length of namespace to ignore on keys
+use constant ON_ITEM => 2;
+use constant BUF => 3; # read buffer
+use constant STATE => 4; # 0 = waiting for a line, N = reading N bytes
+use constant OFFSET => 5; # offsets to read into buffers
+use constant FLAGS => 6;
+use constant KEY => 7; # current key we're parsing (without the namespace prefix)
+
+sub new {
+ my ($class, $dest, $nslen, $on_item) = @_;
+ return bless [$dest, $nslen, $on_item, '', 0, 0], $class;
+}
+
+sub current_key {
+ return $_[0][KEY];
+}
+
+# returns 1 on success, -1 on failure, and 0 if still working.
+sub parse_from_sock {
+ my ($self, $sock) = @_;
+
+ my $res;
+ my $ret = $self->[DEST];
+
+ # where are we reading into?
+ if ($self->[STATE]) { # reading value into $ret
+ $res = sysread($sock, $ret->{$self->[KEY]},
+ $self->[STATE] - $self->[OFFSET],
+ $self->[OFFSET]);
+
+ return 0
+ if !defined($res) and $!==EWOULDBLOCK;
+
+ if ($res == 0) { # catches 0=conn closed or undef=error
+ return -1;
+ }
+
+ $self->[OFFSET] += $res;
+ if ($self->[OFFSET] == $self->[STATE]) { # finished reading
+ $self->[ON_ITEM]->($self->[KEY], $self->[FLAGS]);
+ $self->[OFFSET] = 0;
+ $self->[STATE] = 0;
+ # wait for another VALUE line or END...
+ }
+ return 0; # still working, haven't got to end yet
+ }
+
+ # we're reading a single line.
+ # first, read whatever's there, but be satisfied with 2048 bytes
+ $res = sysread($sock, $self->[BUF],
+ 2048, $self->[OFFSET]);
+ return 0
+ if !defined($res) and $!==EWOULDBLOCK;
+ return -1 if $res == 0;
+
+ $self->[OFFSET] += $res;
+
+ # Below is a hot path. Should be written in C.
+
+ SEARCH:
+ while(1) { # may have to search many times
+
+ # do we have a complete END line?
+ if ($self->[BUF] =~ /^END\r\n/) {
+ $self->[ON_ITEM] = undef;
+ return 1; # we're done successfully, return 1 to finish
+ }
+
+ # do we have a complete VALUE line?
+ if ($self->[BUF] =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
+ ($self->[KEY], $self->[FLAGS], $self->[STATE]) =
+ (substr($1, $self->[NSLEN]), int($2), $3+2);
+ # Note: we use $+[0] and not pos($self->[BUF]) because pos()
+ # seems to have problems under perl's taint mode. nobody
+ # on the list discovered why, but this seems a reasonable
+ # work-around:
+ my $p = $+[0];
+ my $len = length($self->[BUF]);
+ my $copy = $len-$p > $self->[STATE] ? $self->[STATE] : $len-$p;
+ $ret->{$self->[KEY]} = substr($self->[BUF], $p, $copy)
+ if $copy;
+ $self->[OFFSET] = $copy;
+ substr($self->[BUF], 0, $p+$copy, ''); # delete the stuff we used
+
+ if ($self->[OFFSET] == $self->[STATE]) { # have it all?
+ $self->[ON_ITEM]->($self->[KEY], $self->[FLAGS]);
+ $self->[OFFSET] = 0;
+ $self->[STATE] = 0;
+ next SEARCH; # look again
+ }
+
+ last SEARCH; # buffer is empty now
+ }
+
+ # if we're here probably means we only have a partial VALUE
+ # or END line in the buffer. Could happen with multi-get,
+ # though probably very rarely. Exit the loop and let it read
+ # more.
+
+ # but first, make sure subsequent reads don't destroy our
+ # partial VALUE/END line.
+ $self->[OFFSET] = length($self->[BUF]);
+ last SEARCH;
+ }
+}
+
+1;
Modified: trunk/api/perl/lib/Cache/Memcached.pm
===================================================================
--- trunk/api/perl/lib/Cache/Memcached.pm 2006-07-17 21:00:08 UTC (rev 300)
+++ trunk/api/perl/lib/Cache/Memcached.pm 2006-07-17 21:02:22 UTC (rev 301)
@@ -15,6 +15,7 @@
use Time::HiRes ();
use String::CRC32;
use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
+use Cache::Memcached::GetParser;
use fields qw{
debug no_rehash stats compress_threshold compress_enable stat_callback
@@ -575,24 +576,9 @@
# all keyed by $sockstr:
my %reading; # $sockstr -> $sock. bool, whether we're reading from this socket
my %writing; # $sockstr -> $sock. bool, whether we're writing to this socket
- my %state; # reading state:
- # 0 = waiting for a line, N = reading N bytes
- my %buf; # buffers, for both reading/writing
- my %offset; # offsets to read into buffers
- my %key; # current key per socket
- my %flags; # flags per socket
+ my %buf; # buffers, for writing
- foreach (keys %$sock_keys) {
- my $ipport = $sock_map{$_} or die "No map found matching for $_";
- my $sock = $cache_sock{$ipport} or die "No sock found for $ipport";
- print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
- $writing{$_} = $sock;
- if ($self->{namespace}) {
- $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
- } else {
- $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
- }
- }
+ my %parser; # $sockstr -> Cache::Memcached::GetParser
my $active_changed = 1; # force rebuilding of select sets
@@ -601,9 +587,12 @@
print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
delete $reading{$sock};
delete $writing{$sock};
- delete $ret->{$key{$sock}}
- if $key{$sock};
+ if (my $p = $parser{$sock}) {
+ my $key = $p->current_key;
+ delete $ret->{$key} if $key;
+ }
+
if ($self->{'stat_callback'}) {
my $etime = Time::HiRes::time();
$self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
@@ -611,24 +600,17 @@
close $sock;
_dead_sock($sock);
- $active_changed = 1;
};
my $finalize = sub {
- my $sock = "$_[0]";
- my $k = $key{$sock};
+ my ($k, $flags) = @_;
# remove trailing \r\n
chop $ret->{$k}; chop $ret->{$k};
- unless (length($ret->{$k}) == $state{$sock}-2) {
- $dead->($_[0]);
- return;
- }
-
$ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
- if $HAVE_ZLIB && $flags{$sock} & F_COMPRESS;
- if ($flags{$sock} & F_STORABLE) {
+ if $HAVE_ZLIB && $flags & F_COMPRESS;
+ if ($flags & F_STORABLE) {
# wrapped in eval in case a perl 5.6 Storable tries to
# unthaw data from a perl 5.8 Storable. (5.6 is stupid
# and dies if the version number changes at all. in 5.8
@@ -643,129 +625,56 @@
}
};
- my $read = sub {
- my $sockstr = "$_[0]"; # $sock is $_[0];
- my $res;
-
- # where are we reading into?
- if ($state{$sockstr}) { # reading value into $ret
- $res = sysread($_[0], $ret->{$key{$sockstr}},
- $state{$sockstr} - $offset{$sockstr},
- $offset{$sockstr});
- return
- if !defined($res) and $!==EWOULDBLOCK;
- if ($res == 0) { # catches 0=conn closed or undef=error
- $dead->($_[0]);
- return;
- }
- $offset{$sockstr} += $res;
- if ($offset{$sockstr} == $state{$sockstr}) { # finished reading
- $finalize->($_[0]);
- $state{$sockstr} = 0; # wait for another VALUE line or END
- $offset{$sockstr} = 0;
- }
- return;
+ foreach (keys %$sock_keys) {
+ my $ipport = $sock_map{$_} or die "No map found matching for $_";
+ my $sock = $cache_sock{$ipport} or die "No sock found for $ipport";
+ print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
+ $writing{$_} = $sock;
+ if ($self->{namespace}) {
+ $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
+ } else {
+ $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
}
+ $parser{$_} = Cache::Memcached::GetParser->new($ret, $self->{namespace_len}, $finalize);
+ }
- # we're reading a single line.
- # first, read whatever's there, but be satisfied with 2048 bytes
- $res = sysread($_[0], $buf{$sockstr},
- 2048, $offset{$sockstr});
- return
- if !defined($res) and $!==EWOULDBLOCK;
- if ($res == 0) {
+ my $read = sub {
+ my $sockstr = "$_[0]"; # $sock is $_[0];
+ my $p = $parser{$sockstr} or die;
+ my $rv = $p->parse_from_sock($_[0]);
+ if ($rv > 0) {
+ # okay, finished with this socket
+ delete $reading{$sockstr};
+ } elsif ($rv < 0) {
$dead->($_[0]);
- return;
}
- $offset{$sockstr} += $res;
-
-
- # Below is a hot path. In preparation for rewriting it in Perl/C,
- # here are some notes.
- #
- # The while(1) below uses:
- # %buf
- # %reading
- # $active_changed
- # %key, %flags, %state, %offset
- # $finalize (CV)
- # $self->{namespace_len}
-
- SEARCH:
- while(1) { # may have to search many times
-
-
- # do we have a complete END line?
- if ($buf{$sockstr} =~ /^END\r\n/) {
- # okay, finished with this socket
- delete $reading{$sockstr};
- $active_changed = 1;
- return;
- }
-
- # do we have a complete VALUE line?
- if ($buf{$sockstr} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/) {
- ($key{$sockstr}, $flags{$sockstr}, $state{$sockstr}) =
- (substr($1, $self->{namespace_len}), int($2), $3+2);
- # Note: we use $+[0] and not pos($buf{$sockstr}) because pos()
- # seems to have problems under perl's taint mode. nobody
- # on the list discovered why, but this seems a reasonable
- # work-around:
- my $p = $+[0];
- my $len = length($buf{$sockstr});
- my $copy = $len-$p > $state{$sockstr} ? $state{$sockstr} : $len-$p;
- $ret->{$key{$sockstr}} = substr($buf{$sockstr}, $p, $copy)
- if $copy;
- $offset{$sockstr} = $copy;
- substr($buf{$sockstr}, 0, $p+$copy, ''); # delete the stuff we used
- if ($offset{$sockstr} == $state{$sockstr}) { # have it all?
- $finalize->($_[0]);
- $state{$sockstr} = 0; # wait for another VALUE line or END
- $offset{$sockstr} = 0;
- next SEARCH; # look again
- }
- last SEARCH; # buffer is empty now
- }
-
-
- # if we're here probably means we only have a partial VALUE
- # or END line in the buffer. Could happen with multi-get,
- # though probably very rarely. Exit the loop and let it read
- # more.
-
- # but first, make sure subsequent reads don't destroy our
- # partial VALUE/END line.
- $offset{$sockstr} = length($buf{$sockstr});
- last SEARCH;
- }
-
- # we don't have a complete line, wait and read more when ready
- return;
+ return $rv;
};
+ # returns 1 when it's done, for success or error. 0 if still working.
my $write = sub {
my ($sock, $sockstr) = ($_[0], "$_[0]");
my $res;
$res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
- return
+ return 0
if not defined $res and $!==EWOULDBLOCK;
unless ($res > 0) {
$dead->($sock);
- return;
+ return 1;
}
if ($res == length($buf{$sockstr})) { # all sent
$buf{$sockstr} = "";
- $offset{$sockstr} = $state{$sockstr} = 0;
- # switch the socket from writing state to reading state
+
+ # switch the socket from writing to reading
delete $writing{$sockstr};
$reading{$sockstr} = $sock;
- $active_changed = 1;
+ return 1;
} else { # we only succeeded in sending some of it
substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
}
- return;
+ return 0;
};
# the bitsets for select
@@ -797,12 +706,12 @@
# maybe do that on the first loop only?
foreach (values %writing) {
if (vec($wout, fileno($_), 1)) {
- $write->($_);
+ $active_changed = 1 if $write->($_);
}
}
foreach (values %reading) {
if (vec($rout, fileno($_), 1)) {
- $read->($_);
+ $active_changed = 1 if $read->($_);
}
}
}
More information about the memcached-commits
mailing list