[PATCH 7/7] Reimplement GPG wrapping code and make it more robust
Michael Hanselmann
public at hansmi.ch
Wed Aug 13 21:16:02 UTC 2008
---
lib/Brackup/Backup.pm | 69 ++++++++++++++----
lib/Brackup/GPGProcManager.pm | 158 ++++++++++++++++-------------------------
lib/Brackup/GPGProcess.pm | 158 +++++++++++++++++++++++++++--------------
3 files changed, 220 insertions(+), 165 deletions(-)
diff --git a/lib/Brackup/Backup.pm b/lib/Brackup/Backup.pm
index 727c6ac..2d9e1ef 100644
--- a/lib/Brackup/Backup.pm
+++ b/lib/Brackup/Backup.pm
@@ -5,7 +5,6 @@ use Carp qw(croak);
use Brackup::ChunkIterator;
use Brackup::CompositeChunk;
use Brackup::GPGProcManager;
-use Brackup::GPGProcess;
sub new {
my ($class, %opts) = @_;
@@ -67,6 +66,12 @@ sub new {
$self->{stored_files} = ();
$self->{stored_chunks} = ();
+ if ($self->{parent}->{root}->gpg_rcpt) {
+ $self->{gpgmgr} = Brackup::GPGProcManager->new;
+ } else {
+ $self->{gpgmgr} = undef;
+ }
+
# Number of files done
$self->{n_files_done} = 0;
@@ -146,8 +151,6 @@ sub _emit_file {
sub _emit_chunk {
my ($self, $rec) = @_;
my $root = $self->{parent}->{root};
- my $merge_under = $root->merge_files_under;
- my $gpg_rcpt = $root->gpg_rcpt;
if ($rec->isa("Brackup::File")) {
$self->start_file($rec);
@@ -188,13 +191,38 @@ sub _emit_chunk {
unless ($self->{dryrun}) {
$schunk = Brackup::StoredChunk->new($pchunk);
- if ($gpg_rcpt) {
- $schunk->set_encrypted_chunkref($root->encrypt($pchunk->raw_chunkref));
+ my $ctx = {
+ pchunk => $pchunk,
+ schunk => $schunk,
+ };
+
+ if ($self->{gpgmgr}) {
+ $self->{gpgmgr}->enqueue($pchunk, sub {
+ my ($pchunk, $chunkref) = @_;
+
+ $schunk->set_encrypted_chunkref($chunkref);
+
+ $self->_finalize_chunk($ctx);
+ });
+ } else {
+ $self->_finalize_chunk($ctx);
}
+ } else {
+ $pchunk->forget_chunkref;
+ }
+}
- # see if we should pack it into a bigger blob
- my $chunk_size = $schunk->backup_length;
+sub _finalize_chunk {
+ my ($self, $ctx) = @_;
+ my $root = $self->{parent}->{root};
+ my $merge_under = $root->merge_files_under;
+ my $pchunk = $ctx->{pchunk};
+ my $schunk = $ctx->{schunk};
+ # see if we should pack it into a bigger blob
+ my $chunk_size = $schunk->backup_length;
+
+ unless ($self->{dryrun}) {
# see if we should merge this chunk (in this case, file) together with
# other small files we encountered earlier, into a "composite chunk",
# to be stored on the target in one go.
@@ -205,11 +233,9 @@ sub _emit_chunk {
# would rather just have 1 type of magic per file. (split it or join it)
if ($merge_under && $chunk_size < $merge_under && $pchunk->is_entire_file) {
if ($self->{comp_chunk} && ! $self->{comp_chunk}->can_fit($chunk_size)) {
- $self->debug('Finalizing composite chunk ', $self->{comp_chunk}, '...');
- $self->{comp_chunk}->finalize;
- $self->{comp_chunk} = undef;
+ $self->_finalize_composite_chunk();
}
- $self->{comp_chunk} ||= Brackup::CompositeChunk->new($self->{parent}->{root},
+ $self->{comp_chunk} ||= Brackup::CompositeChunk->new($root,
$self->{parent}->{target});
$self->{comp_chunk}->append_little_chunk($schunk);
} else {
@@ -228,6 +254,16 @@ sub _emit_chunk {
$schunk->forget_chunkref if $schunk;
}
+sub _finalize_composite_chunk {
+ my ($self) = @_;
+
+ if ($self->{comp_chunk}) {
+ $self->debug('Finalizing composite chunk ', $self->{comp_chunk}, '...');
+ $self->{comp_chunk}->finalize;
+ $self->{comp_chunk} = undef;
+ }
+}
+
sub start_file {
my ($self, $file) = @_;
@@ -243,6 +279,11 @@ sub end_file {
my ($self) = @_;
my $file = $self->{cur_file};
+ if ($self->{gpgmgr}) {
+ # Make sure all parts are done
+ $self->{gpgmgr}->check(1);
+ }
+
return unless $file;
$self->{cb_end_file}->($file, $self->{stored_chunks});
@@ -279,11 +320,7 @@ sub _end {
$self->end_file();
- if ($self->{comp_chunk}) {
- $self->{comp_chunk}->finalize;
- $self->{comp_chunk} = undef;
- }
-
+ $self->_finalize_composite_chunk();
$self->_write_meta();
unless ($self->{dryrun}) {
diff --git a/lib/Brackup/GPGProcManager.pm b/lib/Brackup/GPGProcManager.pm
index 3c4825a..7025e1c 100644
--- a/lib/Brackup/GPGProcManager.pm
+++ b/lib/Brackup/GPGProcManager.pm
@@ -5,127 +5,93 @@ use Brackup::GPGProcess;
use POSIX ":sys_wait_h";
sub new {
- my ($class, $iter, $target) = @_;
- return bless {
- chunkiter => $iter,
- procs => {}, # "addr(pchunk)" => GPGProcess
- target => $target,
- procs_running => {}, # pid -> GPGProcess
- uncollected_bytes => 0,
- }, $class;
-}
+ my ($class) = @_;
+ my $self = bless {}, $class;
-sub enc_chunkref_of {
- my ($self, $pchunk) = @_;
-
- my $proc = $self->{procs}{$pchunk};
- unless ($proc) {
- # catch iterator up to the point that was
- # requested, or blow up.
- my $found = 0;
- my $iters = 0;
- while (my $ich = $self->{chunkiter}->next) {
- if ($ich == $pchunk) {
- $found = 1;
- last;
- }
- $iters++;
- warn "iters = $iters\n";
- }
- die "Not found" unless $found;
- $proc = $self->gen_process_for($pchunk);
- }
+ $self->{pending} = [];
+ $self->{progressing} = [];
+ $self->{max_concurrent} = 3;
- while ($proc->running) {
- my $pid = $self->wait_for_a_process(1) or die
- "No processes were reaped!";
- }
+ return $self;
+}
- $self->_proc_summary_dump;
- my $cref = $self->get_proc_chunkref($proc);
- $self->_proc_summary_dump;
- $self->start_some_processes;
+sub enqueue {
+ my ($self, $pchunk, $cb) = @_;
- return $cref;
+ push @{$self->{pending}}, {
+ pchunk => $pchunk,
+ cb => $cb,
+ process => undef,
+ };
+
+ $self->_next;
}
-sub start_some_processes {
- my $self = shift;
+sub _start_worker {
+ my ($self) = @_;
+ my $worker = shift @{$self->{pending}};
- # eat up any pending zombies
- while ($self->wait_for_a_process(0)) {}
+ return undef if not defined($worker);
- my $pchunk;
- # TODO: make this stuff configurable/auto-tuned
- while ($self->num_running_procs < 5 &&
- $self->num_uncollected_bytes < 128 * 1024 * 1024 &&
- ($pchunk = $self->next_chunk_to_encrypt)) {
- $self->_proc_summary_dump;
- $self->gen_process_for($pchunk);
- $self->_proc_summary_dump;
- }
+ $worker->{process} = Brackup::GPGProcess->new($worker->{pchunk});
+
+ push @{$self->{progressing}}, $worker;
+
+ return $worker;
}
-sub _proc_summary_dump {
- my $self = shift;
- return unless $ENV{GPG_DEBUG};
+sub _next {
+ my ($self) = @_;
- printf STDERR "num_running=%d, num_outstanding_bytes=%d\n",
- $self->num_running_procs, $self->num_uncollected_bytes;
+ while (scalar(@{$self->{progressing}}) < $self->{max_concurrent}) {
+ last if not defined($self->_start_worker());
+ }
}
-sub next_chunk_to_encrypt {
- my $self = shift;
- while (my $ev = $self->{chunkiter}->next) {
- next if $ev->isa("Brackup::File");
- my $pchunk = $ev;
- next if $self->{target}->stored_chunk_from_inventory($pchunk);
- return $pchunk;
+sub _worker_done {
+ my ($self, $worker) = @_;
+ my $returncode = $worker->{process}->returncode;
+
+ if ($returncode < 0) {
+ die "Worker killed by signal " . abs($returncode);
+ } elsif ($returncode > 0) {
+ die "Worker failed with exit status $returncode";
+ }
+
+ if ($worker->{cb}) {
+ $worker->{cb}->($worker->{pchunk}, $worker->{process}->chunkref);
}
- return undef;
}
-sub get_proc_chunkref {
- my ($self, $proc) = @_;
- my $cref = $proc->chunkref;
- delete $self->{procs}{$proc};
- $self->{uncollected_bytes} -= length($$cref);
- return $cref;
+sub has_running {
+ my ($self) = @_;
+
+ return scalar(@{$self->{progressing}});
}
-# returns PID of a process that finished
-sub wait_for_a_process {
+sub check {
my ($self, $block) = @_;
- my $flags = $block ? 0 : WNOHANG;
- my $kid = waitpid(-1, $flags);
- return 0 if ! $block && $kid <= 0;
- die "no child?" if $kid < 0;
- return 0 unless $kid;
+ my @done;
- my $proc = $self->{procs_running}{$kid} or die "Unknown child
- process $kid finished!\n";
+ @{$self->{progressing}} = grep {
+ my $worker = $_;
+ my $ret = 1;
- delete $self->{procs_running}{$proc->pid} or die;
- $proc->note_stopped;
- $self->{uncollected_bytes} += $proc->size_on_disk;
+ unless ($worker->{process}->check($block)) {
+ push @done, $worker;
+ $ret = 0;
+ }
- return $kid;
-}
+ $ret;
+ } @{$self->{progressing}};
-sub num_uncollected_bytes { $_[0]{uncollected_bytes} }
+ foreach my $worker (@done) {
+ $self->_worker_done($worker);
+ }
-sub gen_process_for {
- my ($self, $pchunk) = @_;
- my $proc = Brackup::GPGProcess->new($pchunk);
- $self->{procs_running}{$proc->pid} = $proc;
- $self->{procs}{$pchunk} = $proc;
- return $proc;
-}
+ $self->_next;
-sub num_running_procs {
- my $self = shift;
- return scalar keys %{$self->{procs_running}};
+ return $self->has_running;
}
1;
-
diff --git a/lib/Brackup/GPGProcess.pm b/lib/Brackup/GPGProcess.pm
index d8da453..ea1aaa4 100644
--- a/lib/Brackup/GPGProcess.pm
+++ b/lib/Brackup/GPGProcess.pm
@@ -2,73 +2,131 @@ package Brackup::GPGProcess;
use strict;
use warnings;
use Brackup::Util qw(tempfile);
-use POSIX qw(_exit);
+use POSIX qw(:sys_wait_h);
sub new {
my ($class, $pchunk) = @_;
+ my $self = bless {}, $class;
+
+ # if true (perhaps on Windows?), then don't fork... do all inline.
+ my $no_fork = 0;
+
+ $self->{pchunk} = $pchunk;
my ($destfh, $destfn) = tempfile();
+ $self->{destfn} = $destfn;
+ $self->{destfh} = $destfh;
- my $no_fork = 0; # if true (perhaps on Windows?), then don't fork... do all inline.
+ $self->{returncode} = undef;
- my $pid = $no_fork ? 0 : fork;
- if (!defined $pid) {
- die "Failed to fork: $!";
- }
+ if ($no_fork) {
+ $self->{pid} = -1;
+ $self->_child();
- # caller (parent)
- if ($pid) {
- return bless {
- destfn => $destfn,
- pid => $pid,
- running => 1,
- }, $class;
- }
+ } else {
+ $self->{pid} = fork;
- # child: encrypt and exit(0)...
- my $enc = $pchunk->root->encrypt($pchunk->raw_chunkref);
- my $enc_len = length($enc);
-
- binmode($destfh);
- print $destfh $enc
- or die "failed to print: $!";
- close $destfh
- or die "failed to close: $!";
-
- my $wrote_size = -s $destfn;
- unless (defined $wrote_size && $wrote_size == $enc_len) {
- unless (-e $destfn) {
- # if the file's gone, that likely means the parent process
- # already terminated and unlinked our temp file, in
- # which case we should just exit (with error code), rather
- # than spewing error messages to stderr.
- POSIX::_exit(1);
+ if (!defined($self->{pid})) {
+ die "Failed to fork: $!";
+
+ } elsif ($self->{pid} == 0) {
+ $self->_child();
}
- $wrote_size = "<undef>" unless defined $wrote_size;
- die "size not right (expected $enc_len; got $wrote_size)";
}
- if ($no_fork) {
- return bless {
- destfn => $destfn,
- pid => 0,
- }, $class;
+ return $self;
+}
+
+# This function is run in the child process and may not return
+sub _child {
+ my ($self) = @_;
+
+ # Catch any error
+ eval {
+ my $destfh = $self->{destfh};
+
+ my $enc = $self->{pchunk}->root->encrypt($self->{pchunk}->raw_chunkref);
+ my $enc_len = length($enc);
+
+ binmode($destfh);
+ print $destfh $enc or die "failed to print: $!";
+ close $destfh or die "failed to close: $!";
+
+ my $wrote_size = -s $self->{destfn};
+ unless (defined $wrote_size && $wrote_size == $enc_len) {
+ unless (-e $self->{destfn}) {
+ # if the file's gone, that likely means the parent process
+ # already terminated and unlinked our temp file, in
+ # which case we should just exit (with error code), rather
+ # than spewing error messages to stderr.
+ POSIX::_exit(1);
+ }
+ $wrote_size = "<undef>" unless defined $wrote_size;
+ die "size not right (expected $enc_len; got $wrote_size)";
+ }
+
+ # Note: we have to do this, to avoid some END block, somewhere,
+ # from cleaning up something or doing something. probably tempfiles
+ # being destroyed in File::Temp.
+ POSIX::_exit(0);
+ };
+
+ POSIX::_exit(1);
+}
+
+sub pchunk {
+ my ($self) = @_;
+ return $self->{pchunk};
+}
+
+sub pid {
+ my ($self) = @_;
+ return $self->{pid};
+}
+
+sub check {
+ my ($self, $block) = @_;
+
+ if ($self->running) {
+ my $flags = $block ? 0 : WNOHANG;
+ my $ret = waitpid($self->{pid}, $flags);
+ my $status = $?;
+
+ # Process gone?
+ unless ($ret == 0) {
+ if (WIFEXITED($status)) {
+ $self->{returncode} = WEXITSTATUS($status);
+ $self->{pid} = -1;
+
+ } elsif (WIFSIGNALED($status)) {
+ $self->{returncode} = -WTERMSIG($status);
+ $self->{pid} = -1;
+
+ } else {
+ die 'Weird process state';
+ }
+ }
}
- # Note: we have to do this, to avoid some END block, somewhere,
- # from cleaning up something or doing something. probably tempfiles
- # being destroyed in File::Temp.
- POSIX::_exit(0);
+ return $self->running;
}
-sub pid { $_[0]{pid} }
+# >= 0: Normal exit()
+# < 0: Killed by signal
+sub returncode {
+ my ($self) = @_;
+ return $self->{returncode};
+}
+
+sub running {
+ my ($self) = @_;
-sub running { $_[0]{running} }
-sub note_stopped { $_[0]{running} = 0; }
+ return $self->{pid} != -1;
+}
sub chunkref {
my ($self) = @_;
- die "Still running!" if $self->{running};
+ die "Still running!" if $self->check(0);
open(my $fh, $self->{destfn})
or die "Failed to open gpg temp file $self->{destfn}: $!";
@@ -82,10 +140,4 @@ sub chunkref {
return \$data;
}
-sub size_on_disk {
- my $self = shift;
- return -s $self->{destfn};
-}
-
1;
-
--
1.5.5.3
More information about the brackup
mailing list