Further improvements to concurrent-DDL guard.

Fix table locking so that race conditions don't exist between lock
release in primary conn, and lock acquisition in conn2. Also, have
conn2 be in charge of performing the table swap step, to avoid a
similar race.

Part of work for Issue #8.
This commit is contained in:
Josh Kupershmidt 2012-10-20 21:47:21 -07:00 committed by Daniele Varrazzo
parent 3606e0a957
commit cf25780575

View File

@ -53,6 +53,24 @@ const char *PROGRAM_VERSION = "unknown";
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
/* To be run while our main connection holds an AccessExclusive lock on the
* target table, and our secondary conn is attempting to grab an AccessShare
* lock. We know that "granted" must be false for these queries because
* we already hold the AccessExclusive lock. Also, we only care about other
* transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock
* level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE
* or TRUNCATE.
*/
#define CANCEL_COMPETING_LOCKS \
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
#define KILL_COMPETING_LOCKS \
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
/*
* per-table information
*/
@ -95,7 +113,7 @@ static void repack_cleanup(bool fatal, void *userdata);
static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col);
static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2);
static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_QUERY_CANCELED "57014"
@ -356,7 +374,7 @@ cleanup:
}
static int
apply_log(const repack_table *table, int count)
apply_log(PGconn *conn, const repack_table *table, int count)
{
int result;
PGresult *res;
@ -370,8 +388,9 @@ apply_log(const repack_table *table, int count)
params[4] = table->sql_pop;
params[5] = utoa(count, buffer);
res = execute("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
6, params);
res = pgut_execute(conn,
"SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
6, params);
result = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
@ -421,8 +440,7 @@ repack_one_table(const repack_table *table, const char *orderby)
* 1. Setup workspaces and a trigger.
*/
elog(DEBUG2, "---- setup ----");
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE);
lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE);
/*
* Check z_repack_trigger is the trigger executed at last so that
@ -444,27 +462,18 @@ repack_one_table(const repack_table *table, const char *orderby)
command(table->enable_trigger, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
command(sql.data, 0, NULL);
command("COMMIT", 0, NULL);
/* While we are still holding an AccessExclusive lock on the table, submit
* the request for an AccessShare lock asynchronously from conn2.
* We want to submit this query in conn2 while connection's
* transaction still holds its lock, so that no DDL may sneak in
* between the time that connection commits and conn2 gets its lock.
*
*/
pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name);
/* XXX: table name escaping? */
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
table->target_name);
res = pgut_execute(conn2, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
printf("%s", PQerrorMessage(conn2));
PQclear(res);
exit(1);
}
PQclear(res);
elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name);
/* store the backend PID of our connection keeping an ACCESS SHARE
lock on the target table.
/* grab the backend PID of conn2; we'll need this when querying
* pg_locks momentarily.
*/
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@ -474,9 +483,80 @@ repack_one_table(const repack_table *table, const char *orderby)
exit(1);
}
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
elog(WARNING, "Have backend PID: %s", lock_conn_pid);
PQclear(res);
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
table->target_name);
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
if (!(PQsendQuery(conn2, sql.data))) {
printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2));
exit(1);
}
/* Now that we've submitted the LOCK TABLE request through conn2,
* look for and cancel any (potentially dangerous) DDL commands which
* might also be waiting on our table lock at this point --
* it's not safe to let them wait, because they may grab their
* AccessExclusive lock before conn2 gets its AccessShare lock,
* and perform unsafe DDL on the table.
*
* XXX: maybe we should use a loop canceling queries, as in
* lock_exclusive().
*/
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid);
res = execute(sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
printf("Error canceling competing queries: %s", PQerrorMessage(connection));
PQclear(res);
exit(1);
}
if (PQntuples(res) > 0)
{
elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res));
if (PQserverVersion(connection) >= 80400)
{
PQclear(res);
printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid);
res = execute(sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
printf("Error killing competing queries: %s", PQerrorMessage(connection));
PQclear(res);
exit(1);
}
}
}
else
{
elog(DEBUG2, "No competing DDL to cancel.");
}
PQclear(res);
/* We're finished killing off any unsafe DDL. COMMIT in our main
* connection, so that conn2 may get its AccessShare lock.
*/
command("COMMIT", 0, NULL);
/* Keep looping PQgetResult() calls until it returns NULL, indicating the
* command is done and we have obtained our lock.
*/
while ((res = PQgetResult(conn2)))
{
elog(DEBUG2, "Waiting on ACCESS SHARE lock...");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2));
PQclear(res);
exit(1);
}
PQclear(res);
}
/*
* Register the table to be dropped on error. We use pktype as
* an advisory lock. The registration should be done after
@ -558,7 +638,7 @@ repack_one_table(const repack_table *table, const char *orderby)
*/
for (;;)
{
num = apply_log(table, APPLY_COUNT);
num = apply_log(connection, table, APPLY_COUNT);
if (num > 0)
continue; /* there might be still some tuples, repeat. */
@ -595,14 +675,17 @@ repack_one_table(const repack_table *table, const char *orderby)
}
/*
* 5. Swap.
* 5. Swap: will be done with conn2, since it already holds an
* AccessShare lock.
*/
elog(DEBUG2, "---- swap ----");
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE);
apply_log(table, 0);
/* Bump our existing AccessShare lock to AccessExclusive */
lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table,
FALSE);
apply_log(conn2, table, 0);
params[0] = utoa(table->target_oid, buffer);
command("SELECT repack.repack_swap($1)", 1, params);
command("COMMIT", 0, NULL);
pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
pgut_command(conn2, "COMMIT", 0, NULL);
/*
* 6. Drop.
@ -616,6 +699,7 @@ repack_one_table(const repack_table *table, const char *orderby)
pgut_atexit_pop(&repack_cleanup, (void *) table);
free(vxid);
free(lock_conn_pid);
/*
* 7. Analyze.
@ -639,17 +723,17 @@ repack_one_table(const repack_table *table, const char *orderby)
* Try acquire a table lock but avoid long time locks when conflict.
* Arguments:
*
* conn: connection to use
* relid: OID of relation
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
* release_conn2: whether we should issue a COMMIT in conn2 to release
* its lock.
* start_xact: whether we need to issue a BEGIN;
*/
static void
lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact)
{
time_t start = time(NULL);
int i;
for (i = 1; ; i++)
{
time_t duration;
@ -657,13 +741,14 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
PGresult *res;
int wait_msec;
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
if (start_xact)
pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
duration = time(NULL) - start;
if (duration > wait_timeout)
{
const char *cancel_query;
if (PQserverVersion(connection) >= 80400 &&
if (PQserverVersion(conn) >= 80400 &&
duration > wait_timeout * 2)
{
elog(WARNING, "terminating conflicted backends");
@ -681,21 +766,15 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
" AND relation = $1 AND pid <> pg_backend_pid()";
}
command(cancel_query, 1, &relid);
pgut_command(conn, cancel_query, 1, &relid);
}
/* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
* lock.
*/
if (release_conn2)
pgut_command(conn2, "COMMIT", 0, NULL);
/* wait for a while to lock the table. */
wait_msec = Min(1000, i * 100);
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
command(sql, 0, NULL);
res = execute_elevel(lock_query, 0, NULL, DEBUG2);
res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);