PostgreSQL store (Re: status update)
Radu Greab
rg at yx.ro
Sun Jan 7 23:51:03 UTC 2007
Attached is a possible start of PostgreSQL implementation. It passes all
tests here, with PostgreSQL 8.1.
Things I should mention about this implementation:
- REPLACE INTO is implemented by trying to INSERT first. If the insert
fails due to unique constraints violation, then an UPDATE is issued.
- should_begin_replicating_fidid() and note_done_replicating() are not
implemented, probably a table should be used for advisory locking.
I have two questions regarding the setup of the database:
- should mogdbsetup allow users to specify different data and index
tablespaces or those who need that will do it manually?
- are referential integrity constraints needed? (not used in this first
version)
Test 52 from t/00-startup.t was failing with the limit 15 seconds. After
I increased the limit it passed.
-------------- next part --------------
=== t/00-startup.t
==================================================================
--- t/00-startup.t (revision 69369)
+++ t/00-startup.t (local)
@@ -210,7 +210,7 @@
ok($tmptrack->mogadm("device", "mark", "hostB", 3, "dead"), "marked device B/3 dead");
ok($tmptrack->mogadm("device", "mark", "hostB", 4, "dead"), "marked device B/4 dead");
-ok(try_for(15, sub {
+ok(try_for(30, sub {
my %has;
my $sth = $dbh->prepare("SELECT devid, COUNT(*) FROM file_on GROUP BY devid");
$sth->execute;
=== t/domains-classes.t
==================================================================
--- t/domains-classes.t (revision 69369)
+++ t/domains-classes.t (local)
@@ -6,7 +6,6 @@
use FindBin qw($Bin);
use MogileFS::Server;
-use MogileFS::Store::MySQL;
use MogileFS::Util qw(error_code);
require "$Bin/lib/mogtestlib.pl";
=== lib/MogileFS/Store/Postgres.pm
==================================================================
--- lib/MogileFS/Store/Postgres.pm (revision 69369)
+++ lib/MogileFS/Store/Postgres.pm (local)
@@ -0,0 +1,509 @@
+package MogileFS::Store::Postgres;
+use strict;
+use warnings;
+use DBI;
+use DBD::Pg;
+use MogileFS::Util qw(throw);
+use base 'MogileFS::Store';
+
+# --------------------------------------------------------------------------
+# Package methods we override
+# --------------------------------------------------------------------------
+
+sub dsn_of_dbhost {
+ my ($class, $dbname, $host) = @_;
+ return "DBI:Pg:dbname=$dbname;host=$host";
+}
+
+sub dsn_of_root {
+ my ($class, $dbname, $hosot) = @_;
+ return "DBI:Pg:dbname=postgres";
+}
+
+# --------------------------------------------------------------------------
+# Store-related things we override
+# --------------------------------------------------------------------------
+
+sub was_duplicate_error {
+ my $self = shift;
+ my $dbh = $self->dbh;
+ return 0 unless $dbh->err;
+ return 1 if $dbh->state == '23505' || $dbh->errstr =~ /duplicate/i;
+}
+
+sub grant_privileges {
+ my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
+ eval {
+ $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
+ undef, $pass);
+ };
+ die "Failed to create user '$user': ". $rdbh->errstr . "\n"
+ if $rdbh->err && $rdbh->state != '42710';
+ $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
+ or die "Failed to grant privileges " . $rdbh->errstr . "\n";
+}
+
+sub table_exists {
+ my ($self, $table) = @_;
+ return eval {
+ my $sth = $self->dbh->table_info(undef, undef, $table, "table");
+ my $rec = $sth->fetchrow_hashref;
+ return $rec ? 1 : 0;
+ };
+}
+
+sub setup_database {
+ my $self = shift;
+
+ return unless $self->SUPER::setup_database;
+
+ $self->dowell("
+CREATE OR REPLACE FUNCTION UNIX_TIMESTAMP() RETURNS integer
+LANGUAGE 'SQL' VOLATILE AS '
+SELECT
+ROUND(EXTRACT( EPOCH FROM abstime(now()) ))::int4 AS result;'
+");
+
+ return 1;
+}
+
+sub TABLE_file {
+ "CREATE TABLE file (
+ fid INT NOT NULL,
+ PRIMARY KEY (fid),
+
+ dmid SMALLINT NOT NULL,
+ dkey VARCHAR(255), -- domain-defined
+ UNIQUE (dmid, dkey),
+
+ length INT, -- 2GB limit
+ CHECK (length >= 0),
+
+ classid SMALLINT NOT NULL,
+ devcount SMALLINT NOT NULL
+)"
+}
+
+sub INDEXES_file {
+ "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
+}
+
+sub INDEXES_unreachable_fids {
+ "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
+}
+
+sub INDEXES_file_on {
+ "CREATE INDEX file_on_devid ON file_on (devid)"
+}
+
+sub TABLE_host {
+ "CREATE TABLE host (
+ hostid SMALLINT NOT NULL,
+ PRIMARY KEY (hostid),
+ CHECK (hostid >= 0),
+
+ status VARCHAR(8),
+ CHECK (status IN ('alive','dead','down')),
+ http_port INT DEFAULT 7500,
+ CHECK (http_port >= 0),
+ http_get_port INT,
+ CHECK (http_get_port >= 0),
+
+ hostname VARCHAR(40),
+ hostip VARCHAR(15),
+ altip VARCHAR(15),
+ altmask VARCHAR(18),
+ UNIQUE (hostname),
+ UNIQUE (hostip),
+ UNIQUE (altip)
+)"
+}
+
+sub TABLE_device {
+ "CREATE TABLE device (
+ devid SMALLINT NOT NULL,
+ PRIMARY KEY (devid),
+ CHECK (devid >= 0),
+
+ hostid SMALLINT NOT NULL,
+
+ status VARCHAR(8),
+ CHECK (status IN ('alive','dead','down','readonly')),
+ weight INT DEFAULT 100,
+
+ mb_total INT,
+ CHECK (mb_total >= 0),
+ mb_used INT,
+ CHECK (mb_used >= 0),
+ mb_asof INT
+ CHECK (mb_asof >= 0)
+)"
+}
+
+sub INDEXES_device {
+ "CREATE INDEX device_status ON device (status)"
+}
+
+sub INDEXES_file_to_replicate {
+ "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
+}
+
+sub INDEXES_file_to_delete_later {
+ "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
+}
+
+sub filter_create_sql {
+ my ($self, $sql) = @_;
+ $sql =~ s/\bUNSIGNED\b//g;
+ $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
+ $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
+ $sql =~ s/# /-- /g;
+
+ my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
+ die "didn't find table" unless $table;
+ print STDERR "test INDEXES_$table\n";
+ if ($self->can("INDEXES_$table")) {
+ print STDERR "yes\n";
+ $sql =~ s!,\s+INDEX\s+\(.+?\)!!mg;
+ }
+
+ return $sql;
+}
+
+# --------------------------------------------------------------------------
+# Functions specific to Store::Postgres subclass. Not in parent.
+# --------------------------------------------------------------------------
+
+sub insert_or_update {
+ my $self = shift;
+ my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
+ my $dbh = $self->dbh;
+
+ $dbh->begin_work;
+
+ eval {
+ $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
+ };
+ if ($@ || $dbh->err) {
+ if ($self->was_duplicate_error) {
+ $dbh->do($arg{update}, undef, @{ $arg{update_vals} });
+ }
+ $self->condthrow;
+ }
+
+ $dbh->commit;
+ return 1;
+}
+
+# --------------------------------------------------------------------------
+# Test suite things we override
+# --------------------------------------------------------------------------
+
+sub new_temp {
+ my $dbname = "tmp_mogiletest";
+ _drop_db($dbname);
+
+ system("$FindBin::Bin/../mogdbsetup", "--yes", "--dbname=$dbname", "--type=Postgres", "--dbrootuser=postgres")
+ and die "Failed to run mogdbsetup ($FindBin::Bin/../mogdbsetup).";
+
+ return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname",
+ "mogile",
+ "");
+}
+
+my $rootdbh;
+sub _root_dbh {
+ return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres", "postgres", "", { RaiseError => 1 })
+ or die "Couldn't connect to local PostgreSQL database as postgres";
+}
+
+sub _drop_db {
+ my $dbname = shift;
+ my $root_dbh = _root_dbh();
+ eval {
+ $root_dbh->do("DROP DATABASE $dbname");
+ };
+}
+
+
+# --------------------------------------------------------------------------
+# Data-access things we override
+# --------------------------------------------------------------------------
+
+# throw 'dup' on duplicate name
+sub create_class {
+ my ($self, $dmid, $classname) = @_;
+ my $dbh = $self->dbh;
+
+ # get the max class id in this domain
+ my $maxid = $dbh->selectrow_array
+ ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
+
+ # now insert the new class
+ my $rv = eval {
+ $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
+ undef, $dmid, $maxid + 1, $classname, 2);
+ };
+ if ($@ || $dbh->err) {
+ # first is error code for duplicates
+ if ($self->was_duplicate_error) {
+ throw("dup");
+ }
+ }
+ return $maxid + 1 if $rv;
+ $self->condthrow;
+ die;
+}
+
+sub register_tempfile {
+ my $self = shift;
+ my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
+
+ my $dbh = $self->dbh;
+ my $fid = $arg{fid};
+
+ my $explicit_fid_used = $fid ? 1 : 0;
+
+ # setup the new mapping. we store the devices that we picked for
+ # this file in here, knowing that they might not be used. create_close
+ # is responsible for actually mapping in file_on. NOTE: fid is being
+ # passed in, it's either some number they gave us, or it's going to be
+ # undef; we build the query and bind list accordingly
+ my $ins_tempfile = sub {
+ my @params = $explicit_fid_used ? ($fid) : ();
+ push @params, @arg{ qw(dmid key classid devids) };
+ $dbh->do("INSERT INTO tempfile (" .
+ ($explicit_fid_used ? "fid, " : "") . "dmid, dkey, classid, createtime, devids) " .
+ "VALUES (" .
+ ($explicit_fid_used ? "?, " : "") . "?, ?, ?, UNIX_TIMESTAMP(), ?)",
+ undef, @params);
+
+ return undef if $dbh->err;
+
+ unless (defined $fid) {
+ # if they did not give us a fid, then we want to grab the one that was
+ # theoretically automatically generated
+ $fid = $dbh->last_insert_id(undef, undef, "tempfile", "fid"); # DBD::Pg specific
+ }
+ return undef unless defined $fid && $fid > 0;
+ return 1;
+ };
+
+ unless ($ins_tempfile->()) {
+ return -1 if $explicit_fid_used;
+ return undef;
+ }
+
+ return $fid;
+}
+
+# returns 1 on success, 0 on duplicate key error, dies on exception
+# TODO: need a test to hit the duplicate name error condition
+sub rename_file {
+ my ($self, $fidid, $to_key) = @_;
+ my $dbh = $self->dbh;
+ eval {
+ $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
+ undef, $to_key, $fidid);
+ };
+ if ($@ || $dbh->err) {
+ # first is error code for duplicates
+ if ($self->was_duplicate_error) {
+ return 0;
+ } else {
+ die $@;
+ }
+ }
+ $self->condthrow;
+ return 1;
+}
+
+# add a record of fidid existing on devid
+# returns 1 on success, 0 on duplicate
+sub add_fidid_to_devid {
+ my ($self, $fidid, $devid) = @_;
+ my $dbh = $self->dbh;
+ eval {
+ $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
+ };
+
+ return 1 if !$@ && !$dbh->err;
+ return 0;
+}
+
+# update the device count for a given fidid
+sub update_devcount_atomic {
+ my ($self, $fidid) = @_;
+ my $dbh = $self->dbh;
+
+ $dbh->begin_work;
+
+ my $sth = $dbh->prepare("SELECT devcount FROM file WHERE fid=? FOR UPDATE");
+ eval {
+ local $SIG{ALRM} = sub { die "alarm" };
+ alarm 10;
+ $sth->execute($fidid);
+ alarm 0;
+ };
+ if ($@ && $@ eq "alarm") {
+ $dbh->rollback;
+ return 0;
+ }
+
+ $self->update_devcount($fidid);
+
+ $dbh->commit;
+
+ return 1;
+}
+
+# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
+sub enqueue_for_replication {
+ my ($self, $fidid, $from_devid, $in) = @_;
+ my $dbh = $self->dbh;
+
+ my $nexttry = 0;
+ if ($in) {
+ $nexttry = "UNIX_TIMESTAMP() + ${in}::int";
+ }
+
+ eval {
+ $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
+ undef, $fidid, $from_devid);
+ };
+}
+
+# reschedule all deferred replication, return number rescheduled
+sub replicate_now {
+ my ($self) = @_;
+ return $self->dbh->do("UPDATE file_to_replicate SET nexttry = UNIX_TIMESTAMP() WHERE nexttry > UNIX_TIMESTAMP()");
+}
+
+sub reschedule_file_to_replicate_relative {
+ my ($self, $fid, $in_n_secs) = @_;
+ $self->dbh->do("UPDATE file_to_replicate SET nexttry = UNIX_TIMESTAMP() + ?, failcount = failcount + 1 WHERE fid = ?",
+ undef, $in_n_secs, $fid);
+}
+
+# creates a new domain, given a domain namespace string. return the dmid on success,
+# throw 'dup' on duplicate name.
+sub create_domain {
+ my ($self, $name) = @_;
+ my $dbh = $self->dbh;
+
+ # get the max domain id
+ my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
+ my $rv = eval {
+ $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
+ undef, $maxid + 1, $name);
+ };
+ if ($self->was_duplicate_error) {
+ throw("dup");
+ }
+ return $maxid+1 if $rv;
+ die "failed to make domain"; # FIXME: the above is racy.
+}
+
+sub set_server_setting {
+ my ($self, $key, $val) = @_;
+ my $dbh = $self->dbh;
+
+ if (defined $val) {
+ $self->insert_or_update(
+ insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
+ insert_vals => [ $key, $val ],
+ update => "UPDATE server_settings SET value = ? WHERE field = ?",
+ update_vals => [ $val, $key ],
+ );
+ } else {
+ $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
+ }
+
+ die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
+ return 1;
+}
+
+# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
+sub create_device {
+ my ($self, $devid, $hostid, $status) = @_;
+ my $rv = $self->conddup(sub {
+ $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
+ $devid, $hostid, $status);
+ });
+ $self->condthrow;
+ die "error making device $devid\n" unless $rv > 0;
+ return 1;
+}
+
+sub mark_fidid_unreachable {
+ my ($self, $fidid) = @_;
+ my $dbh = $self->dbh;
+
+ eval {
+ $self->insert_or_update(
+ insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, UNIX_TIMESTAMP())",
+ insert_vals => [ $fidid ],
+ update => "UPDATE unreachable_fids SET lastupdate = UNIX_TIMESTAMP() WHERE field = ?",
+ update_vals => [ $fidid ],
+ );
+ };
+}
+
+sub delete_fidid {
+ my ($self, $fidid) = @_;
+ $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
+ $self->condthrow;
+ $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
+ $self->condthrow;
+ $self->insert_or_update(
+ insert => "INSERT INTO file_to_delete (fid) VALUES (?)",
+ insert_vals => [ $fidid ],
+ update => "UPDATE file_to_delete SET fid=?",
+ update_vals => [ $fidid ],
+ );
+ $self->condthrow;
+}
+
+sub replace_into_file {
+ my $self = shift;
+ my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
+ $self->insert_or_update(
+ insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
+ insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
+ update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
+ update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
+ );
+ $self->condthrow;
+ print "# done replace\n";
+}
+
+# given an array of MogileFS::DevFID objects, mass-insert them all
+# into file_on (ignoring if they're already present)
+sub mass_insert_file_on {
+ my ($self, @devfids) = @_;
+ my @qmarks = map { "(?,?)" } @devfids;
+ my @binds = map { $_->fidid, $_->devid } @devfids;
+
+ my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
+ foreach (@devfids) {
+ eval {
+ $sth->execute($_->fidid, $_->devid);
+ };
+ $self->condthrow unless $self->was_duplicate_error;
+ }
+ return 1;
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
+
+=head1 SEE ALSO
+
+L<MogileFS::Store>
+
+
More information about the mogilefs
mailing list