From 78bae3871816b05e3031cfb7d99a24dd036fed19 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sat, 20 Oct 2012 16:27:54 -0700 Subject: [PATCH 01/12] Take an ACCESS SHARE LOCK on the target table, in an initial attempt to prevent concurrent DDL. This is a first pass at Daniele's suggestion in Issue #8, although it is definitely still buggy -- it is still possible for another transaction to get in an AccessExclusive lock and perform DDL either before the ACCESS SHARE lock is acquired or immediately after it is released. --- bin/pg_repack.c | 73 +++++++++++++++++++++++++++++++++++++++++----- bin/pgut/pgut-fe.c | 2 ++ bin/pgut/pgut-fe.h | 1 + 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 5960ab9..58ae024 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -35,14 +35,20 @@ const char *PROGRAM_VERSION = "unknown"; */ #define APPLY_COUNT 1000 -/* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted +/* Record the PIDs of any possibly-conflicting transactions. Ignore the PID + * of our primary connection, and our second connection holding an + * ACCESS SHARE table lock. + * The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted * servers. See GH ticket #1. */ #define SQL_XID_SNAPSHOT \ "SELECT repack.array_accum(virtualtransaction) FROM pg_locks"\ - " WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()"\ + " WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)"\ " AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0')" +/* Later, check whether any of the transactions we saw before are still + * alive, and wait for them to go away. + */ #define SQL_XID_ALIVE \ "SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\ " AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)" @@ -89,7 +95,7 @@ static void repack_cleanup(bool fatal, void *userdata); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(const char *relid, const char *lock_query); +static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -384,6 +390,7 @@ repack_one_table(const repack_table *table, const char *orderby) int i; int num_waiting = 0; char *vxid; + char *lock_conn_pid; char buffer[12]; StringInfoData sql; @@ -414,7 +421,8 @@ repack_one_table(const repack_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table); + lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE); + /* * Check z_repack_trigger is the trigger executed at last so that @@ -438,6 +446,37 @@ repack_one_table(const repack_table *table, const char *orderby) command(sql.data, 0, NULL); command("COMMIT", 0, NULL); + PQclear(PQexec(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED")); + elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name); + + /* XXX: table name escaping? */ + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", + table->target_name); + res = PQexec(conn2, sql.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + printf("%s", PQerrorMessage(conn2)); + PQclear(res); + exit(1); + } + PQclear(res); + + elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name); + + /* store the backend PID of our connection keeping an ACCESS SHARE + lock on the target table. + */ + res = PQexec(conn2, "SELECT pg_backend_pid()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("%s", PQerrorMessage(conn2)); + PQclear(res); + exit(1); + } + lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); + elog(WARNING, "Have backend PID: %s", lock_conn_pid); + PQclear(res); + /* * Register the table to be dropped on error. We use pktype as * an advisory lock. The registration should be done after @@ -455,9 +494,14 @@ repack_one_table(const repack_table *table, const char *orderby) command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL); if (orderby && !orderby[0]) command("SET LOCAL synchronize_seqscans = off", 0, NULL); - res = execute(SQL_XID_SNAPSHOT, 0, NULL); + + /* Fetch an array of Virtual IDs of all transactions active right now. + */ + params[0] = lock_conn_pid; + res = execute(SQL_XID_SNAPSHOT, 1, params); vxid = strdup(PQgetvalue(res, 0, 0)); PQclear(res); + command(table->delete_log, 0, NULL); command(table->create_table, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); @@ -554,7 +598,7 @@ repack_one_table(const repack_table *table, const char *orderby) * 5. Swap. */ elog(DEBUG2, "---- swap ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table); + lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE); apply_log(table, 0); params[0] = utoa(table->target_oid, buffer); command("SELECT repack.repack_swap($1)", 1, params); @@ -593,9 +637,15 @@ repack_one_table(const repack_table *table, const char *orderby) /* * Try acquire a table lock but avoid long time locks when conflict. + * Arguments: + * + * relid: OID of relation + * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed + * release_conn2: whether we should issue a COMMIT in conn2 to release + * its lock. */ static void -lock_exclusive(const char *relid, const char *lock_query) +lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) { time_t start = time(NULL); int i; @@ -634,6 +684,12 @@ lock_exclusive(const char *relid, const char *lock_query) command(cancel_query, 1, &relid); } + /* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE + * lock. + */ + if (release_conn2) + PQclear(PQexec(conn2, "COMMIT")); + /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); @@ -683,6 +739,9 @@ repack_cleanup(bool fatal, void *userdata) const char *params[1]; /* Rollback current transaction */ + if (conn2) + PQclear(PQexec(conn2, "ROLLBACK")); + if (connection) command("ROLLBACK", 0, NULL); diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index c6af0f2..b622974 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -24,6 +24,7 @@ char *password = NULL; YesNo prompt_password = DEFAULT; PGconn *connection = NULL; +PGconn *conn2 = NULL; static bool parse_pair(const char buffer[], char key[], char value[]); static char *get_username(void); @@ -51,6 +52,7 @@ reconnect(int elevel) appendStringInfo(&buf, "password=%s ", password); connection = pgut_connect(buf.data, prompt_password, elevel); + conn2 = pgut_connect(buf.data, prompt_password, elevel); /* update password */ if (connection) diff --git a/bin/pgut/pgut-fe.h b/bin/pgut/pgut-fe.h index 4e3ab31..49e6cfb 100644 --- a/bin/pgut/pgut-fe.h +++ b/bin/pgut/pgut-fe.h @@ -56,6 +56,7 @@ extern char *password; extern YesNo prompt_password; extern PGconn *connection; +extern PGconn *conn2; extern void pgut_help(bool details); extern void help(bool details); From 3606e0a957cf266a55ddfc20a5f60501de06325d Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sat, 20 Oct 2012 17:15:17 -0700 Subject: [PATCH 02/12] Switch to using pgut_command() and pgut_execute() for conn2. --- bin/pg_repack.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 58ae024..33ab3b7 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -446,13 +446,13 @@ repack_one_table(const repack_table *table, const char *orderby) command(sql.data, 0, NULL); command("COMMIT", 0, NULL); - PQclear(PQexec(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED")); + pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name); /* XXX: table name escaping? */ printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); - res = PQexec(conn2, sql.data); + res = pgut_execute(conn2, sql.data, 0, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) { printf("%s", PQerrorMessage(conn2)); @@ -466,7 +466,7 @@ repack_one_table(const repack_table *table, const char *orderby) /* store the backend PID of our connection keeping an ACCESS SHARE lock on the target table. */ - res = PQexec(conn2, "SELECT pg_backend_pid()"); + res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { printf("%s", PQerrorMessage(conn2)); @@ -688,7 +688,7 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) * lock. */ if (release_conn2) - PQclear(PQexec(conn2, "COMMIT")); + pgut_command(conn2, "COMMIT", 0, NULL); /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); @@ -740,7 +740,7 @@ repack_cleanup(bool fatal, void *userdata) /* Rollback current transaction */ if (conn2) - PQclear(PQexec(conn2, "ROLLBACK")); + pgut_command(conn2, "ROLLBACK", 0, NULL); if (connection) command("ROLLBACK", 0, NULL); From cf25780575eee72f927d4860a22508365c77f5e6 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sat, 20 Oct 2012 21:47:21 -0700 Subject: [PATCH 03/12] Further improvements to concurrent-DDL guard. Fix table locking so that race conditions don't exist between lock release in primary conn, and lock acquisition in conn2. Also, have conn2 be in charge of performing the table swap step, to avoid a similar race. Part of work for Issue #8. --- bin/pg_repack.c | 169 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 124 insertions(+), 45 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 33ab3b7..8eb05f6 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -53,6 +53,24 @@ const char *PROGRAM_VERSION = "unknown"; "SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\ " AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)" +/* To be run while our main connection holds an AccessExclusive lock on the + * target table, and our secondary conn is attempting to grab an AccessShare + * lock. We know that "granted" must be false for these queries because + * we already hold the AccessExclusive lock. Also, we only care about other + * transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock + * level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE + * or TRUNCATE. + */ +#define CANCEL_COMPETING_LOCKS \ + "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ + " AND granted = false AND relation = %u"\ + " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" + +#define KILL_COMPETING_LOCKS \ + "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ + " AND granted = false AND relation = %u"\ + " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" + /* * per-table information */ @@ -95,7 +113,7 @@ static void repack_cleanup(bool fatal, void *userdata); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2); +static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -356,7 +374,7 @@ cleanup: } static int -apply_log(const repack_table *table, int count) +apply_log(PGconn *conn, const repack_table *table, int count) { int result; PGresult *res; @@ -370,8 +388,9 @@ apply_log(const repack_table *table, int count) params[4] = table->sql_pop; params[5] = utoa(count, buffer); - res = execute("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)", - 6, params); + res = pgut_execute(conn, + "SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)", + 6, params); result = atoi(PQgetvalue(res, 0, 0)); PQclear(res); @@ -421,8 +440,7 @@ repack_one_table(const repack_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE); - + lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE); /* * Check z_repack_trigger is the trigger executed at last so that @@ -444,27 +462,18 @@ repack_one_table(const repack_table *table, const char *orderby) command(table->enable_trigger, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); command(sql.data, 0, NULL); - command("COMMIT", 0, NULL); + /* While we are still holding an AccessExclusive lock on the table, submit + * the request for an AccessShare lock asynchronously from conn2. + * We want to submit this query in conn2 while connection's + * transaction still holds its lock, so that no DDL may sneak in + * between the time that connection commits and conn2 gets its lock. + * + */ pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); - elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name); - /* XXX: table name escaping? */ - printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", - table->target_name); - res = pgut_execute(conn2, sql.data, 0, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - printf("%s", PQerrorMessage(conn2)); - PQclear(res); - exit(1); - } - PQclear(res); - - elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name); - - /* store the backend PID of our connection keeping an ACCESS SHARE - lock on the target table. + /* grab the backend PID of conn2; we'll need this when querying + * pg_locks momentarily. */ res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -474,9 +483,80 @@ repack_one_table(const repack_table *table, const char *orderby) exit(1); } lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); - elog(WARNING, "Have backend PID: %s", lock_conn_pid); PQclear(res); + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", + table->target_name); + elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); + if (!(PQsendQuery(conn2, sql.data))) { + printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); + exit(1); + } + + /* Now that we've submitted the LOCK TABLE request through conn2, + * look for and cancel any (potentially dangerous) DDL commands which + * might also be waiting on our table lock at this point -- + * it's not safe to let them wait, because they may grab their + * AccessExclusive lock before conn2 gets its AccessShare lock, + * and perform unsafe DDL on the table. + * + * XXX: maybe we should use a loop canceling queries, as in + * lock_exclusive(). + */ + printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid); + res = execute(sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("Error canceling competing queries: %s", PQerrorMessage(connection)); + PQclear(res); + exit(1); + } + if (PQntuples(res) > 0) + { + elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); + + if (PQserverVersion(connection) >= 80400) + { + PQclear(res); + printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid); + res = execute(sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("Error killing competing queries: %s", PQerrorMessage(connection)); + PQclear(res); + exit(1); + } + } + + } + else + { + elog(DEBUG2, "No competing DDL to cancel."); + } + PQclear(res); + + + /* We're finished killing off any unsafe DDL. COMMIT in our main + * connection, so that conn2 may get its AccessShare lock. + */ + command("COMMIT", 0, NULL); + + /* Keep looping PQgetResult() calls until it returns NULL, indicating the + * command is done and we have obtained our lock. + */ + while ((res = PQgetResult(conn2))) + { + elog(DEBUG2, "Waiting on ACCESS SHARE lock..."); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2)); + PQclear(res); + exit(1); + } + PQclear(res); + } + + /* * Register the table to be dropped on error. We use pktype as * an advisory lock. The registration should be done after @@ -558,7 +638,7 @@ repack_one_table(const repack_table *table, const char *orderby) */ for (;;) { - num = apply_log(table, APPLY_COUNT); + num = apply_log(connection, table, APPLY_COUNT); if (num > 0) continue; /* there might be still some tuples, repeat. */ @@ -595,14 +675,17 @@ repack_one_table(const repack_table *table, const char *orderby) } /* - * 5. Swap. + * 5. Swap: will be done with conn2, since it already holds an + * AccessShare lock. */ elog(DEBUG2, "---- swap ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE); - apply_log(table, 0); + /* Bump our existing AccessShare lock to AccessExclusive */ + lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table, + FALSE); + apply_log(conn2, table, 0); params[0] = utoa(table->target_oid, buffer); - command("SELECT repack.repack_swap($1)", 1, params); - command("COMMIT", 0, NULL); + pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params); + pgut_command(conn2, "COMMIT", 0, NULL); /* * 6. Drop. @@ -616,6 +699,7 @@ repack_one_table(const repack_table *table, const char *orderby) pgut_atexit_pop(&repack_cleanup, (void *) table); free(vxid); + free(lock_conn_pid); /* * 7. Analyze. @@ -639,17 +723,17 @@ repack_one_table(const repack_table *table, const char *orderby) * Try acquire a table lock but avoid long time locks when conflict. * Arguments: * + * conn: connection to use * relid: OID of relation * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed - * release_conn2: whether we should issue a COMMIT in conn2 to release - * its lock. + * start_xact: whether we need to issue a BEGIN; */ static void -lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) +lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) { time_t start = time(NULL); int i; - + for (i = 1; ; i++) { time_t duration; @@ -657,13 +741,14 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) PGresult *res; int wait_msec; - command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); + if (start_xact) + pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); duration = time(NULL) - start; if (duration > wait_timeout) { const char *cancel_query; - if (PQserverVersion(connection) >= 80400 && + if (PQserverVersion(conn) >= 80400 && duration > wait_timeout * 2) { elog(WARNING, "terminating conflicted backends"); @@ -681,21 +766,15 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) " AND relation = $1 AND pid <> pg_backend_pid()"; } - command(cancel_query, 1, &relid); + pgut_command(conn, cancel_query, 1, &relid); } - /* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE - * lock. - */ - if (release_conn2) - pgut_command(conn2, "COMMIT", 0, NULL); - /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); command(sql, 0, NULL); - res = execute_elevel(lock_query, 0, NULL, DEBUG2); + res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); From ad00eb181de0448ecca8db05d2485597e372202a Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Tue, 23 Oct 2012 20:41:46 -0700 Subject: [PATCH 04/12] Several fixes for concurrent-DDL guard. * KILL_COMPETING_LOCKS was using pg_cancel_backend() instead of pg_terminate_backend() * create kill_ddl() function for canceling+terminating any pending unsafe concurrent DDL, i.e. anyone hanging out waiting for an ACCESS EXCLUSIVE lock on our table. * create lock_access_share() function for reliably obtaining an ACCESS SHARE lock on the target table, killing off any queued ACCESS EXCLUSIVE lockers in the process via kill_ddl() * Avoid deadlock possible before we run: CREATE TABLE reorg.table_xxx AS SELECT ... FROM ONLY ... by using lock_access_share() * Fix a few calls in lock_exclusive() which were forgetting to specify the passed-in connection. These fixes are related to Issue #8. The main thing remaining AFAIK is to review or fix some of the unlikely-error handling bits; most of these should be marked with XXX now. --- bin/pg_repack.c | 201 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 163 insertions(+), 38 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 8eb05f6..8b84f4f 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -57,9 +57,8 @@ const char *PROGRAM_VERSION = "unknown"; * target table, and our secondary conn is attempting to grab an AccessShare * lock. We know that "granted" must be false for these queries because * we already hold the AccessExclusive lock. Also, we only care about other - * transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock - * level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE - * or TRUNCATE. + * transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only + * trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE. */ #define CANCEL_COMPETING_LOCKS \ "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ @@ -67,7 +66,8 @@ const char *PROGRAM_VERSION = "unknown"; " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" #define KILL_COMPETING_LOCKS \ - "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ + "SELECT pg_terminate_backend(pid) "\ + "FROM pg_locks WHERE locktype = 'relation'"\ " AND granted = false AND relation = %u"\ " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" @@ -114,6 +114,8 @@ static void repack_cleanup(bool fatal, void *userdata); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); +static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); +static void lock_access_share(PGconn *conn, Oid relid, const char *target_name); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -485,11 +487,17 @@ repack_one_table(const repack_table *table, const char *orderby) lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); PQclear(res); + /* + * Not using lock_access_share() here since we know that + * it's not possible to obtain the ACCESS SHARE lock right now + * in conn2, since the primary connection holds ACCESS EXCLUSIVE. + */ printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); if (!(PQsendQuery(conn2, sql.data))) { printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); + /* XXX: better error handling */ exit(1); } @@ -500,40 +508,13 @@ repack_one_table(const repack_table *table, const char *orderby) * AccessExclusive lock before conn2 gets its AccessShare lock, * and perform unsafe DDL on the table. * - * XXX: maybe we should use a loop canceling queries, as in - * lock_exclusive(). + * Normally, lock_access_share() would take care of this for us, + * but we're not able to use it here. */ - printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid); - res = execute(sql.data, 0, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) + if (!(kill_ddl(connection, table->target_oid, true))) { - printf("Error canceling competing queries: %s", PQerrorMessage(connection)); - PQclear(res); exit(1); } - if (PQntuples(res) > 0) - { - elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); - - if (PQserverVersion(connection) >= 80400) - { - PQclear(res); - printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid); - res = execute(sql.data, 0, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - printf("Error killing competing queries: %s", PQerrorMessage(connection)); - PQclear(res); - exit(1); - } - } - - } - else - { - elog(DEBUG2, "No competing DDL to cancel."); - } - PQclear(res); /* We're finished killing off any unsafe DDL. COMMIT in our main @@ -583,6 +564,21 @@ repack_one_table(const repack_table *table, const char *orderby) PQclear(res); command(table->delete_log, 0, NULL); + + /* We need to be able to obtain an AccessShare lock on the target table + * for the create_table command to go through, so go ahead and obtain + * the lock explicitly. + * + * Since conn2 has been diligently holding its AccessShare lock, it + * is possible that another transaction has been waiting to acquire + * an AccessExclusive lock on the table (e.g. a concurrent ALTER TABLE + * or TRUNCATE which we must not allow). If there are any such + * transactions, lock_access_share() will kill them so that our + * CREATE TABLE ... AS SELECT does not deadlock waiting for an + * AccessShare lock. + */ + lock_access_share(connection, table->target_oid, table->target_name); + command(table->create_table, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); if (table->drop_columns) @@ -719,8 +715,134 @@ repack_one_table(const repack_table *table, const char *orderby) termStringInfo(&sql); } +/* Kill off any concurrent DDL (or any transaction attempting to take + * an AccessExclusive lock) trying to run against our table. Note, we're + * killing these queries off *before* they are granted an AccessExclusive + * lock on our table. + * + * Returns true if no problems encountered, false otherwise. + */ +static bool +kill_ddl(PGconn *conn, Oid relid, bool terminate) +{ + bool ret = true; + PGresult *res; + StringInfoData sql; + + initStringInfo(&sql); + + printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid); + res = pgut_execute(conn, sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("Error canceling unsafe queries: %s", PQerrorMessage(conn)); + ret = false; + } + else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400) + { + elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); + + PQclear(res); + printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid); + res = pgut_execute(conn, sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("Error killing unsafe queries: %s", PQerrorMessage(conn)); + ret = false; + } + } + else if (PQntuples(res) > 0) + elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res)); + else + elog(DEBUG2, "No competing DDL to cancel."); + + + PQclear(res); + termStringInfo(&sql); + + return ret; +} + + /* - * Try acquire a table lock but avoid long time locks when conflict. + * Try to acquire an ACCESS SHARE table lock, avoiding deadlocks and long + * waits by killing off other sessions which may be stuck trying to obtain + * an ACCESS EXCLUSIVE lock. + * + * Arguments: + * + * conn: connection to use + * relid: OID of relation + * target_name: name of table + */ +static void +lock_access_share(PGconn *conn, Oid relid, const char *target_name) +{ + StringInfoData sql; + time_t start = time(NULL); + int i; + + initStringInfo(&sql); + + for (i = 1; ; i++) + { + time_t duration; + PGresult *res; + int wait_msec; + + duration = time(NULL) - start; + + /* Cancel queries unconditionally, i.e. don't bother waiting + * wait_timeout as lock_exclusive() does -- the only queries we + * should be killing are disallowed DDL commands hanging around + * for an AccessExclusive lock, which must be deadlocked at + * this point anyway since conn2 holds its AccessShare lock + * already. + */ + if (duration > (wait_timeout * 2)) + kill_ddl(conn, relid, true); + else + kill_ddl(conn, relid, false); + + /* wait for a while to lock the table. */ + wait_msec = Min(1000, i * 100); + printfStringInfo(&sql, "SET LOCAL statement_timeout = %d", wait_msec); + pgut_command(conn, sql.data, 0, NULL); + + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", target_name); + res = pgut_execute_elevel(conn, sql.data, 0, NULL, DEBUG2); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + break; + } + else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED)) + { + /* XXX: does this ROLLBACK need any rethinking wrt. two connections + * now? + */ + /* retry if lock conflicted */ + PQclear(res); + pgut_command(conn, "ROLLBACK", 0, NULL); + continue; + } + else + { + /* exit otherwise */ + printf("%s", PQerrorMessage(connection)); + PQclear(res); + exit(1); + } + } + + termStringInfo(&sql); + pgut_command(conn, "RESET statement_timeout", 0, NULL); +} + + +/* + * Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long + * waits by killing off other sessions. * Arguments: * * conn: connection to use @@ -772,7 +894,7 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); - command(sql, 0, NULL); + pgut_command(conn, sql, 0, NULL); res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -782,9 +904,12 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta } else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED)) { + /* XXX: does this ROLLBACK need any rethinking wrt. two connections + * now? + */ /* retry if lock conflicted */ PQclear(res); - command("ROLLBACK", 0, NULL); + pgut_command(conn, "ROLLBACK", 0, NULL); continue; } else @@ -796,7 +921,7 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta } } - command("RESET statement_timeout", 0, NULL); + pgut_command(conn, "RESET statement_timeout", 0, NULL); } /* From ad75dcfbb11722db2e30085350a23655233558de Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Tue, 6 Nov 2012 22:07:46 -0700 Subject: [PATCH 05/12] Allow multiple --table options to be specified on the command-line. Per Issue #18. SimpleStringList code borrowed from pg_dump and a pending patch to add similar functionality to pg_restore, clusterdb, vacuumdb, and reindexdb. The error handling in reorg_one_table() could still be much improved, so that an error processing a single table doesn't cause pg_reorg to necessarily bail out and skip further tables, but I'll leave that for another day. --- bin/pg_repack.c | 32 ++++++++++++++++++++------------ bin/pgut/pgut-fe.c | 11 +++++++++-- bin/pgut/pgut-fe.h | 5 +++-- bin/pgut/pgut.c | 34 ++++++++++++++++++++++++++++++++++ bin/pgut/pgut.h | 17 +++++++++++++++++ 5 files changed, 83 insertions(+), 16 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 8b84f4f..ea3b732 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -125,12 +125,12 @@ static bool sqlstate_equals(PGresult *res, const char *state) return strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), state) == 0; } -static bool analyze = true; -static bool alldb = false; -static bool noorder = false; -static char *table = NULL; -static char *orderby = NULL; -static int wait_timeout = 60; /* in seconds */ +static bool analyze = true; +static bool alldb = false; +static bool noorder = false; +static SimpleStringList table_list = {NULL, NULL}; +static char *orderby = NULL; +static int wait_timeout = 60; /* in seconds */ /* buffer should have at least 11 bytes */ static char * @@ -143,7 +143,7 @@ utoa(unsigned int value, char *buffer) static pgut_option options[] = { { 'b', 'a', "all", &alldb }, - { 's', 't', "table", &table }, + { 'l', 't', "table", &table_list }, { 'b', 'n', "no-order", &noorder }, { 's', 'o', "order-by", &orderby }, { 'i', 'T', "wait-timeout", &wait_timeout }, @@ -154,7 +154,8 @@ static pgut_option options[] = int main(int argc, char *argv[]) { - int i; + int i; + SimpleStringListCell *cell; i = pgut_getopt(argc, argv, options); @@ -170,7 +171,7 @@ main(int argc, char *argv[]) if (alldb) { - if (table) + if (table_list.head != NULL) ereport(ERROR, (errcode(EINVAL), errmsg("cannot repack a specific table in all databases"))); @@ -178,10 +179,17 @@ main(int argc, char *argv[]) } else { - if (!repack_one_database(orderby, table)) + if (table_list.head != NULL) + { + for (cell = table_list.head; cell; cell = cell->next) + { + repack_one_database(orderby, cell->val); + } + } + else if (!repack_one_database(orderby, NULL)) ereport(ERROR, - (errcode(ENOENT), - errmsg("%s is not installed", PROGRAM_NAME))); + (errcode(ENOENT), + errmsg("%s is not installed", PROGRAM_NAME))); } return 0; diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index b622974..bcea19f 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -144,9 +144,12 @@ pgut_setopt(pgut_option *opt, const char *optarg, pgut_optsrc src) /* high prior value has been set already. */ return; } - else if (src >= SOURCE_CMDLINE && opt->source >= src) + else if (src >= SOURCE_CMDLINE && opt->source >= src && opt->type != 'l') { - /* duplicated option in command line */ + /* duplicated option in command line -- don't worry if the option + * type is 'l' i.e. SimpleStringList, since we are allowed to have + * multiples of these. + */ message = "specified only once"; } else @@ -177,6 +180,10 @@ pgut_setopt(pgut_option *opt, const char *optarg, pgut_optsrc src) return; message = "a 32bit signed integer"; break; + case 'l': + message = "a List"; + simple_string_list_append(opt->var, optarg); + return; case 'u': if (parse_uint32(optarg, opt->var)) return; diff --git a/bin/pgut/pgut-fe.h b/bin/pgut/pgut-fe.h index 49e6cfb..7529587 100644 --- a/bin/pgut/pgut-fe.h +++ b/bin/pgut/pgut-fe.h @@ -25,13 +25,14 @@ typedef enum pgut_optsrc * type: * b: bool (true) * B: bool (false) - * f: pgut_optfn + * f: pgut_optfn * i: 32bit signed integer + * l: StringList * u: 32bit unsigned integer * I: 64bit signed integer * U: 64bit unsigned integer * s: string - * t: time_t + * t: time_t * y: YesNo (YES) * Y: YesNo (NO) */ diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index dca9853..5a86330 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -384,6 +384,40 @@ parse_time(const char *value, time_t *time) return true; } +/* Append the given string `val` to the `list` */ +void +simple_string_list_append(SimpleStringList *list, const char *val) +{ + SimpleStringListCell *cell; + + /* this calculation correctly accounts for the null trailing byte */ + cell = (SimpleStringListCell *) + pgut_malloc(sizeof(SimpleStringListCell) + strlen(val)); + cell->next = NULL; + strcpy(cell->val, val); + + if (list->tail) + list->tail->next = cell; + else + list->head = cell; + list->tail = cell; +} + +/* Test whether `val` is in the given `list` */ +bool +simple_string_list_member(SimpleStringList *list, const char *val) +{ + SimpleStringListCell *cell; + + for (cell = list->head; cell; cell = cell->next) + { + if (strcmp(cell->val, val) == 0) + return true; + } + return false; +} + + static char * prompt_for_password(void) { diff --git a/bin/pgut/pgut.h b/bin/pgut/pgut.h index f5217e6..f0b0f99 100644 --- a/bin/pgut/pgut.h +++ b/bin/pgut/pgut.h @@ -171,6 +171,23 @@ extern bool parse_time(const char *value, time_t *time); #define ToLower(c) (tolower((unsigned char)(c))) #define ToUpper(c) (toupper((unsigned char)(c))) +/* linked list of string values and helper functions, stolen from pg_dump. */ +typedef struct SimpleStringListCell +{ + struct SimpleStringListCell *next; + char val[1]; /* VARIABLE LENGTH FIELD */ +} SimpleStringListCell; + +typedef struct SimpleStringList +{ + SimpleStringListCell *head; + SimpleStringListCell *tail; +} SimpleStringList; + +extern void simple_string_list_append(SimpleStringList *list, const char *val); +extern bool simple_string_list_member(SimpleStringList *list, const char *val); + + /* * socket operations */ From 00ddb1edf96e429707d7b7430ff01d6cbdd99976 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sun, 11 Nov 2012 20:20:48 -0500 Subject: [PATCH 06/12] Improved error handling, particularly when processing multiple tables. Previously, an error while processing any single table would cause pg_reorg to cause exit() and bail out. Quick summary of fixes: * get rid of pgut_atexit_push() and pgut_atexit_pop() use, since we are no longer relying on calling exit() to handle mundane errors * remove lock_conn_pid variable; we can just use buffer instead * lock_exclusive() and lock_access_share() now return bool instead of bailing out on any error * ERROR-level ereport() or elog() calls now return WARNING instead, to avoid bailing out unnecessarily * signature of reorg_cleanup() changed; it no longer needs to take a void pointer * check return of strdup() for vxid * Use pgut_rollback() instead of sending ROLLBACK; command directly There are still one or two FIXMEs left, including fixing table name escaping, but I'm committing this much. --- bin/pg_repack.c | 198 +++++++++++++++++++++++++++++------------------- 1 file changed, 121 insertions(+), 77 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index ea3b732..1b5cd2c 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -107,15 +107,15 @@ typedef struct repack_index } repack_index; static void repack_all_databases(const char *order_by); -static bool repack_one_database(const char *order_by, const char *table); +static bool repack_one_database(const char *order_by); static void repack_one_table(const repack_table *table, const char *order_by); -static void repack_cleanup(bool fatal, void *userdata); +static void repack_cleanup(bool fatal, const repack_table *table); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); +static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); -static void lock_access_share(PGconn *conn, Oid relid, const char *target_name); +static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -155,7 +155,6 @@ int main(int argc, char *argv[]) { int i; - SimpleStringListCell *cell; i = pgut_getopt(argc, argv, options); @@ -171,22 +170,15 @@ main(int argc, char *argv[]) if (alldb) { - if (table_list.head != NULL) + if (table_list.head) ereport(ERROR, (errcode(EINVAL), - errmsg("cannot repack a specific table in all databases"))); + errmsg("cannot repack specific table(s) in all databases"))); repack_all_databases(orderby); } else { - if (table_list.head != NULL) - { - for (cell = table_list.head; cell; cell = cell->next) - { - repack_one_database(orderby, cell->val); - } - } - else if (!repack_one_database(orderby, NULL)) + if (!repack_one_database(orderby)) ereport(ERROR, (errcode(ENOENT), errmsg("%s is not installed", PROGRAM_NAME))); @@ -221,7 +213,7 @@ repack_all_databases(const char *orderby) fflush(stdout); } - ret = repack_one_database(orderby, NULL); + ret = repack_one_database(orderby); if (pgut_log_level >= INFO) { @@ -259,13 +251,14 @@ getoid(PGresult *res, int row, int col) * Call repack_one_table for the target table or each table in a database. */ static bool -repack_one_database(const char *orderby, const char *table) +repack_one_database(const char *orderby) { - bool ret = true; - PGresult *res; - int i; - int num; - StringInfoData sql; + bool ret = true; + PGresult *res; + int i; + int num; + StringInfoData sql; + SimpleStringListCell *cell; initStringInfo(&sql); @@ -282,10 +275,18 @@ repack_one_database(const char *orderby, const char *table) /* acquire target tables */ appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE "); - if (table) + if (table_list.head) { - appendStringInfoString(&sql, "relid = $1::regclass"); - res = execute_elevel(sql.data, 1, &table, DEBUG2); + appendStringInfoString(&sql, "( "); + for (cell = table_list.head; cell; cell = cell->next) + { + /* FIXME: bogus table quoting */ + appendStringInfo(&sql, "relid = '%s'::regclass", cell->val); + if (cell->next) + appendStringInfoString(&sql, " OR "); + } + appendStringInfoString(&sql, " )"); + res = execute_elevel(sql.data, 0, NULL, DEBUG2); } else { @@ -300,6 +301,7 @@ repack_one_database(const char *orderby, const char *table) if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)) { /* Schema repack does not exist. Skip the database. */ + PQclear(res); ret = false; goto cleanup; } @@ -349,9 +351,12 @@ repack_one_database(const char *orderby, const char *table) { /* CLUSTER mode */ if (ckey == NULL) - ereport(ERROR, + { + ereport(WARNING, (errcode(E_PG_COMMAND), errmsg("relation \"%s\" has no cluster key", table.target_name))); + continue; + } appendStringInfo(&sql, "%s ORDER BY %s", create_table, ckey); table.create_table = sql.data; } @@ -418,12 +423,13 @@ repack_one_table(const repack_table *table, const char *orderby) int num; int i; int num_waiting = 0; - char *vxid; - char *lock_conn_pid; + char *vxid = NULL; char buffer[12]; StringInfoData sql; + bool have_error = false; initStringInfo(&sql); + printf("Repack table: %s\n", table->target_name); elog(DEBUG2, "---- repack_one_table ----"); elog(DEBUG2, "target_name : %s", table->target_name); @@ -450,7 +456,12 @@ repack_one_table(const repack_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE); + if (!(lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE))) + { + elog(WARNING, "lock_exclusive() failed for %s", table->target_name); + have_error = true; + goto cleanup; + } /* * Check z_repack_trigger is the trigger executed at last so that @@ -460,10 +471,15 @@ repack_one_table(const repack_table *table, const char *orderby) res = execute("SELECT repack.conflicted_triggers($1)", 1, params); if (PQntuples(res) > 0) - ereport(ERROR, + { + ereport(WARNING, (errcode(E_PG_COMMAND), errmsg("trigger %s conflicted for %s", PQgetvalue(res, 0, 0), table->target_name))); + PQclear(res); + have_error = true; + goto cleanup; + } PQclear(res); command(table->create_pktype, 0, NULL); @@ -478,7 +494,6 @@ repack_one_table(const repack_table *table, const char *orderby) * We want to submit this query in conn2 while connection's * transaction still holds its lock, so that no DDL may sneak in * between the time that connection commits and conn2 gets its lock. - * */ pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); @@ -490,9 +505,11 @@ repack_one_table(const repack_table *table, const char *orderby) { printf("%s", PQerrorMessage(conn2)); PQclear(res); - exit(1); + have_error = true; + goto cleanup; } - lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); + buffer[0] = '\0'; + strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1); PQclear(res); /* @@ -504,9 +521,10 @@ repack_one_table(const repack_table *table, const char *orderby) table->target_name); elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); if (!(PQsendQuery(conn2, sql.data))) { - printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); - /* XXX: better error handling */ - exit(1); + elog(WARNING, "Error sending async query: %s\n%s", sql.data, + PQerrorMessage(conn2)); + have_error = true; + goto cleanup; } /* Now that we've submitted the LOCK TABLE request through conn2, @@ -521,10 +539,11 @@ repack_one_table(const repack_table *table, const char *orderby) */ if (!(kill_ddl(connection, table->target_oid, true))) { - exit(1); + elog(WARNING, "kill_ddl() failed."); + have_error = true; + goto cleanup; } - /* We're finished killing off any unsafe DDL. COMMIT in our main * connection, so that conn2 may get its AccessShare lock. */ @@ -540,19 +559,12 @@ repack_one_table(const repack_table *table, const char *orderby) { printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2)); PQclear(res); - exit(1); + have_error = true; + goto cleanup; } PQclear(res); } - - /* - * Register the table to be dropped on error. We use pktype as - * an advisory lock. The registration should be done after - * the first command succeeds. - */ - pgut_atexit_push(&repack_cleanup, (void *) table); - /* * 2. Copy tuples into temp table. */ @@ -566,9 +578,16 @@ repack_one_table(const repack_table *table, const char *orderby) /* Fetch an array of Virtual IDs of all transactions active right now. */ - params[0] = lock_conn_pid; + params[0] = buffer; res = execute(SQL_XID_SNAPSHOT, 1, params); - vxid = strdup(PQgetvalue(res, 0, 0)); + if (!(vxid = strdup(PQgetvalue(res, 0, 0)))) + { + elog(WARNING, "Unable to allocate vxid, length: %d\n", + PQgetlength(res, 0, 0)); + PQclear(res); + have_error = true; + goto cleanup; + } PQclear(res); command(table->delete_log, 0, NULL); @@ -585,7 +604,11 @@ repack_one_table(const repack_table *table, const char *orderby) * CREATE TABLE ... AS SELECT does not deadlock waiting for an * AccessShare lock. */ - lock_access_share(connection, table->target_oid, table->target_name); + if (!(lock_access_share(connection, table->target_oid, table->target_name))) + { + have_error = true; + goto cleanup; + } command(table->create_table, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); @@ -684,8 +707,16 @@ repack_one_table(const repack_table *table, const char *orderby) */ elog(DEBUG2, "---- swap ----"); /* Bump our existing AccessShare lock to AccessExclusive */ - lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table, - FALSE); + + if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer), + table->lock_table, FALSE))) + { + elog(WARNING, "lock_exclusive() failed in conn2 for %s", + table->target_name); + have_error = true; + goto cleanup; + } + apply_log(conn2, table, 0); params[0] = utoa(table->target_oid, buffer); pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params); @@ -701,10 +732,6 @@ repack_one_table(const repack_table *table, const char *orderby) command("SELECT repack.repack_drop($1)", 1, params); command("COMMIT", 0, NULL); - pgut_atexit_pop(&repack_cleanup, (void *) table); - free(vxid); - free(lock_conn_pid); - /* * 7. Analyze. * Note that cleanup hook has been already uninstalled here because analyze @@ -720,7 +747,16 @@ repack_one_table(const repack_table *table, const char *orderby) command("COMMIT", 0, NULL); } +cleanup: termStringInfo(&sql); + if (vxid) + free(vxid); + + /* XXX: distinguish between fatal and non-fatal errors via the first + * arg to repack_cleanup(). + */ + if (have_error) + repack_cleanup(false, table); } /* Kill off any concurrent DDL (or any transaction attempting to take @@ -743,19 +779,23 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate) res = pgut_execute(conn, sql.data, 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - printf("Error canceling unsafe queries: %s", PQerrorMessage(conn)); + elog(WARNING, "Error canceling unsafe queries: %s", + PQerrorMessage(conn)); ret = false; } else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400) { - elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); + elog(WARNING, + "Canceled %d unsafe queries. Terminating any remaining PIDs.", + PQntuples(res)); PQclear(res); printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid); res = pgut_execute(conn, sql.data, 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - printf("Error killing unsafe queries: %s", PQerrorMessage(conn)); + elog(WARNING, "Error killing unsafe queries: %s", + PQerrorMessage(conn)); ret = false; } } @@ -764,7 +804,6 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate) else elog(DEBUG2, "No competing DDL to cancel."); - PQclear(res); termStringInfo(&sql); @@ -783,12 +822,13 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate) * relid: OID of relation * target_name: name of table */ -static void +static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name) { StringInfoData sql; time_t start = time(NULL); int i; + bool ret = true; initStringInfo(&sql); @@ -808,9 +848,12 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) * already. */ if (duration > (wait_timeout * 2)) - kill_ddl(conn, relid, true); + ret = kill_ddl(conn, relid, true); else - kill_ddl(conn, relid, false); + ret = kill_ddl(conn, relid, false); + + if (!ret) + break; /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); @@ -837,14 +880,16 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) else { /* exit otherwise */ - printf("%s", PQerrorMessage(connection)); + elog(WARNING, "%s", PQerrorMessage(connection)); PQclear(res); - exit(1); + ret = false; + break; } } termStringInfo(&sql); pgut_command(conn, "RESET statement_timeout", 0, NULL); + return ret; } @@ -858,11 +903,12 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed * start_xact: whether we need to issue a BEGIN; */ -static void +static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) { time_t start = time(NULL); int i; + bool ret = true; for (i = 1; ; i++) { @@ -925,11 +971,13 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta /* exit otherwise */ printf("%s", PQerrorMessage(connection)); PQclear(res); - exit(1); + ret = false; + break; } } pgut_command(conn, "RESET statement_timeout", 0, NULL); + return ret; } /* @@ -937,10 +985,8 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta * objects before the program exits. */ static void -repack_cleanup(bool fatal, void *userdata) +repack_cleanup(bool fatal, const repack_table *table) { - const repack_table *table = (const repack_table *) userdata; - if (fatal) { fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n"); @@ -950,15 +996,13 @@ repack_cleanup(bool fatal, void *userdata) char buffer[12]; const char *params[1]; - /* Rollback current transaction */ - if (conn2) - pgut_command(conn2, "ROLLBACK", 0, NULL); - - if (connection) - command("ROLLBACK", 0, NULL); + /* Rollback current transactions */ + pgut_rollback(connection); + pgut_rollback(conn2); /* Try reconnection if not available. */ - if (PQstatus(connection) != CONNECTION_OK) + if (PQstatus(connection) != CONNECTION_OK || + PQstatus(conn2) != CONNECTION_OK) reconnect(ERROR); /* do cleanup */ From 34605aef2764aba303eedf13911aa04d371acf99 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Wed, 14 Nov 2012 15:27:54 -0500 Subject: [PATCH 07/12] Fix bogus use of table name parameters. Mimic the original code, which used execute_elevel() with params to pass in table names which are assumed to be quoted already by the user. --- bin/pg_repack.c | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 1b5cd2c..4ce4e6f 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -116,6 +116,7 @@ static Oid getoid(PGresult *res, int row, int col); static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name); +static size_t simple_string_list_size(SimpleStringList string_list); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -247,6 +248,22 @@ getoid(PGresult *res, int row, int col) return (Oid)strtoul(PQgetvalue(res, row, col), NULL, 10); } +/* Returns the number of elements in the given SimpleStringList */ +static size_t +simple_string_list_size(SimpleStringList string_list) +{ + size_t i = 0; + SimpleStringListCell *cell = table_list.head; + + while (cell) + { + cell = cell->next; + i++; + } + + return i; +} + /* * Call repack_one_table for the target table or each table in a database. */ @@ -259,6 +276,11 @@ repack_one_database(const char *orderby) int num; StringInfoData sql; SimpleStringListCell *cell; + const char **params = NULL; + size_t num_params = simple_string_list_size(table_list); + + if (num_params) + params = pgut_malloc(num_params * sizeof(char *)); initStringInfo(&sql); @@ -275,18 +297,19 @@ repack_one_database(const char *orderby) /* acquire target tables */ appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE "); - if (table_list.head) + if (num_params) { - appendStringInfoString(&sql, "( "); - for (cell = table_list.head; cell; cell = cell->next) + appendStringInfoString(&sql, "("); + for (i = 0, cell = table_list.head; cell; cell = cell->next, i++) { - /* FIXME: bogus table quoting */ - appendStringInfo(&sql, "relid = '%s'::regclass", cell->val); + /* Construct table name placeholders to be used by PQexecParams */ + appendStringInfo(&sql, "relid = $%d::regclass", i + 1); + params[i] = cell->val; if (cell->next) appendStringInfoString(&sql, " OR "); } - appendStringInfoString(&sql, " )"); - res = execute_elevel(sql.data, 0, NULL, DEBUG2); + appendStringInfoString(&sql, ")"); + res = execute_elevel(sql.data, (int) num_params, params, DEBUG2); } else { From 40626769d8b0be98f8e85d26f76647f76a94d51e Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Wed, 14 Nov 2012 20:47:47 -0500 Subject: [PATCH 08/12] Since commit 742380f0429b6 included some of the same changes already in 6a0af679e14d1d, go ahead and include a few more of the error cleanup from 6a0af679e14d1d. --- bin/pg_repack.c | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 4ce4e6f..20e5756 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -331,7 +331,7 @@ repack_one_database(const char *orderby) else { /* exit otherwise */ - printf("%s", PQerrorMessage(connection)); + elog(ERROR, "%s", PQerrorMessage(connection)); PQclear(res); exit(1); } @@ -892,12 +892,9 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) } else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED)) { - /* XXX: does this ROLLBACK need any rethinking wrt. two connections - * now? - */ /* retry if lock conflicted */ PQclear(res); - pgut_command(conn, "ROLLBACK", 0, NULL); + pgut_rollback(conn); continue; } else @@ -981,12 +978,9 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta } else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED)) { - /* XXX: does this ROLLBACK need any rethinking wrt. two connections - * now? - */ /* retry if lock conflicted */ PQclear(res); - pgut_command(conn, "ROLLBACK", 0, NULL); + pgut_rollback(conn); continue; } else From decd8223937eceadcf5479a445c614548a3ac18b Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Wed, 14 Nov 2012 19:02:31 -0700 Subject: [PATCH 09/12] Move simple_string_list_size() into pgut.c, with the rest of these string functions. Also, fix an error with this function not actually using its string_list argument. --- bin/pg_repack.c | 16 ---------------- bin/pgut/pgut.c | 15 +++++++++++++++ bin/pgut/pgut.h | 1 + 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 20e5756..ec25251 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -116,7 +116,6 @@ static Oid getoid(PGresult *res, int row, int col); static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name); -static size_t simple_string_list_size(SimpleStringList string_list); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -248,21 +247,6 @@ getoid(PGresult *res, int row, int col) return (Oid)strtoul(PQgetvalue(res, row, col), NULL, 10); } -/* Returns the number of elements in the given SimpleStringList */ -static size_t -simple_string_list_size(SimpleStringList string_list) -{ - size_t i = 0; - SimpleStringListCell *cell = table_list.head; - - while (cell) - { - cell = cell->next; - i++; - } - - return i; -} /* * Call repack_one_table for the target table or each table in a database. diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index 5a86330..4661126 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -417,6 +417,21 @@ simple_string_list_member(SimpleStringList *list, const char *val) return false; } +/* Returns the number of elements in the given SimpleStringList */ +size_t +simple_string_list_size(SimpleStringList string_list) +{ + size_t i = 0; + SimpleStringListCell *cell = string_list.head; + + while (cell) + { + cell = cell->next; + i++; + } + + return i; +} static char * prompt_for_password(void) diff --git a/bin/pgut/pgut.h b/bin/pgut/pgut.h index f0b0f99..5abbe30 100644 --- a/bin/pgut/pgut.h +++ b/bin/pgut/pgut.h @@ -186,6 +186,7 @@ typedef struct SimpleStringList extern void simple_string_list_append(SimpleStringList *list, const char *val); extern bool simple_string_list_member(SimpleStringList *list, const char *val); +extern size_t simple_string_list_size(SimpleStringList string_list); /* From 8ba92a1f493c4affceebbe56a79008eb11330a68 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Wed, 14 Nov 2012 19:05:31 -0700 Subject: [PATCH 10/12] Rename argument to simple_string_list_size() for consistency. --- bin/pgut/pgut.c | 4 ++-- bin/pgut/pgut.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index 4661126..0013236 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -419,10 +419,10 @@ simple_string_list_member(SimpleStringList *list, const char *val) /* Returns the number of elements in the given SimpleStringList */ size_t -simple_string_list_size(SimpleStringList string_list) +simple_string_list_size(SimpleStringList list) { size_t i = 0; - SimpleStringListCell *cell = string_list.head; + SimpleStringListCell *cell = list.head; while (cell) { diff --git a/bin/pgut/pgut.h b/bin/pgut/pgut.h index 5abbe30..93d7137 100644 --- a/bin/pgut/pgut.h +++ b/bin/pgut/pgut.h @@ -186,7 +186,7 @@ typedef struct SimpleStringList extern void simple_string_list_append(SimpleStringList *list, const char *val); extern bool simple_string_list_member(SimpleStringList *list, const char *val); -extern size_t simple_string_list_size(SimpleStringList string_list); +extern size_t simple_string_list_size(SimpleStringList list); /* From 3c73a0204af0336094677bce25655887b7b807c0 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Thu, 15 Nov 2012 19:28:23 -0700 Subject: [PATCH 11/12] More consistent error reporting This patch is a port of Daniele's commit 0be414ad10c32d from his own fork, "error_reporting" branch. reorg_all_database can return an error message: in case of any error different from "missing schema" return the error and keep processing the other databases instead of printing and stopping the program. The output of the program is now something like: $ pg_reorg --all pg_reorg: reorg database "contrib_regression" pg_reorg: reorg database "template1" ... skipped: pg_reorg is not installed in the database --- bin/pg_repack.c | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index ec25251..c264d77 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -107,7 +107,7 @@ typedef struct repack_index } repack_index; static void repack_all_databases(const char *order_by); -static bool repack_one_database(const char *order_by); +static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize); static void repack_one_table(const repack_table *table, const char *order_by); static void repack_cleanup(bool fatal, const repack_table *table); @@ -178,10 +178,11 @@ main(int argc, char *argv[]) } else { - if (!repack_one_database(orderby)) + char errbuf[256]; + if (!repack_one_database(orderby, errbuf, sizeof(errbuf))) ereport(ERROR, - (errcode(ENOENT), - errmsg("%s is not installed", PROGRAM_NAME))); + (errcode(ERROR), + errmsg("%s", errbuf))); } return 0; @@ -204,23 +205,24 @@ repack_all_databases(const char *orderby) for (i = 0; i < PQntuples(result); i++) { bool ret; + char errbuf[256]; dbname = PQgetvalue(result, i, 0); if (pgut_log_level >= INFO) { - printf("%s: repack database \"%s\"", PROGRAM_NAME, dbname); + printf("%s: repack database \"%s\"\n", PROGRAM_NAME, dbname); fflush(stdout); } - ret = repack_one_database(orderby); + ret = repack_one_database(orderby, errbuf, sizeof(errbuf)); if (pgut_log_level >= INFO) { if (ret) printf("\n"); else - printf(" ... skipped\n"); + printf(" ... skipped: %s\n", errbuf); fflush(stdout); } } @@ -252,7 +254,7 @@ getoid(PGresult *res, int row, int col) * Call repack_one_table for the target table or each table in a database. */ static bool -repack_one_database(const char *orderby) +repack_one_database(const char *orderby, char *errbuf, size_t errsize) { bool ret = true; PGresult *res; @@ -303,22 +305,25 @@ repack_one_database(const char *orderby) res = execute_elevel(sql.data, 0, NULL, DEBUG2); } + /* on error skip the database */ if (PQresultStatus(res) != PGRES_TUPLES_OK) { if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)) { /* Schema repack does not exist. Skip the database. */ - PQclear(res); - ret = false; - goto cleanup; + if (errbuf) + snprintf(errbuf, errsize, + "%s is not installed in the database", PROGRAM_NAME); + } else { - /* exit otherwise */ - elog(ERROR, "%s", PQerrorMessage(connection)); - PQclear(res); - exit(1); + /* Return the error message otherwise */ + if (errbuf) + snprintf(errbuf, errsize, "%s", PQerrorMessage(connection)); } + ret = false; + goto cleanup; } num = PQntuples(res); From 3c13a0d05e83e23fd5846e66c8da2c0e87dbb88b Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sun, 18 Nov 2012 14:36:09 -0700 Subject: [PATCH 12/12] Don't forget to disconnect conn2 when disconnect() is called, otherwise we leave a connection hanging open for every database processed via pg_repack -a. --- bin/pgut/pgut-fe.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index bcea19f..c7f888c 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -77,6 +77,11 @@ disconnect(void) pgut_disconnect(connection); connection = NULL; } + if (conn2) + { + pgut_disconnect(conn2); + conn2 = NULL; + } } static void