[PATCH 7/7] Reimplement GPG wrapping code and make it more robust

Michael Hanselmann public at hansmi.ch
Wed Aug 13 21:16:02 UTC 2008


---
 lib/Brackup/Backup.pm         |   69 ++++++++++++++----
 lib/Brackup/GPGProcManager.pm |  158 ++++++++++++++++-------------------------
 lib/Brackup/GPGProcess.pm     |  158 +++++++++++++++++++++++++++--------------
 3 files changed, 220 insertions(+), 165 deletions(-)

diff --git a/lib/Brackup/Backup.pm b/lib/Brackup/Backup.pm
index 727c6ac..2d9e1ef 100644
--- a/lib/Brackup/Backup.pm
+++ b/lib/Brackup/Backup.pm
@@ -5,7 +5,6 @@ use Carp qw(croak);
 use Brackup::ChunkIterator;
 use Brackup::CompositeChunk;
 use Brackup::GPGProcManager;
-use Brackup::GPGProcess;
 
 sub new {
     my ($class, %opts) = @_;
@@ -67,6 +66,12 @@ sub new {
     $self->{stored_files} = ();
     $self->{stored_chunks} = ();
 
+    if ($self->{parent}->{root}->gpg_rcpt) {
+        $self->{gpgmgr} = Brackup::GPGProcManager->new;
+    } else {
+        $self->{gpgmgr} = undef;
+    }
+
     # Number of files done
     $self->{n_files_done} = 0;
 
@@ -146,8 +151,6 @@ sub _emit_file {
 sub _emit_chunk {
     my ($self, $rec) = @_;
     my $root = $self->{parent}->{root};
-    my $merge_under = $root->merge_files_under;
-    my $gpg_rcpt = $root->gpg_rcpt;
 
     if ($rec->isa("Brackup::File")) {
         $self->start_file($rec);
@@ -188,13 +191,38 @@ sub _emit_chunk {
     unless ($self->{dryrun}) {
         $schunk = Brackup::StoredChunk->new($pchunk);
 
-        if ($gpg_rcpt) {
-            $schunk->set_encrypted_chunkref($root->encrypt($pchunk->raw_chunkref));
+        my $ctx = {
+            pchunk => $pchunk,
+            schunk => $schunk,
+        };
+
+        if ($self->{gpgmgr}) {
+            $self->{gpgmgr}->enqueue($pchunk, sub {
+                my ($pchunk, $chunkref) = @_;
+
+                $schunk->set_encrypted_chunkref($chunkref);
+
+                $self->_finalize_chunk($ctx);
+            });
+        } else {
+            $self->_finalize_chunk($ctx);
         }
+    } else {
+        $pchunk->forget_chunkref;
+    }
+}
 
-        # see if we should pack it into a bigger blob
-        my $chunk_size = $schunk->backup_length;
+sub _finalize_chunk {
+    my ($self, $ctx) = @_;
+    my $root = $self->{parent}->{root};
+    my $merge_under = $root->merge_files_under;
+    my $pchunk = $ctx->{pchunk};
+    my $schunk = $ctx->{schunk};
 
+    # see if we should pack it into a bigger blob
+    my $chunk_size = $schunk->backup_length;
+
+    unless ($self->{dryrun}) {
         # see if we should merge this chunk (in this case, file) together with
         # other small files we encountered earlier, into a "composite chunk",
         # to be stored on the target in one go.
@@ -205,11 +233,9 @@ sub _emit_chunk {
         # would rather just have 1 type of magic per file.  (split it or join it)
         if ($merge_under && $chunk_size < $merge_under && $pchunk->is_entire_file) {
             if ($self->{comp_chunk} && ! $self->{comp_chunk}->can_fit($chunk_size)) {
-                $self->debug('Finalizing composite chunk ', $self->{comp_chunk}, '...');
-                $self->{comp_chunk}->finalize;
-                $self->{comp_chunk} = undef;
+                $self->_finalize_composite_chunk();
             }
-            $self->{comp_chunk} ||= Brackup::CompositeChunk->new($self->{parent}->{root},
+            $self->{comp_chunk} ||= Brackup::CompositeChunk->new($root,
                                                                  $self->{parent}->{target});
             $self->{comp_chunk}->append_little_chunk($schunk);
         } else {
@@ -228,6 +254,16 @@ sub _emit_chunk {
     $schunk->forget_chunkref if $schunk;
 }
 
+sub _finalize_composite_chunk {
+    my ($self) = @_;
+
+    if ($self->{comp_chunk}) {
+        $self->debug('Finalizing composite chunk ', $self->{comp_chunk}, '...');
+        $self->{comp_chunk}->finalize;
+        $self->{comp_chunk} = undef;
+    }
+}
+
 sub start_file {
     my ($self, $file) = @_;
 
@@ -243,6 +279,11 @@ sub end_file {
     my ($self) = @_;
     my $file = $self->{cur_file};
 
+    if ($self->{gpgmgr}) {
+        # Make sure all parts are done
+        $self->{gpgmgr}->check(1);
+    }
+
     return unless $file;
 
     $self->{cb_end_file}->($file, $self->{stored_chunks});
@@ -279,11 +320,7 @@ sub _end {
 
     $self->end_file();
 
-    if ($self->{comp_chunk}) {
-        $self->{comp_chunk}->finalize;
-        $self->{comp_chunk} = undef;
-    }
-
+    $self->_finalize_composite_chunk();
     $self->_write_meta();
 
     unless ($self->{dryrun}) {
diff --git a/lib/Brackup/GPGProcManager.pm b/lib/Brackup/GPGProcManager.pm
index 3c4825a..7025e1c 100644
--- a/lib/Brackup/GPGProcManager.pm
+++ b/lib/Brackup/GPGProcManager.pm
@@ -5,127 +5,93 @@ use Brackup::GPGProcess;
 use POSIX ":sys_wait_h";
 
 sub new {
-    my ($class, $iter, $target) = @_;
-    return bless {
-        chunkiter => $iter,
-        procs     => {},  # "addr(pchunk)" => GPGProcess
-        target    => $target,
-        procs_running => {}, # pid -> GPGProcess
-        uncollected_bytes => 0,
-    }, $class;
-}
+    my ($class) = @_;
+    my $self = bless {}, $class;
 
-sub enc_chunkref_of {
-    my ($self, $pchunk) = @_;
-
-    my $proc = $self->{procs}{$pchunk};
-    unless ($proc) {
-        # catch iterator up to the point that was
-        # requested, or blow up.
-        my $found = 0;
-        my $iters = 0;
-        while (my $ich = $self->{chunkiter}->next) {
-            if ($ich == $pchunk) {
-                $found = 1;
-                last;
-            }
-            $iters++;
-            warn "iters = $iters\n";
-        }
-        die "Not found" unless $found;
-        $proc = $self->gen_process_for($pchunk);
-    }
+    $self->{pending} = [];
+    $self->{progressing} = [];
+    $self->{max_concurrent} = 3;
 
-    while ($proc->running) {
-        my $pid = $self->wait_for_a_process(1) or die
-            "No processes were reaped!";
-    }
+    return $self;
+}
 
-    $self->_proc_summary_dump;
-    my $cref = $self->get_proc_chunkref($proc);
-    $self->_proc_summary_dump;
-    $self->start_some_processes;
+sub enqueue {
+    my ($self, $pchunk, $cb) = @_;
 
-    return $cref;
+    push @{$self->{pending}}, {
+        pchunk => $pchunk,
+        cb => $cb,
+        process => undef,
+    };
+
+    $self->_next;
 }
 
-sub start_some_processes {
-    my $self = shift;
+sub _start_worker {
+    my ($self) = @_;
+    my $worker = shift @{$self->{pending}};
 
-    # eat up any pending zombies
-    while ($self->wait_for_a_process(0)) {}
+    return undef if not defined($worker);
 
-    my $pchunk;
-    # TODO: make this stuff configurable/auto-tuned
-    while ($self->num_running_procs < 5 &&
-           $self->num_uncollected_bytes < 128 * 1024 * 1024 &&
-           ($pchunk = $self->next_chunk_to_encrypt)) {
-        $self->_proc_summary_dump;
-        $self->gen_process_for($pchunk);
-        $self->_proc_summary_dump;
-    }
+    $worker->{process} = Brackup::GPGProcess->new($worker->{pchunk});
+
+    push @{$self->{progressing}}, $worker;
+
+    return $worker;
 }
 
-sub _proc_summary_dump {
-    my $self = shift;
-    return unless $ENV{GPG_DEBUG};
+sub _next {
+    my ($self) = @_;
 
-    printf STDERR "num_running=%d, num_outstanding_bytes=%d\n",
-    $self->num_running_procs,  $self->num_uncollected_bytes;
+    while (scalar(@{$self->{progressing}}) < $self->{max_concurrent}) {
+        last if not defined($self->_start_worker());
+    }
 }
 
-sub next_chunk_to_encrypt {
-    my $self = shift;
-    while (my $ev = $self->{chunkiter}->next) {
-        next if $ev->isa("Brackup::File");
-        my $pchunk = $ev;
-        next if $self->{target}->stored_chunk_from_inventory($pchunk);
-        return $pchunk;
+sub _worker_done {
+    my ($self, $worker) = @_;
+    my $returncode = $worker->{process}->returncode;
+
+    if ($returncode < 0) {
+        die "Worker killed by signal " . abs($returncode);
+    } elsif ($returncode > 0) {
+        die "Worker failed with exit status $returncode";
+    }
+
+    if ($worker->{cb}) {
+        $worker->{cb}->($worker->{pchunk}, $worker->{process}->chunkref);
     }
-    return undef;
 }
 
-sub get_proc_chunkref {
-    my ($self, $proc) = @_;
-    my $cref = $proc->chunkref;
-    delete $self->{procs}{$proc};
-    $self->{uncollected_bytes} -= length($$cref);
-    return $cref;
+sub has_running {
+    my ($self) = @_;
+
+    return scalar(@{$self->{progressing}});
 }
 
-# returns PID of a process that finished
-sub wait_for_a_process {
+sub check {
     my ($self, $block) = @_;
-    my $flags = $block ? 0 : WNOHANG;
-    my $kid = waitpid(-1, $flags);
-    return 0 if ! $block && $kid <= 0;
-    die "no child?" if $kid < 0;
-    return 0 unless $kid;
+    my @done;
 
-    my $proc = $self->{procs_running}{$kid} or die "Unknown child
-        process $kid finished!\n";
+    @{$self->{progressing}} = grep {
+        my $worker = $_;
+        my $ret = 1;
 
-    delete $self->{procs_running}{$proc->pid} or die;
-    $proc->note_stopped;
-    $self->{uncollected_bytes} += $proc->size_on_disk;
+        unless ($worker->{process}->check($block)) {
+            push @done, $worker;
+            $ret = 0;
+        }
 
-    return $kid;
-}
+        $ret;
+    } @{$self->{progressing}};
 
-sub num_uncollected_bytes { $_[0]{uncollected_bytes} }
+    foreach my $worker (@done) {
+        $self->_worker_done($worker);
+    }
 
-sub gen_process_for {
-    my ($self, $pchunk) = @_;
-    my $proc = Brackup::GPGProcess->new($pchunk);
-    $self->{procs_running}{$proc->pid} = $proc;
-    $self->{procs}{$pchunk} = $proc;
-    return $proc;
-}
+    $self->_next;
 
-sub num_running_procs {
-    my $self = shift;
-    return scalar keys %{$self->{procs_running}};
+    return $self->has_running;
 }
 
 1;
-
diff --git a/lib/Brackup/GPGProcess.pm b/lib/Brackup/GPGProcess.pm
index d8da453..ea1aaa4 100644
--- a/lib/Brackup/GPGProcess.pm
+++ b/lib/Brackup/GPGProcess.pm
@@ -2,73 +2,131 @@ package Brackup::GPGProcess;
 use strict;
 use warnings;
 use Brackup::Util qw(tempfile);
-use POSIX qw(_exit);
+use POSIX qw(:sys_wait_h);
 
 sub new {
     my ($class, $pchunk) = @_;
+    my $self = bless {}, $class;
+
+    # if true (perhaps on Windows?), then don't fork... do all inline.
+    my $no_fork = 0;
+
+    $self->{pchunk} = $pchunk;
 
     my ($destfh, $destfn) = tempfile();
+    $self->{destfn} = $destfn;
+    $self->{destfh} = $destfh;
 
-    my $no_fork = 0;  # if true (perhaps on Windows?), then don't fork... do all inline.
+    $self->{returncode} = undef;
 
-    my $pid = $no_fork ? 0 : fork;
-    if (!defined $pid) {
-        die "Failed to fork: $!";
-    }
+    if ($no_fork) {
+        $self->{pid} = -1;
+        $self->_child();
 
-    # caller (parent)
-    if ($pid) {
-        return bless {
-            destfn    => $destfn,
-            pid       => $pid,
-            running   => 1,
-        }, $class;
-    }
+    } else {
+        $self->{pid} = fork;
 
-    # child:  encrypt and exit(0)...
-    my $enc = $pchunk->root->encrypt($pchunk->raw_chunkref);
-    my $enc_len = length($enc);
-
-    binmode($destfh);
-    print $destfh $enc
-        or die "failed to print: $!";
-    close $destfh
-        or die "failed to close: $!";
-
-    my $wrote_size = -s $destfn;
-    unless (defined $wrote_size && $wrote_size == $enc_len) {
-        unless (-e $destfn) {
-            # if the file's gone, that likely means the parent process
-            # already terminated and unlinked our temp file, in
-            # which case we should just exit (with error code), rather
-            # than spewing error messages to stderr.
-            POSIX::_exit(1);
+        if (!defined($self->{pid})) {
+            die "Failed to fork: $!";
+
+        } elsif ($self->{pid} == 0) {
+            $self->_child();
         }
-        $wrote_size = "<undef>" unless defined $wrote_size;
-        die "size not right (expected $enc_len; got $wrote_size)";
     }
 
-    if ($no_fork) {
-        return bless {
-            destfn => $destfn,
-            pid    => 0,
-        }, $class;
+    return $self;
+}
+
+# This function is run in the child process and may not return
+sub _child {
+    my ($self) = @_;
+
+    # Catch any error
+    eval {
+        my $destfh = $self->{destfh};
+
+        my $enc = $self->{pchunk}->root->encrypt($self->{pchunk}->raw_chunkref);
+        my $enc_len = length($enc);
+
+        binmode($destfh);
+        print $destfh $enc or die "failed to print: $!";
+        close $destfh or die "failed to close: $!";
+
+        my $wrote_size = -s $self->{destfn};
+        unless (defined $wrote_size && $wrote_size == $enc_len) {
+            unless (-e $self->{destfn}) {
+                # if the file's gone, that likely means the parent process
+                # already terminated and unlinked our temp file, in
+                # which case we should just exit (with error code), rather
+                # than spewing error messages to stderr.
+                POSIX::_exit(1);
+            }
+            $wrote_size = "<undef>" unless defined $wrote_size;
+            die "size not right (expected $enc_len; got $wrote_size)";
+        }
+
+        # Note: we have to do this, to avoid some END block, somewhere,
+        # from cleaning up something or doing something.  probably tempfiles
+        # being destroyed in File::Temp.
+        POSIX::_exit(0);
+    };
+
+    POSIX::_exit(1);
+}
+
+sub pchunk {
+    my ($self) = @_;
+    return $self->{pchunk};
+}
+
+sub pid {
+    my ($self) = @_;
+    return $self->{pid};
+}
+
+sub check {
+    my ($self, $block) = @_;
+
+    if ($self->running) {
+        my $flags = $block ? 0 : WNOHANG;
+        my $ret = waitpid($self->{pid}, $flags);
+        my $status = $?;
+
+        # Process gone?
+        unless ($ret == 0) {
+            if (WIFEXITED($status)) {
+                $self->{returncode} = WEXITSTATUS($status);
+                $self->{pid} = -1;
+
+            } elsif (WIFSIGNALED($status)) {
+                $self->{returncode} = -WTERMSIG($status);
+                $self->{pid} = -1;
+
+            } else {
+                die 'Weird process state';
+            }
+        }
     }
 
-    # Note: we have to do this, to avoid some END block, somewhere,
-    # from cleaning up something or doing something.  probably tempfiles
-    # being destroyed in File::Temp.
-    POSIX::_exit(0);
+    return $self->running;
 }
 
-sub pid { $_[0]{pid} }
+# >= 0: Normal exit()
+#  < 0: Killed by signal
+sub returncode {
+    my ($self) = @_;
+    return $self->{returncode};
+}
+
+sub running {
+    my ($self) = @_;
 
-sub running { $_[0]{running} }
-sub note_stopped { $_[0]{running} = 0; }
+    return $self->{pid} != -1;
+}
 
 sub chunkref {
     my ($self) = @_;
-    die "Still running!" if $self->{running};
+    die "Still running!" if $self->check(0);
 
     open(my $fh, $self->{destfn})
         or die "Failed to open gpg temp file $self->{destfn}: $!";
@@ -82,10 +140,4 @@ sub chunkref {
     return \$data;
 }
 
-sub size_on_disk {
-    my $self = shift;
-    return -s $self->{destfn};
-}
-
 1;
-
-- 
1.5.5.3



More information about the brackup mailing list