[mogilefs] bradfitz,
r1114: more work on custom replication policies... (fwd)
Brad Fitzpatrick
brad at danga.com
Thu Aug 16 01:51:22 UTC 2007
Mogilers,
This might be of interest to some of you....
It's now quite easy to drop in your own custom replication policies.
- Brad
---------- Forwarded message ----------
Date: Thu, 16 Aug 2007 01:37:43 +0000 (UTC)
From: commits at code.sixapart.com
To: cvs-commits at livejournal.com
Subject: [mogilefs] bradfitz,
r1114: more work on custom replication policies...
more work on custom replication policies.
with exception of the lack of mogadm command, should all work now.
added more docs & tests, too.
U trunk/server/CHANGES
U trunk/server/MANIFEST
U trunk/server/lib/MogileFS/Class.pm
U trunk/server/lib/MogileFS/FID.pm
U trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
U trunk/server/lib/MogileFS/Store.pm
U trunk/server/lib/MogileFS/Worker/Replicate.pm
U trunk/server/t/multiple-hosts-replpol.t
U trunk/server/t/replpolicy-parsing.t
Modified: trunk/server/CHANGES
===================================================================
--- trunk/server/CHANGES 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/CHANGES 2007-08-16 01:37:41 UTC (rev 1114)
@@ -16,10 +16,14 @@
* In the test suite, also search for mogadm in /usr/local/bin/ and
/usr/local/sbin (spotted by Dean Wilson)
- * start of per-class pluggable replication policies. it was always
+ * SCHEMA VERSION 10: 'replpolicy' column on 'class' table.
+ it's safe to --ignore-schema-version and run this mogilefsd
+ against an older schema. new column enables per-class
+ pluggable replication policies. it was always
abstract, but not easy to plugin your own alternatives.
+ see doc/pluggable-replication-policies.txt
- * add start of a ReplicationPolicy::Union implementation
+ * add start of a MogileFS::ReplicationPolicy::Union implementation
* fix crash in queryworker's create_open command, when a device
has its directory made on a mogile storage node (the host of
Modified: trunk/server/MANIFEST
===================================================================
--- trunk/server/MANIFEST 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/MANIFEST 2007-08-16 01:37:41 UTC (rev 1114)
@@ -1,6 +1,7 @@
CHANGES
doc/fsck-notes.txt
doc/memcache-support.txt
+doc/pluggable-replication-policies.txt
doc/testing.txt
lib/mogdeps/Danga/Socket.pm
lib/mogdeps/Gearman/Client.pm
Modified: trunk/server/lib/MogileFS/Class.pm
===================================================================
--- trunk/server/lib/MogileFS/Class.pm 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/Class.pm 2007-08-16 01:37:41 UTC (rev 1114)
@@ -167,16 +167,14 @@
sub classid { $_[0]{classid} }
sub mindevcount { $_[0]{mindevcount} }
-# TODO: die, remove this
-sub policy_class { "MogileFS::ReplicationPolicy::MultipleHosts" }
-
-# TODO: currently unused, just by testing code. remove callers of policy_class above.
sub repl_policy_string {
my $self = shift;
- return "MultipleHosts(" . ($self->{mindevcount} || MogileFS->config('default_mindevcount')) . ")";
+ # if they've actually configured one, it gets used:
+ return $self->{replpolicy} if $self->{replpolicy};
+ # else, the historical default:
+ return "MultipleHosts()";
}
-# TODO: currently unused, just by testing code. remove callers of policy_class above.
sub repl_policy_obj {
my $self = shift;
return $self->{_repl_policy_obj} if $self->{_repl_policy_obj};
Modified: trunk/server/lib/MogileFS/FID.pm
===================================================================
--- trunk/server/lib/MogileFS/FID.pm 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/FID.pm 2007-08-16 01:37:41 UTC (rev 1114)
@@ -186,15 +186,11 @@
my $self = shift;
my $cls = $self->class;
- my $policy_class = $cls->policy_class
- or die "No policy class";
+ my $polobj = $cls->repl_policy_obj;
my $alldev = MogileFS::Device->map
or die "No global device map";
- eval "use $policy_class; 1;";
- die "Failed to load policy class: $policy_class: $@" if $@;
-
my %rep_args = (
fid => $self->id,
on_devs => [$self->devs],
@@ -202,7 +198,7 @@
failed => {},
min => $cls->mindevcount,
);
- my $rr = rr_upgrade($policy_class->replicate_to(%rep_args));
+ my $rr = rr_upgrade($polobj->replicate_to(%rep_args));
return $rr->is_happy && ! $rr->too_happy;
}
Modified: trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
===================================================================
--- trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm 2007-08-16 01:37:41 UTC (rev 1114)
@@ -4,13 +4,20 @@
use MogileFS::Util qw(weighted_list);
use MogileFS::ReplicationRequest qw(ALL_GOOD TOO_GOOD TEMP_NO_ANSWER);
+sub new {
+ my ($class, $mindevcount) = @_;
+ return bless {
+ mindevcount => $mindevcount,
+ }, $class;
+}
+
sub new_from_policy_args {
my ($class, $argref) = @_;
- $$argref =~ s/^\s* \( \s* (\d+) \s* \) \s*//x
+ # Note: "MultipleHosts()" is okay, in which case the 'mindevcount'
+ # on the class is used. (see below)
+ $$argref =~ s/^\s* \( \s* (\d*) \s* \) \s*//x
or die "$class failed to parse args: $$argref";
- return bless {
- mindevcount => $1,
- }, $class;
+ return $class->new($1)
}
sub mindevcount { $_[0]{mindevcount} }
@@ -23,15 +30,12 @@
my $all_devs = delete $args{all_devs}; # hashref of { devid => MogileFS::Device }
my $failed = delete $args{failed}; # hashref of { devid => 1 } of failed attempts this round
- # FIXME: this code works with either old way or new way. once old caller code is cleaned up,
- # only $self-as-object needs to be supported...
- my $min; # configured min devcount for this class
- # new way
- if (ref $self) {
- $min = $self->{mindevcount};
- } else {
- $min = delete $args{min};
- }
+ # this is the per-class mindevcount (the old way), which is passed in automatically
+ # from the replication worker. but if we have our own configured mindevcount
+ # in class.replpolicy, like "MultipleHosts(3)", then we use the explciit one. otherwise,
+ # if blank, or zero, like "MultipleHosts()", then we use the builtin on
+ my $min = delete $args{min};
+ $min = $self->{mindevcount} || $min;
warn "Unknown parameters: " . join(", ", sort keys %args) if %args;
die "Missing parameters" unless $on_devs && $all_devs && $failed && $fid;
Modified: trunk/server/lib/MogileFS/Store.pm
===================================================================
--- trunk/server/lib/MogileFS/Store.pm 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/Store.pm 2007-08-16 01:37:41 UTC (rev 1114)
@@ -11,7 +11,8 @@
#
# 8: adds fsck_log table
# 9: adds 'drain' state to enum in device table
-use constant SCHEMA_VERSION => 9;
+# 10: adds 'replpolicy' column to 'class' table
+use constant SCHEMA_VERSION => 10;
sub new {
my ($class) = @_;
@@ -384,10 +385,17 @@
$sto->upgrade_add_device_weight;
$sto->upgrade_add_device_readonly;
$sto->upgrade_add_device_drain;
+ $sto->upgrade_add_class_replpolicy;
return 1;
}
+sub cached_schema_version {
+ my $self = shift;
+ return $self->{_cached_schema_version} ||=
+ $self->schema_version;
+}
+
sub schema_version {
my $self = shift;
my $dbh = $self->dbh;
@@ -632,6 +640,13 @@
sub upgrade_add_device_readonly { 1 }
sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
+sub upgrade_add_class_replpolicy {
+ my ($self) = @_;
+ unless ($self->column_type("class", "replpolicy")) {
+ $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
+ }
+}
+
# return true if deleted, 0 if didn't exist, exception if error
sub delete_host {
my ($self, $hostid) = @_;
@@ -1009,7 +1024,13 @@
sub get_all_classes {
my ($self) = @_;
my (@ret, $row);
- my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount FROM class");
+
+ my $repl_col = "";
+ if ($self->cached_schema_version >= 10) {
+ $repl_col = ", replpolicy";
+ }
+
+ my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
$sth->execute;
push @ret, $row while $row = $sth->fetchrow_hashref;
return @ret;
Modified: trunk/server/lib/MogileFS/Worker/Replicate.pm
===================================================================
--- trunk/server/lib/MogileFS/Worker/Replicate.pm 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/lib/MogileFS/Worker/Replicate.pm 2007-08-16 01:37:41 UTC (rev 1114)
@@ -256,7 +256,7 @@
my $did_something = 0;
MogileFS::Class->foreach(sub {
my $mclass = shift;
- my ($dmid, $classid, $min, $policy_class) = map { $mclass->$_ } qw(domainid classid mindevcount policy_class);
+ my ($dmid, $classid, $min) = map { $mclass->$_ } qw(domainid classid mindevcount);
debug("Checking replication for dmid=$dmid, classid=$classid, min=$min") if $Mgd::DEBUG >= 2;
@@ -547,11 +547,7 @@
return $retunlock->("nofid") unless $fid->exists;
my $cls = $fid->class;
- my $policy_class = $cls->policy_class;
- eval "use $policy_class; 1;";
- if ($@) {
- return error("Failed to load policy class: $policy_class: $@");
- }
+ my $polobj = $cls->repl_policy_obj;
# learn what this devices file is already on
my @on_devs; # all devices fid is on, reachable or not.
@@ -585,13 +581,13 @@
my $rr; # MogileFS::ReplicationRequest
while (1) {
- $rr = rr_upgrade($policy_class->replicate_to(
- fid => $fidid,
- on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
- all_devs => $devs,
- failed => \%dest_failed,
- min => $cls->mindevcount,
- ));
+ $rr = rr_upgrade($polobj->replicate_to(
+ fid => $fidid,
+ on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
+ all_devs => $devs,
+ failed => \%dest_failed,
+ min => $cls->mindevcount,
+ ));
last if $rr->is_happy;
Modified: trunk/server/t/multiple-hosts-replpol.t
===================================================================
--- trunk/server/t/multiple-hosts-replpol.t 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/t/multiple-hosts-replpol.t 2007-08-16 01:37:41 UTC (rev 1114)
@@ -108,7 +108,8 @@
}
$parse_error->() if $state =~ /\S/;
- my $pol = "MogileFS::ReplicationPolicy::MultipleHosts";
+ my $polclass = "MogileFS::ReplicationPolicy::MultipleHosts";
+ my $pol = $polclass->new;
my $rr = $pol->replicate_to(
fid => 1,
on_devs => $on_devs,
Modified: trunk/server/t/replpolicy-parsing.t
===================================================================
--- trunk/server/t/replpolicy-parsing.t 2007-08-13 17:07:36 UTC (rev 1113)
+++ trunk/server/t/replpolicy-parsing.t 2007-08-16 01:37:41 UTC (rev 1114)
@@ -10,7 +10,7 @@
use MogileFS::Util qw(error_code);
require "$Bin/lib/mogtestlib.pl";
-plan tests => 30;
+plan tests => 31;
my $obj;
@@ -19,6 +19,10 @@
or die "can't proceed";
is($obj->mindevcount, 5, "got correct devcount");
+$obj = MogileFS::ReplicationPolicy->new_from_policy_string("MultipleHosts()");
+isa_ok($obj, "MogileFS::ReplicationPolicy::MultipleHosts", "got a multiple hosts policy")
+ or die "can't proceed";
+
foreach my $str ("Union(MultipleHosts(5), MultipleHosts(2))",
"Union(MultipleHosts(5), MultipleHosts(2), )",
"Union( MultipleHosts(5), MultipleHosts(2) )",
More information about the mogilefs
mailing list