package Cache::Memcached::UDP; use String::CRC32; use IO::Select; use IO::Socket; use strict; sub new { my $class = shift; my $self = bless { _timeout => 0.5, }, ref $class || $class; @_ = %{ $_[0] } if @_ == 1 and ref $_[0] eq "HASH"; @_ % 2 == 0 or die "Argument count must be a multiple of 2\n"; while (@_) { my ($key, $val) = splice(@_, 0, 2); $self->{lc "_$key"} = $val; } $self->{"_server_weight_sum"} = 0; foreach my $server (@{ $self->{"_servers"} ||= [] }) { $server = [ $server ] unless ref $server; $self->{"_server_weight_sum"} += $server->[1] ||= 1; } return $self; } sub _choose_sock { my ($self, $key) = @_; my $bucket = (ref $key ? $key->[1] : crc32($key)) % $self->{"_server_weight_sum"}; my $pos = 0; foreach my $server (@{ $self->{"_servers"} }) { $pos += $server->[1]; return $server if $bucket < $pos; } die; } sub _send_wait { my ($self, $server, $msg) = @_; my $id = ($self->{_id} ||= int rand(2**16))++ % 2**16; my $sock = $server->[2] ||= IO::Socket::INET->new( PeerAddr => $server->[0], Proto => "udp", ) or die; length($msg) + 8 <= 1400 or die "Message too long\n"; # # request id (opaque), seq num, #packets, reserved (must be 0) $sock->send(pack("nnnn", $id, 0, 1, 0).$msg) or die "Couldn't send: $!\n"; my $sel = IO::Select->new($sock); my $ret = ''; my $expected; my $seq = 0; while ($sel->can_read($self->{"_timeout"})) { $sock->recv(my $buf, 65536) or die "Receive failed: $!\n"; length($buf) >= 8 or die "Short read: ".length($buf)."\n"; my ($rid, $rseq, $rpkts, $rrsv) = unpack("nnnn", substr($buf, 0, 8, '')); $expected = $rpkts unless defined $expected; next unless $rid == $id; next unless $rseq == $seq; $ret .= $buf; last if ++$seq == $rpkts; } return unless $expected and $seq == $expected; $ret; } sub get_multi { my ($self, @keys) = @_; my %get; my %ret; foreach my $key (@keys) { my $server = $self->_choose_sock($key); $get{$server}{server} = $server; push @{ $get{$server}{keys} }, ref $key ? $key->[0] : $key; } foreach my $get (values %get) { my $ret = $self->_send_wait($get->{server}, "get ".join(" ", @{ $get->{keys} })."\r\n"); while ($ret =~ s/^VALUE (\S+) (\d+) (\d+)\r\n//) { my ($key, $flag, $len) = ($1, $2, $3); my $val = substr($ret, 0, $len, ''); die unless substr($ret, 0, 2, '') eq "\r\n"; die if $flag; $ret{$key} = $val; } die if $ret ne "END\r\n"; } \%ret; } sub _add_set_replace { my ($self, $type, $key, $val, $exptime) = @_; my $server = $self->_choose_sock($key); return $self->_send_wait($server, sprintf("%s %s 0 %d %s\r\n%s\r\n", $type, ref $key ? $key->[1] : $key, $exptime, length($val), $val)) eq "STORED\r\n" ? 1 : 0; } sub _incr_decr { my ($self, $type, $key, $val, $exptime) = @_; my $server = $self->_choose_sock($key); return $self->_send_wait($server, "$type $key $val".(@_ > 4 ? " $exptime" : "")."\r\n") =~ /^(\d+)\r\n$/ ? $1 : undef; } sub stats { my ($self, $type) = @_; my $ret = ''; foreach my $server (@{ $self->{"_servers"} }) { $ret .= $self->_send_wait($server, "stats".($type ? " $type" : "")."\r\n"); } $ret; } sub get { (values %{ shift->get_multi(@_) })[0] } sub add { shift->_add_set_replace("add", @_) } sub set { shift->_add_set_replace("set", @_) } sub replace { shift->_add_set_replace("replace", @_) } sub incr { shift->_incr_decr("incr", @_) } sub decr { shift->_incr_decr("decr", @_) } 1;