diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 33ab3b7..8eb05f6 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -53,6 +53,24 @@ const char *PROGRAM_VERSION = "unknown"; "SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\ " AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)" +/* To be run while our main connection holds an AccessExclusive lock on the + * target table, and our secondary conn is attempting to grab an AccessShare + * lock. We know that "granted" must be false for these queries because + * we already hold the AccessExclusive lock. Also, we only care about other + * transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock + * level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE + * or TRUNCATE. + */ +#define CANCEL_COMPETING_LOCKS \ + "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ + " AND granted = false AND relation = %u"\ + " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" + +#define KILL_COMPETING_LOCKS \ + "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ + " AND granted = false AND relation = %u"\ + " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" + /* * per-table information */ @@ -95,7 +113,7 @@ static void repack_cleanup(bool fatal, void *userdata); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2); +static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -356,7 +374,7 @@ cleanup: } static int -apply_log(const repack_table *table, int count) +apply_log(PGconn *conn, const repack_table *table, int count) { int result; PGresult *res; @@ -370,8 +388,9 @@ apply_log(const repack_table *table, int count) params[4] = table->sql_pop; params[5] = utoa(count, buffer); - res = execute("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)", - 6, params); + res = pgut_execute(conn, + "SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)", + 6, params); result = atoi(PQgetvalue(res, 0, 0)); PQclear(res); @@ -421,8 +440,7 @@ repack_one_table(const repack_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE); - + lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE); /* * Check z_repack_trigger is the trigger executed at last so that @@ -444,27 +462,18 @@ repack_one_table(const repack_table *table, const char *orderby) command(table->enable_trigger, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); command(sql.data, 0, NULL); - command("COMMIT", 0, NULL); + /* While we are still holding an AccessExclusive lock on the table, submit + * the request for an AccessShare lock asynchronously from conn2. + * We want to submit this query in conn2 while connection's + * transaction still holds its lock, so that no DDL may sneak in + * between the time that connection commits and conn2 gets its lock. + * + */ pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); - elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name); - /* XXX: table name escaping? */ - printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", - table->target_name); - res = pgut_execute(conn2, sql.data, 0, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - printf("%s", PQerrorMessage(conn2)); - PQclear(res); - exit(1); - } - PQclear(res); - - elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name); - - /* store the backend PID of our connection keeping an ACCESS SHARE - lock on the target table. + /* grab the backend PID of conn2; we'll need this when querying + * pg_locks momentarily. */ res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -474,9 +483,80 @@ repack_one_table(const repack_table *table, const char *orderby) exit(1); } lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); - elog(WARNING, "Have backend PID: %s", lock_conn_pid); PQclear(res); + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", + table->target_name); + elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); + if (!(PQsendQuery(conn2, sql.data))) { + printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); + exit(1); + } + + /* Now that we've submitted the LOCK TABLE request through conn2, + * look for and cancel any (potentially dangerous) DDL commands which + * might also be waiting on our table lock at this point -- + * it's not safe to let them wait, because they may grab their + * AccessExclusive lock before conn2 gets its AccessShare lock, + * and perform unsafe DDL on the table. + * + * XXX: maybe we should use a loop canceling queries, as in + * lock_exclusive(). + */ + printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid); + res = execute(sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("Error canceling competing queries: %s", PQerrorMessage(connection)); + PQclear(res); + exit(1); + } + if (PQntuples(res) > 0) + { + elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); + + if (PQserverVersion(connection) >= 80400) + { + PQclear(res); + printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid); + res = execute(sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("Error killing competing queries: %s", PQerrorMessage(connection)); + PQclear(res); + exit(1); + } + } + + } + else + { + elog(DEBUG2, "No competing DDL to cancel."); + } + PQclear(res); + + + /* We're finished killing off any unsafe DDL. COMMIT in our main + * connection, so that conn2 may get its AccessShare lock. + */ + command("COMMIT", 0, NULL); + + /* Keep looping PQgetResult() calls until it returns NULL, indicating the + * command is done and we have obtained our lock. + */ + while ((res = PQgetResult(conn2))) + { + elog(DEBUG2, "Waiting on ACCESS SHARE lock..."); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2)); + PQclear(res); + exit(1); + } + PQclear(res); + } + + /* * Register the table to be dropped on error. We use pktype as * an advisory lock. The registration should be done after @@ -558,7 +638,7 @@ repack_one_table(const repack_table *table, const char *orderby) */ for (;;) { - num = apply_log(table, APPLY_COUNT); + num = apply_log(connection, table, APPLY_COUNT); if (num > 0) continue; /* there might be still some tuples, repeat. */ @@ -595,14 +675,17 @@ repack_one_table(const repack_table *table, const char *orderby) } /* - * 5. Swap. + * 5. Swap: will be done with conn2, since it already holds an + * AccessShare lock. */ elog(DEBUG2, "---- swap ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE); - apply_log(table, 0); + /* Bump our existing AccessShare lock to AccessExclusive */ + lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table, + FALSE); + apply_log(conn2, table, 0); params[0] = utoa(table->target_oid, buffer); - command("SELECT repack.repack_swap($1)", 1, params); - command("COMMIT", 0, NULL); + pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params); + pgut_command(conn2, "COMMIT", 0, NULL); /* * 6. Drop. @@ -616,6 +699,7 @@ repack_one_table(const repack_table *table, const char *orderby) pgut_atexit_pop(&repack_cleanup, (void *) table); free(vxid); + free(lock_conn_pid); /* * 7. Analyze. @@ -639,17 +723,17 @@ repack_one_table(const repack_table *table, const char *orderby) * Try acquire a table lock but avoid long time locks when conflict. * Arguments: * + * conn: connection to use * relid: OID of relation * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed - * release_conn2: whether we should issue a COMMIT in conn2 to release - * its lock. + * start_xact: whether we need to issue a BEGIN; */ static void -lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) +lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) { time_t start = time(NULL); int i; - + for (i = 1; ; i++) { time_t duration; @@ -657,13 +741,14 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) PGresult *res; int wait_msec; - command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); + if (start_xact) + pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); duration = time(NULL) - start; if (duration > wait_timeout) { const char *cancel_query; - if (PQserverVersion(connection) >= 80400 && + if (PQserverVersion(conn) >= 80400 && duration > wait_timeout * 2) { elog(WARNING, "terminating conflicted backends"); @@ -681,21 +766,15 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) " AND relation = $1 AND pid <> pg_backend_pid()"; } - command(cancel_query, 1, &relid); + pgut_command(conn, cancel_query, 1, &relid); } - /* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE - * lock. - */ - if (release_conn2) - pgut_command(conn2, "COMMIT", 0, NULL); - /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); command(sql, 0, NULL); - res = execute_elevel(lock_query, 0, NULL, DEBUG2); + res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res);