Support for multiple --tables, as well as Concurrent DDL Guard.

Code merged in, with a few more changes, from the multiple_tables branch.

The multiple --table support and SimpleStringList code is largely
borrowed from pg_dump. (pg_reorg Issue #18).

The concurrent DDL guard is implemented using an auxiliary
database connection (pg_reorg Issue #8) which holds an ACCESS SHARE
lock on the target table while pg_repack conducts the rest of its work.
This commit is contained in:
Josh Kupershmidt 2012-12-06 20:58:33 -07:00
commit f6ca290fb2
5 changed files with 491 additions and 91 deletions

View File

@ -40,52 +40,77 @@ const char *PROGRAM_VERSION = "unknown";
* pg_repack's setup. Some transactions we can safely ignore:
* a. The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
* servers. See https://github.com/reorg/pg_reorg/issues/1
* b. Our own database connection
* b. Our own database connections
* c. Other pg_repack clients, as distinguished by application_name, which
* may be operating on other tables at the same time. See
* https://github.com/reorg/pg_repack/issues/1
*
* Note, there is some redundancy in how the filtering is done (e.g. excluding
* based on pg_backend_pid() and application_name), but that shouldn't hurt
* anything.
* anything. Also, the test of application_name is not bulletproof -- for
* instance, the application name when running installcheck will be
* pg_regress.
*/
#define SQL_XID_SNAPSHOT_90200 \
"SELECT repack.array_accum(l.virtualtransaction) " \
" FROM pg_locks AS l " \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.pid " \
" WHERE l.locktype = 'virtualxid' AND l.pid <> pg_backend_pid() " \
"AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
"AND (a.application_name IS NULL OR a.application_name <> $1)"
" WHERE l.locktype = 'virtualxid' " \
" AND l.pid NOT IN (pg_backend_pid(), $1) " \
" AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
" AND (a.application_name IS NULL OR a.application_name <> $2)"
#define SQL_XID_SNAPSHOT_90000 \
"SELECT repack.array_accum(l.virtualtransaction) " \
" FROM pg_locks AS l " \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.procpid " \
" WHERE l.locktype = 'virtualxid' AND l.pid <> pg_backend_pid() " \
"AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
"AND (a.application_name IS NULL OR a.application_name <> $1)"
" WHERE l.locktype = 'virtualxid' " \
" AND l.pid NOT IN (pg_backend_pid(), $1) " \
" AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
" AND (a.application_name IS NULL OR a.application_name <> $2)"
/* application_name is not available before 9.0. The last clause of
* the WHERE clause is just to eat the $1 parameter (application name).
* the WHERE clause is just to eat the $2 parameter (application name).
*/
#define SQL_XID_SNAPSHOT_80300 \
"SELECT repack.array_accum(virtualtransaction) FROM pg_locks" \
" WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()" \
" WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)" \
" AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0') " \
" AND ($1 IS NOT NULL)"
" AND ($2 IS NOT NULL)"
#define SQL_XID_SNAPSHOT \
(PQserverVersion(connection) >= 90200 ? SQL_XID_SNAPSHOT_90200 : \
(PQserverVersion(connection) >= 90000 ? SQL_XID_SNAPSHOT_90000 : \
SQL_XID_SNAPSHOT_80300))
/* Later, check whether any of the transactions we saw before are still
* alive, and wait for them to go away.
*/
#define SQL_XID_ALIVE \
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
/* To be run while our main connection holds an AccessExclusive lock on the
* target table, and our secondary conn is attempting to grab an AccessShare
* lock. We know that "granted" must be false for these queries because
* we already hold the AccessExclusive lock. Also, we only care about other
* transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only
* trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE.
*/
#define CANCEL_COMPETING_LOCKS \
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
#define KILL_COMPETING_LOCKS \
"SELECT pg_terminate_backend(pid) "\
"FROM pg_locks WHERE locktype = 'relation'"\
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
/*
* per-table information
*/
@ -123,13 +148,15 @@ typedef struct repack_index
static bool is_superuser(void);
static void repack_all_databases(const char *order_by);
static bool repack_one_database(const char *order_by, const char *table, char *errbuf, size_t errsize);
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
static void repack_one_table(const repack_table *table, const char *order_by);
static void repack_cleanup(bool fatal, void *userdata);
static void repack_cleanup(bool fatal, const repack_table *table);
static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col);
static void lock_exclusive(const char *relid, const char *lock_query);
static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_QUERY_CANCELED "57014"
@ -139,12 +166,12 @@ static bool sqlstate_equals(PGresult *res, const char *state)
return strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), state) == 0;
}
static bool analyze = true;
static bool alldb = false;
static bool noorder = false;
static char *table = NULL;
static char *orderby = NULL;
static int wait_timeout = 60; /* in seconds */
static bool analyze = true;
static bool alldb = false;
static bool noorder = false;
static SimpleStringList table_list = {NULL, NULL};
static char *orderby = NULL;
static int wait_timeout = 60; /* in seconds */
/* buffer should have at least 11 bytes */
static char *
@ -157,7 +184,7 @@ utoa(unsigned int value, char *buffer)
static pgut_option options[] =
{
{ 'b', 'a', "all", &alldb },
{ 's', 't', "table", &table },
{ 'l', 't', "table", &table_list },
{ 'b', 'n', "no-order", &noorder },
{ 's', 'o', "order-by", &orderby },
{ 'i', 'T', "wait-timeout", &wait_timeout },
@ -168,7 +195,7 @@ static pgut_option options[] =
int
main(int argc, char *argv[])
{
int i;
int i;
i = pgut_getopt(argc, argv, options);
@ -184,19 +211,19 @@ main(int argc, char *argv[])
if (alldb)
{
if (table)
if (table_list.head)
ereport(ERROR,
(errcode(EINVAL),
errmsg("cannot repack a specific table in all databases")));
errmsg("cannot repack specific table(s) in all databases")));
repack_all_databases(orderby);
}
else
{
char errbuf[256];
if (!repack_one_database(orderby, table, errbuf, sizeof(errbuf)))
if (!repack_one_database(orderby, errbuf, sizeof(errbuf)))
ereport(ERROR,
(errcode(ERROR),
errmsg("%s", errbuf)));
(errcode(ERROR),
errmsg("%s", errbuf)));
}
return 0;
@ -219,7 +246,7 @@ is_superuser(void)
return false;
val = PQparameterStatus(connection, "is_superuser");
if (val && strcmp(val, "on") == 0)
return true;
@ -254,11 +281,11 @@ repack_all_databases(const char *orderby)
if (pgut_log_level >= INFO)
{
printf("%s: repack database \"%s\"", PROGRAM_NAME, dbname);
printf("%s: repack database \"%s\"\n", PROGRAM_NAME, dbname);
fflush(stdout);
}
ret = repack_one_database(orderby, NULL, errbuf, sizeof(errbuf));
ret = repack_one_database(orderby, errbuf, sizeof(errbuf));
if (pgut_log_level >= INFO)
{
@ -296,13 +323,24 @@ getoid(PGresult *res, int row, int col)
* Call repack_one_table for the target table or each table in a database.
*/
static bool
repack_one_database(const char *orderby, const char *table, char *errbuf, size_t errsize)
repack_one_database(const char *orderby, char *errbuf, size_t errsize)
{
bool ret = false;
PGresult *res = NULL;
int i;
int num;
StringInfoData sql;
bool ret = false;
PGresult *res = NULL;
int i;
int num;
StringInfoData sql;
SimpleStringListCell *cell;
const char **params = NULL;
size_t num_params = simple_string_list_size(table_list);
/* We need to be able to support at least two params, or more
* if we have multiple --tables specified.
*/
if (num_params && num_params > 2)
params = pgut_malloc(num_params * sizeof(char *));
else
params = pgut_malloc(2 * sizeof(char *));
initStringInfo(&sql);
@ -355,7 +393,7 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t
/* Schema repack does not exist. Skip the database. */
if (errbuf)
snprintf(errbuf, errsize,
"%s is not installed in the database", PROGRAM_NAME);
"%s is not installed in the database", PROGRAM_NAME);
}
else
{
@ -377,10 +415,19 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t
/* acquire target tables */
appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE ");
if (table)
if (num_params)
{
appendStringInfoString(&sql, "relid = $1::regclass");
res = execute_elevel(sql.data, 1, &table, DEBUG2);
appendStringInfoString(&sql, "(");
for (i = 0, cell = table_list.head; cell; cell = cell->next, i++)
{
/* Construct table name placeholders to be used by PQexecParams */
appendStringInfo(&sql, "relid = $%d::regclass", i + 1);
params[i] = cell->val;
if (cell->next)
appendStringInfoString(&sql, " OR ");
}
appendStringInfoString(&sql, ")");
res = execute_elevel(sql.data, (int) num_params, params, DEBUG2);
}
else
{
@ -398,7 +445,7 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t
/* Schema repack does not exist. Skip the database. */
if (errbuf)
snprintf(errbuf, errsize,
"%s is not installed in the database", PROGRAM_NAME);
"%s is not installed in the database", PROGRAM_NAME);
}
else
{
@ -406,7 +453,6 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t
if (errbuf)
snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
}
ret = false;
goto cleanup;
}
@ -447,9 +493,12 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t
{
/* CLUSTER mode */
if (ckey == NULL)
ereport(ERROR,
{
ereport(WARNING,
(errcode(E_PG_COMMAND),
errmsg("relation \"%s\" has no cluster key", table.target_name)));
continue;
}
appendStringInfo(&sql, "%s ORDER BY %s", create_table, ckey);
table.create_table = sql.data;
}
@ -484,7 +533,7 @@ cleanup:
}
static int
apply_log(const repack_table *table, int count)
apply_log(PGconn *conn, const repack_table *table, int count)
{
int result;
PGresult *res;
@ -498,8 +547,9 @@ apply_log(const repack_table *table, int count)
params[4] = table->sql_pop;
params[5] = utoa(count, buffer);
res = execute("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
6, params);
res = pgut_execute(conn,
"SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
6, params);
result = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
@ -517,9 +567,10 @@ repack_one_table(const repack_table *table, const char *orderby)
int num;
int i;
int num_waiting = 0;
char *vxid;
char *vxid = NULL;
char buffer[12];
StringInfoData sql;
bool have_error = false;
initStringInfo(&sql);
@ -548,7 +599,12 @@ repack_one_table(const repack_table *table, const char *orderby)
* 1. Setup workspaces and a trigger.
*/
elog(DEBUG2, "---- setup ----");
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
if (!(lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE)))
{
elog(WARNING, "lock_exclusive() failed for %s", table->target_name);
have_error = true;
goto cleanup;
}
/*
* Check z_repack_trigger is the trigger executed at last so that
@ -558,10 +614,15 @@ repack_one_table(const repack_table *table, const char *orderby)
res = execute("SELECT repack.conflicted_triggers($1)", 1, params);
if (PQntuples(res) > 0)
ereport(ERROR,
{
ereport(WARNING,
(errcode(E_PG_COMMAND),
errmsg("trigger %s conflicted for %s",
PQgetvalue(res, 0, 0), table->target_name)));
PQclear(res);
have_error = true;
goto cleanup;
}
PQclear(res);
command(table->create_pktype, 0, NULL);
@ -570,14 +631,82 @@ repack_one_table(const repack_table *table, const char *orderby)
command(table->enable_trigger, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
command(sql.data, 0, NULL);
command("COMMIT", 0, NULL);
/* While we are still holding an AccessExclusive lock on the table, submit
* the request for an AccessShare lock asynchronously from conn2.
* We want to submit this query in conn2 while connection's
* transaction still holds its lock, so that no DDL may sneak in
* between the time that connection commits and conn2 gets its lock.
*/
pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
/* grab the backend PID of conn2; we'll need this when querying
* pg_locks momentarily.
*/
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
printf("%s", PQerrorMessage(conn2));
PQclear(res);
have_error = true;
goto cleanup;
}
buffer[0] = '\0';
strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1);
PQclear(res);
/*
* Register the table to be dropped on error. We use pktype as
* an advisory lock. The registration should be done after
* the first command succeeds.
* Not using lock_access_share() here since we know that
* it's not possible to obtain the ACCESS SHARE lock right now
* in conn2, since the primary connection holds ACCESS EXCLUSIVE.
*/
pgut_atexit_push(&repack_cleanup, (void *) table);
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
table->target_name);
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
if (!(PQsendQuery(conn2, sql.data))) {
elog(WARNING, "Error sending async query: %s\n%s", sql.data,
PQerrorMessage(conn2));
have_error = true;
goto cleanup;
}
/* Now that we've submitted the LOCK TABLE request through conn2,
* look for and cancel any (potentially dangerous) DDL commands which
* might also be waiting on our table lock at this point --
* it's not safe to let them wait, because they may grab their
* AccessExclusive lock before conn2 gets its AccessShare lock,
* and perform unsafe DDL on the table.
*
* Normally, lock_access_share() would take care of this for us,
* but we're not able to use it here.
*/
if (!(kill_ddl(connection, table->target_oid, true)))
{
elog(WARNING, "kill_ddl() failed.");
have_error = true;
goto cleanup;
}
/* We're finished killing off any unsafe DDL. COMMIT in our main
* connection, so that conn2 may get its AccessShare lock.
*/
command("COMMIT", 0, NULL);
/* Keep looping PQgetResult() calls until it returns NULL, indicating the
* command is done and we have obtained our lock.
*/
while ((res = PQgetResult(conn2)))
{
elog(DEBUG2, "Waiting on ACCESS SHARE lock...");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(WARNING, "Error with LOCK TABLE: %s", PQerrorMessage(conn2));
PQclear(res);
have_error = true;
goto cleanup;
}
PQclear(res);
}
/*
* 2. Copy tuples into temp table.
@ -593,9 +722,20 @@ repack_one_table(const repack_table *table, const char *orderby)
command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
if (orderby && !orderby[0])
command("SET LOCAL synchronize_seqscans = off", 0, NULL);
params[0] = PROGRAM_NAME;
res = execute(SQL_XID_SNAPSHOT, 1, params);
vxid = strdup(PQgetvalue(res, 0, 0));
/* Fetch an array of Virtual IDs of all transactions active right now.
*/
params[0] = buffer; /* backend PID of conn2 */
params[1] = PROGRAM_NAME;
res = execute(SQL_XID_SNAPSHOT, 2, params);
if (!(vxid = strdup(PQgetvalue(res, 0, 0))))
{
elog(WARNING, "Unable to allocate vxid, length: %d\n",
PQgetlength(res, 0, 0));
PQclear(res);
have_error = true;
goto cleanup;
}
PQclear(res);
/* Delete any existing entries in the log table now, since we have not
@ -604,6 +744,25 @@ repack_one_table(const repack_table *table, const char *orderby)
* log we could wind up with duplicates.
*/
command(table->delete_log, 0, NULL);
/* We need to be able to obtain an AccessShare lock on the target table
* for the create_table command to go through, so go ahead and obtain
* the lock explicitly.
*
* Since conn2 has been diligently holding its AccessShare lock, it
* is possible that another transaction has been waiting to acquire
* an AccessExclusive lock on the table (e.g. a concurrent ALTER TABLE
* or TRUNCATE which we must not allow). If there are any such
* transactions, lock_access_share() will kill them so that our
* CREATE TABLE ... AS SELECT does not deadlock waiting for an
* AccessShare lock.
*/
if (!(lock_access_share(connection, table->target_oid, table->target_name)))
{
have_error = true;
goto cleanup;
}
command(table->create_table, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
if (table->drop_columns)
@ -659,7 +818,7 @@ repack_one_table(const repack_table *table, const char *orderby)
*/
for (;;)
{
num = apply_log(table, APPLY_COUNT);
num = apply_log(connection, table, APPLY_COUNT);
if (num > 0)
continue; /* there might be still some tuples, repeat. */
@ -696,14 +855,25 @@ repack_one_table(const repack_table *table, const char *orderby)
}
/*
* 5. Swap.
* 5. Swap: will be done with conn2, since it already holds an
* AccessShare lock.
*/
elog(DEBUG2, "---- swap ----");
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
apply_log(table, 0);
/* Bump our existing AccessShare lock to AccessExclusive */
if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer),
table->lock_table, FALSE)))
{
elog(WARNING, "lock_exclusive() failed in conn2 for %s",
table->target_name);
have_error = true;
goto cleanup;
}
apply_log(conn2, table, 0);
params[0] = utoa(table->target_oid, buffer);
command("SELECT repack.repack_swap($1)", 1, params);
command("COMMIT", 0, NULL);
pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
pgut_command(conn2, "COMMIT", 0, NULL);
/*
* 6. Drop.
@ -715,9 +885,6 @@ repack_one_table(const repack_table *table, const char *orderby)
command("SELECT repack.repack_drop($1)", 1, params);
command("COMMIT", 0, NULL);
pgut_atexit_pop(&repack_cleanup, (void *) table);
free(vxid);
/*
* 7. Analyze.
* Note that cleanup hook has been already uninstalled here because analyze
@ -733,18 +900,166 @@ repack_one_table(const repack_table *table, const char *orderby)
command("COMMIT", 0, NULL);
}
cleanup:
termStringInfo(&sql);
if (vxid)
free(vxid);
/* XXX: distinguish between fatal and non-fatal errors via the first
* arg to repack_cleanup().
*/
if (have_error)
repack_cleanup(false, table);
}
/*
* Try acquire a table lock but avoid long time locks when conflict.
/* Kill off any concurrent DDL (or any transaction attempting to take
* an AccessExclusive lock) trying to run against our table. Note, we're
* killing these queries off *before* they are granted an AccessExclusive
* lock on our table.
*
* Returns true if no problems encountered, false otherwise.
*/
static void
lock_exclusive(const char *relid, const char *lock_query)
static bool
kill_ddl(PGconn *conn, Oid relid, bool terminate)
{
bool ret = true;
PGresult *res;
StringInfoData sql;
initStringInfo(&sql);
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Error canceling unsafe queries: %s",
PQerrorMessage(conn));
ret = false;
}
else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
{
elog(WARNING,
"Canceled %d unsafe queries. Terminating any remaining PIDs.",
PQntuples(res));
PQclear(res);
printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Error killing unsafe queries: %s",
PQerrorMessage(conn));
ret = false;
}
}
else if (PQntuples(res) > 0)
elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res));
else
elog(DEBUG2, "No competing DDL to cancel.");
PQclear(res);
termStringInfo(&sql);
return ret;
}
/*
* Try to acquire an ACCESS SHARE table lock, avoiding deadlocks and long
* waits by killing off other sessions which may be stuck trying to obtain
* an ACCESS EXCLUSIVE lock.
*
* Arguments:
*
* conn: connection to use
* relid: OID of relation
* target_name: name of table
*/
static bool
lock_access_share(PGconn *conn, Oid relid, const char *target_name)
{
StringInfoData sql;
time_t start = time(NULL);
int i;
bool ret = true;
initStringInfo(&sql);
for (i = 1; ; i++)
{
time_t duration;
PGresult *res;
int wait_msec;
duration = time(NULL) - start;
/* Cancel queries unconditionally, i.e. don't bother waiting
* wait_timeout as lock_exclusive() does -- the only queries we
* should be killing are disallowed DDL commands hanging around
* for an AccessExclusive lock, which must be deadlocked at
* this point anyway since conn2 holds its AccessShare lock
* already.
*/
if (duration > (wait_timeout * 2))
ret = kill_ddl(conn, relid, true);
else
ret = kill_ddl(conn, relid, false);
if (!ret)
break;
/* wait for a while to lock the table. */
wait_msec = Min(1000, i * 100);
printfStringInfo(&sql, "SET LOCAL statement_timeout = %d", wait_msec);
pgut_command(conn, sql.data, 0, NULL);
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", target_name);
res = pgut_execute_elevel(conn, sql.data, 0, NULL, DEBUG2);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
break;
}
else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
{
/* retry if lock conflicted */
PQclear(res);
pgut_rollback(conn);
continue;
}
else
{
/* exit otherwise */
elog(WARNING, "%s", PQerrorMessage(connection));
PQclear(res);
ret = false;
break;
}
}
termStringInfo(&sql);
pgut_command(conn, "RESET statement_timeout", 0, NULL);
return ret;
}
/*
* Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long
* waits by killing off other sessions.
* Arguments:
*
* conn: connection to use
* relid: OID of relation
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
* start_xact: whether we need to issue a BEGIN;
*/
static bool
lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact)
{
time_t start = time(NULL);
int i;
bool ret = true;
for (i = 1; ; i++)
{
time_t duration;
@ -752,13 +1067,14 @@ lock_exclusive(const char *relid, const char *lock_query)
PGresult *res;
int wait_msec;
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
if (start_xact)
pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
duration = time(NULL) - start;
if (duration > wait_timeout)
{
const char *cancel_query;
if (PQserverVersion(connection) >= 80400 &&
if (PQserverVersion(conn) >= 80400 &&
duration > wait_timeout * 2)
{
elog(WARNING, "terminating conflicted backends");
@ -776,15 +1092,15 @@ lock_exclusive(const char *relid, const char *lock_query)
" AND relation = $1 AND pid <> pg_backend_pid()";
}
command(cancel_query, 1, &relid);
pgut_command(conn, cancel_query, 1, &relid);
}
/* wait for a while to lock the table. */
wait_msec = Min(1000, i * 100);
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
command(sql, 0, NULL);
pgut_command(conn, sql, 0, NULL);
res = execute_elevel(lock_query, 0, NULL, DEBUG2);
res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
@ -794,7 +1110,7 @@ lock_exclusive(const char *relid, const char *lock_query)
{
/* retry if lock conflicted */
PQclear(res);
command("ROLLBACK", 0, NULL);
pgut_rollback(conn);
continue;
}
else
@ -802,11 +1118,13 @@ lock_exclusive(const char *relid, const char *lock_query)
/* exit otherwise */
printf("%s", PQerrorMessage(connection));
PQclear(res);
exit(1);
ret = false;
break;
}
}
command("RESET statement_timeout", 0, NULL);
pgut_command(conn, "RESET statement_timeout", 0, NULL);
return ret;
}
/*
@ -814,10 +1132,8 @@ lock_exclusive(const char *relid, const char *lock_query)
* objects before the program exits.
*/
static void
repack_cleanup(bool fatal, void *userdata)
repack_cleanup(bool fatal, const repack_table *table)
{
const repack_table *table = (const repack_table *) userdata;
if (fatal)
{
fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n");
@ -827,12 +1143,13 @@ repack_cleanup(bool fatal, void *userdata)
char buffer[12];
const char *params[1];
/* Rollback current transaction */
if (connection)
command("ROLLBACK", 0, NULL);
/* Rollback current transactions */
pgut_rollback(connection);
pgut_rollback(conn2);
/* Try reconnection if not available. */
if (PQstatus(connection) != CONNECTION_OK)
if (PQstatus(connection) != CONNECTION_OK ||
PQstatus(conn2) != CONNECTION_OK)
reconnect(ERROR);
/* do cleanup */

View File

@ -24,6 +24,7 @@ char *password = NULL;
YesNo prompt_password = DEFAULT;
PGconn *connection = NULL;
PGconn *conn2 = NULL;
static bool parse_pair(const char buffer[], char key[], char value[]);
static char *get_username(void);
@ -51,6 +52,7 @@ reconnect(int elevel)
appendStringInfo(&buf, "password=%s ", password);
connection = pgut_connect(buf.data, prompt_password, elevel);
conn2 = pgut_connect(buf.data, prompt_password, elevel);
/* update password */
if (connection)
@ -75,6 +77,11 @@ disconnect(void)
pgut_disconnect(connection);
connection = NULL;
}
if (conn2)
{
pgut_disconnect(conn2);
conn2 = NULL;
}
}
static void
@ -142,9 +149,12 @@ pgut_setopt(pgut_option *opt, const char *optarg, pgut_optsrc src)
/* high prior value has been set already. */
return;
}
else if (src >= SOURCE_CMDLINE && opt->source >= src)
else if (src >= SOURCE_CMDLINE && opt->source >= src && opt->type != 'l')
{
/* duplicated option in command line */
/* duplicated option in command line -- don't worry if the option
* type is 'l' i.e. SimpleStringList, since we are allowed to have
* multiples of these.
*/
message = "specified only once";
}
else
@ -175,6 +185,10 @@ pgut_setopt(pgut_option *opt, const char *optarg, pgut_optsrc src)
return;
message = "a 32bit signed integer";
break;
case 'l':
message = "a List";
simple_string_list_append(opt->var, optarg);
return;
case 'u':
if (parse_uint32(optarg, opt->var))
return;

View File

@ -25,13 +25,14 @@ typedef enum pgut_optsrc
* type:
* b: bool (true)
* B: bool (false)
* f: pgut_optfn
* f: pgut_optfn
* i: 32bit signed integer
* l: StringList
* u: 32bit unsigned integer
* I: 64bit signed integer
* U: 64bit unsigned integer
* s: string
* t: time_t
* t: time_t
* y: YesNo (YES)
* Y: YesNo (NO)
*/
@ -56,6 +57,7 @@ extern char *password;
extern YesNo prompt_password;
extern PGconn *connection;
extern PGconn *conn2;
extern void pgut_help(bool details);
extern void help(bool details);

View File

@ -384,6 +384,55 @@ parse_time(const char *value, time_t *time)
return true;
}
/* Append the given string `val` to the `list` */
void
simple_string_list_append(SimpleStringList *list, const char *val)
{
SimpleStringListCell *cell;
/* this calculation correctly accounts for the null trailing byte */
cell = (SimpleStringListCell *)
pgut_malloc(sizeof(SimpleStringListCell) + strlen(val));
cell->next = NULL;
strcpy(cell->val, val);
if (list->tail)
list->tail->next = cell;
else
list->head = cell;
list->tail = cell;
}
/* Test whether `val` is in the given `list` */
bool
simple_string_list_member(SimpleStringList *list, const char *val)
{
SimpleStringListCell *cell;
for (cell = list->head; cell; cell = cell->next)
{
if (strcmp(cell->val, val) == 0)
return true;
}
return false;
}
/* Returns the number of elements in the given SimpleStringList */
size_t
simple_string_list_size(SimpleStringList list)
{
size_t i = 0;
SimpleStringListCell *cell = list.head;
while (cell)
{
cell = cell->next;
i++;
}
return i;
}
static char *
prompt_for_password(void)
{

View File

@ -171,6 +171,24 @@ extern bool parse_time(const char *value, time_t *time);
#define ToLower(c) (tolower((unsigned char)(c)))
#define ToUpper(c) (toupper((unsigned char)(c)))
/* linked list of string values and helper functions, stolen from pg_dump. */
typedef struct SimpleStringListCell
{
struct SimpleStringListCell *next;
char val[1]; /* VARIABLE LENGTH FIELD */
} SimpleStringListCell;
typedef struct SimpleStringList
{
SimpleStringListCell *head;
SimpleStringListCell *tail;
} SimpleStringList;
extern void simple_string_list_append(SimpleStringList *list, const char *val);
extern bool simple_string_list_member(SimpleStringList *list, const char *val);
extern size_t simple_string_list_size(SimpleStringList list);
/*
* socket operations
*/