updates

Eric Lambrecht eml at guba.com
Wed Nov 15 21:12:21 UTC 2006


I've attached a patch that takes revision 458 from the sixapart svn and 
patches it to include the changes we've made internally to mogile. I can 
break this down, but I wanted to at least get something back in case I 
get distracted again...

The big changes we made:

All the decisions for where to put a particular file (when storing for 
the first time or replicating) are now part of the replication policy. 
We added a 'store_on' function to the ReplicationPolicy class that is 
called by the 'create_open' command to ask for a place to store the 
first instance of a file. The 'Mgd::find_device_id' code was moved to 
the default MultipleHosts replication policy.

The replication code was updated so that a ReplicationPolicy class can 
tell the replication worker to delete a replica. It does this by 
returning a negative device id from the 'replicate_to' class. We also 
pass the size of the file to be replicated to the ReplicationPolicy. 
Also, the 'files_to_replicate' table has an 'extrareplicacount' column 
added that lets use request more than the minimum number of replicas for 
some file (see below).

new 'increp' script that lets you tell mogile to make extra replicas of 
some file (see below).

'listpaths' command added to mogtool to assist our admins/developers in 
finding out where things are in mogile and checking their size (we had a 
lot of truncated/missing files for some reason). It just prints out the 
URLs of the requested file along with their actual size as determined by 
a HEAD request.

The 'host_meta' table was added, along with the code to read it in when 
caching host information.

Our MultipleHostsWithHints replication policy was added (see my previous 
email and the comments in the code for how it works).

Our 'Reduce' worker was added (see below).

Updates to make the Checker work (not heavily tested yet).

Update to mogdbsetup to make all our database changes.

--

With respect to the 'Reduce' worker, the 'extrareplica' count stuff, and 
the abililty for a ReplicationPolicy to mark something for deletion:

Our internal goal has been to update mogile to push content around to 
different machines to deal with different file size/access patterns 
(without changing the API for interacting with mogile). Our 
MultipleHostsWithHints replication policy solves that and lets us throw 
things to specific machines upon insertion (thumbnails to low 
storage/high ram boxes, big ol' DVD's to reall dense/slow machines)..

To handle content that suddenly becomes very popular, but is on slow 
machines, we came up with the notion of 'overreplicating' it. We realize 
that fid XXX is popular (via the lighttpd logs), so we tell mogile to 
make a couple extra replicas of XXX by throwing a new entry in the 
'files_to_replicate' table with 'extrareplicacount' set to some non-zero 
amount.

Just making more copies of a file doesn't necessarily speed up access to 
it, but when we combine this with our replication policy (which says 
'put replicas 3 and 4 of any file on these really fast boxes'), we can 
ensure that popular content gets migrated to our really fast machines 
and we don't beat the hell out of our higher density archive boxes.

We added the 'Reduce' worker to randomly delete those extra replicas 
from the fast machines. Our system continuously pushes popular stuff to 
the high speed machines while randomly removing older (hopefully less 
popular) stuff from those boxes.

We updated the ReplicationPolicy code to allow it to delete replicas so 
that it can push things around if their existing locations don't match 
up with where the policy wants them to be. This is useful if you've 
stored something under replication policy X, but now change it to 
replication policy Y or if you change the minimum devcount for some class.

If you have more questions, let me know. I'd like to help push any 
changes into the official distro, but I understand if they don't work 
with your broader goals...

Eric...
-------------- next part --------------
Index: utils/increp
===================================================================
--- utils/increp	(.../vendor/danga/mogilefs/458)	(revision 0)
+++ utils/increp	(.../trunk/mogile2.0)	(revision 1095)
@@ -0,0 +1,32 @@
+#!/usr/bin/perl
+########################################################
+# This script accepts a list of fids from stdin and sends
+# them off to a tracker to tell the tracker to make more
+# replicas of the given fids.
+
+use strict;
+use Getopt::Long;
+use Data::Dumper;
+use MogileFS::Admin;
+
+my $extra = shift @ARGV;
+my $trackers = \@ARGV;
+die "usage:\n   echo list-of-fids | increase extra-count tracker.location.com:8001 [tracker.location.2.com:8001]" unless @$trackers && $extra;
+
+my $mogadm = MogileFS::Admin->new(hosts => $trackers);
+die "couldn't connect to tracker" unless $mogadm;
+
+my $fid;
+my $line;
+
+while ($line = <STDIN>) {
+  while ($line =~ m/(\d+)/g) {
+    $fid = $1;
+	
+    $mogadm->increase_replicas($fid, $extra);
+
+    if ($mogadm->err) {
+      print "ERR: " . $mogadm->errstr;		
+    }
+  }
+}

Property changes on: utils/increp
___________________________________________________________________
Name: svn:executable
   + *

Index: utils/mogtool
===================================================================
--- utils/mogtool	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ utils/mogtool	(.../trunk/mogile2.0)	(revision 1095)
@@ -30,6 +30,10 @@
     $ mogtool extract --bigfile --asfile thedirkey thefile.tgz
     $ mogtool extract --bigfile thedevkey /dev/hda4
 
+    $ mogtool increasereplicas key extra_replica_count
+
+    $ mogtool listpaths thefilekey
+
     $ mogtool delete thekey
 
     $ mogtool list
@@ -86,6 +90,16 @@
 
 Delete a resource.  See L</"DELETE OPTIONS"> and L</"DELETE ARGUMENTS">.
 
+=item increasereplicas|ir
+
+Increase the number of replics of some file, over and above the
+minimum replica count required by the class the file is in.
+
+=item listpaths|lp
+
+List all the paths for the given file and additionally get the size for each
+file as reported by the storage nodes.
+
 =item list|ls
 
 List all big files contained in MogileFS.  No options, no arguments.
@@ -373,6 +387,13 @@
 Extract the partition at I<hda3.backup> to the partition I</dev/hda4>.  B<WARNING:>
 mogtool won't ask for confirmation, make sure you don't mistype partition numbers!
 
+=head3 Increasing Replica Count
+
+    $ mogtool increasereplicas key extra_replica_count
+
+Increase the number of replicas of this particular file over and above
+the minimum replica count required by the class of the file.
+
 =head2 Deleting a Resource
 
 B<WARNING:> Please make sure you're specifying the right parameter, as delete does
@@ -546,6 +567,8 @@
 extract() if $cmd eq 'x' || $cmd eq "extract";
 list() if $cmd eq 'ls' || $cmd eq "list";
 mdelete() if $cmd eq 'rm' || $cmd eq "delete";
+increasereplicas() if $cmd eq 'ir' || $cmd eq "increasereplicas";
+listpaths() if $cmd eq 'lp' || $cmd eq "listpaths";
 
 # fail if we get this far
 abortWithUsage();
@@ -1052,6 +1075,47 @@
     return $res;
 }
 
+sub increasereplicas {
+    my $key = shift @ARGV;
+    my $extrareplicacount = shift @ARGV;
+
+    abortWithUsage() unless $key && $extrareplicacount;
+
+    my $rv = $mogfs->increase_replicas($key, $extrareplicacount);
+
+    if ($rv) {
+	print "Increased replica count\n";
+	exit(0);
+    } else {
+	print "Increase failed: " . $mogfs->errstr . "\n";
+	exit(1);
+    }
+}
+
+sub listpaths {
+    my $key = shift @ARGV;
+    abortWithusage() unless $key;
+
+    error("Error: key $key isn't valid; must not contain spaces or commas.", ERR_FATAL)
+        unless $key =~ /^[^\s\,]+$/;
+
+    my @paths = $mogfs->get_paths($key, 0);
+
+    foreach my $path (@paths) {
+	my $ua = new LWP::UserAgent;
+	$ua->timeout(3);
+	my $response = $ua->head($path);
+
+	if ($response->code == 200) {
+	    print $response->header("Content-Length") . "\t$path\n";
+	} else {
+	    print "ERR\t$path\n";
+	}
+    }
+
+    exit 0;
+}
+
 sub extract {
     my $key = shift @ARGV;
     my $dest = shift @ARGV;
Index: utils/mogadm
===================================================================
--- utils/mogadm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ utils/mogadm	(.../trunk/mogile2.0)	(revision 1095)
@@ -594,7 +594,20 @@
     die "Not implemented.\n";
 }
 
+sub cmd_fid_increasecopies {
+    my ($fid, $extracopycount) = @_;
+    abortWithUsage() unless $fid && $extracopycount;
 
+    my $mogadm = mogadm();
+    $mogadm->increase_copies($fid, $extracopycount);
+    if ($mogadm->err) {
+        fail('Error increasing copy count: ' . $mogadm->errstr);
+    }
+
+    ok('Copies increased.');
+}
+
+
 ###########################################################################
 ## helper routines
 ###########################################################################
@@ -975,6 +988,10 @@
     $ mogadm class modify first.domain my.class --mindevcount=2
     $ mogadm class delete first.domain my.class
 
+Fid manipulation
+
+    $ mogadm fid map my.fid
+
 Check the status of your entire MogileFS system:
 
     $ mogadm check
Index: utils/Makefile.PL
===================================================================
--- utils/Makefile.PL	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ utils/Makefile.PL	(.../trunk/mogile2.0)	(revision 1095)
@@ -4,7 +4,7 @@
 
 my $exe_files;
 
-foreach("mogtool", "mogadm")
+foreach("mogtool", "mogadm", "increp")
 {
 	push @$exe_files, $_ if -e;
 }
Index: server/mogstored
===================================================================
--- server/mogstored	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/mogstored	(.../trunk/mogile2.0)	(revision 1095)
@@ -177,6 +177,15 @@
     }
 
     foreach my $devnum (@devnum) {
+        # delete any old test write files
+        foreach my $testfile (glob("$path/$devnum/test-write/test-write-*")) {
+            # nuke anything older than about 20 minutes
+            if ((time() - (stat($testfile))[9]) > (60 * 20)) {
+                unlink($testfile);
+            }
+        }
+
+        # update the usage file
         my $rval = `df -k -l -P $path/$devnum`;
         foreach my $l (split /\r?\n/, $rval) {
             next unless $l =~ /^(.+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.+)\s+(.+)$/;
Index: server/lib/MogileFS/Class.pm
===================================================================
--- server/lib/MogileFS/Class.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Class.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -11,17 +11,30 @@
 sub of_fid {
     my ($class, $fid) = @_;
     my $dbh = Mgd::get_dbh();
-    my $row = $dbh->selectrow_hashref("SELECT c.* FROM class c, file f ".
-                                      "WHERE f.dmid=c.dmid AND f.classid=c.classid AND f.fid=?",
+    my $row = $dbh->selectrow_hashref("SELECT c.*, d.namespace FROM class c, file f, domain d ".
+                                      "WHERE f.dmid=c.dmid AND f.dmid=d.dmid AND f.classid=c.classid AND f.fid=?",
                                       undef, $fid);
     return $class->new($row);
 }
 
+# return MogileFS::Class object for the given dmid and class name
+sub for_dmid_and_name {
+    my ($class, $dmid, $name) = @_;
+    my $dbh = Mgd::get_dbh();
+    my $row = $dbh->selectrow_hashref("SELECT c.*, d.namespace FROM class c, domain d ".
+                                      "WHERE d.dmid=c.dmid AND c.dmid=? AND c.classname=?",
+                                      undef, $dmid, $name);
+    return $class->new($row);
+}
+
 sub domainid     { $_[0]{dmid} }
 sub classid      { $_[0]{classid} }
 sub mindevcount  { $_[0]{mindevcount} }
 sub policy_class { $_[0]{replpolicy} }
+sub namespace    { $_[0]{namespace} }
+sub classname    { $_[0]{classname} }
 
+# not used by new files_to_replicate code
 sub foreach {
     my ($class, $cb) = @_;
     # get the min dev counts
Index: server/lib/MogileFS/Config.pm
===================================================================
--- server/lib/MogileFS/Config.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Config.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -51,6 +51,7 @@
     $reaper_jobs,
     $monitor_jobs,
     $checker_jobs,
+    $reduce_jobs,
     $mog_root,
     $min_free_space,
     $max_disk_age,
@@ -125,7 +126,7 @@
     $daemonize      = choose_value( 'daemonize', 0, 1 );
     $db_dsn         = choose_value( 'db_dsn', "DBI:mysql:mogilefs" );
     $db_user        = choose_value( 'db_user', "mogile" );
-    $db_pass        = choose_value( 'db_pass', "", 1 );
+    $db_pass        = choose_value( 'db_pass', "");
     $conf_port      = choose_value( 'conf_port', 7001 );
     $MOG_ROOT       = set_config('root',
                                  choose_value( 'mog_root', $DEFAULT_MOG_ROOT )
@@ -138,6 +139,7 @@
     $reaper_jobs    = choose_value( 'reaper_jobs', 1 );
     $monitor_jobs   = choose_value( 'monitor_jobs', 1 );
     $checker_jobs   = choose_value( 'checker_jobs', 1 );
+    $reduce_jobs    = choose_value( 'reduce_jobs',  1 );
     $min_free_space = choose_value( 'min_free_space', 100 );
     $max_disk_age   = choose_value( 'max_disk_age', 5 );
     $DEBUG          = choose_value( 'debug', $ENV{DEBUG} || 0, 1 );
Index: server/lib/MogileFS/ProcManager.pm
===================================================================
--- server/lib/MogileFS/ProcManager.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/ProcManager.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -54,6 +54,7 @@
         replicate   => "Replicate",
         reaper      => "Reaper",
         monitor     => "Monitor",
+        reduce      => "Reduce",
     }->{$job};
 }
 
@@ -641,7 +642,7 @@
     my ($what, $whatid, $state, $child) = @_;
     #warn "STATE CHANGE: $what<$whatid> = $state\n";
     # TODO: can probably send this to all children now, not just certain types
-    for my $type (qw(queryworker replicate delete monitor checker)) {
+    for my $type (qw(queryworker replicate delete monitor checker reduce)) {
         MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":state_change $what $whatid $state", $child);
     }
 }
@@ -658,14 +659,14 @@
 sub send_invalidate {
     my ($what, $child) = @_;
     # TODO: can probably send this to all children now, not just certain types
-    for my $type (qw(queryworker replicate delete monitor checker)) {
+    for my $type (qw(queryworker replicate delete monitor checker reduce)) {
         MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":invalidate_meta_once $what", $child);
     }
 }
 
 sub send_monitor_has_run {
     my $child = shift;
-    for my $type (qw(replicate checker queryworker delete)) {
+    for my $type (qw(replicate checker queryworker reduce delete)) {
         MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
     }
 }
Index: server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
===================================================================
--- server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -9,10 +9,12 @@
     my ($class, %args) = @_;
 
     my $fid      = delete $args{fid};      # fid scalar to copy
+    my $length   = delete $args{length};   # length of file we're copying, or 0
     my $on_devs  = delete $args{on_devs};  # arrayref of device objects
     my $all_devs = delete $args{all_devs}; # hashref of { devid => devobj }
     my $failed   = delete $args{failed};   # hashref of { devid => 1 } of failed attempts this round
     my $min      = delete $args{min};      # configured min devcount for this class
+    my $mclass   = delete $args{mclass};   # class with info from database
 
     warn "Unknown parameters: " . join(", ", sort keys %args) if %args;
     die "Missing parameters" unless $on_devs && $all_devs && $failed && $fid;
@@ -43,7 +45,7 @@
     }
 
     my @good_devids = grep { ! $failed->{$_} && ! $on_dev{$_} }
-            Mgd::find_deviceid(
+                 find_deviceid(
                                random         => 1,
                                not_on_hosts   => $not_on_hosts,
                                weight_by_free => 1,
@@ -64,4 +66,110 @@
     return scalar keys %host;
 }
 
+# Return an array of devids that signify where we should
+# initially store copies of a file in this particular class.
+# This is used by the Query workers when they're asked where
+# to store some file.
+
+sub store_on {
+    my ($class, %args) = @_;
+
+    my $all_devs = delete $args{all_devs};    # hashref of { devid => devobj }
+    my $mclass   = delete $args{mclass};      # MogileFS::Class instance
+    my $count    = delete $args{count};       # number of devices to return
+
+    my (@dests, @hosts);
+    while (scalar(@dests) < $count) {
+        my $devid = find_deviceid(
+				  random => 1,
+				  weight_by_free => 1,
+				  not_on_hosts => \@hosts,
+				  );
+	
+        last unless defined $devid;
+	
+        push @dests, $devid;
+        push @hosts, $all_devs->{$devid}->{hostid};
+    }        
+
+    return @dests;
+}
+
+# general purpose device locator.  example:
+#
+# my $devid = Mgd::find_deviceid(
+#     random => 1,              # get random device (else find first suitable)
+#     min_free_space => 100,    # with at least 100MB free
+#     weight_by_free => 1,      # find result weighted by free space
+#     max_disk_age => 5,        # minutes of age the last usage report can be before we ignore the disk
+#     not_on_hosts => [ 1, 2 ], # no devices on hosts 1 and 2
+#     must_be_alive => 1,       # if specified, device/host must be writeable (fully available)
+# );
+#
+# returns undef if no suitable device was found.  else, if you wanted an
+# array will return an array of the suitable devices--if you want just a
+# single item, you get just the first one found.
+sub find_deviceid {
+    my %opts = ( @_ );
+
+    # validate we're getting called with known parameters
+    my %valid_keys = map { $_ => 1 } qw( random min_free_space weight_by_free max_disk_age not_on_hosts must_be_writeable must_be_readable );
+    warn "invalid key $_ in call to find_deviceid\n"
+        foreach grep { ! $valid_keys{$_} } keys %opts;
+
+    # copy down global minimum free space if not specified
+    $opts{min_free_space} ||= MogileFS->config("min_free_space");
+    $opts{max_disk_age}   ||= MogileFS->config("max_disk_age");
+    if ($opts{max_disk_age}) {
+        $opts{max_disk_age} = time() - ($opts{max_disk_age} * 60);
+    }
+    $opts{must_be_alive} = 1 unless defined $opts{must_be_alive};
+
+    # setup for iterating over devices
+    my $devs = Mgd::get_device_summary();
+    my @devids = keys %{$devs || {}};
+    my $devcount = scalar(@devids);
+    my $start = $opts{random} ? int(rand($devcount)) : 0;
+    my %not_on_host = ( map { $_ => 1 } @{$opts{not_on_hosts} || []} );
+    my $total_free = 0;
+
+    # now find a device that matches what they want
+    my @list;
+    for (my $i = 0; $i < $devcount; $i++) {
+        my $idx = ($i + $start) % $devcount;
+        my $dev = $devs->{$devids[$idx]};
+
+        # series of suitability checks
+        next unless $dev->{status} eq 'alive';
+        next if $not_on_host{$dev->{hostid}};
+        next if $opts{max_disk_age} && $dev->{mb_asof} &&
+                $dev->{mb_asof} < $opts{max_disk_age};
+        next if $opts{min_free_space} && $dev->{mb_total} &&
+                $dev->{mb_free} < $opts{min_free_space};
+        next if $opts{must_be_writeable} &&
+            (MogileFS->observed_state("host", $dev->{hostid}) ne "reachable" ||
+             MogileFS->observed_state("device", $dev->{devid}) ne "writeable");
+        next if $opts{must_be_readable} &&
+            (MogileFS->observed_state("host", $dev->{hostid}) ne "reachable" ||
+             MogileFS->observed_state("device", $dev->{devid}) ne "readable");
+
+        # we get here, this is a suitable device
+        push @list, $dev->{devid};
+        $total_free += $dev->{mb_free};
+    }
+
+    # now we have a list ordered randomly, do free space weighting
+    if ($opts{weight_by_free}) {
+        my $rand = int(rand($total_free));
+        my $cur = 0;
+        foreach my $devid (@list) {
+            $cur += $devs->{$devid}->{mb_free};
+            return $devid if $cur >= $rand;
+        }
+    }
+
+    # return whole list if wanting array, else just first item
+    return wantarray ? @list : shift(@list);
+}
+
 1;
Index: server/lib/MogileFS/ReplicationPolicy/MultipleHostsWithHints.pm
===================================================================
--- server/lib/MogileFS/ReplicationPolicy/MultipleHostsWithHints.pm	(.../vendor/danga/mogilefs/458)	(revision 0)
+++ server/lib/MogileFS/ReplicationPolicy/MultipleHostsWithHints.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -0,0 +1,342 @@
+package MogileFS::ReplicationPolicy::MultipleHostsWithHints;
+
+#  Hosts are now labelled with storage preferences via the 'host_meta' table.
+#  If you specify 'mydomain:foo', then devices on that host will only store
+#  files in the 'mydomain' domain and 'foo' class. A host can prefer to hold
+#  multiple classes or any class. If you specify 'mydomain:foo; mydomain:bar',
+#  then this host will only hold files in domain 'mydomain' in either
+#  the 'foo' or 'bar' classes. A host with no label, or with the label
+#  'any', will hold files from any class and domain, just as the existing system
+#  does.
+#  
+#  We use this to store files that have different traffic and filesize
+#  patterns on different hosts. Thumbnails are heavily hit by anonymous
+#  visitors and very small, so we store them on a small set of very fast
+#  (and expensive) machines with lots of RAM and not much storage in
+#  them. Full length movies are very large files and are accessed
+#  infreqently by a much smaller number of people who fork over their
+#  credit card information, so they are stored on machines with much
+#  denser (and slower, and cheaper) storage on them.
+#  
+#  When determining where to put a file or a replica of a file, we first
+#  try to store it on a machine that prefers the class of the file being
+#  submitted. If that fails, we try to store the file on a machine that
+#  will accept files of any class.
+#  
+#  Additionally, we want to influence where we store the replicas of
+#  files on different hosts. One reason for this is to make sure
+#  that multiple copies of files are stored on machines on different
+#  power strips. It does us no good if copy 1 and copy 2 are stored on
+#  two hosts that are hooked up to the same dead power supply.
+#  
+#  We specify where replicas of files should be stored by annotating the
+#  hosts with which replica they would like to store: '1', '2', '3',
+#  e.t.c. A host labelled '1' would like to store the original instance
+#  of a file. A host labelled '3' would like to store the 3rd instance
+#  of a file. A host labelled '1 3 5' would like to store the 1st, 3rd,
+#  and 5th instance of a file. A host with no label will store any
+#  instance of a file.
+#  
+#  By labelling all hosts on power strip A with '1 3 5' and all hosts on
+#  power strip B with '2 4 6', we can be confident that if one of the
+#  power strips goes away, we'll still have access to our files.
+#  
+#  We can combine the two annotation methods to even further massage our
+#  content: 'foo 1 2' is a host that wants the first and second copy of
+#  'foo' files, and 'foo 3 4' is a host that wants the third and fourth
+#  copies of 'foo' files.
+#  
+#  More examples:
+#  
+#  mydomain:foo 1 2                store the first and second instances
+#                                  of 'foo' files in domain 'mydomain'
+#  hisdomain:bar                   store any copy of 'bar' files in domain
+#                                  'hisdomain'
+#  any 1                           store the first copy of any file
+#  1           			   store the first copy of any file
+#  any                     	   store any copy of any file
+#  mydomain:foo 1; hisdomain:bar 2 store the first copy of 'foo' files
+#				   from the domain 'mydomain', and the second
+#                                  copy of 'bar' files from the 'hisdomain' domain
+#  1 3                             store the first and third copy of any files
+#                                  from any domain
+#  1; any                          store the first copy of any file, but also
+#                                  store any copy of any file (assuming some
+#                                  other device doesn't store it first due to
+#                                  a more specific hint)
+#  
+#  Specifically, the algorithm works like this, when trying to determine
+#  where to store replica X of a file in class CLASS:
+#  
+#  1. try to store the file in a device requesting files in class CLASS, replica X
+#  2. try to store the file in a device requesting files in class CLASS
+#  3. try to store the file in a device requesting replica X of any class
+#  4. try to store the file in any device with no class or replica preferences
+#  5. failure - can't store the file anywhere
+#  
+#  Hosts are annotated by making an entry in the 'host_meta' table, with the
+#  metakey as 'class_hints' and the metaval as one of the strings above.
+#  
+
+use Data::Dumper;
+
+use strict;
+
+# build and return a map of "domain:class:replica" => arrayref of device objects
+# to help us figure out where to store things
+
+sub calculate_stores {
+    my ($all_devs) = @_;
+
+    my %stores;
+    foreach my $dev (keys %$all_devs) {
+	my $devobj = $all_devs->{$dev};
+
+	if ($devobj->{host}->{meta}->{class_hints}) {
+	    foreach my $advice (parse_replication_advice($devobj->{host}->{meta}->{class_hints})) {
+		${stores{$advice}}->{$devobj->{devid}} = $devobj;
+	    }
+	} else {
+	    ${stores{"any:any"}}->{$devobj->{devid}} = $devobj;
+	}
+    }
+
+    #print "stores: " . Dumper(\%stores) . "\n";
+
+    return \%stores;
+}
+
+# Convert a replication advice string to a list of replication
+# advice entries in canonical form: 
+#      ( ( domain ':' class ) | 'any' ) ( replica | 'any' )
+
+sub parse_replication_advice {
+    my ($advice) = @_;
+
+    my @parsed = ();
+
+    # default: anything can be stored on this device
+    return ( "any:any" ) unless ($advice);
+
+    # advice items are separated by ';'
+    foreach my $spec (split(/\s*;\s*/, $advice)) {
+
+	# valid specs of the form "domain:class 1 2 3 4"
+	if ($spec =~ /^(\D\S+)?\s*([\d\s]*)$/) {
+	    my $word = $1;
+	    my $numbers = $2;
+
+	    if ($word =~ /^\S+:\S+$/) {
+		# TODO: confirm valid domain and class here
+
+	    } elsif ($word eq "") {
+		$word = "any";
+
+	    } elsif ($word ne "any") {
+		# TODO: log error
+		next;
+	    }
+
+	    if ($numbers) {
+		foreach my $number (split(/\s+/, $numbers)) {
+		    push @parsed, "$word:$number";
+		}
+	    } else {
+		push @parsed, "$word:any";
+	    }
+
+	} else {
+	    # TODO: log bad format
+	    
+	}
+    }
+
+    return @parsed;
+}
+
+# returns:
+#   0:      replication sufficient
+#   undef:  no suitable recommendations currently.
+#   >0:     devid to replicate to.
+#   <0:     devid to delete from
+
+sub replicate_to {
+    my ($class, %args) = @_;
+
+    my $fid      = delete $args{fid};         # fid scalar to copy
+    my $length   = delete $args{length};      # length of file we're copying (or 0 if we don't know)
+    my @on_devs  = @{delete $args{on_devs}};  # arrayref of device objects
+    my $all_devs = delete $args{all_devs};    # hashref of { devid => devobj }
+    my $failed   = delete $args{failed};      # hashref of { devid => 1 } of failed attempts this round
+    my $min      = delete $args{min};         # configured min devcount for this class
+    my $mclass   = delete $args{mclass};      # MogileFS::Class instance
+
+    warn "Unknown parameters: " . join(", ", sort keys %args) if %args;
+    die "Missing parameters" unless $all_devs && $failed;
+
+    # build map of all the places we could store this mofo
+    # this is a hashref of { "class:domain:replica" => [ array of devobj ] }
+    my $stores = calculate_stores($all_devs);
+    
+    # Iterate through the mindevcount for this class to ensure we've got adequate
+    # copies of things. If we're missing something, then inform the replication
+    # thread to make the copy.
+
+    my $cname = $mclass->classname();
+    my $dname = $mclass->namespace();
+    my %on_hosts;   # hostid => 1 if we've got a copy on that host
+
+    #print "looking to store fid $fid which is already on " . Dumper(\@on_devs) . "\n";
+
+    # Iterate through each possible instance of the file
+    # (1..min_dev_count) until we find an instance that hasn't been
+    # created. Then pick out a device the new instance should go on
+    INSTANCE: 
+    foreach my $instance (1..$min) {
+
+	# iterate through all the possible places we could store this instance,
+	# and confirm we've got a copy there already, or note that we need to
+	# make one
+        REPLSPACE:
+	foreach my $replication_space ("$dname:$cname:$instance", "$dname:$cname:any",
+				    "any:$instance", "any:any") {
+
+	    my $replication_devices = $stores->{$replication_space};
+
+	    #print "checking $replication_space to store instance $instance: " . Dumper($replication_devices) . "\n";
+
+	    if (!(scalar keys %$replication_devices)) {
+		# no candidate devices to store this sucker on
+		#print "no candidates in $replication_space\n";
+
+	    } else {
+		# we found some devices we could store this sucker on
+		#print "found candidates...\n";
+
+		# Remove any devices from the list that are on a host we've already
+		# stored a copy on.
+		foreach my $candidate_devid (keys %$replication_devices) {
+		    my $candidate_devobj = $replication_devices->{$candidate_devid};
+		    #print "checking candidate_devobj " . Dumper($candidate_devobj) . "\n";
+		    
+		    if ($on_hosts{$candidate_devobj->{hostid}}) {    # already have a copy on this host
+			delete $replication_devices->{$candidate_devid};
+		    }
+		}
+
+		#print "after removing hosts we're on (" . join(", ", keys %on_hosts) . ")\n";
+		
+		# if we're already on one of the candidate devices, make a note and
+		# move on to the next instance
+		my @matches = grep { $replication_devices->{$_->{devid}}; } @on_devs;
+		if (scalar @matches) {
+		    my $existing_device = $matches[0];
+
+		    #print "found file already on candidate device " . $existing_device->{devid} . "\n";
+		    
+		    # remove that device from the list of copies
+		    @on_devs = grep { $_->{devid} != $existing_device->{devid} } @on_devs;
+
+		    #print "on_devs is now " . Dumper(\@on_devs) . "\n";
+
+		    # note that we're on that host, so future instances can't be
+		    $on_hosts{$existing_device->{hostid}} = 1;
+		    
+		    #print "on_hosts is now " . Dumper(\%on_hosts) . "\n";
+
+		    # loop to next instance
+		    next INSTANCE;
+		}
+
+		#print "found candidates, selecting one\n";
+		    
+		# there are candidate hosts we're not already on, which means
+		# we should pick one to copy to. Remove any hosts that we previously
+		# failed to copy to, aren't available for writing right now, or
+		# don't have enough storage to take our file
+		foreach my $candidate_devid (keys %$replication_devices) {
+		    my $candidate_devobj = $replication_devices->{$candidate_devid};
+		    #print "checking candidate_devobj " . Dumper($candidate_devobj) . "\n";
+
+		    if ($failed->{$candidate_devid} ||                                   # previously failed to copy to this host
+			($candidate_devobj->{status} ne 'alive') ||                      # host is down, dead, or read only
+			($candidate_devobj->{mb_total} < ($candidate_devobj->{mb_used} +
+							  ($length / 1000000) + 2000))) { # there is enough free space for the file + 2GB
+			    
+			delete $replication_devices->{$candidate_devid};
+		    }
+		}
+
+		# if there are candidates after all this, pick one and return it
+		my @keys = keys %$replication_devices;
+		if (scalar @keys) {
+		    # randomly choose and return one device from the candidate set
+		    my $result = $keys[int(rand(scalar @keys))];
+
+		    #print "copying fid to device $result\n";
+		    return $result;
+		}
+	    }
+	}
+
+	# if we got here, we didn't find a placce to store this instance, so
+	#  log an error
+	#  return undef to try again in the future
+	#print "no place to store $fid\n";
+	return undef;
+    }
+
+    # if there are copies listed in our 'list of copies'
+    #   then schedule them for deletion
+    foreach my $dev (@on_devs) {
+	if (!$failed->{$dev->{devid}}) {
+	    return -1 * $dev->{devid};
+	}
+    }
+
+    if (scalar @on_devs) {
+	# we're only left with copies on failed hosts
+	return undef;
+    } else {
+	# nothing left - we're done
+	return 0;
+    }
+}
+
+# Return an array of devids that signify where we should
+# initially store copies of a file in this particular class.
+# This is used by the Query workers when they're asked where
+# to store some file.
+
+sub store_on {
+    my ($class, %args) = @_;
+
+    my $all_devs = delete $args{all_devs};    # hashref of { devid => devobj }
+    my $mclass   = delete $args{mclass};      # MogileFS::Class instance
+    my $count    = delete $args{count};       # number of devices to return
+
+    warn "Unknown parameters: " . join(", ", sort keys %args) if %args;
+    die "Missing parameters" unless $mclass && $all_devs && $count;
+
+    # This probably isn't the most efficient way to do this, but no
+    # worries about this code suggesting that we store a file somewhere
+    # that the replicate_to code wouldn't store the file
+    my ($devid, @on_devs, %failed);
+    while ($devid = $class->replicate_to(
+					 on_devs   => \@on_devs,
+					 all_devs  => $all_devs,
+					 failed    => \%failed,
+					 min       => $count,
+					 mclass    => $mclass
+					 )) {
+	push @on_devs, $all_devs->{$devid};
+    }
+
+    my @result = map { $_->{devid} } @on_devs;
+
+    #print "store_on returning " . Dumper(\@result) . "\n";
+
+    return @result;
+}
+
+1;
Index: server/lib/MogileFS/Worker/Checker.pm
===================================================================
--- server/lib/MogileFS/Worker/Checker.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Worker/Checker.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -93,9 +93,12 @@
                              undef, int((rand()*300)+300), $fid);
 
                 } elsif ($rv == PERMANENT) {
-                    # FIXME: should probably do something more than this here?
+                    # definitely don't want to look for this again
                     $dbh->do("DELETE FROM fsck WHERE fid = ?", undef, $fid);
 
+                    # make a note that this is unreachable
+                    $dbh->do("REPLACE INTO unreachable_fids VALUES ($fid, UNIX_TIMESTAMP())");
+
                 } elsif ($rv == REPLICATE) {
                     # FIXME: use nexttry = 1?  fromdevid should be specified as a known good, too.  flags
                     # should probably be set for something?  not sure yet.
@@ -204,7 +207,7 @@
         # setup and do the request.  these failures are total failures in that we expect
         # them to work again later, as it's probably transient and will persist no matter
         # how many paths we try.
-        my $path = Mgd::make_http_path($devid, $fid)
+        my $path = Mgd::make_full_url($devid, $fid, 1)
             or return $retunlock->(TEMPORARY, 'failure to create HTTP path to file');
         my $ua = LWP::UserAgent->new(timeout => 3)
             or return $retunlock->(TEMPORARY, 'failed to create LWP::UserAgent object');
@@ -228,7 +231,7 @@
             if ($resp->code == 404) {
                 $devs{$devid} = PERMANENT;
             } else {
-                $devs{$devid} = TEMPORARY;
+                return $retunlock->(TEMPORARY, "temporary failure on dev $devid");
             }
         }
     }
@@ -237,8 +240,52 @@
     # a device scan, then we need to take care of those now by removing them.  but DO NOT
     # remove them if that would leave us with no mappings!  ONLY if there is at least one
     # SUCCESS mapping.
-    # FIXME: implement
+    if (scalar grep {$devs{$_} == SUCCESS} keys %devs) {
+        foreach my $bad_devid (keys %devs) {
+            # Skip the good devs 
+            next if ($devs{$bad_devid} == SUCCESS);
 
+            # Delete fid from bad devs
+            my $path = Mgd::make_path($bad_devid, $fid);
+            my $rv = 0;
+            if (my $urlref = Mgd::is_url($path)) {
+                # hit up the server and delete it
+                # TODO: (optimization) use MogileFS->get_observed_state and don't try to delete things known to be down/etc
+                my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
+                                                 PeerPort => $urlref->[1],
+                                                 Timeout => 2);
+                unless ($sock) {
+                    # timeout or something, mark this device as down for now and move on
+                    error("could not create socket for deletion fid: $fid devid: $bad_devid");
+                    next;
+                }
+
+                # send delete request
+                error("Sending delete for $path") if $Mgd::DEBUG >= 2;
+                $sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
+                my $response = <$sock>;
+                if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
+                    if (($1 >= 200 && $1 <= 299) || $1 == 404) {
+                        # effectively means all went well
+                        $rv = 1;
+                    } else {
+                        # remote file system error?  mark node as down
+                        error("Error: unlink failure: $path: $1");
+                        next;
+                    }
+                } else {
+                    error("Error: unknown response line: $response");
+                }
+            } else {
+	        error("invalid url: $path");
+            }
+        }
+
+        # insert into files_to_replicate table
+        $dbh->do("INSERT INTO file_to_replicate (fid, nexttry, fromdevid, failcount, flags) " .
+                 "VALUES (?, 0, NULL, 0, 0)", undef, $fid);
+    }
+
     # if they wanted a quick scan, let's stop here and throw a result based on the contents
     # of the %devs hash.  basically, if any of the devices had issues, then at this point we
     # want to throw a flag saying "please replicate this".  if not, then we tell them that
Index: server/lib/MogileFS/Worker/Query.pm
===================================================================
--- server/lib/MogileFS/Worker/Query.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Worker/Query.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -6,6 +6,7 @@
 
 use base 'MogileFS::Worker';
 use fields qw(querystarttime reqid);
+use MogileFS::Util qw(error);
 
 sub new {
     my ($class, $psock) = @_;
@@ -108,31 +109,6 @@
     return $self->err_line('unknown_command');
 }
 
-# this is a half-finished command.  in particular, errors tend to
-# crash the parent or child or something.  it's a quick hack for a quick
-# ops task that needs done.  note in particular how it reaches across
-# package boundaries into an API that the Replicator probably doesn't
-# want exposed.
-sub cmd_httpcopy {
-    my MogileFS::Worker::Query $self = shift;
-    my $args = shift;
-    my $sdevid = $args->{sdevid};
-    my $ddevid = $args->{ddevid};
-    my $fid    = $args->{fid};
-
-    my $err;
-    my $rv = MogileFS::Worker::Replicate::http_copy(sdevid => $sdevid,
-                                                    ddevid => $ddevid,
-                                                    fid    => $fid,
-                                                    errref => \$err);
-    if ($rv) {
-        MogileFS::Worker::Replicate::add_file_on($fid, $ddevid);
-        return $self->ok_line;
-    } else {
-        return $self->err_line("copy_err", $err);
-    }
-}
-
 # returns 0 on error, or dmid of domain
 sub check_domain {
     my MogileFS::Worker::Query $self = shift;
@@ -181,6 +157,8 @@
     my $dmid = $args->{dmid};
     my $key = $args->{key} || "";
     my $multi = $args->{multi_dest} ? 1 : 0;
+    my $class = $args->{class} 
+        or return $self->err_line("unreg_class");
 
     # we want it to be undef if not explicit, else force to numeric
     my $fid = $args->{fid} ? int($args->{fid}) : undef;
@@ -188,18 +166,13 @@
     # get DB handle
     my $dbh = Mgd::get_dbh() or
         return $self->err_line("nodb");
-
+    
     # figure out what classid this file is for
-    my $class = $args->{class} || "";
-    my $classid = 0;
-    if (length($class)) {
-        # TODO: cache this
-        $classid = $dbh->selectrow_array("SELECT classid FROM class ".
-                                         "WHERE dmid=? AND classname=?",
-                                         undef, $dmid, $class)
-            or return $self->err_line("unreg_class");
-    }
-
+    # $class = name of class, $classid = id of class
+    # or return $self->err_line("unreg_class");
+    
+    my $mclass = MogileFS::Class->for_dmid_and_name($dmid, $class);
+                                                    
     # if we haven't heard from the monitoring job yet, we need to chill a bit
     # to prevent a race where we tell a user that we can't create a file when
     # in fact we've just not heard from the monitor
@@ -209,22 +182,21 @@
         sleep 1;
     }
 
-    # find a device to put this file on that has 100Mb free.
-    my (@dests, @hosts);
+    # load up the policy class to help us find a place to store
+    # this new file
+    my $policy_class = $mclass->policy_class;
+    eval "use $policy_class; 1;";
+    if ($@) {
+        return $self->err_line("unreg_class_policy");
+    }
+
+    # ask the policy class for places to store this file
     my $devs = Mgd::get_device_summary();
-
-    while (scalar(@dests) < ($multi ? 3 : 1)) {
-        my $devid = Mgd::find_deviceid(
-                                       random           => 1,
-                                       must_be_writeable => 1,
-                                       weight_by_free   => 1,
-                                       not_on_hosts     => \@hosts,
-                                       );
-        last unless defined $devid;
-
-        push @dests, $devid;
-        push @hosts, $devs->{$devid}->{hostid};
-    }
+    my @dests =  $policy_class->store_on(
+                                         all_devs => $devs,
+                                         mclass   => $mclass,
+                                         count    => ($multi ? 3 : 1)
+                                         );
     return $self->err_line("no_devices") unless @dests;
 
     my $explicit_fid_used = $fid ? 1 : 0;
@@ -237,13 +209,14 @@
     my $ins_tempfile = sub {
         $dbh->do("INSERT INTO tempfile SET ".
                  " fid=?, dmid=?, dkey=?, classid=?, createtime=UNIX_TIMESTAMP(), devids=?",
-                 undef, $fid, $dmid, $key, $classid, join(',', @dests));
+                 undef, $fid, $dmid, $key, $mclass->classid, join(',', @dests));
         return undef if $dbh->err;
         unless (defined $fid) {
             # if they did not give us a fid, then we want to grab the one that was
             # theoretically automatically generated
             $fid = $dbh->{mysql_insertid};  # FIXME: mysql-ism
         }
+
         return undef unless defined $fid && $fid > 0;
         return 1;
     };
@@ -272,6 +245,8 @@
         return undef unless $ins_tempfile->();
     }
 
+    error("storing $fid in dev" . $dests[0]) if $Mgd::DEBUG >= 1;
+
     # make sure directories exist for client to be able to PUT into
     foreach my $devid (@dests) {
         my $path = Mgd::make_path($devid, $fid);
@@ -391,6 +366,45 @@
     }
 }
 
+sub cmd_increase_replicas {
+    my MogileFS::Worker::Query $self = shift;
+    my $args = shift;
+
+    # get DB handle
+    my $dbh = Mgd::get_dbh() or
+        return $self->err_line("nodb");
+
+    # get the number of extra replicas
+    my $extrareplicacount = $args->{extracount};
+
+    # we need either a key or a fid
+    my $fid = $args->{fid};
+
+    if (!defined $fid) {
+        # validate parameters
+        my $domain = $args->{domain};
+        return $self->err_line('no_domain') unless length $domain;
+        
+        my $dmid = Mgd::domain_id($domain);
+        return $self->err_line('domain_not_found') unless $dmid;
+        
+        my $key = $args->{key} or
+            return $self->err_line("no_key_or_fid");
+
+        $fid = $dbh->selectrow_array("SELECT fid FROM file WHERE dmid=? AND dkey=?",
+                                        undef, $dmid, $key);
+        return $self->err_line("unknown_key") unless $fid;
+    }
+
+    # got the fid, now schedule more replicas to be made
+    $dbh->do("INSERT IGNORE INTO file_to_replicate ".
+             "SET fid=?, extrareplicacount=?, nexttry=1", undef, $fid, $extrareplicacount);
+    return $self->err_line("db_err") if $dbh->err;
+
+    return $self->ok_line();
+
+}
+
 sub cmd_delete {
     my MogileFS::Worker::Query $self = shift;
     my $args = shift;
@@ -1202,6 +1216,7 @@
 
     my $args = shift;
     my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
+
     $self->send_to_parent("${id}${delay}OK $argline");
     return 1;
 }
@@ -1238,12 +1253,15 @@
         'no_host' => "No host provided",
         'no_ip' => "IP required to create host",
         'no_port' => "Port required to create host",
+        'no_key_or_fid' => "Either a key or fid must be provided",
         'none_match' => "No keys match that pattern and after-value (if any).",
         'plugin_aborted' => "Action aborted by plugin",
         'state_too_high' => "Status cannot go from dead to alive; must use down",
         'unknown_command' => "Unknown server command",
         'unknown_host' => "Host not found",
+        'unknown_fid' => "No file with that fid found",
         'unknown_state' => "Invalid/unknown state",
+        'unknown_key' => "File with specified key not found",
         'unreg_domain' => "Domain name invalid/not found",
     }->{$err_code} || "no text";
 
Index: server/lib/MogileFS/Worker/Monitor.pm
===================================================================
--- server/lib/MogileFS/Worker/Monitor.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Worker/Monitor.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -82,7 +82,9 @@
                 $stats{$1} = $2;
             }
 
-            my ($used, $total) = ($stats{used}, $stats{total});
+            # set total to be used + available rather than total, as the total number
+	    # seems to lie.
+            my ($used, $total) = ($stats{used}, $stats{used} + $stats{available});
             unless ($used && $total) {
                 error("dev$dev->{devid} reports used = $used, total = $total, error?");
                 next;
@@ -125,6 +127,10 @@
 
                 # if success and the content matches, mark it writeable
                 if ($testwrite->is_success && $testwrite->content eq $content) {
+                    # delete the file from the storage node so we don't end up with zillions
+                    # of these mofos. Ignore the return code.
+                    error("problem deleting test write: $puturl") unless http_delete($puturl);
+                    
                     $self->broadcast_device_writeable($dev->{devid});
                     error("dev$dev->{devid}: used = $used, total = $total, writeable = 1")
                         if $Mgd::DEBUG >= 1;
@@ -146,6 +152,38 @@
 
 }
 
+sub http_delete {
+    my $path = shift;
+
+    # hit up the server and delete it
+    if (my $urlref = Mgd::is_url($path)) {
+        my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
+                                         PeerPort => $urlref->[1],
+                                         Timeout => 4);
+        unless ($sock) {
+            # device must be down
+            return 0;
+        }
+        
+        # send delete request
+        $sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
+        my $response = <$sock>;
+        if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
+            if (($1 >= 200 && $1 <= 299) || $1 == 404) {
+                # effectively means all went well
+                return 1;
+            } else {
+                # remote file system error?  mark node as down
+                error("Error: unlink failure: $path: $1");
+                return 0;
+            }
+        } else {
+            error("Error: unknown response line: $response");
+            return 0;
+        }
+    }
+}
+
 1;
 
 # Local Variables:
Index: server/lib/MogileFS/Worker/Reduce.pm
===================================================================
--- server/lib/MogileFS/Worker/Reduce.pm	(.../vendor/danga/mogilefs/458)	(revision 0)
+++ server/lib/MogileFS/Worker/Reduce.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -0,0 +1,276 @@
+package MogileFS::Worker::Reduce;
+
+use strict;
+use base 'MogileFS::Worker';
+use MogileFS::Util qw(every error);
+
+use POSIX;
+
+# We start deleting things if the drive is more than %80 full.
+# Dennis assures me that is the tipping point for good performance
+# on a drive. ;-)
+
+use constant MB_REDUCTION_PER_PASS => 2000;
+use constant MAX_DELETE_PER_PASS => 1000;
+use constant REDUCE_FREQUENCY_SECONDS => (60 * 60);
+
+use constant TESTING => 1;
+
+sub new {
+    my ($class, $psock) = @_;
+    my $self = fields::new($class);
+    $self->SUPER::new($psock);
+
+    return $self;
+}
+
+sub watchdog_timeout {
+    60;   # some of these queries could take a while
+}
+
+sub work {
+    my $self = shift;
+
+    # give the monitor job 15 seconds to give us an update
+    my $reduce_at = time();
+    my $warn_after = time() + 15;
+    my $loop = 0;
+
+    every(10, sub {
+        # make sure the parent is still around
+        $self->parent_ping;
+
+        # reduction doesn't go well if the monitor job hasn't actively started
+        # marking things as being available
+        unless ($self->monitor_has_run) {
+            error("waiting for monitor job to complete a cycle before beginning replication")
+                if time() > $warn_after;            
+            return;
+        }
+
+        # only reduce once every hour
+        unless (time() > $reduce_at) {
+            return;
+        }
+        $reduce_at = time() + REDUCE_FREQUENCY_SECONDS;
+
+        error("Reducer running; looking for devices that need to remove replicas to get space");
+        
+        $self->validate_dbh;
+        my $dbh = $self->get_dbh or return 0;
+
+        # get a current list of live devices
+        my $devs = Mgd::get_device_summary();
+        my @livedevs = grep { $_->{status} eq "alive" } values %$devs
+            or return;
+
+        my $worker = MogileFS::ProcManager->is_child or die;
+
+        foreach my $dev (@livedevs) {
+            my $start_time = time();
+            
+            # nothing to delete if we have no total space
+            next unless $dev->{mb_total};
+
+            # if the drive is over 80% full, let's see if there is stuff to delete
+            my $pfull = ($dev->{mb_used} / $dev->{mb_total}) * 100;
+
+            error("dev" . $dev->{devid} . " is " . sprintf("%02d", $pfull) . " percent full");
+
+            if ($pfull > 80) {
+                # see how much extra stuff we've got on this device
+                my ($count, $size) = $dbh->selectrow_array("select count(*), sum(file.length) / 1000000 from file, file_on, class where file_on.devid = ? and file_on.fid = file.fid and file.dmid = class.dmid and file.classid = class.classid and file.devcount > class.mindevcount", undef, $dev->{devid});
+                
+                error("looking for excess replicas on dev" . $dev->{devid} . " (" .
+                      sprintf("%02d", $pfull) . 
+                      " percent full) took " . (time() - $start_time) . 
+                      " seconds, found: $count items, ${size}MB");
+                      
+                # can't do anything if the drive has no extra stuff
+                next unless $count;
+                
+                # sometimes rounding makes this wacky
+                $size = 1 unless int($size);
+
+                # figure out the average size of the extra replicas we've got,
+                # and use that to compute how many files we think we should nuke
+                my $avg_size = $size / $count;
+                my $files_to_nuke = int(MB_REDUCTION_PER_PASS / $avg_size);
+                $files_to_nuke = ($files_to_nuke > MAX_DELETE_PER_PASS) ? MAX_DELETE_PER_PASS : $files_to_nuke;
+                
+                my $start_offset;
+                if ($files_to_nuke > $count) {
+                    $start_offset = 0;
+                    $files_to_nuke = $count;
+                    
+                } else {
+                    # randomly start somewhere
+                    $start_offset = int(rand($count - $files_to_nuke));
+                }
+                
+                error("deleting $files_to_nuke files off dev" . $dev->{devid} . ", starting at offset $start_offset, count $files_to_nuke");
+                
+                # get the list of things to delete
+                $start_time = time();
+                my $sth = $dbh->prepare("select file.fid from file, file_on, class where file_on.devid = ? and file_on.fid = file.fid and file.dmid = class.dmid and file.classid = class.classid and file.devcount > class.mindevcount limit ?, ?");
+                #print " time to pull in extra files: " . (time() - $start_time) . "\n";
+                
+                $sth->bind_param(1, $dev->{devid});
+                $sth->bind_param(2, $start_offset);
+                $sth->bind_param(3, $files_to_nuke);
+                $sth->execute;
+                
+                my (@fids, $fid);
+                push(@fids, $fid) while (($fid) = $sth->fetchrow_array());
+                
+                $sth->finish;
+
+                if (TESTING) {
+                    error("skipping delete since we're testing");
+                    next;
+                }
+
+                # loop through and try to delete each fid
+                foreach $fid (@fids) {
+                    # keep the ProcManager satisfied that we haven't hung
+                    $worker->still_alive;
+                    
+                    #print "deleting $fid off of " . $dev->{devid} . "\n";
+                    
+                    # continue if we properly deleted things
+                    my $rv = delete_fid($dbh, $dev->{devid}, $fid);
+                    if (!$rv) {
+                        # go to the next device on any error
+                        error("Not reducing replicas on " . $dev->{devid} . " due to connection error");
+                        last;
+                    }
+                }
+            }
+        }
+            
+        error("Reduce run complete");
+    });
+}
+
+# delete the given fid from the given device.
+# this and all code under it were basically ripped
+# from the Replicate worker.
+
+sub delete_fid {
+    my ($dbh, $devid, $fid) = @_;
+
+    # use the same locks as replicate so we don't trounce
+    # one another.
+    my $lock;  # bool: whether we got the lock or not
+    my $lockname = "mgfs:fid:$fid:replicate";
+
+    my $unlock = sub {
+        $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname)
+            if $lock;
+    };
+
+    my $retunlock = sub {
+        my $rv = shift;
+        
+        my ($errmsg, $errcode);
+        if (@_ == 2) {
+            ($errcode, $errmsg) = @_;
+            $errmsg = "$errcode: $errmsg"; # include code with message
+        } else {
+            ($errmsg) = @_;
+        }
+
+        my $ret = $rv ? $rv : error($errmsg);
+        $unlock->();
+        return $ret;
+    };
+
+    $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef, $lockname);
+    return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock $lockname")
+        unless $lock;
+    
+    my $rv = http_delete($devid, $fid);
+
+    # only continue if we successfully deleted this sucker
+    return $retunlock->($rv) unless $rv;
+
+    # delete the reference in the database
+    $rv = remove_file_from($fid, $devid, 0);
+
+    return $retunlock->($rv);
+}
+
+# Delete the given instance . Return 1 on success and 0 on failure.
+
+sub http_delete {
+    my ($ddevid, $fid) = @_;
+
+    my $path = Mgd::make_path($ddevid, $fid);
+
+    if (my $urlref = Mgd::is_url($path)) {
+        # hit up the server and delete it
+        # TODO: (optimization) use MogileFS->get_observed_state and don't try to delete things known to be down/etc
+        my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
+                                         PeerPort => $urlref->[1],
+                                         Timeout => 4);
+        unless ($sock) {
+            # device must be down
+            return 0;
+        }
+        
+        # send delete request
+        error("deleting $fid on dev$ddevid");
+        $sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
+        my $response = <$sock>;
+        if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
+            if (($1 >= 200 && $1 <= 299) || $1 == 404) {
+                # effectively means all went well
+                return 1;
+            } else {
+                # remote file system error?  mark node as down
+                error("Error: unlink failure: $path: $1");
+                return 0;
+            }
+        } else {
+            error("Error: unknown response line: $response");
+            return 0;
+        }
+    } else {
+        # do normal unlink
+        my $rv = unlink "$Mgd::MOG_ROOT/$path";
+        
+        # device is timing out. take note of it and
+        # continue dealing with other deletes
+        if (! $rv) {
+            if ($! == ENOENT) {
+                # file doesn't exist is ok with us
+                return 1;
+            } else {
+                return 0;
+            }
+        } else {
+            return 1;
+        }
+    }
+}
+
+
+sub remove_file_from {
+    my ($fid, $devid, $no_lock) = @_;
+
+    my $dbh = Mgd::get_dbh() or return 0;
+
+    my $rv = $dbh->do("DELETE FROM file_on where fid=? AND devid=?",
+                      undef, $fid, $devid);
+    if ($rv > 0) {
+        return Mgd::update_fid_devcount($fid, $no_lock);
+    }
+}
+
+1;
+
+# Local Variables:
+# mode: perl
+# c-basic-indent: 4
+# indent-tabs-mode: nil
+# End:
Index: server/lib/MogileFS/Worker/Replicate.pm
===================================================================
--- server/lib/MogileFS/Worker/Replicate.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Worker/Replicate.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -53,7 +53,7 @@
         MogileFS::Config->set_config("old_repl_compat", $1);
         return 1;
     }
-
+    
     return 0;
 }
 
@@ -111,21 +111,22 @@
     # find some fids to replicate, prioritize based on when they should be tried
     my $LIMIT = 1000;
     my $to_repl_map = $dbh->selectall_hashref(qq{
-        SELECT fid, fromdevid, failcount, flags, nexttry
-        FROM file_to_replicate
-        WHERE nexttry <= UNIX_TIMESTAMP()
-        ORDER BY nexttry
-        LIMIT $LIMIT
-    }, "fid");
+        SELECT file_to_replicate.fid, fromdevid, failcount, flags, nexttry, extrareplicacount, file.length
+            FROM file_to_replicate, file
+            WHERE nexttry <= UNIX_TIMESTAMP()
+            AND file_to_replicate.fid = file.fid
+            ORDER BY nexttry
+            LIMIT $LIMIT
+        }, "fid");
     if ($dbh->err) {
         error("Database error selecting fids to replicate: " . $dbh->errstr);
         return;
     }
-
+    
     # get random list of hashref of things to do:
     my $to_repl = [ List::Util::shuffle(values %$to_repl_map) ];
     return unless @$to_repl;
-
+    
     # sort our priority list in terms of 0s (immediate, only 1 copy), 1s (immediate replicate,
     # but we already have 2 copies), and big numbers (unixtimestamps) of things that failed.
     # but because sort is stable, these are random within their 0/1/big classes.
@@ -135,9 +136,11 @@
 
     foreach my $todo (@$to_repl) {
         my $fid = $todo->{fid};
+        my $length = $todo->{length};
+        my $extrareplicacount = $todo->{extrareplicacount};
 
         my $errcode;
-        my ($status, $unlock) = replicate($dbh, $fid,
+        my ($status, $unlock) = replicate($dbh, $fid, $extrareplicacount, $length,
                                           errref       => \$errcode,
                                           no_unlock    => 1,   # to make it return an $unlock subref
                                           source_devid => $todo->{fromdevid},
@@ -277,7 +280,7 @@
                 $self->read_from_parent;
                 $self->still_alive;
 
-                if (my $status = replicate($dbh, $fid, class => $mclass)) {
+                if (my $status = replicate($dbh, $fid, 0, class => $mclass)) { # TODO: send acctual length, not 0
                     # $status is either 0 (failure, handled below), 1 (success, we actually
                     # replicated this file), or 2 (success, but someone else replicated it).
                     # so if it's 2, we just want to go to the next fid.  this file is done.
@@ -288,11 +291,11 @@
                         $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid);
                         die $dbh->errstr if $dbh->err;
                     }
-
+                    
                     # housekeeping
                     $fixed++;
                     $self->send_to_parent("repl_i_did $fid");
-
+                    
                     # status update
                     if ($fixed % 20 == 0) {
                         my $ratio = $fixed/$attempted*100;
@@ -308,38 +311,38 @@
     });
 }
 
-# replicates $fid if its devcount is less than $min.  (eh, not quite)
+# replicates $fid if its devcount is less than $min.
 #
 # $policy_class is optional (perl classname representing replication policy).  if present, used.  if not, looked up based on $fid.
 #
 # README: if you update this sub to return a new error code, please update the
 # appropriate callers to know how to deal with the errors returned.
 sub replicate {
-    my ($dbh, $fid, %opts) = @_;
+    my ($dbh, $fid, $extrareplicacount, $length, %opts) = @_;
     my $errref    = delete $opts{'errref'};
     my $mclass    = delete $opts{'class'};
     my $no_unlock = delete $opts{'no_unlock'};
     my $sdevid    = delete $opts{'source_devid'};
     die if %opts;
-
+    
     # bool:  if source was explicitly requested by caller
     my $fixed_source = $sdevid ? 1 : 0;
-
+    
     $mclass ||= MogileFS::Class->of_fid($fid);
-
+    
     my $policy_class = $mclass->policy_class;
     eval "use $policy_class; 1;";
     if ($@) {
         return error("Failed to load policy class: $policy_class: $@");
     }
-
+    
     my $lock;  # bool: whether we got the lock or not
     my $lockname = "mgfs:fid:$fid:replicate";
     my $unlock = sub {
         $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname)
             if $lock;
     };
-
+    
     my $retunlock = sub {
         my $rv = shift;
         my ($errmsg, $errcode);
@@ -350,7 +353,7 @@
             ($errmsg) = @_;
         }
         $$errref = $errcode if $errref;
-
+        
         my $ret = $rv ? $rv : error($errmsg);
         if ($no_unlock) {
             return ($ret, $unlock);
@@ -359,21 +362,21 @@
             return $ret;
         }
     };
-
+    
     # hashref of devid -> $device_row_href  (where devid is alive)
     my $devs = Mgd::get_device_summary();
     return $retunlock->(0, "no_devices", "Device information from get_device_summary is empty")
         unless $devs && %$devs;
-
+    
     $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef, $lockname);
     return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock $lockname")
         unless $lock;
-
+    
     # learn what devices this file is already on
     my $on_count = 0;
     my @dead_devid;   # list of dead devids.  FIXME: do something with this?
     my @exist_devid;  # list of existing devids
-
+    
     my $sth = $dbh->prepare("SELECT devid FROM file_on WHERE fid=?");
     $sth->execute($fid);
     die $dbh->errstr if $dbh->err;
@@ -388,100 +391,135 @@
         $on_count++;
         push @exist_devid, $devid;
     }
-
+    
     return $retunlock->(0, "no_source",   "Source is no longer available replicating $fid") if $on_count == 0;
     return $retunlock->(0, "source_down", "No eligible devices available replicating $fid") if @exist_devid == 0;
-
+    
     # if they requested a specific source, that source must be up.
     if ($sdevid && ! grep { $_ == $sdevid} @exist_devid) {
         return $retunlock->(0, "source_down", "Requested replication source device $sdevid not available");
     }
-
+    
     my $ddevid;
     my %dest_failed;    # devid -> 1 for each devid we were asked to copy to, but failed.
     my %source_failed;  # devid -> 1 for each devid we had problems reading from.
     my $got_copy_request = 0;  # true once replication policy asks us to move something somewhere
     my $copy_err;
-
+    
     while ($ddevid = $policy_class->replicate_to(
                                                  fid       => $fid,
+                                                 length    => $length,
                                                  on_devs   => \@on_devs, # all device objects fid is on, dead or otherwise
                                                  all_devs  => $devs,
                                                  failed    => \%dest_failed,
-                                                 min       => $mclass->mindevcount,
+                                                 min       => $mclass->mindevcount + $extrareplicacount,
+                                                 mclass     => $mclass,
                                                  ))
     {
         # they can return either a dev hashref/object or a devid number.  we want the number.
         $ddevid = $ddevid->{devid} if ref $ddevid;
-        $got_copy_request = 1;
-
-        # replication policy shouldn't tell us to put a file on a device
-        # we've already told it that we've failed at.  so if we get that response,
-        # the policy plugin is broken and we should terminate now.
-        if ($dest_failed{$ddevid}) {
-            return $retunlock->(0, "policy_error_doing_failed",
-                                "replication policy told us to do something we already told it we failed at while replicating fid $fid");
-        }
-
-        # replication policy shouldn't tell us to put a file on a
-        # device that it's already on.  that's just stupid.
-        if (grep { $_->{devid} == $ddevid } @on_devs) {
-            return $retunlock->(0, "policy_error_already_there",
-                                "replication policy told us to put fid $fid on dev $ddevid, but it's already there!");
-        }
-
-        # find where we're replicating from
-        unless ($fixed_source) {
-            # TODO: use an observed good device+host as source to start.
-            my @choices = grep { ! $source_failed{$_} } @exist_devid;
-            return $retunlock->(0, "source_down", "No devices available replicating $fid") unless @choices;
-            $sdevid = @choices[int(rand(scalar @choices))];
-        }
-
-        my $worker = MogileFS::ProcManager->is_child or die;
-        my $rv = http_copy(
-                           sdevid       => $sdevid,
-                           ddevid       => $ddevid,
-                           fid          => $fid,
-                           expected_len => undef,  # FIXME: get this info to pass along
-                           errref       => \$copy_err,
-                           callback     => sub { $worker->still_alive; },
-                           );
-        die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
-
-        unless ($rv) {
-            error("Failed copying fid $fid from devid $sdevid to devid $ddevid (error type: $copy_err)");
-            if ($copy_err eq "src_error") {
-                $source_failed{$sdevid} = 1;
-
-                if ($fixed_source) {
-                    # there can't be any more retries, as this source
-                    # is busted and is the only one we wanted.
-                    return $retunlock->(0, "copy_error", "error copying fid $fid from devid $sdevid during replication");
+        
+        if ($ddevid > 0) {
+            # positive ddevid indicates to copy the file to a new host
+            
+            error("copying $fid (" . $mclass->namespace . ":" . $mclass->classname . ") to dev$ddevid");
+            
+            $got_copy_request = 1;
+            
+            # replication policy shouldn't tell us to put a file on a device
+            # we've already told it that we've failed at.  so if we get that response,
+            # the policy plugin is broken and we should terminate now.
+            if ($dest_failed{$ddevid}) {
+                return $retunlock->(0, "policy_error_doing_failed",
+                                    "replication policy told us to do something we already told it we failed at while replicating fid $fid");
+            }
+            
+            # replication policy shouldn't tell us to put a file on a
+            # device that it's already on.  that's just stupid.
+            if (grep { $_->{devid} == $ddevid } @on_devs) {
+                return $retunlock->(0, "policy_error_already_there",
+                                    "replication policy told us to put fid $fid on dev $ddevid, but it's already there!");
+            }
+            
+            # find where we're replicating from
+            unless ($fixed_source) {
+                # TODO: use an observed good device+host as source to start.
+                my @choices = grep { ! $source_failed{$_} } @exist_devid;
+                return $retunlock->(0, "source_down", "No devices available replicating $fid") unless @choices;
+                $sdevid = @choices[int(rand(scalar @choices))];
+            }
+            
+            my $worker = MogileFS::ProcManager->is_child or die;
+            my $rv = http_copy(
+                               sdevid       => $sdevid,
+                               ddevid       => $ddevid,
+                               fid          => $fid,
+                               expected_len => undef,  # FIXME: get this info to pass along
+                               errref       => \$copy_err,
+                               callback     => sub { $worker->still_alive; },
+                               );
+            die "Bogus error code: '$copy_err'" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
+            
+            unless ($rv) {
+                error("Failed copying fid $fid from devid $sdevid to devid $ddevid (error type: $copy_err)");
+                if ($copy_err eq "src_error") {
+                    $source_failed{$sdevid} = 1;
+                    
+                    if ($fixed_source) {
+                        # there can't be any more retries, as this source
+                        # is busted and is the only one we wanted.
+                        return $retunlock->(0, "copy_error", "error copying fid $fid from devid $sdevid during replication");
+                    }
+                    
+                } else {
+                    $dest_failed{$ddevid} = 1;
                 }
-
+                next;
+            }
+            
+            add_file_on($fid, $ddevid, 1);
+            push @on_devs, $devs->{$ddevid};
+            
+        } else {
+            $got_copy_request = 1;
+            
+            # negative ddevid was passed back, indicating we should delete this
+            # file from the given device (the replication policy thinks there are
+            # too many copies of this file)
+            $ddevid = -1 * $ddevid;
+            
+            error("deleting $fid (" . $mclass->namespace . ":" . $mclass->classname . ") from dev$ddevid");
+            
+            my $rv = http_delete(
+                                 ddevid     => $ddevid,
+                                 fid       => $fid,
+                                 );
+            
+            if ($rv == 0) {
+                $dest_failed{$ddevid} = 1;
+                
             } else {
-                $dest_failed{$ddevid} = 1;
+                # success! note the replica is deleted in the db
+                remove_file_from($fid, $ddevid, 1);
+                
+                # take it out of our list of devs this file is on
+                @on_devs = grep { $_->{devid} != $ddevid } @on_devs;
             }
-            next;
         }
-
-        add_file_on($fid, $ddevid, 1);
-        push @on_devs, $devs->{$ddevid};
     }
-
+    
     # returning 0, not undef, means replication policy is happy and we're done.
     if (defined $ddevid && ! $ddevid) {
         return $retunlock->(1) if $got_copy_request;
         return $retunlock->(2);  # some other process got to it first.  policy was happy immediately.
     }
-
+    
     # TODO: if we're on only 1 device and they returned undef, let's
     # try and put it SOMEWHERE just to make ourselves happy, even if
     # it it doesn't obey policy?  or is that decision itself policy?
     # unfortunately, there's no way for the replication policy to say
     # "replicate to 6, but I don't like that, so don't count it as good"
-
+    
     if ($got_copy_request) {
         return $retunlock->(0, "copy_error", "errors copying fid $fid during replication");
     } else {
@@ -489,6 +527,51 @@
     }
 }
 
+# Delete the given instance. Return 1 on success and 0 on failure.
+
+sub http_delete {
+    my %opts = @_;
+    my $ddevid = delete $opts{ddevid};
+    my $fid = delete $opts{fid};
+    
+    warn("unknown options passed to http_delete: " . join(", ", keys(%opts))) if %opts;
+    
+    my $path = Mgd::make_path($ddevid, $fid);
+    
+    if (my $urlref = Mgd::is_url($path)) {
+        # hit up the server and delete it
+        # TODO: (optimization) use MogileFS->get_observed_state and don't try to delete things known to be down/etc
+        my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
+                                         PeerPort => $urlref->[1],
+                                         Timeout => 4);
+        unless ($sock) {
+            # device must be down
+            return 0;
+        }
+        
+        # send delete request
+        error("Sending delete for $path") if $Mgd::DEBUG >= 2;
+        $sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
+        my $response = <$sock>;
+        if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
+            if (($1 >= 200 && $1 <= 299) || $1 == 404) {
+                # effectively means all went well
+                return 1;
+            } else {
+                # remote file system error?  mark node as down
+                error("Error: unlink failure: $path: $1");
+                return 0;
+            }
+        } else {
+            error("Error: unknown response line: $response");
+            return 0;
+        }
+    } else {
+        error("unable to delete file at path '$path', as it doesn't appear to be a URL");
+    }
+}
+
+
 # copies a file from one Perlbal to another utilizing HTTP
 sub http_copy {
     my %opts = @_;
@@ -555,7 +638,7 @@
     my $sock = IO::Socket::INET->new(PeerAddr => $shost, PeerPort => $sport, Timeout => 2)
         or return $src_error->("Unable to create source socket to $shost:$sport for $spath");
     $sock->write("GET $spath HTTP/1.0\r\n\r\n");
-    return error("Pipe closed retrieving $spath from $shost:$sport")
+    return $src_error->("Pipe closed retrieving $spath from $shost:$sport")
         if $pipe_closed;
 
     # we just want a content length
@@ -635,6 +718,19 @@
     }
 }
 
+sub remove_file_from {
+    my ($fid, $devid, $no_lock) = @_;
+
+    my $dbh = Mgd::get_dbh() or return 0;
+
+    my $rv = $dbh->do("DELETE FROM file_on where fid=? AND devid=?",
+                      undef, $fid, $devid);
+    if ($rv > 0) {
+        return Mgd::update_fid_devcount($fid, $no_lock);
+    }
+}
+
+
 1;
 
 # Local Variables:
Index: server/lib/MogileFS/Worker/Reaper.pm
===================================================================
--- server/lib/MogileFS/Worker/Reaper.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Worker/Reaper.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -62,6 +62,14 @@
                     next;
                 }
 
+                # enqueue the fid to make new replicas
+                $dbh->do("INSERT IGNORE INTO file_to_replicate ".
+                         "SET fid=?, nexttry=0", undef, $fid);
+                if ($dbh->err) {
+                    error("error inserting $fid into file_to_replicate: " . $dbh->errstr);
+                    next;
+                }
+
                 # if debugging on, note this is done
                 error("Reaper noted fid $fid no longer on device $devid")
                     if $Mgd::DEBUG >= 2;
Index: server/lib/MogileFS/Worker/Delete.pm
===================================================================
--- server/lib/MogileFS/Worker/Delete.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/lib/MogileFS/Worker/Delete.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -8,7 +8,7 @@
 # we select 1000 but only do a random 100 of them, to allow
 # for stateless paralleism
 use constant LIMIT => 1000;
-use constant PER_BATCH => 100;
+use constant PER_BATCH => 200;
 
 # TODO: use LWP and persistent connections to do deletes.  less local ports used.
 
@@ -35,6 +35,12 @@
     while (1) {
         $self->parent_ping;
 
+        unless ($self->monitor_has_run) {
+            error("waiting for monitor job to complete a cycle before beginning deletion");
+            sleep $sleep_max;
+            next;
+        }
+
         $self->validate_dbh;
         my $dbh = $self->get_dbh;
 
@@ -96,14 +102,21 @@
                                              "WHERE createtime < UNIX_TIMESTAMP() - $too_old LIMIT 50");
     return 0 unless $tempfiles && @$tempfiles;
 
+    # get references to dead machines so we don't reinsert values that
+    # refer to dead boxes.
+    my $devs = Mgd::get_device_summary();
+
     # insert the right rows into file_on and file_to_delete and remove the
     # now expunged (or soon to be) rows from tempfile
     my (@questions, @binds, @fids);
     foreach my $row (@$tempfiles) {
         push @fids, $row->[0];
         foreach my $devid (split /,/, $row->[1]) {
-            push @questions, "(?, ?)";
-            push @binds, $row->[0], $devid;
+            # don't enqueue files on dead or nonexistant hosts
+            if ($devs->{$devid} && ($devs->{$devid}->{status} ne 'dead')) {
+                push @questions, "(?, ?)";
+                push @binds, $row->[0], $devid;
+            }
         }
     }
 
@@ -144,7 +157,6 @@
             $dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
                      undef, $fid, $devid);
             dbcheck($dbh, "Failure to delete from file_on for $fid/$devid");
-            die "Failed to delete from file_on: " . $dbh->errstr if $dbh->err;
         };
 
         my $reschedule_fid = sub {
@@ -231,7 +243,7 @@
                 $reschedule_fid->(60 * 30, "http_code_$httpcode");
                 next;
             }
-        } else {
+        } elsif ($path) {
             error("Error: unknown response line deleting $path: $response");
         }
     }
Index: server/mogilefsd
===================================================================
--- server/mogilefsd	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/mogilefsd	(.../trunk/mogile2.0)	(revision 1095)
@@ -45,6 +45,7 @@
 use MogileFS::Worker::Reaper;
 use MogileFS::Worker::Monitor;
 use MogileFS::Worker::Checker;
+use MogileFS::Worker::Reduce;
 use MogileFS::ProcManager;
 use MogileFS::Config;
 use MogileFS::HTTPFile;
@@ -55,7 +56,7 @@
 
 # this is incremented whenever the schema changes.  server will refuse
 # to start-up with an old schema version
-use constant SCHEMA_VERSION => 7;
+use constant SCHEMA_VERSION => 8;
 
 our $starttime = time(); # time we got going
 our %domaincache; # { domainname => { domainrow } }
@@ -78,6 +79,7 @@
 MogileFS::ProcManager->set_min_workers('reaper'      => MogileFS->config('reaper_jobs'));
 MogileFS::ProcManager->set_min_workers('monitor'     => MogileFS->config('monitor_jobs'));
 MogileFS::ProcManager->set_min_workers('checker'     => MogileFS->config('checker_jobs'));
+MogileFS::ProcManager->set_min_workers('reduce'      => MogileFS->config('reduce_jobs'));
 
 # open up our log
 openlog('mogilefsd', 'pid', 'daemon');
@@ -139,7 +141,6 @@
 Mgd::log('info', 'ending run');
 closelog();
 
-
 # database checking/connecting
 {
     my ($dbh, $dbh_pid);
@@ -219,87 +220,10 @@
 ### S E R V E R   A P I   F U N C T I O N S
 #####################################################################
 
-# returns hashref of devid -> $device_row_href  (where devid is alive/down, but not dead)
+# returns hashref of devid -> $device_row_href
 # cached for 15 seconds.
 use vars qw($cache_device_summary $cache_device_summary_time %cache_host $cache_host_time);
 
-# general purpose device locator.  example:
-#
-# my $devid = Mgd::find_deviceid(
-#     random => 1,              # get random device (else find first suitable)
-#     min_free_space => 100,    # with at least 100MB free
-#     weight_by_free => 1,      # find result weighted by free space
-#     max_disk_age => 5,        # minutes of age the last usage report can be before we ignore the disk
-#     not_on_hosts => [ 1, 2 ], # no devices on hosts 1 and 2
-#     must_be_alive => 1,       # if specified, device/host must be writeable (fully available)
-# );
-#
-# returns undef if no suitable device was found.  else, if you wanted an
-# array will return an array of the suitable devices--if you want just a
-# single item, you get just the first one found.
-sub find_deviceid {
-    my %opts = ( @_ );
-
-    # validate we're getting called with known parameters
-    my %valid_keys = map { $_ => 1 } qw( random min_free_space weight_by_free max_disk_age not_on_hosts must_be_writeable must_be_readable );
-    warn "invalid key $_ in call to find_deviceid\n"
-        foreach grep { ! $valid_keys{$_} } keys %opts;
-
-    # copy down global minimum free space if not specified
-    $opts{min_free_space} ||= MogileFS->config("min_free_space");
-    $opts{max_disk_age}   ||= MogileFS->config("max_disk_age");
-    if ($opts{max_disk_age}) {
-        $opts{max_disk_age} = time() - ($opts{max_disk_age} * 60);
-    }
-    $opts{must_be_alive} = 1 unless defined $opts{must_be_alive};
-
-    # setup for iterating over devices
-    my $devs = Mgd::get_device_summary();
-    my @devids = keys %{$devs || {}};
-    my $devcount = scalar(@devids);
-    my $start = $opts{random} ? int(rand($devcount)) : 0;
-    my %not_on_host = ( map { $_ => 1 } @{$opts{not_on_hosts} || []} );
-    my $total_free = 0;
-
-    # now find a device that matches what they want
-    my @list;
-    for (my $i = 0; $i < $devcount; $i++) {
-        my $idx = ($i + $start) % $devcount;
-        my $dev = $devs->{$devids[$idx]};
-
-        # series of suitability checks
-        next unless $dev->{status} eq 'alive';
-        next if $not_on_host{$dev->{hostid}};
-        next if $opts{max_disk_age} && $dev->{mb_asof} &&
-                $dev->{mb_asof} < $opts{max_disk_age};
-        next if $opts{min_free_space} && $dev->{mb_total} &&
-                $dev->{mb_free} < $opts{min_free_space};
-        next if $opts{must_be_writeable} &&
-            (MogileFS->observed_state("host", $dev->{hostid}) ne "reachable" ||
-             MogileFS->observed_state("device", $dev->{devid}) ne "writeable");
-        next if $opts{must_be_readable} &&
-            (MogileFS->observed_state("host", $dev->{hostid}) ne "reachable" ||
-             MogileFS->observed_state("device", $dev->{devid}) ne "readable");
-
-        # we get here, this is a suitable device
-        push @list, $dev->{devid};
-        $total_free += $dev->{mb_free};
-    }
-
-    # now we have a list ordered randomly, do free space weighting
-    if ($opts{weight_by_free}) {
-        my $rand = int(rand($total_free));
-        my $cur = 0;
-        foreach my $devid (@list) {
-            $cur += $devs->{$devid}->{mb_free};
-            return $devid if $cur >= $rand;
-        }
-    }
-
-    # return whole list if wanting array, else just first item
-    return wantarray ? @list : shift(@list);
-}
-
 sub get_device_summary {
     my $now = time;
     return $cache_device_summary if $cache_device_summary_time > $now - 15;
@@ -316,6 +240,7 @@
     $dev{$row->{devid}} = $row while $row = $sth->fetchrow_hashref;
 
     # now override device status with host status if the host status is less than the device status
+    # also set or override replication_advice (device overrides host)
     Mgd::check_host_cache();
 
     foreach my $devid (keys %dev) {
@@ -332,6 +257,8 @@
         } elsif ($dev{$devid}->{status} eq 'down' && $host_status eq 'dead') {
             $dev{$devid}->{status} = $host_status;
         }
+
+        $dev{$devid}->{host} = $cache_host{$dev{$devid}->{hostid}};
     }
 
     $cache_device_summary_time = $now;
@@ -386,6 +313,13 @@
         $cache_host{$host->{hostid}}->{mask} = Net::Netmask->new2($host->{altmask})
             if $host->{altip} && $host->{altmask};
     }
+
+    $sth = $dbh->prepare("SELECT host.hostid, metakey, metaval FROM host, host_meta WHERE host.hostid = host_meta.hostid");
+    $sth->execute;
+    while (my $host_meta = $sth->fetchrow_hashref) {
+        $cache_host{$host_meta->{hostid}}->{meta}->{$host_meta->{metakey}} = $host_meta->{metaval};
+    }
+
     $cache_host_time = time();
     return \%cache_host;
 }
Index: server/mogdbsetup
===================================================================
--- server/mogdbsetup	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ server/mogdbsetup	(.../trunk/mogile2.0)	(revision 1095)
@@ -147,8 +147,9 @@
 sub upgrade_database {
     my ($host, $name, $user, $pass) = @_;
 
-    my $SCHEMA_VERSION = 7;
+    my $SCHEMA_VERSION = 8;
     # schema history:
+    #   8: adds host_meta table and 'extrareplicacount' to file_to_replicate
     #   7: adds file_to_delete_later table
     my $curver = schema_version();
 
@@ -301,7 +302,8 @@
    UNIQUE     (hostip),
    altip      VARCHAR(15),
    UNIQUE     (altip),
-   altmask    VARCHAR(18)
+   altmask    VARCHAR(18),
+   replication_advice TEXT
 )");
 
 # disks
@@ -342,6 +344,13 @@
         $run->("ALTER TABLE device MODIFY COLUMN status ENUM('alive', 'dead', 'down', 'readonly')");
     }
 
+    # extra host meta-information
+    $run->("CREATE TABLE IF NOT EXISTS host_meta (
+               hostid     MEDIUMINT UNSIGNED NOT NULL,
+               metakey    VARCHAR(255) NOT NULL,
+               PRIMARY KEY (hostid, metakey),
+               metaval  TEXT
+)");
 
     $run->("CREATE TABLE IF NOT EXISTS server_settings (
    field   VARCHAR(50) PRIMARY KEY,
@@ -353,6 +362,8 @@
     #   unixtimestamp means at/after that time.  some previous error occurred.
     # fromdevid, if not null, means which devid we should replicate from.  perhaps it's the only non-corrupt one.  otherwise, wherever.
     # failcount.  how many times we've failed, just for doing backoff of nexttry.
+    # extrareplicacount. number of replicas of this file we should ensure exist,
+    #     above and beyond the minimum required by the files class
     # flags.  reserved for future use.
     $run->("CREATE TABLE IF NOT EXISTS file_to_replicate (
    fid        INT UNSIGNED NOT NULL PRIMARY KEY,
@@ -360,9 +371,14 @@
    INDEX (nexttry),
    fromdevid  INT UNSIGNED,
    failcount  TINYINT UNSIGNED NOT NULL DEFAULT 0,
+   extrareplicacount TINYINT UNSIGNED NOT NULL DEFAULT 0,
    flags      SMALLINT UNSIGNED NOT NULL DEFAULT 0
    )");
 
+    unless (column_type("file_to_replicate", "extrareplicacount")) {
+        $run->("ALTER TABLE file_to_replicate ADD COLUMN extrareplicacount TINYINT UNSIGNED NOT NULL DEFAULT 0");
+    }
+
     $run->("CREATE TABLE IF NOT EXISTS file_to_delete_later (
    fid  INT UNSIGNED NOT NULL,
    PRIMARY KEY (fid),
Index: api/perl/lib/MogileFS/Admin.pm
===================================================================
--- api/perl/lib/MogileFS/Admin.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ api/perl/lib/MogileFS/Admin.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -31,6 +31,21 @@
     return 1;
 }
 
+# increase the number of replicas of some item
+#
+sub increase_replicas {
+    my MogileFS::Admin $self = shift;
+    my ($fid, $extracopycount) = @_;
+
+    my $rv = $self->{backend}->do_request
+        ("increase_replicas", {
+            fid    => $fid,
+	    extracount => $extracopycount,
+        }) or return undef;
+
+    return 1;
+}
+
 sub get_hosts {
     my MogileFS::Admin $self = shift;
     my $hostid = shift;
@@ -213,7 +228,6 @@
     return 1;
 }
 
-
 # create a host
 sub create_host {
     my MogileFS::Admin $self = shift;
Index: api/perl/lib/MogileFS/Client.pm
===================================================================
--- api/perl/lib/MogileFS/Client.pm	(.../vendor/danga/mogilefs/458)	(revision 1095)
+++ api/perl/lib/MogileFS/Client.pm	(.../trunk/mogile2.0)	(revision 1095)
@@ -188,6 +188,28 @@
     length($content);
 }
 
+# increase the number of replicas of some item, over and
+# above the minimum replica count required by the class of
+# the item.
+#
+sub increase_replicas {
+    my MogileFS::Client $self = shift;
+    my ($key, $extrareplicacount) = @_;
+
+    my $rv = $self->{backend}->do_request
+        ("increase_replicas", {
+            domain => $self->{domain},
+            key    => $key,
+	    extracount => $extrareplicacount,
+        }) or return undef;
+
+    # if it's unknown_key, not an error
+    return undef unless defined $rv ||
+                        $self->{backend}->{lasterr} eq 'unknown_key';
+
+    return 1;
+}
+
 # old style calling:
 #   get_paths(key, noverify)
 # new style calling:


More information about the mogilefs mailing list