[PATCH mogilefs 7/7] Port of Radu Greab <rg@yx.ro>'s MogileFS::Store::Postgres backing storage to

Robin H. Johnson robbat2 at gentoo.org
Tue Jun 5 13:38:15 UTC 2007


From: Robin H. Johnson <robbat2 at gentoo.org>

PG-specific additions:
- create_db_if_not_exists
- unix_timestamp
- column_type
- column_constraint
- update_*.

PG-specific removals:
- Custom register_tempfile (depends on cleanups to the main register_tempfile,
  see earlier patch in the series).
- UNIX_TIMESTAMP() PL/SQL function.

Test changes:
- 00-startup.t timelimit increased to 15 seconds.
- 'use MogileFS::Store::MySQL' removed from t/domains-classes.t

Notes per Radu's original version:
- should_begin_replicating_fidid() and note_done_replicating() are still not
  yet implemented.
- REPLACE INTO is implemented by trying to INSERT first. If the insert fails
  due to unique constraints violation, then an UPDATE is issued.

Questions per Radu's original version:
- There are no FORIEGN keys yet. Should we implement them?
- Should we provide an option to mogdbsetup for different tablespaces for index
  vs. data etc?

Signed-off-by: Radu Greab <rg at yx.ro>
Signed-off-by: Robin H. Johnson <robbat2 at gentoo.org>
---
 server/lib/MogileFS/Store/Postgres.pm |  550 +++++++++++++++++++++++++++++++++
 server/t/00-startup.t                 |    2 +-
 server/t/domains-classes.t            |    1 -
 3 files changed, 551 insertions(+), 2 deletions(-)
 create mode 100644 server/lib/MogileFS/Store/Postgres.pm

diff --git a/server/lib/MogileFS/Store/Postgres.pm b/server/lib/MogileFS/Store/Postgres.pm
new file mode 100644
index 0000000..151bb27
--- /dev/null
+++ b/server/lib/MogileFS/Store/Postgres.pm
@@ -0,0 +1,550 @@
+package MogileFS::Store::Postgres;
+use strict;
+use warnings;
+use DBI;
+use DBD::Pg;
+use MogileFS::Util qw(throw);
+use base 'MogileFS::Store';
+
+# --------------------------------------------------------------------------
+# Package methods we override
+# --------------------------------------------------------------------------
+
+sub dsn_of_dbhost {
+    my ($class, $dbname, $host) = @_;
+    return "DBI:Pg:dbname=$dbname;host=$host";
+}
+
+sub dsn_of_root {
+    my ($class, $dbname, $host) = @_;
+    return "DBI:Pg:dbname=postgres";
+}
+
+# --------------------------------------------------------------------------
+# Store-related things we override
+# --------------------------------------------------------------------------
+
+sub was_duplicate_error {
+    my $self = shift;
+    my $dbh = $self->dbh;
+    return 0 unless $dbh->err;
+    return 1 if $dbh->state == '23505' || $dbh->errstr =~ /duplicate/i;
+}
+
+# given a root DBI connection, create the named database.  succeed
+# if it it's made, or already exists.  die otherwise.
+sub create_db_if_not_exists {
+    my ($pkg, $rdbh, $dbname) = @_;
+    if(not $rdbh->do("CREATE DATABASE $dbname")) {
+        die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
+    }
+}
+
+sub grant_privileges {
+    my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
+    eval {
+        $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
+                  undef, $pass);
+    };
+    die "Failed to create user '$user': ". $rdbh->errstr . "\n"
+        if $rdbh->err && $rdbh->state != '42710';
+    $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
+        or die "Failed to grant privileges " . $rdbh->errstr . "\n";
+}
+
+sub table_exists {
+    my ($self, $table) = @_;
+    return eval {
+        my $sth = $self->dbh->table_info(undef, undef, $table, "table");
+        my $rec = $sth->fetchrow_hashref;
+        return $rec ? 1 : 0;
+    };
+}
+
+sub setup_database {
+    my $self = shift;
+
+    return unless $self->SUPER::setup_database;
+
+    return 1;
+}
+
+sub TABLE_file {
+    "CREATE TABLE file (
+   fid          INT NOT NULL,
+   PRIMARY KEY  (fid),
+
+   dmid          SMALLINT NOT NULL,
+   dkey           VARCHAR(255),     -- domain-defined
+   UNIQUE        (dmid, dkey),
+
+   length        INT,               -- 2GB limit
+   CHECK         (length >= 0),
+
+   classid       SMALLINT NOT NULL,
+   devcount      SMALLINT NOT NULL
+)"
+}
+
+sub INDEXES_file {
+    "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
+}
+
+sub INDEXES_unreachable_fids {
+    "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
+}
+
+sub INDEXES_file_on {
+    "CREATE INDEX file_on_devid ON file_on (devid)"
+}
+
+sub TABLE_host {
+    "CREATE TABLE host (
+   hostid     SMALLINT NOT NULL,
+   PRIMARY KEY (hostid),
+   CHECK       (hostid >= 0),
+
+   status     VARCHAR(8),
+   CHECK      (status IN ('alive','dead','down')),
+   http_port  INT DEFAULT 7500,
+   CHECK      (http_port >= 0),
+   http_get_port INT,
+   CHECK      (http_get_port >= 0),
+
+   hostname   VARCHAR(40),
+   hostip     VARCHAR(15),
+   altip      VARCHAR(15),
+   altmask    VARCHAR(18),
+   UNIQUE     (hostname),
+   UNIQUE     (hostip),
+   UNIQUE     (altip)
+)"
+}
+
+sub TABLE_device {
+    "CREATE TABLE device (
+   devid   SMALLINT NOT NULL,
+   PRIMARY KEY (devid),
+   CHECK   (devid >= 0),
+
+   hostid     SMALLINT NOT NULL,
+
+   status     VARCHAR(8),
+   CHECK      (status IN ('alive','dead','down','readonly','drain')),
+   weight     INT DEFAULT 100,
+
+   mb_total   INT,
+   CHECK      (mb_total >= 0),
+   mb_used    INT,
+   CHECK      (mb_used >= 0),
+   mb_asof    INT
+   CHECK      (mb_asof >= 0)
+)"
+}
+
+sub INDEXES_device {
+    "CREATE INDEX device_status ON device (status)"
+}
+
+sub INDEXES_file_to_replicate {
+    "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
+}
+
+sub INDEXES_file_to_delete_later {
+    "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
+}
+
+sub INDEXES_fsck_log {
+    "CREATE INDEX fsck_log_utime ON fsck_log (utime)"
+}
+
+sub can_replace { 0 }
+sub can_insertignore { 0 }
+sub can_insert_multi { 1 }
+sub unix_timestamp { "EXTRACT(epoch FROM NOW())::int4" }
+
+sub filter_create_sql {
+    my ($self, $sql) = @_;
+    $sql =~ s/\bUNSIGNED\b//g;
+    $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
+    $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
+    $sql =~ s/# /-- /g;
+
+    my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
+    die "didn't find table" unless $table;
+    #X#print STDERR "test INDEXES_$table\n";
+    if ($self->can("INDEXES_$table")) {
+        #X#print STDERR "yes\n";
+        $sql =~ s!,\s+INDEX\s*\(.+?\)!!mg;
+    }
+
+    return $sql;
+}
+
+# --------------------------------------------------------------------------
+# Functions specific to Store::Postgres subclass.  Not in parent.
+# --------------------------------------------------------------------------
+
+sub insert_or_update {
+    my $self = shift;
+    my %arg  = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
+    my $dbh = $self->dbh;
+
+    $dbh->begin_work;
+
+    eval {
+        $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
+    };
+    if ($@ || $dbh->err) {
+        if ($self->was_duplicate_error) {
+            $dbh->do($arg{update}, undef, @{ $arg{update_vals} });
+        }
+        $self->condthrow;
+    }
+
+    $dbh->commit;
+    return 1;
+}
+
+sub column_type {
+    my ($self, $table, $col) = @_;
+    my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
+    $sth->execute($table,$col);
+    while (my $rec = $sth->fetchrow_hashref) {
+        if ($rec->{column_name} eq $col) {
+            $sth->finish;
+            return $rec->{data_type};
+        }
+    }
+    return undef;
+}
+
+sub column_constraint {
+    my ($self, $table, $col) = @_;
+    my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
+    $sth->execute($table,$col);
+    while (my $rec = $sth->fetchrow_hashref) {
+        if ($rec->{column_name} eq $col) {
+            $sth->finish;
+            return $rec->{check_clause};
+        }
+    }
+    return undef;
+}
+
+# --------------------------------------------------------------------------
+# Test suite things we override
+# --------------------------------------------------------------------------
+
+sub new_temp {
+    my $dbname = "tmp_mogiletest";
+    _drop_db($dbname);
+
+    system("$FindBin::Bin/../mogdbsetup", "--yes", "--dbname=$dbname", "--type=Postgres", "--dbrootuser=postgres")
+        and die "Failed to run mogdbsetup ($FindBin::Bin/../mogdbsetup).";
+
+    return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname",
+                                                   "mogile",
+                                                   "");
+}
+
+my $rootdbh;
+sub _root_dbh {
+    return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres", "postgres", "", { RaiseError => 1 })
+        or die "Couldn't connect to local PostgreSQL database as postgres";
+}
+
+sub _drop_db {
+    my $dbname = shift;
+    my $root_dbh = _root_dbh();
+    eval {
+        $root_dbh->do("DROP DATABASE $dbname");
+    };
+}
+
+
+# --------------------------------------------------------------------------
+# Data-access things we override
+# --------------------------------------------------------------------------
+
+# throw 'dup' on duplicate name
+sub create_class {
+    my ($self, $dmid, $classname) = @_;
+    my $dbh = $self->dbh;
+
+    # get the max class id in this domain
+    my $maxid = $dbh->selectrow_array
+        ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
+
+    # now insert the new class
+    my $rv = eval {
+        $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
+                 undef, $dmid, $maxid + 1, $classname, 2);
+    };
+    if ($@ || $dbh->err) {
+        # first is error code for duplicates
+        if ($self->was_duplicate_error) {
+            throw("dup");
+        }
+    }
+    return $maxid + 1 if $rv;
+    $self->condthrow;
+    die;
+}
+
+# returns 1 on success, 0 on duplicate key error, dies on exception
+# TODO: need a test to hit the duplicate name error condition
+sub rename_file {
+    my ($self, $fidid, $to_key) = @_;
+    my $dbh = $self->dbh;
+    eval {
+        $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
+                 undef, $to_key, $fidid);
+    };
+    if ($@ || $dbh->err) {
+        # first is error code for duplicates
+        if ($self->was_duplicate_error) {
+            return 0;
+        } else {
+            die $@;
+        }
+    }
+    $self->condthrow;
+    return 1;
+}
+
+# add a record of fidid existing on devid
+# returns 1 on success, 0 on duplicate
+sub add_fidid_to_devid {
+    my ($self, $fidid, $devid) = @_;
+    my $dbh = $self->dbh;
+    eval {
+        $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
+    };
+
+    return 1 if !$@ && !$dbh->err;
+    return 0;
+}
+
+# update the device count for a given fidid
+sub update_devcount_atomic {
+    my ($self, $fidid) = @_;
+    my $dbh = $self->dbh;
+
+    $dbh->begin_work;
+
+    my $sth = $dbh->prepare("SELECT devcount FROM file WHERE fid=? FOR UPDATE");
+    eval {
+        local $SIG{ALRM} = sub { die "alarm" };
+        alarm 10;
+        $sth->execute($fidid);
+        alarm 0;
+    };
+    if ($@ && $@ eq "alarm") {
+        $dbh->rollback;
+        return 0;
+    }
+
+    $self->update_devcount($fidid);
+
+    $dbh->commit;
+
+    return 1;
+}
+
+# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
+sub enqueue_for_replication {
+    my ($self, $fidid, $from_devid, $in) = @_;
+    my $dbh = $self->dbh;
+
+    my $nexttry = 0;
+    if ($in) {
+        $nexttry = $self->unix_timestamp." + ${in}::int";
+    }
+
+    eval {
+        $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
+                 undef, $fidid, $from_devid);
+    };
+}
+
+# reschedule all deferred replication, return number rescheduled
+sub replicate_now {
+    my ($self) = @_;
+    return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
+}
+
+sub reschedule_file_to_replicate_relative {
+    my ($self, $fid, $in_n_secs) = @_;
+    $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
+                   undef, $in_n_secs, $fid);
+}
+
+# creates a new domain, given a domain namespace string.  return the dmid on success,
+# throw 'dup' on duplicate name.
+sub create_domain {
+    my ($self, $name) = @_;
+    my $dbh = $self->dbh;
+
+    # get the max domain id
+    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
+    my $rv = eval {
+        $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
+                 undef, $maxid + 1, $name);
+    };
+    if ($self->was_duplicate_error) {
+        throw("dup");
+    }
+    return $maxid+1 if $rv;
+    die "failed to make domain";  # FIXME: the above is racy.
+}
+
+sub set_server_setting {
+    my ($self, $key, $val) = @_;
+    my $dbh = $self->dbh;
+
+    if (defined $val) {
+        $self->insert_or_update(
+            insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
+            insert_vals => [ $key, $val ],
+            update => "UPDATE server_settings SET value = ? WHERE field = ?",
+            update_vals => [ $val, $key ],
+        );
+    } else {
+        $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
+    }
+
+    die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
+    return 1;
+}
+
+# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
+sub create_device {
+    my ($self, $devid, $hostid, $status) = @_;
+    my $rv = $self->conddup(sub {
+        $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
+                       $devid, $hostid, $status);
+    });
+    $self->condthrow;
+    die "error making device $devid\n" unless $rv > 0;
+    return 1;
+}
+
+sub mark_fidid_unreachable {
+    my ($self, $fidid) = @_;
+    my $dbh = $self->dbh;
+
+    eval {
+        $self->insert_or_update(
+            insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
+            insert_vals => [ $fidid ],
+            update => "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
+            update_vals => [ $fidid ],
+        );
+    };
+}
+
+sub delete_fidid {
+    my ($self, $fidid) = @_;
+    $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
+    $self->condthrow;
+    $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
+    $self->condthrow;
+    $self->insert_or_update(
+        insert => "INSERT INTO file_to_delete (fid) VALUES (?)",
+        insert_vals => [ $fidid ],
+        update => "UPDATE file_to_delete SET fid=?",
+        update_vals => [ $fidid ],
+    );
+    $self->condthrow;
+}
+
+sub replace_into_file {
+    my $self = shift;
+    my %arg  = $self->_valid_params([qw(fidid dmid key length classid)], @_);
+    $self->insert_or_update(
+        insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
+        insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
+        update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
+        update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
+    );
+    $self->condthrow;
+    print "# done replace\n";
+}
+
+# given an array of MogileFS::DevFID objects, mass-insert them all
+# into file_on (ignoring if they're already present)
+sub mass_insert_file_on {
+    my ($self, @devfids) = @_;
+    my @qmarks = map { "(?,?)" } @devfids;
+    my @binds  = map { $_->fidid, $_->devid } @devfids;
+
+    my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
+    foreach (@devfids) {
+        eval {
+            $sth->execute($_->fidid, $_->devid);
+        };
+        $self->condthrow unless $self->was_duplicate_error;
+    }
+    return 1;
+}
+
+sub upgrade_add_host_getport {
+    my $self = shift;
+    # see if they have the get port, else update it
+    unless ($self->column_type("host", "http_get_port")) {
+        $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
+    }
+
+}
+sub upgrade_add_host_altip {
+    my $self = shift;
+    unless ($self->column_type("host", "altip")) {
+        $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
+        $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
+        $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
+    }
+}
+
+sub upgrade_add_device_asof {
+    my $self = shift;
+    unless ($self->column_type("device", "mb_asof")) {
+        $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
+    }
+}
+
+sub upgrade_add_device_weight {
+    my $self = shift;
+    unless ($self->column_type("device", "weight")) {
+        $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
+    }
+
+}
+
+sub upgrade_add_device_readonly {
+    my $self = shift;
+    unless ($self->column_constraint("device", "status") =~ /readonly/) {
+        $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
+    }
+}
+
+sub upgrade_add_device_drain {
+    my $self = shift;
+    unless ($self->column_constraint("device", "status") =~ /drain/) {
+        $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
+    }
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
+
+=head1 SEE ALSO
+
+L<MogileFS::Store>
+
+
diff --git a/server/t/00-startup.t b/server/t/00-startup.t
index 01ea6a8..9eceb02 100644
--- a/server/t/00-startup.t
+++ b/server/t/00-startup.t
@@ -205,7 +205,7 @@ sleep(3);  # FIXME: make an explicit "rescan" or "remonitor" job to mogilefsd, j
 ok($tmptrack->mogadm("device", "mark", "hostB", 3, "dead"), "marked device B/3 dead");
 ok($tmptrack->mogadm("device", "mark", "hostB", 4, "dead"), "marked device B/4 dead");
 
-ok(try_for(15, sub {
+ok(try_for(30, sub {
     my %has;
     my $sth = $dbh->prepare("SELECT devid, COUNT(*) FROM file_on GROUP BY devid");
     $sth->execute;
diff --git a/server/t/domains-classes.t b/server/t/domains-classes.t
index af89d5d..f2fbb55 100644
--- a/server/t/domains-classes.t
+++ b/server/t/domains-classes.t
@@ -6,7 +6,6 @@ use Test::More;
 use FindBin qw($Bin);
 
 use MogileFS::Server;
-use MogileFS::Store::MySQL;
 use MogileFS::Util qw(error_code);
 require "$Bin/lib/mogtestlib.pl";
 
-- 
1.5.2



More information about the mogilefs mailing list