[mogilefs] bradfitz, r1114: more work on custom replication policies... (fwd)

Brad Fitzpatrick brad at danga.com
Thu Aug 16 01:51:22 UTC 2007


Mogilers,

This might be of interest to some of you....

It's now quite easy to drop in your own custom replication policies.

- Brad


---------- Forwarded message ----------
Date: Thu, 16 Aug 2007 01:37:43 +0000 (UTC)
From: commits at code.sixapart.com
To: cvs-commits at livejournal.com
Subject: [mogilefs] bradfitz,
     r1114: more work on custom replication policies...

more work on custom replication policies.

with exception of the lack of mogadm command, should all work now.
added more docs & tests, too.



U   trunk/server/CHANGES
U   trunk/server/MANIFEST
U   trunk/server/lib/MogileFS/Class.pm
U   trunk/server/lib/MogileFS/FID.pm
U   trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
U   trunk/server/lib/MogileFS/Store.pm
U   trunk/server/lib/MogileFS/Worker/Replicate.pm
U   trunk/server/t/multiple-hosts-replpol.t
U   trunk/server/t/replpolicy-parsing.t


Modified: trunk/server/CHANGES
===================================================================
--- trunk/server/CHANGES	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/CHANGES	2007-08-16 01:37:41 UTC (rev 1114)
@@ -16,10 +16,14 @@
         * In the test suite, also search for mogadm in /usr/local/bin/ and
           /usr/local/sbin (spotted by Dean Wilson)

-        * start of per-class pluggable replication policies.  it was always
+        * SCHEMA VERSION 10:  'replpolicy' column on 'class' table.
+          it's safe to --ignore-schema-version and run this mogilefsd
+          against an older schema.  new column enables per-class
+          pluggable replication policies.  it was always
           abstract, but not easy to plugin your own alternatives.
+          see doc/pluggable-replication-policies.txt

-        * add start of a ReplicationPolicy::Union implementation
+        * add start of a MogileFS::ReplicationPolicy::Union implementation

         * fix crash in queryworker's create_open command, when a device
           has its directory made on a mogile storage node (the host of

Modified: trunk/server/MANIFEST
===================================================================
--- trunk/server/MANIFEST	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/MANIFEST	2007-08-16 01:37:41 UTC (rev 1114)
@@ -1,6 +1,7 @@
 CHANGES
 doc/fsck-notes.txt
 doc/memcache-support.txt
+doc/pluggable-replication-policies.txt
 doc/testing.txt
 lib/mogdeps/Danga/Socket.pm
 lib/mogdeps/Gearman/Client.pm

Modified: trunk/server/lib/MogileFS/Class.pm
===================================================================
--- trunk/server/lib/MogileFS/Class.pm	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/Class.pm	2007-08-16 01:37:41 UTC (rev 1114)
@@ -167,16 +167,14 @@
 sub classid      { $_[0]{classid} }
 sub mindevcount  { $_[0]{mindevcount} }

-# TODO: die, remove this
-sub policy_class { "MogileFS::ReplicationPolicy::MultipleHosts" }
-
-# TODO: currently unused, just by testing code. remove callers of policy_class above.
 sub repl_policy_string {
     my $self = shift;
-    return "MultipleHosts(" . ($self->{mindevcount} || MogileFS->config('default_mindevcount')) . ")";
+    # if they've actually configured one, it gets used:
+    return $self->{replpolicy} if $self->{replpolicy};
+    # else, the historical default:
+    return "MultipleHosts()";
 }

-# TODO: currently unused, just by testing code.  remove callers of policy_class above.
 sub repl_policy_obj {
     my $self = shift;
     return $self->{_repl_policy_obj} if $self->{_repl_policy_obj};

Modified: trunk/server/lib/MogileFS/FID.pm
===================================================================
--- trunk/server/lib/MogileFS/FID.pm	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/FID.pm	2007-08-16 01:37:41 UTC (rev 1114)
@@ -186,15 +186,11 @@
     my $self = shift;
     my $cls  = $self->class;

-    my $policy_class = $cls->policy_class
-        or die "No policy class";
+    my $polobj = $cls->repl_policy_obj;

     my $alldev = MogileFS::Device->map
         or die "No global device map";

-    eval "use $policy_class; 1;";
-    die "Failed to load policy class: $policy_class: $@" if $@;
-
     my %rep_args = (
                     fid       => $self->id,
                     on_devs   => [$self->devs],
@@ -202,7 +198,7 @@
                     failed    => {},
                     min       => $cls->mindevcount,
                     );
-    my $rr = rr_upgrade($policy_class->replicate_to(%rep_args));
+    my $rr = rr_upgrade($polobj->replicate_to(%rep_args));
     return $rr->is_happy && ! $rr->too_happy;
 }


Modified: trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
===================================================================
--- trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm	2007-08-16 01:37:41 UTC (rev 1114)
@@ -4,13 +4,20 @@
 use MogileFS::Util qw(weighted_list);
 use MogileFS::ReplicationRequest qw(ALL_GOOD TOO_GOOD TEMP_NO_ANSWER);

+sub new {
+    my ($class, $mindevcount) = @_;
+    return bless {
+        mindevcount => $mindevcount,
+    }, $class;
+}
+
 sub new_from_policy_args {
     my ($class, $argref) = @_;
-    $$argref =~ s/^\s* \( \s* (\d+) \s* \) \s*//x
+    # Note: "MultipleHosts()" is okay, in which case the 'mindevcount'
+    # on the class is used.  (see below)
+    $$argref =~ s/^\s* \( \s* (\d*) \s* \) \s*//x
         or die "$class failed to parse args: $$argref";
-    return bless {
-        mindevcount => $1,
-    }, $class;
+    return $class->new($1)
 }

 sub mindevcount { $_[0]{mindevcount} }
@@ -23,15 +30,12 @@
     my $all_devs = delete $args{all_devs}; # hashref of { devid => MogileFS::Device }
     my $failed   = delete $args{failed};   # hashref of { devid => 1 } of failed attempts this round

-    # FIXME: this code works with either old way or new way.  once old caller code is cleaned up,
-    # only $self-as-object needs to be supported...
-    my $min;     # configured min devcount for this class
-    # new way
-    if (ref $self) {
-        $min = $self->{mindevcount};
-    } else {
-        $min = delete $args{min};
-    }
+    # this is the per-class mindevcount (the old way), which is passed in automatically
+    # from the replication worker.  but if we have our own configured mindevcount
+    # in class.replpolicy, like "MultipleHosts(3)", then we use the explciit one. otherwise,
+    # if blank, or zero, like "MultipleHosts()", then we use the builtin on
+    my $min      = delete $args{min};
+    $min         = $self->{mindevcount} || $min;

     warn "Unknown parameters: " . join(", ", sort keys %args) if %args;
     die "Missing parameters" unless $on_devs && $all_devs && $failed && $fid;

Modified: trunk/server/lib/MogileFS/Store.pm
===================================================================
--- trunk/server/lib/MogileFS/Store.pm	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/Store.pm	2007-08-16 01:37:41 UTC (rev 1114)
@@ -11,7 +11,8 @@
 #
 # 8: adds fsck_log table
 # 9: adds 'drain' state to enum in device table
-use constant SCHEMA_VERSION => 9;
+# 10: adds 'replpolicy' column to 'class' table
+use constant SCHEMA_VERSION => 10;

 sub new {
     my ($class) = @_;
@@ -384,10 +385,17 @@
     $sto->upgrade_add_device_weight;
     $sto->upgrade_add_device_readonly;
     $sto->upgrade_add_device_drain;
+    $sto->upgrade_add_class_replpolicy;

     return 1;
 }

+sub cached_schema_version {
+    my $self = shift;
+    return $self->{_cached_schema_version} ||=
+        $self->schema_version;
+}
+
 sub schema_version {
     my $self = shift;
     my $dbh = $self->dbh;
@@ -632,6 +640,13 @@
 sub upgrade_add_device_readonly { 1 }
 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }

+sub upgrade_add_class_replpolicy {
+    my ($self) = @_;
+    unless ($self->column_type("class", "replpolicy")) {
+        $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
+    }
+}
+
 # return true if deleted, 0 if didn't exist, exception if error
 sub delete_host {
     my ($self, $hostid) = @_;
@@ -1009,7 +1024,13 @@
 sub get_all_classes {
     my ($self) = @_;
     my (@ret, $row);
-    my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount FROM class");
+
+    my $repl_col = "";
+    if ($self->cached_schema_version >= 10) {
+        $repl_col = ", replpolicy";
+    }
+
+    my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
     $sth->execute;
     push @ret, $row while $row = $sth->fetchrow_hashref;
     return @ret;

Modified: trunk/server/lib/MogileFS/Worker/Replicate.pm
===================================================================
--- trunk/server/lib/MogileFS/Worker/Replicate.pm	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/Worker/Replicate.pm	2007-08-16 01:37:41 UTC (rev 1114)
@@ -256,7 +256,7 @@
     my $did_something = 0;
     MogileFS::Class->foreach(sub {
         my $mclass = shift;
-        my ($dmid, $classid, $min, $policy_class) = map { $mclass->$_ } qw(domainid classid mindevcount policy_class);
+        my ($dmid, $classid, $min) = map { $mclass->$_ } qw(domainid classid mindevcount);

         debug("Checking replication for dmid=$dmid, classid=$classid, min=$min") if $Mgd::DEBUG >= 2;

@@ -547,11 +547,7 @@
     return $retunlock->("nofid") unless $fid->exists;

     my $cls = $fid->class;
-    my $policy_class = $cls->policy_class;
-    eval "use $policy_class; 1;";
-    if ($@) {
-        return error("Failed to load policy class: $policy_class: $@");
-    }
+    my $polobj = $cls->repl_policy_obj;

     # learn what this devices file is already on
     my @on_devs;         # all devices fid is on, reachable or not.
@@ -585,13 +581,13 @@

     my $rr;  # MogileFS::ReplicationRequest
     while (1) {
-        $rr = rr_upgrade($policy_class->replicate_to(
-                                                     fid       => $fidid,
-                                                     on_devs   => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
-                                                     all_devs  => $devs,
-                                                     failed    => \%dest_failed,
-                                                     min       => $cls->mindevcount,
-                                                     ));
+        $rr = rr_upgrade($polobj->replicate_to(
+                                               fid       => $fidid,
+                                               on_devs   => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
+                                               all_devs  => $devs,
+                                               failed    => \%dest_failed,
+                                               min       => $cls->mindevcount,
+                                               ));

         last if $rr->is_happy;


Modified: trunk/server/t/multiple-hosts-replpol.t
===================================================================
--- trunk/server/t/multiple-hosts-replpol.t	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/t/multiple-hosts-replpol.t	2007-08-16 01:37:41 UTC (rev 1114)
@@ -108,7 +108,8 @@
     }
     $parse_error->() if $state =~ /\S/;

-    my $pol = "MogileFS::ReplicationPolicy::MultipleHosts";
+    my $polclass = "MogileFS::ReplicationPolicy::MultipleHosts";
+    my $pol = $polclass->new;
     my $rr = $pol->replicate_to(
                                 fid      => 1,
                                 on_devs  => $on_devs,

Modified: trunk/server/t/replpolicy-parsing.t
===================================================================
--- trunk/server/t/replpolicy-parsing.t	2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/t/replpolicy-parsing.t	2007-08-16 01:37:41 UTC (rev 1114)
@@ -10,7 +10,7 @@
 use MogileFS::Util qw(error_code);
 require "$Bin/lib/mogtestlib.pl";

-plan tests => 30;
+plan tests => 31;

 my $obj;

@@ -19,6 +19,10 @@
     or die "can't proceed";
 is($obj->mindevcount, 5, "got correct devcount");

+$obj = MogileFS::ReplicationPolicy->new_from_policy_string("MultipleHosts()");
+isa_ok($obj, "MogileFS::ReplicationPolicy::MultipleHosts", "got a multiple hosts policy")
+    or die "can't proceed";
+
 foreach my $str ("Union(MultipleHosts(5), MultipleHosts(2))",
                  "Union(MultipleHosts(5), MultipleHosts(2), )",
                  "Union( MultipleHosts(5), MultipleHosts(2) )",




More information about the mogilefs mailing list