Several fixes for concurrent-DDL guard.
* KILL_COMPETING_LOCKS was using pg_cancel_backend() instead of pg_terminate_backend() * create kill_ddl() function for canceling+terminating any pending unsafe concurrent DDL, i.e. anyone hanging out waiting for an ACCESS EXCLUSIVE lock on our table. * create lock_access_share() function for reliably obtaining an ACCESS SHARE lock on the target table, killing off any queued ACCESS EXCLUSIVE lockers in the process via kill_ddl() * Avoid deadlock possible before we run: CREATE TABLE reorg.table_xxx AS SELECT ... FROM ONLY ... by using lock_access_share() * Fix a few calls in lock_exclusive() which were forgetting to specify the passed-in connection. These fixes are related to Issue #8. The main thing remaining AFAIK is to review or fix some of the unlikely-error handling bits; most of these should be marked with XXX now.
This commit is contained in:
parent
cf25780575
commit
ad00eb181d
201
bin/pg_repack.c
201
bin/pg_repack.c
@ -57,9 +57,8 @@ const char *PROGRAM_VERSION = "unknown";
|
|||||||
* target table, and our secondary conn is attempting to grab an AccessShare
|
* target table, and our secondary conn is attempting to grab an AccessShare
|
||||||
* lock. We know that "granted" must be false for these queries because
|
* lock. We know that "granted" must be false for these queries because
|
||||||
* we already hold the AccessExclusive lock. Also, we only care about other
|
* we already hold the AccessExclusive lock. Also, we only care about other
|
||||||
* transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock
|
* transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only
|
||||||
* level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE
|
* trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE.
|
||||||
* or TRUNCATE.
|
|
||||||
*/
|
*/
|
||||||
#define CANCEL_COMPETING_LOCKS \
|
#define CANCEL_COMPETING_LOCKS \
|
||||||
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
|
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
|
||||||
@ -67,7 +66,8 @@ const char *PROGRAM_VERSION = "unknown";
|
|||||||
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
|
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
|
||||||
|
|
||||||
#define KILL_COMPETING_LOCKS \
|
#define KILL_COMPETING_LOCKS \
|
||||||
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
|
"SELECT pg_terminate_backend(pid) "\
|
||||||
|
"FROM pg_locks WHERE locktype = 'relation'"\
|
||||||
" AND granted = false AND relation = %u"\
|
" AND granted = false AND relation = %u"\
|
||||||
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
|
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
|
||||||
|
|
||||||
@ -114,6 +114,8 @@ static void repack_cleanup(bool fatal, void *userdata);
|
|||||||
static char *getstr(PGresult *res, int row, int col);
|
static char *getstr(PGresult *res, int row, int col);
|
||||||
static Oid getoid(PGresult *res, int row, int col);
|
static Oid getoid(PGresult *res, int row, int col);
|
||||||
static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
|
static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
|
||||||
|
static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
|
||||||
|
static void lock_access_share(PGconn *conn, Oid relid, const char *target_name);
|
||||||
|
|
||||||
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
|
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
|
||||||
#define SQLSTATE_QUERY_CANCELED "57014"
|
#define SQLSTATE_QUERY_CANCELED "57014"
|
||||||
@ -485,11 +487,17 @@ repack_one_table(const repack_table *table, const char *orderby)
|
|||||||
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
|
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Not using lock_access_share() here since we know that
|
||||||
|
* it's not possible to obtain the ACCESS SHARE lock right now
|
||||||
|
* in conn2, since the primary connection holds ACCESS EXCLUSIVE.
|
||||||
|
*/
|
||||||
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
|
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
|
||||||
table->target_name);
|
table->target_name);
|
||||||
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
|
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
|
||||||
if (!(PQsendQuery(conn2, sql.data))) {
|
if (!(PQsendQuery(conn2, sql.data))) {
|
||||||
printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2));
|
printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2));
|
||||||
|
/* XXX: better error handling */
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -500,40 +508,13 @@ repack_one_table(const repack_table *table, const char *orderby)
|
|||||||
* AccessExclusive lock before conn2 gets its AccessShare lock,
|
* AccessExclusive lock before conn2 gets its AccessShare lock,
|
||||||
* and perform unsafe DDL on the table.
|
* and perform unsafe DDL on the table.
|
||||||
*
|
*
|
||||||
* XXX: maybe we should use a loop canceling queries, as in
|
* Normally, lock_access_share() would take care of this for us,
|
||||||
* lock_exclusive().
|
* but we're not able to use it here.
|
||||||
*/
|
*/
|
||||||
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid);
|
if (!(kill_ddl(connection, table->target_oid, true)))
|
||||||
res = execute(sql.data, 0, NULL);
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
{
|
||||||
printf("Error canceling competing queries: %s", PQerrorMessage(connection));
|
|
||||||
PQclear(res);
|
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
if (PQntuples(res) > 0)
|
|
||||||
{
|
|
||||||
elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res));
|
|
||||||
|
|
||||||
if (PQserverVersion(connection) >= 80400)
|
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid);
|
|
||||||
res = execute(sql.data, 0, NULL);
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
printf("Error killing competing queries: %s", PQerrorMessage(connection));
|
|
||||||
PQclear(res);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG2, "No competing DDL to cancel.");
|
|
||||||
}
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
|
|
||||||
/* We're finished killing off any unsafe DDL. COMMIT in our main
|
/* We're finished killing off any unsafe DDL. COMMIT in our main
|
||||||
@ -583,6 +564,21 @@ repack_one_table(const repack_table *table, const char *orderby)
|
|||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
command(table->delete_log, 0, NULL);
|
command(table->delete_log, 0, NULL);
|
||||||
|
|
||||||
|
/* We need to be able to obtain an AccessShare lock on the target table
|
||||||
|
* for the create_table command to go through, so go ahead and obtain
|
||||||
|
* the lock explicitly.
|
||||||
|
*
|
||||||
|
* Since conn2 has been diligently holding its AccessShare lock, it
|
||||||
|
* is possible that another transaction has been waiting to acquire
|
||||||
|
* an AccessExclusive lock on the table (e.g. a concurrent ALTER TABLE
|
||||||
|
* or TRUNCATE which we must not allow). If there are any such
|
||||||
|
* transactions, lock_access_share() will kill them so that our
|
||||||
|
* CREATE TABLE ... AS SELECT does not deadlock waiting for an
|
||||||
|
* AccessShare lock.
|
||||||
|
*/
|
||||||
|
lock_access_share(connection, table->target_oid, table->target_name);
|
||||||
|
|
||||||
command(table->create_table, 0, NULL);
|
command(table->create_table, 0, NULL);
|
||||||
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
|
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
|
||||||
if (table->drop_columns)
|
if (table->drop_columns)
|
||||||
@ -719,8 +715,134 @@ repack_one_table(const repack_table *table, const char *orderby)
|
|||||||
termStringInfo(&sql);
|
termStringInfo(&sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Kill off any concurrent DDL (or any transaction attempting to take
|
||||||
|
* an AccessExclusive lock) trying to run against our table. Note, we're
|
||||||
|
* killing these queries off *before* they are granted an AccessExclusive
|
||||||
|
* lock on our table.
|
||||||
|
*
|
||||||
|
* Returns true if no problems encountered, false otherwise.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
kill_ddl(PGconn *conn, Oid relid, bool terminate)
|
||||||
|
{
|
||||||
|
bool ret = true;
|
||||||
|
PGresult *res;
|
||||||
|
StringInfoData sql;
|
||||||
|
|
||||||
|
initStringInfo(&sql);
|
||||||
|
|
||||||
|
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid);
|
||||||
|
res = pgut_execute(conn, sql.data, 0, NULL);
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
printf("Error canceling unsafe queries: %s", PQerrorMessage(conn));
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
|
||||||
|
{
|
||||||
|
elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res));
|
||||||
|
|
||||||
|
PQclear(res);
|
||||||
|
printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
|
||||||
|
res = pgut_execute(conn, sql.data, 0, NULL);
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
printf("Error killing unsafe queries: %s", PQerrorMessage(conn));
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (PQntuples(res) > 0)
|
||||||
|
elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res));
|
||||||
|
else
|
||||||
|
elog(DEBUG2, "No competing DDL to cancel.");
|
||||||
|
|
||||||
|
|
||||||
|
PQclear(res);
|
||||||
|
termStringInfo(&sql);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try acquire a table lock but avoid long time locks when conflict.
|
* Try to acquire an ACCESS SHARE table lock, avoiding deadlocks and long
|
||||||
|
* waits by killing off other sessions which may be stuck trying to obtain
|
||||||
|
* an ACCESS EXCLUSIVE lock.
|
||||||
|
*
|
||||||
|
* Arguments:
|
||||||
|
*
|
||||||
|
* conn: connection to use
|
||||||
|
* relid: OID of relation
|
||||||
|
* target_name: name of table
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
lock_access_share(PGconn *conn, Oid relid, const char *target_name)
|
||||||
|
{
|
||||||
|
StringInfoData sql;
|
||||||
|
time_t start = time(NULL);
|
||||||
|
int i;
|
||||||
|
|
||||||
|
initStringInfo(&sql);
|
||||||
|
|
||||||
|
for (i = 1; ; i++)
|
||||||
|
{
|
||||||
|
time_t duration;
|
||||||
|
PGresult *res;
|
||||||
|
int wait_msec;
|
||||||
|
|
||||||
|
duration = time(NULL) - start;
|
||||||
|
|
||||||
|
/* Cancel queries unconditionally, i.e. don't bother waiting
|
||||||
|
* wait_timeout as lock_exclusive() does -- the only queries we
|
||||||
|
* should be killing are disallowed DDL commands hanging around
|
||||||
|
* for an AccessExclusive lock, which must be deadlocked at
|
||||||
|
* this point anyway since conn2 holds its AccessShare lock
|
||||||
|
* already.
|
||||||
|
*/
|
||||||
|
if (duration > (wait_timeout * 2))
|
||||||
|
kill_ddl(conn, relid, true);
|
||||||
|
else
|
||||||
|
kill_ddl(conn, relid, false);
|
||||||
|
|
||||||
|
/* wait for a while to lock the table. */
|
||||||
|
wait_msec = Min(1000, i * 100);
|
||||||
|
printfStringInfo(&sql, "SET LOCAL statement_timeout = %d", wait_msec);
|
||||||
|
pgut_command(conn, sql.data, 0, NULL);
|
||||||
|
|
||||||
|
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", target_name);
|
||||||
|
res = pgut_execute_elevel(conn, sql.data, 0, NULL, DEBUG2);
|
||||||
|
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
PQclear(res);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
|
||||||
|
{
|
||||||
|
/* XXX: does this ROLLBACK need any rethinking wrt. two connections
|
||||||
|
* now?
|
||||||
|
*/
|
||||||
|
/* retry if lock conflicted */
|
||||||
|
PQclear(res);
|
||||||
|
pgut_command(conn, "ROLLBACK", 0, NULL);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* exit otherwise */
|
||||||
|
printf("%s", PQerrorMessage(connection));
|
||||||
|
PQclear(res);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
termStringInfo(&sql);
|
||||||
|
pgut_command(conn, "RESET statement_timeout", 0, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long
|
||||||
|
* waits by killing off other sessions.
|
||||||
* Arguments:
|
* Arguments:
|
||||||
*
|
*
|
||||||
* conn: connection to use
|
* conn: connection to use
|
||||||
@ -772,7 +894,7 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
|
|||||||
/* wait for a while to lock the table. */
|
/* wait for a while to lock the table. */
|
||||||
wait_msec = Min(1000, i * 100);
|
wait_msec = Min(1000, i * 100);
|
||||||
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
|
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
|
||||||
command(sql, 0, NULL);
|
pgut_command(conn, sql, 0, NULL);
|
||||||
|
|
||||||
res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
|
res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
|
||||||
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||||
@ -782,9 +904,12 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
|
|||||||
}
|
}
|
||||||
else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
|
else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
|
||||||
{
|
{
|
||||||
|
/* XXX: does this ROLLBACK need any rethinking wrt. two connections
|
||||||
|
* now?
|
||||||
|
*/
|
||||||
/* retry if lock conflicted */
|
/* retry if lock conflicted */
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
command("ROLLBACK", 0, NULL);
|
pgut_command(conn, "ROLLBACK", 0, NULL);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -796,7 +921,7 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
command("RESET statement_timeout", 0, NULL);
|
pgut_command(conn, "RESET statement_timeout", 0, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
Loading…
x
Reference in New Issue
Block a user