need advice: http client / async / danga socket

Mika Raento mikie at iki.fi
Sat Aug 4 17:57:53 UTC 2007


Ah, and I sent the not-quite-debugged version - here's something that 
works (although I'm not quite sure whether I'm doing the SSL shutdown 
correctly).

	Mika

Mika Raento kirjoitti:
> Hiya
> 
> I've been thinking about integrating djabberd with a HTTP based service 
> (AWS S3 and SQS) and to that end I began hacking together an async http 
> client. Here's the code - it might be enough for your needs. Very basic, 
> but does include https (with no certificate checks).
> 
> Request creation and response parsing comes courtesy of LWP; DNS, socket 
> handling and SSL are ripped/used from djabberd.
> 
> It doesn't even do redirects, let alone cookies, but it shouldn't be too 
> hard to add whatever you need.
> 
> (licensed under the same terms as perl/djabberd)
> 
> Regards,

-------------- next part --------------
#!/usr/bin/perl

use lib 'lib';
use strict;

use HTTP::Request;
use HTTP::Response;

my $request=HTTP::Request->new("GET", "http://www.google.com/");
my $c=new Client($request, sub { print $_[0]->as_string; });
my $request2=HTTP::Request->new("GET", "https://www.google.com/");
my $c2=new Client($request2, sub { print $_[0]->as_string; });

Danga::Socket->EventLoop();

package Client;
use DJabberd::DNS;
use URI;
use strict;
use fields qw(response_data dns request cb connection uri);

sub new {
    my ($class, $request, $cb) = @_;
    my $uri=URI->new($request->uri);
    $request->protocol("HTTP/1.1");
    $request->init_header('Host' => $uri->host);
    $request->init_header('Connection' => 'close');

    my $host=$uri->host;
    my $port=$uri->port;

    my $self=fields::new($class);

    my $dns=new DJabberd::DNS(hostname=>$host, port=>$port, callback=>
        sub { $self->connect(@_); });

    $self->{dns}=$dns;
    $self->{request}=$request;
    $self->{cb}=$cb;
    $self->{uri}=$uri;

    return $self;
}

sub connect {
    my $self=shift;
    my $endpoint=shift;
    die "DNS lookup failed " unless($endpoint);
    my $connection=Connection->new($endpoint, $self->{request}, $self->{cb}, $self->{uri});
}

1;

package Connection;
use base 'Danga::Socket';
use fields qw(request data cb state ssl write_when_readable uri);
use HTTP::Response;
use IO::Handle;
use Socket;
use DJabberd::Stanza::StartTLS;
 
use constant POLLIN        => 1;
use constant POLLOUT       => 4;

sub new {
    my ($class, $endpoint, $request, $cb, $uri) = @_;

    my $sock;
    my $proto = getprotobyname('tcp');
    socket $sock, PF_INET, SOCK_STREAM, $proto;
    unless ($sock && defined fileno($sock)) {
        die "Cannot alloc socket";
        return;
    }
    my $ip=$endpoint->addr;
    connect $sock, Socket::sockaddr_in($endpoint->port, Socket::inet_aton($ip));
    IO::Handle::blocking($sock, 0);

    my $self = $class->SUPER::new($sock);
    $self->watch_write(1);
    $self->{request}=$request;
    $self->{data}='';
    $self->{cb}=$cb;
    $self->{uri}=$uri;

    $self->{state}="connecting";

    return $self;
}

sub event_read {
    my Connection $self = shift;
    # for async SSL:  if a session renegotation is in progress,
    # our previous write wants us to become readable first.
    # we then go back into the write path (by flushing the write
    # buffer) and it then does a read on this socket.
    #warn "event_read\n";
    if (my $ar = $self->{write_when_readable}) {
        $self->{write_when_readable} = 0;
        $self->watch_read($ar->[0]);  # restore previous readability state
        $self->watch_write(1);
        return;
    }
    #warn "got some data\n";

    my $bref;
    if (my $ssl = $self->{ssl}) {
        my $data = Net::SSLeay::read($ssl);

        my $errs = Net::SSLeay::print_errs('SSL_read');
        if ($errs) {
            warn "SSL Read error: $errs\n";
            $self->close;
            return;
        }

        # Net::SSLeays buffers internally, so if we didn't read anything, it's
        # in its buffer
        return unless $data && length $data;
        $bref = \$data;
    } else {
        # non-ssl mode:
        $bref = $self->read(20_000);
    }
    return $self->close unless defined $bref;
    #print "read $$bref\n";

    $self->{data} .= $$bref;
}

sub close {
    my Connection $self = shift;
    my $resp;

    if (my $ssl = $self->{ssl}) {
        Net::SSLeay::free($ssl);
        $self->{ssl} = undef;
    }

    eval { $resp=HTTP::Response->parse($self->{data})};
    $self->{cb}->($resp);
    $self->SUPER::close;

    #warn "closed\n";
}

sub event_err {
    #warn "event_err\n";
    $_[0]->close;
}

sub event_write {
    my Connection $self = shift;

    if ($self->{state} eq "connecting") {
        $self->{state}="connected";
        $self->on_connected;
    } else {
        if ($self->write(undef)) {
            $self->watch_write(0);
            if ($self->{ssl}) {
                Net::SSLeay::shutdown($self->{ssl});
            } else {
                CORE::shutdown $self->{sock}->fileno, 1;
            } 
        }    
    }
}

sub on_connected {
    my Connection $self = shift;

    if ($self->{uri}->scheme eq "https") {
        $self->setup_ssl;
    }
    # note that we must request watch_read() before
    # attempting to write, so that the request is remembered
    # when the SSL handshake ends
    $self->watch_read(1);
    $self->write( $self->{request}->as_string );
}

sub setup_ssl {
    my Connection $self = shift;

    my $ctx = Net::SSLeay::CTX_new()
        or die("Failed to create SSL_CTX $!");

    $Net::SSLeay::ssl_version = 10; # Insist on TLSv1
    Net::SSLeay::CTX_set_options($ctx, &Net::SSLeay::OP_ALL)
        and Net::SSLeay::die_if_ssl_error("ssl ctx set options");

    Net::SSLeay::CTX_set_mode($ctx, 1)  # enable partial writes
        and Net::SSLeay::die_if_ssl_error("ssl ctx set options");

    my $ssl = Net::SSLeay::new($ctx) or die_now("Failed to create SSL $!");
    $self->{ssl} = $ssl;

    my $fileno = $self->{sock}->fileno;
    warn "setting ssl ($ssl) fileno to $fileno\n";
    Net::SSLeay::set_fd($ssl, $fileno);

    $Net::SSLeay::trace = 2;

    my $rv = Net::SSLeay::connect($ssl);
    if (!$rv) {
        warn "SSL accept error on $self\n";
        $self->close;
        return;
    }

    #warn "$self:  Cipher `" . Net::SSLeay::get_cipher($ssl) . "'\n";

    $self->set_writer_func(DJabberd::Stanza::StartTLS->danga_socket_writerfunc($self));
}

# called by Danga::Socket when a write doesn't fully go through.  by default it
# enables writability.  but we want to do nothing if we're waiting for a read for SSL
sub on_incomplete_write {
    my $self = shift;
    return if $self->{write_when_readable};
    $self->SUPER::on_incomplete_write;
}

# called by SSL machinery to let us know a write is stalled on readability.
# so we need to (at least temporarily) go readable and then process writes.
sub write_when_readable {
    my $self = shift;
    #warn "write_when_readable\n";

    # enable readability, but remember old value so we can pop it back
    my $prev_readable = ($self->{event_watch} & POLLIN)  ? 1 : 0;
    $self->watch_read(1);
    $self->{write_when_readable} = [ $prev_readable ];

    # don't need to push/pop its state because Danga::Socket->write, called later,
    # will do the one final write, or if not all written, will turn on watch_write
    $self->watch_write(0);
}


1;


More information about the Djabberd mailing list