PostgreSQL store (Re: status update)

Radu Greab rg at yx.ro
Sun Jan 7 23:51:03 UTC 2007


Attached is a possible start of PostgreSQL implementation. It passes all
tests here, with PostgreSQL 8.1.

Things I should mention about this implementation:
- REPLACE INTO is implemented by trying to INSERT first. If the insert
  fails due to unique constraints violation, then an UPDATE is issued.
- should_begin_replicating_fidid() and note_done_replicating() are not
  implemented, probably a table should be used for advisory locking.

I have two questions regarding the setup of the database:
- should mogdbsetup allow users to specify different data and index
  tablespaces or those who need that will do it manually?
- are referential integrity constraints needed? (not used in this first
  version)


Test 52 from t/00-startup.t was failing with the limit 15 seconds. After
I increased the limit it passed.


-------------- next part --------------
=== t/00-startup.t
==================================================================
--- t/00-startup.t	(revision 69369)
+++ t/00-startup.t	(local)
@@ -210,7 +210,7 @@
 ok($tmptrack->mogadm("device", "mark", "hostB", 3, "dead"), "marked device B/3 dead");
 ok($tmptrack->mogadm("device", "mark", "hostB", 4, "dead"), "marked device B/4 dead");
 
-ok(try_for(15, sub {
+ok(try_for(30, sub {
     my %has;
     my $sth = $dbh->prepare("SELECT devid, COUNT(*) FROM file_on GROUP BY devid");
     $sth->execute;
=== t/domains-classes.t
==================================================================
--- t/domains-classes.t	(revision 69369)
+++ t/domains-classes.t	(local)
@@ -6,7 +6,6 @@
 use FindBin qw($Bin);
 
 use MogileFS::Server;
-use MogileFS::Store::MySQL;
 use MogileFS::Util qw(error_code);
 require "$Bin/lib/mogtestlib.pl";
 
=== lib/MogileFS/Store/Postgres.pm
==================================================================
--- lib/MogileFS/Store/Postgres.pm	(revision 69369)
+++ lib/MogileFS/Store/Postgres.pm	(local)
@@ -0,0 +1,509 @@
+package MogileFS::Store::Postgres;
+use strict;
+use warnings;
+use DBI;
+use DBD::Pg;
+use MogileFS::Util qw(throw);
+use base 'MogileFS::Store';
+
+# --------------------------------------------------------------------------
+# Package methods we override
+# --------------------------------------------------------------------------
+
+sub dsn_of_dbhost {
+    my ($class, $dbname, $host) = @_;
+    return "DBI:Pg:dbname=$dbname;host=$host";
+}
+
+sub dsn_of_root {
+    my ($class, $dbname, $hosot) = @_;
+    return "DBI:Pg:dbname=postgres";
+}
+
+# --------------------------------------------------------------------------
+# Store-related things we override
+# --------------------------------------------------------------------------
+
+sub was_duplicate_error {
+    my $self = shift;
+    my $dbh = $self->dbh;
+    return 0 unless $dbh->err;
+    return 1 if $dbh->state == '23505' || $dbh->errstr =~ /duplicate/i;
+}
+
+sub grant_privileges {
+    my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
+    eval {
+        $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
+                  undef, $pass);
+    };
+    die "Failed to create user '$user': ". $rdbh->errstr . "\n"
+        if $rdbh->err && $rdbh->state != '42710';
+    $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
+        or die "Failed to grant privileges " . $rdbh->errstr . "\n";
+}
+
+sub table_exists {
+    my ($self, $table) = @_;
+    return eval {
+        my $sth = $self->dbh->table_info(undef, undef, $table, "table");
+        my $rec = $sth->fetchrow_hashref;
+        return $rec ? 1 : 0;
+    };
+}
+
+sub setup_database {
+    my $self = shift;
+
+    return unless $self->SUPER::setup_database;
+
+    $self->dowell("
+CREATE OR REPLACE FUNCTION UNIX_TIMESTAMP() RETURNS integer 
+LANGUAGE 'SQL' VOLATILE AS '
+SELECT
+ROUND(EXTRACT( EPOCH FROM abstime(now()) ))::int4 AS result;'
+");
+
+    return 1;
+}
+
+sub TABLE_file {
+    "CREATE TABLE file (
+   fid          INT NOT NULL,
+   PRIMARY KEY  (fid),
+
+   dmid          SMALLINT NOT NULL,
+   dkey           VARCHAR(255),     -- domain-defined
+   UNIQUE        (dmid, dkey),
+
+   length        INT,               -- 2GB limit
+   CHECK         (length >= 0),
+
+   classid       SMALLINT NOT NULL,
+   devcount      SMALLINT NOT NULL
+)"
+}
+
+sub INDEXES_file {
+    "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
+}
+
+sub INDEXES_unreachable_fids {
+    "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
+}
+
+sub INDEXES_file_on {
+    "CREATE INDEX file_on_devid ON file_on (devid)"
+}
+
+sub TABLE_host {
+    "CREATE TABLE host (
+   hostid     SMALLINT NOT NULL,
+   PRIMARY KEY (hostid),
+   CHECK       (hostid >= 0),
+
+   status     VARCHAR(8),
+   CHECK      (status IN ('alive','dead','down')),
+   http_port  INT DEFAULT 7500,
+   CHECK      (http_port >= 0),
+   http_get_port INT,
+   CHECK      (http_get_port >= 0),
+
+   hostname   VARCHAR(40),
+   hostip     VARCHAR(15),
+   altip      VARCHAR(15),
+   altmask    VARCHAR(18),
+   UNIQUE     (hostname),
+   UNIQUE     (hostip),
+   UNIQUE     (altip)
+)"
+}
+
+sub TABLE_device {
+    "CREATE TABLE device (
+   devid   SMALLINT NOT NULL,
+   PRIMARY KEY (devid),
+   CHECK   (devid >= 0),
+
+   hostid     SMALLINT NOT NULL,
+
+   status     VARCHAR(8),
+   CHECK      (status IN ('alive','dead','down','readonly')),
+   weight     INT DEFAULT 100,
+
+   mb_total   INT,
+   CHECK      (mb_total >= 0),
+   mb_used    INT,
+   CHECK      (mb_used >= 0),
+   mb_asof    INT
+   CHECK      (mb_asof >= 0)
+)"
+}
+
+sub INDEXES_device {
+    "CREATE INDEX device_status ON device (status)"
+}
+
+sub INDEXES_file_to_replicate {
+    "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
+}
+
+sub INDEXES_file_to_delete_later {
+    "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
+}
+
+sub filter_create_sql {
+    my ($self, $sql) = @_;
+    $sql =~ s/\bUNSIGNED\b//g;
+    $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
+    $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
+    $sql =~ s/# /-- /g;
+
+    my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
+    die "didn't find table" unless $table;
+    print STDERR "test INDEXES_$table\n";
+    if ($self->can("INDEXES_$table")) {
+        print STDERR "yes\n";
+        $sql =~ s!,\s+INDEX\s+\(.+?\)!!mg;
+    }
+
+    return $sql;
+}
+
+# --------------------------------------------------------------------------
+# Functions specific to Store::Postgres subclass.  Not in parent.
+# --------------------------------------------------------------------------
+
+sub insert_or_update {
+    my $self = shift;
+    my %arg  = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
+    my $dbh = $self->dbh;
+
+    $dbh->begin_work;
+
+    eval {
+        $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
+    };
+    if ($@ || $dbh->err) {
+        if ($self->was_duplicate_error) {
+            $dbh->do($arg{update}, undef, @{ $arg{update_vals} });
+        }
+        $self->condthrow;
+    }
+
+    $dbh->commit;
+    return 1;
+}
+
+# --------------------------------------------------------------------------
+# Test suite things we override
+# --------------------------------------------------------------------------
+
+sub new_temp {
+    my $dbname = "tmp_mogiletest";
+    _drop_db($dbname);
+
+    system("$FindBin::Bin/../mogdbsetup", "--yes", "--dbname=$dbname", "--type=Postgres", "--dbrootuser=postgres")
+        and die "Failed to run mogdbsetup ($FindBin::Bin/../mogdbsetup).";
+
+    return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname",
+                                                   "mogile",
+                                                   "");
+}
+
+my $rootdbh;
+sub _root_dbh {
+    return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres", "postgres", "", { RaiseError => 1 })
+        or die "Couldn't connect to local PostgreSQL database as postgres";
+}
+
+sub _drop_db {
+    my $dbname = shift;
+    my $root_dbh = _root_dbh();
+    eval {
+        $root_dbh->do("DROP DATABASE $dbname");
+    };
+}
+
+
+# --------------------------------------------------------------------------
+# Data-access things we override
+# --------------------------------------------------------------------------
+
+# throw 'dup' on duplicate name
+sub create_class {
+    my ($self, $dmid, $classname) = @_;
+    my $dbh = $self->dbh;
+
+    # get the max class id in this domain
+    my $maxid = $dbh->selectrow_array
+        ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
+
+    # now insert the new class
+    my $rv = eval {
+        $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
+                 undef, $dmid, $maxid + 1, $classname, 2);
+    };
+    if ($@ || $dbh->err) {
+        # first is error code for duplicates
+        if ($self->was_duplicate_error) {
+            throw("dup");
+        }
+    }
+    return $maxid + 1 if $rv;
+    $self->condthrow;
+    die;
+}
+
+sub register_tempfile {
+    my $self = shift;
+    my %arg  = $self->_valid_params([qw(fid dmid key classid devids)], @_);
+
+    my $dbh = $self->dbh;
+    my $fid = $arg{fid};
+
+    my $explicit_fid_used = $fid ? 1 : 0;
+
+    # setup the new mapping.  we store the devices that we picked for
+    # this file in here, knowing that they might not be used.  create_close
+    # is responsible for actually mapping in file_on.  NOTE: fid is being
+    # passed in, it's either some number they gave us, or it's going to be
+    # undef; we build the query and bind list accordingly
+    my $ins_tempfile = sub {
+        my @params = $explicit_fid_used ? ($fid) : ();
+        push @params, @arg{ qw(dmid key classid devids) };
+        $dbh->do("INSERT INTO tempfile (" .
+                 ($explicit_fid_used ? "fid, " : "") . "dmid, dkey, classid, createtime, devids) " .
+                 "VALUES (" .
+                 ($explicit_fid_used ? "?, " : "") . "?, ?, ?, UNIX_TIMESTAMP(), ?)",
+                 undef, @params);
+
+        return undef if $dbh->err;
+
+        unless (defined $fid) {
+            # if they did not give us a fid, then we want to grab the one that was
+            # theoretically automatically generated
+            $fid = $dbh->last_insert_id(undef, undef, "tempfile", "fid");  # DBD::Pg specific
+        }
+        return undef unless defined $fid && $fid > 0;
+        return 1;
+    };
+
+    unless ($ins_tempfile->()) {
+        return -1 if $explicit_fid_used;
+        return undef;
+    }
+
+    return $fid;
+}
+
+# returns 1 on success, 0 on duplicate key error, dies on exception
+# TODO: need a test to hit the duplicate name error condition
+sub rename_file {
+    my ($self, $fidid, $to_key) = @_;
+    my $dbh = $self->dbh;
+    eval {
+        $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
+                 undef, $to_key, $fidid);
+    };
+    if ($@ || $dbh->err) {
+        # first is error code for duplicates
+        if ($self->was_duplicate_error) {
+            return 0;
+        } else {
+            die $@;
+        }
+    }
+    $self->condthrow;
+    return 1;
+}
+
+# add a record of fidid existing on devid
+# returns 1 on success, 0 on duplicate
+sub add_fidid_to_devid {
+    my ($self, $fidid, $devid) = @_;
+    my $dbh = $self->dbh;
+    eval {
+        $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
+    };
+
+    return 1 if !$@ && !$dbh->err;
+    return 0;
+}
+
+# update the device count for a given fidid
+sub update_devcount_atomic {
+    my ($self, $fidid) = @_;
+    my $dbh = $self->dbh;
+
+    $dbh->begin_work;
+
+    my $sth = $dbh->prepare("SELECT devcount FROM file WHERE fid=? FOR UPDATE");
+    eval {
+        local $SIG{ALRM} = sub { die "alarm" };
+        alarm 10;
+        $sth->execute($fidid);
+        alarm 0;
+    };
+    if ($@ && $@ eq "alarm") {
+        $dbh->rollback;
+        return 0;
+    }
+
+    $self->update_devcount($fidid);
+
+    $dbh->commit;
+
+    return 1;
+}
+
+# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
+sub enqueue_for_replication {
+    my ($self, $fidid, $from_devid, $in) = @_;
+    my $dbh = $self->dbh;
+
+    my $nexttry = 0;
+    if ($in) {
+        $nexttry = "UNIX_TIMESTAMP() + ${in}::int";
+    }
+
+    eval {
+        $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
+                 undef, $fidid, $from_devid);
+    };
+}
+
+# reschedule all deferred replication, return number rescheduled
+sub replicate_now {
+    my ($self) = @_;
+    return $self->dbh->do("UPDATE file_to_replicate SET nexttry = UNIX_TIMESTAMP() WHERE nexttry > UNIX_TIMESTAMP()");
+}
+
+sub reschedule_file_to_replicate_relative {
+    my ($self, $fid, $in_n_secs) = @_;
+    $self->dbh->do("UPDATE file_to_replicate SET nexttry = UNIX_TIMESTAMP() + ?, failcount = failcount + 1 WHERE fid = ?",
+                   undef, $in_n_secs, $fid);
+}
+
+# creates a new domain, given a domain namespace string.  return the dmid on success,
+# throw 'dup' on duplicate name.
+sub create_domain {
+    my ($self, $name) = @_;
+    my $dbh = $self->dbh;
+
+    # get the max domain id
+    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
+    my $rv = eval {
+        $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
+                 undef, $maxid + 1, $name);
+    };
+    if ($self->was_duplicate_error) {
+        throw("dup");
+    }
+    return $maxid+1 if $rv;
+    die "failed to make domain";  # FIXME: the above is racy.
+}
+
+sub set_server_setting {
+    my ($self, $key, $val) = @_;
+    my $dbh = $self->dbh;
+
+    if (defined $val) {
+        $self->insert_or_update(
+            insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
+            insert_vals => [ $key, $val ],
+            update => "UPDATE server_settings SET value = ? WHERE field = ?",
+            update_vals => [ $val, $key ],
+        );
+    } else {
+        $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
+    }
+
+    die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
+    return 1;
+}
+
+# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
+sub create_device {
+    my ($self, $devid, $hostid, $status) = @_;
+    my $rv = $self->conddup(sub {
+        $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
+                       $devid, $hostid, $status);
+    });
+    $self->condthrow;
+    die "error making device $devid\n" unless $rv > 0;
+    return 1;
+}
+
+sub mark_fidid_unreachable {
+    my ($self, $fidid) = @_;
+    my $dbh = $self->dbh;
+
+    eval {
+        $self->insert_or_update(
+            insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, UNIX_TIMESTAMP())",
+            insert_vals => [ $fidid ],
+            update => "UPDATE unreachable_fids SET lastupdate = UNIX_TIMESTAMP() WHERE field = ?",
+            update_vals => [ $fidid ],
+        );
+    };
+}
+
+sub delete_fidid {
+    my ($self, $fidid) = @_;
+    $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
+    $self->condthrow;
+    $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
+    $self->condthrow;
+    $self->insert_or_update(
+        insert => "INSERT INTO file_to_delete (fid) VALUES (?)",
+        insert_vals => [ $fidid ],
+        update => "UPDATE file_to_delete SET fid=?",
+        update_vals => [ $fidid ],
+    );
+    $self->condthrow;
+}
+
+sub replace_into_file {
+    my $self = shift;
+    my %arg  = $self->_valid_params([qw(fidid dmid key length classid)], @_);
+    $self->insert_or_update(
+        insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
+        insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
+        update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
+        update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
+    );
+    $self->condthrow;
+    print "# done replace\n";
+}
+
+# given an array of MogileFS::DevFID objects, mass-insert them all
+# into file_on (ignoring if they're already present)
+sub mass_insert_file_on {
+    my ($self, @devfids) = @_;
+    my @qmarks = map { "(?,?)" } @devfids;
+    my @binds  = map { $_->fidid, $_->devid } @devfids;
+
+    my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
+    foreach (@devfids) {
+        eval {
+            $sth->execute($_->fidid, $_->devid);
+        };
+        $self->condthrow unless $self->was_duplicate_error;
+    }
+    return 1;
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
+
+=head1 SEE ALSO
+
+L<MogileFS::Store>
+
+


More information about the mogilefs mailing list