diff --git a/bin/pg_repack.c b/bin/pg_repack.c index e1c7bda..d2de3d0 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -40,52 +40,77 @@ const char *PROGRAM_VERSION = "unknown"; * pg_repack's setup. Some transactions we can safely ignore: * a. The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted * servers. See https://github.com/reorg/pg_reorg/issues/1 - * b. Our own database connection + * b. Our own database connections * c. Other pg_repack clients, as distinguished by application_name, which * may be operating on other tables at the same time. See * https://github.com/reorg/pg_repack/issues/1 * * Note, there is some redundancy in how the filtering is done (e.g. excluding * based on pg_backend_pid() and application_name), but that shouldn't hurt - * anything. + * anything. Also, the test of application_name is not bulletproof -- for + * instance, the application name when running installcheck will be + * pg_regress. */ #define SQL_XID_SNAPSHOT_90200 \ "SELECT repack.array_accum(l.virtualtransaction) " \ " FROM pg_locks AS l " \ " LEFT JOIN pg_stat_activity AS a " \ " ON l.pid = a.pid " \ - " WHERE l.locktype = 'virtualxid' AND l.pid <> pg_backend_pid() " \ - "AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \ - "AND (a.application_name IS NULL OR a.application_name <> $1)" + " WHERE l.locktype = 'virtualxid' " \ + " AND l.pid NOT IN (pg_backend_pid(), $1) " \ + " AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \ + " AND (a.application_name IS NULL OR a.application_name <> $2)" #define SQL_XID_SNAPSHOT_90000 \ "SELECT repack.array_accum(l.virtualtransaction) " \ " FROM pg_locks AS l " \ " LEFT JOIN pg_stat_activity AS a " \ " ON l.pid = a.procpid " \ - " WHERE l.locktype = 'virtualxid' AND l.pid <> pg_backend_pid() " \ - "AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \ - "AND (a.application_name IS NULL OR a.application_name <> $1)" + " WHERE l.locktype = 'virtualxid' " \ + " AND l.pid NOT IN (pg_backend_pid(), $1) " \ + " AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \ + " AND (a.application_name IS NULL OR a.application_name <> $2)" /* application_name is not available before 9.0. The last clause of - * the WHERE clause is just to eat the $1 parameter (application name). + * the WHERE clause is just to eat the $2 parameter (application name). */ #define SQL_XID_SNAPSHOT_80300 \ "SELECT repack.array_accum(virtualtransaction) FROM pg_locks" \ - " WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()" \ + " WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)" \ " AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0') " \ - " AND ($1 IS NOT NULL)" + " AND ($2 IS NOT NULL)" #define SQL_XID_SNAPSHOT \ (PQserverVersion(connection) >= 90200 ? SQL_XID_SNAPSHOT_90200 : \ (PQserverVersion(connection) >= 90000 ? SQL_XID_SNAPSHOT_90000 : \ SQL_XID_SNAPSHOT_80300)) - + +/* Later, check whether any of the transactions we saw before are still + * alive, and wait for them to go away. + */ #define SQL_XID_ALIVE \ "SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\ " AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)" +/* To be run while our main connection holds an AccessExclusive lock on the + * target table, and our secondary conn is attempting to grab an AccessShare + * lock. We know that "granted" must be false for these queries because + * we already hold the AccessExclusive lock. Also, we only care about other + * transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only + * trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE. + */ +#define CANCEL_COMPETING_LOCKS \ + "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ + " AND granted = false AND relation = %u"\ + " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" + +#define KILL_COMPETING_LOCKS \ + "SELECT pg_terminate_backend(pid) "\ + "FROM pg_locks WHERE locktype = 'relation'"\ + " AND granted = false AND relation = %u"\ + " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" + /* * per-table information */ @@ -123,13 +148,15 @@ typedef struct repack_index static bool is_superuser(void); static void repack_all_databases(const char *order_by); -static bool repack_one_database(const char *order_by, const char *table, char *errbuf, size_t errsize); +static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize); static void repack_one_table(const repack_table *table, const char *order_by); -static void repack_cleanup(bool fatal, void *userdata); +static void repack_cleanup(bool fatal, const repack_table *table); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(const char *relid, const char *lock_query); +static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); +static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); +static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -139,12 +166,12 @@ static bool sqlstate_equals(PGresult *res, const char *state) return strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), state) == 0; } -static bool analyze = true; -static bool alldb = false; -static bool noorder = false; -static char *table = NULL; -static char *orderby = NULL; -static int wait_timeout = 60; /* in seconds */ +static bool analyze = true; +static bool alldb = false; +static bool noorder = false; +static SimpleStringList table_list = {NULL, NULL}; +static char *orderby = NULL; +static int wait_timeout = 60; /* in seconds */ /* buffer should have at least 11 bytes */ static char * @@ -157,7 +184,7 @@ utoa(unsigned int value, char *buffer) static pgut_option options[] = { { 'b', 'a', "all", &alldb }, - { 's', 't', "table", &table }, + { 'l', 't', "table", &table_list }, { 'b', 'n', "no-order", &noorder }, { 's', 'o', "order-by", &orderby }, { 'i', 'T', "wait-timeout", &wait_timeout }, @@ -168,7 +195,7 @@ static pgut_option options[] = int main(int argc, char *argv[]) { - int i; + int i; i = pgut_getopt(argc, argv, options); @@ -184,19 +211,19 @@ main(int argc, char *argv[]) if (alldb) { - if (table) + if (table_list.head) ereport(ERROR, (errcode(EINVAL), - errmsg("cannot repack a specific table in all databases"))); + errmsg("cannot repack specific table(s) in all databases"))); repack_all_databases(orderby); } else { char errbuf[256]; - if (!repack_one_database(orderby, table, errbuf, sizeof(errbuf))) + if (!repack_one_database(orderby, errbuf, sizeof(errbuf))) ereport(ERROR, - (errcode(ERROR), - errmsg("%s", errbuf))); + (errcode(ERROR), + errmsg("%s", errbuf))); } return 0; @@ -219,7 +246,7 @@ is_superuser(void) return false; val = PQparameterStatus(connection, "is_superuser"); - + if (val && strcmp(val, "on") == 0) return true; @@ -254,11 +281,11 @@ repack_all_databases(const char *orderby) if (pgut_log_level >= INFO) { - printf("%s: repack database \"%s\"", PROGRAM_NAME, dbname); + printf("%s: repack database \"%s\"\n", PROGRAM_NAME, dbname); fflush(stdout); } - ret = repack_one_database(orderby, NULL, errbuf, sizeof(errbuf)); + ret = repack_one_database(orderby, errbuf, sizeof(errbuf)); if (pgut_log_level >= INFO) { @@ -296,13 +323,24 @@ getoid(PGresult *res, int row, int col) * Call repack_one_table for the target table or each table in a database. */ static bool -repack_one_database(const char *orderby, const char *table, char *errbuf, size_t errsize) +repack_one_database(const char *orderby, char *errbuf, size_t errsize) { - bool ret = false; - PGresult *res = NULL; - int i; - int num; - StringInfoData sql; + bool ret = false; + PGresult *res = NULL; + int i; + int num; + StringInfoData sql; + SimpleStringListCell *cell; + const char **params = NULL; + size_t num_params = simple_string_list_size(table_list); + + /* We need to be able to support at least two params, or more + * if we have multiple --tables specified. + */ + if (num_params && num_params > 2) + params = pgut_malloc(num_params * sizeof(char *)); + else + params = pgut_malloc(2 * sizeof(char *)); initStringInfo(&sql); @@ -355,7 +393,7 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t /* Schema repack does not exist. Skip the database. */ if (errbuf) snprintf(errbuf, errsize, - "%s is not installed in the database", PROGRAM_NAME); + "%s is not installed in the database", PROGRAM_NAME); } else { @@ -377,10 +415,19 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t /* acquire target tables */ appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE "); - if (table) + if (num_params) { - appendStringInfoString(&sql, "relid = $1::regclass"); - res = execute_elevel(sql.data, 1, &table, DEBUG2); + appendStringInfoString(&sql, "("); + for (i = 0, cell = table_list.head; cell; cell = cell->next, i++) + { + /* Construct table name placeholders to be used by PQexecParams */ + appendStringInfo(&sql, "relid = $%d::regclass", i + 1); + params[i] = cell->val; + if (cell->next) + appendStringInfoString(&sql, " OR "); + } + appendStringInfoString(&sql, ")"); + res = execute_elevel(sql.data, (int) num_params, params, DEBUG2); } else { @@ -398,7 +445,7 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t /* Schema repack does not exist. Skip the database. */ if (errbuf) snprintf(errbuf, errsize, - "%s is not installed in the database", PROGRAM_NAME); + "%s is not installed in the database", PROGRAM_NAME); } else { @@ -406,7 +453,6 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t if (errbuf) snprintf(errbuf, errsize, "%s", PQerrorMessage(connection)); } - ret = false; goto cleanup; } @@ -447,9 +493,12 @@ repack_one_database(const char *orderby, const char *table, char *errbuf, size_t { /* CLUSTER mode */ if (ckey == NULL) - ereport(ERROR, + { + ereport(WARNING, (errcode(E_PG_COMMAND), errmsg("relation \"%s\" has no cluster key", table.target_name))); + continue; + } appendStringInfo(&sql, "%s ORDER BY %s", create_table, ckey); table.create_table = sql.data; } @@ -484,7 +533,7 @@ cleanup: } static int -apply_log(const repack_table *table, int count) +apply_log(PGconn *conn, const repack_table *table, int count) { int result; PGresult *res; @@ -498,8 +547,9 @@ apply_log(const repack_table *table, int count) params[4] = table->sql_pop; params[5] = utoa(count, buffer); - res = execute("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)", - 6, params); + res = pgut_execute(conn, + "SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)", + 6, params); result = atoi(PQgetvalue(res, 0, 0)); PQclear(res); @@ -517,9 +567,10 @@ repack_one_table(const repack_table *table, const char *orderby) int num; int i; int num_waiting = 0; - char *vxid; + char *vxid = NULL; char buffer[12]; StringInfoData sql; + bool have_error = false; initStringInfo(&sql); @@ -548,7 +599,12 @@ repack_one_table(const repack_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table); + if (!(lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE))) + { + elog(WARNING, "lock_exclusive() failed for %s", table->target_name); + have_error = true; + goto cleanup; + } /* * Check z_repack_trigger is the trigger executed at last so that @@ -558,10 +614,15 @@ repack_one_table(const repack_table *table, const char *orderby) res = execute("SELECT repack.conflicted_triggers($1)", 1, params); if (PQntuples(res) > 0) - ereport(ERROR, + { + ereport(WARNING, (errcode(E_PG_COMMAND), errmsg("trigger %s conflicted for %s", PQgetvalue(res, 0, 0), table->target_name))); + PQclear(res); + have_error = true; + goto cleanup; + } PQclear(res); command(table->create_pktype, 0, NULL); @@ -570,14 +631,82 @@ repack_one_table(const repack_table *table, const char *orderby) command(table->enable_trigger, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); command(sql.data, 0, NULL); - command("COMMIT", 0, NULL); + + /* While we are still holding an AccessExclusive lock on the table, submit + * the request for an AccessShare lock asynchronously from conn2. + * We want to submit this query in conn2 while connection's + * transaction still holds its lock, so that no DDL may sneak in + * between the time that connection commits and conn2 gets its lock. + */ + pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); + + /* grab the backend PID of conn2; we'll need this when querying + * pg_locks momentarily. + */ + res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("%s", PQerrorMessage(conn2)); + PQclear(res); + have_error = true; + goto cleanup; + } + buffer[0] = '\0'; + strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1); + PQclear(res); /* - * Register the table to be dropped on error. We use pktype as - * an advisory lock. The registration should be done after - * the first command succeeds. + * Not using lock_access_share() here since we know that + * it's not possible to obtain the ACCESS SHARE lock right now + * in conn2, since the primary connection holds ACCESS EXCLUSIVE. */ - pgut_atexit_push(&repack_cleanup, (void *) table); + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", + table->target_name); + elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); + if (!(PQsendQuery(conn2, sql.data))) { + elog(WARNING, "Error sending async query: %s\n%s", sql.data, + PQerrorMessage(conn2)); + have_error = true; + goto cleanup; + } + + /* Now that we've submitted the LOCK TABLE request through conn2, + * look for and cancel any (potentially dangerous) DDL commands which + * might also be waiting on our table lock at this point -- + * it's not safe to let them wait, because they may grab their + * AccessExclusive lock before conn2 gets its AccessShare lock, + * and perform unsafe DDL on the table. + * + * Normally, lock_access_share() would take care of this for us, + * but we're not able to use it here. + */ + if (!(kill_ddl(connection, table->target_oid, true))) + { + elog(WARNING, "kill_ddl() failed."); + have_error = true; + goto cleanup; + } + + /* We're finished killing off any unsafe DDL. COMMIT in our main + * connection, so that conn2 may get its AccessShare lock. + */ + command("COMMIT", 0, NULL); + + /* Keep looping PQgetResult() calls until it returns NULL, indicating the + * command is done and we have obtained our lock. + */ + while ((res = PQgetResult(conn2))) + { + elog(DEBUG2, "Waiting on ACCESS SHARE lock..."); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + elog(WARNING, "Error with LOCK TABLE: %s", PQerrorMessage(conn2)); + PQclear(res); + have_error = true; + goto cleanup; + } + PQclear(res); + } /* * 2. Copy tuples into temp table. @@ -593,9 +722,20 @@ repack_one_table(const repack_table *table, const char *orderby) command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL); if (orderby && !orderby[0]) command("SET LOCAL synchronize_seqscans = off", 0, NULL); - params[0] = PROGRAM_NAME; - res = execute(SQL_XID_SNAPSHOT, 1, params); - vxid = strdup(PQgetvalue(res, 0, 0)); + + /* Fetch an array of Virtual IDs of all transactions active right now. + */ + params[0] = buffer; /* backend PID of conn2 */ + params[1] = PROGRAM_NAME; + res = execute(SQL_XID_SNAPSHOT, 2, params); + if (!(vxid = strdup(PQgetvalue(res, 0, 0)))) + { + elog(WARNING, "Unable to allocate vxid, length: %d\n", + PQgetlength(res, 0, 0)); + PQclear(res); + have_error = true; + goto cleanup; + } PQclear(res); /* Delete any existing entries in the log table now, since we have not @@ -604,6 +744,25 @@ repack_one_table(const repack_table *table, const char *orderby) * log we could wind up with duplicates. */ command(table->delete_log, 0, NULL); + + /* We need to be able to obtain an AccessShare lock on the target table + * for the create_table command to go through, so go ahead and obtain + * the lock explicitly. + * + * Since conn2 has been diligently holding its AccessShare lock, it + * is possible that another transaction has been waiting to acquire + * an AccessExclusive lock on the table (e.g. a concurrent ALTER TABLE + * or TRUNCATE which we must not allow). If there are any such + * transactions, lock_access_share() will kill them so that our + * CREATE TABLE ... AS SELECT does not deadlock waiting for an + * AccessShare lock. + */ + if (!(lock_access_share(connection, table->target_oid, table->target_name))) + { + have_error = true; + goto cleanup; + } + command(table->create_table, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); if (table->drop_columns) @@ -659,7 +818,7 @@ repack_one_table(const repack_table *table, const char *orderby) */ for (;;) { - num = apply_log(table, APPLY_COUNT); + num = apply_log(connection, table, APPLY_COUNT); if (num > 0) continue; /* there might be still some tuples, repeat. */ @@ -696,14 +855,25 @@ repack_one_table(const repack_table *table, const char *orderby) } /* - * 5. Swap. + * 5. Swap: will be done with conn2, since it already holds an + * AccessShare lock. */ elog(DEBUG2, "---- swap ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table); - apply_log(table, 0); + /* Bump our existing AccessShare lock to AccessExclusive */ + + if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer), + table->lock_table, FALSE))) + { + elog(WARNING, "lock_exclusive() failed in conn2 for %s", + table->target_name); + have_error = true; + goto cleanup; + } + + apply_log(conn2, table, 0); params[0] = utoa(table->target_oid, buffer); - command("SELECT repack.repack_swap($1)", 1, params); - command("COMMIT", 0, NULL); + pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params); + pgut_command(conn2, "COMMIT", 0, NULL); /* * 6. Drop. @@ -715,9 +885,6 @@ repack_one_table(const repack_table *table, const char *orderby) command("SELECT repack.repack_drop($1)", 1, params); command("COMMIT", 0, NULL); - pgut_atexit_pop(&repack_cleanup, (void *) table); - free(vxid); - /* * 7. Analyze. * Note that cleanup hook has been already uninstalled here because analyze @@ -733,18 +900,166 @@ repack_one_table(const repack_table *table, const char *orderby) command("COMMIT", 0, NULL); } +cleanup: termStringInfo(&sql); + if (vxid) + free(vxid); + + /* XXX: distinguish between fatal and non-fatal errors via the first + * arg to repack_cleanup(). + */ + if (have_error) + repack_cleanup(false, table); } -/* - * Try acquire a table lock but avoid long time locks when conflict. +/* Kill off any concurrent DDL (or any transaction attempting to take + * an AccessExclusive lock) trying to run against our table. Note, we're + * killing these queries off *before* they are granted an AccessExclusive + * lock on our table. + * + * Returns true if no problems encountered, false otherwise. */ -static void -lock_exclusive(const char *relid, const char *lock_query) +static bool +kill_ddl(PGconn *conn, Oid relid, bool terminate) +{ + bool ret = true; + PGresult *res; + StringInfoData sql; + + initStringInfo(&sql); + + printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid); + res = pgut_execute(conn, sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + elog(WARNING, "Error canceling unsafe queries: %s", + PQerrorMessage(conn)); + ret = false; + } + else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400) + { + elog(WARNING, + "Canceled %d unsafe queries. Terminating any remaining PIDs.", + PQntuples(res)); + + PQclear(res); + printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid); + res = pgut_execute(conn, sql.data, 0, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + elog(WARNING, "Error killing unsafe queries: %s", + PQerrorMessage(conn)); + ret = false; + } + } + else if (PQntuples(res) > 0) + elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res)); + else + elog(DEBUG2, "No competing DDL to cancel."); + + PQclear(res); + termStringInfo(&sql); + + return ret; +} + + +/* + * Try to acquire an ACCESS SHARE table lock, avoiding deadlocks and long + * waits by killing off other sessions which may be stuck trying to obtain + * an ACCESS EXCLUSIVE lock. + * + * Arguments: + * + * conn: connection to use + * relid: OID of relation + * target_name: name of table + */ +static bool +lock_access_share(PGconn *conn, Oid relid, const char *target_name) +{ + StringInfoData sql; + time_t start = time(NULL); + int i; + bool ret = true; + + initStringInfo(&sql); + + for (i = 1; ; i++) + { + time_t duration; + PGresult *res; + int wait_msec; + + duration = time(NULL) - start; + + /* Cancel queries unconditionally, i.e. don't bother waiting + * wait_timeout as lock_exclusive() does -- the only queries we + * should be killing are disallowed DDL commands hanging around + * for an AccessExclusive lock, which must be deadlocked at + * this point anyway since conn2 holds its AccessShare lock + * already. + */ + if (duration > (wait_timeout * 2)) + ret = kill_ddl(conn, relid, true); + else + ret = kill_ddl(conn, relid, false); + + if (!ret) + break; + + /* wait for a while to lock the table. */ + wait_msec = Min(1000, i * 100); + printfStringInfo(&sql, "SET LOCAL statement_timeout = %d", wait_msec); + pgut_command(conn, sql.data, 0, NULL); + + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", target_name); + res = pgut_execute_elevel(conn, sql.data, 0, NULL, DEBUG2); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + break; + } + else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED)) + { + /* retry if lock conflicted */ + PQclear(res); + pgut_rollback(conn); + continue; + } + else + { + /* exit otherwise */ + elog(WARNING, "%s", PQerrorMessage(connection)); + PQclear(res); + ret = false; + break; + } + } + + termStringInfo(&sql); + pgut_command(conn, "RESET statement_timeout", 0, NULL); + return ret; +} + + +/* + * Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long + * waits by killing off other sessions. + * Arguments: + * + * conn: connection to use + * relid: OID of relation + * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed + * start_xact: whether we need to issue a BEGIN; + */ +static bool +lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) { time_t start = time(NULL); int i; - + bool ret = true; + for (i = 1; ; i++) { time_t duration; @@ -752,13 +1067,14 @@ lock_exclusive(const char *relid, const char *lock_query) PGresult *res; int wait_msec; - command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); + if (start_xact) + pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); duration = time(NULL) - start; if (duration > wait_timeout) { const char *cancel_query; - if (PQserverVersion(connection) >= 80400 && + if (PQserverVersion(conn) >= 80400 && duration > wait_timeout * 2) { elog(WARNING, "terminating conflicted backends"); @@ -776,15 +1092,15 @@ lock_exclusive(const char *relid, const char *lock_query) " AND relation = $1 AND pid <> pg_backend_pid()"; } - command(cancel_query, 1, &relid); + pgut_command(conn, cancel_query, 1, &relid); } /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); - command(sql, 0, NULL); + pgut_command(conn, sql, 0, NULL); - res = execute_elevel(lock_query, 0, NULL, DEBUG2); + res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); @@ -794,7 +1110,7 @@ lock_exclusive(const char *relid, const char *lock_query) { /* retry if lock conflicted */ PQclear(res); - command("ROLLBACK", 0, NULL); + pgut_rollback(conn); continue; } else @@ -802,11 +1118,13 @@ lock_exclusive(const char *relid, const char *lock_query) /* exit otherwise */ printf("%s", PQerrorMessage(connection)); PQclear(res); - exit(1); + ret = false; + break; } } - command("RESET statement_timeout", 0, NULL); + pgut_command(conn, "RESET statement_timeout", 0, NULL); + return ret; } /* @@ -814,10 +1132,8 @@ lock_exclusive(const char *relid, const char *lock_query) * objects before the program exits. */ static void -repack_cleanup(bool fatal, void *userdata) +repack_cleanup(bool fatal, const repack_table *table) { - const repack_table *table = (const repack_table *) userdata; - if (fatal) { fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n"); @@ -827,12 +1143,13 @@ repack_cleanup(bool fatal, void *userdata) char buffer[12]; const char *params[1]; - /* Rollback current transaction */ - if (connection) - command("ROLLBACK", 0, NULL); + /* Rollback current transactions */ + pgut_rollback(connection); + pgut_rollback(conn2); /* Try reconnection if not available. */ - if (PQstatus(connection) != CONNECTION_OK) + if (PQstatus(connection) != CONNECTION_OK || + PQstatus(conn2) != CONNECTION_OK) reconnect(ERROR); /* do cleanup */ diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index c6af0f2..c7f888c 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -24,6 +24,7 @@ char *password = NULL; YesNo prompt_password = DEFAULT; PGconn *connection = NULL; +PGconn *conn2 = NULL; static bool parse_pair(const char buffer[], char key[], char value[]); static char *get_username(void); @@ -51,6 +52,7 @@ reconnect(int elevel) appendStringInfo(&buf, "password=%s ", password); connection = pgut_connect(buf.data, prompt_password, elevel); + conn2 = pgut_connect(buf.data, prompt_password, elevel); /* update password */ if (connection) @@ -75,6 +77,11 @@ disconnect(void) pgut_disconnect(connection); connection = NULL; } + if (conn2) + { + pgut_disconnect(conn2); + conn2 = NULL; + } } static void @@ -142,9 +149,12 @@ pgut_setopt(pgut_option *opt, const char *optarg, pgut_optsrc src) /* high prior value has been set already. */ return; } - else if (src >= SOURCE_CMDLINE && opt->source >= src) + else if (src >= SOURCE_CMDLINE && opt->source >= src && opt->type != 'l') { - /* duplicated option in command line */ + /* duplicated option in command line -- don't worry if the option + * type is 'l' i.e. SimpleStringList, since we are allowed to have + * multiples of these. + */ message = "specified only once"; } else @@ -175,6 +185,10 @@ pgut_setopt(pgut_option *opt, const char *optarg, pgut_optsrc src) return; message = "a 32bit signed integer"; break; + case 'l': + message = "a List"; + simple_string_list_append(opt->var, optarg); + return; case 'u': if (parse_uint32(optarg, opt->var)) return; diff --git a/bin/pgut/pgut-fe.h b/bin/pgut/pgut-fe.h index 4e3ab31..7529587 100644 --- a/bin/pgut/pgut-fe.h +++ b/bin/pgut/pgut-fe.h @@ -25,13 +25,14 @@ typedef enum pgut_optsrc * type: * b: bool (true) * B: bool (false) - * f: pgut_optfn + * f: pgut_optfn * i: 32bit signed integer + * l: StringList * u: 32bit unsigned integer * I: 64bit signed integer * U: 64bit unsigned integer * s: string - * t: time_t + * t: time_t * y: YesNo (YES) * Y: YesNo (NO) */ @@ -56,6 +57,7 @@ extern char *password; extern YesNo prompt_password; extern PGconn *connection; +extern PGconn *conn2; extern void pgut_help(bool details); extern void help(bool details); diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index dca9853..0013236 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -384,6 +384,55 @@ parse_time(const char *value, time_t *time) return true; } +/* Append the given string `val` to the `list` */ +void +simple_string_list_append(SimpleStringList *list, const char *val) +{ + SimpleStringListCell *cell; + + /* this calculation correctly accounts for the null trailing byte */ + cell = (SimpleStringListCell *) + pgut_malloc(sizeof(SimpleStringListCell) + strlen(val)); + cell->next = NULL; + strcpy(cell->val, val); + + if (list->tail) + list->tail->next = cell; + else + list->head = cell; + list->tail = cell; +} + +/* Test whether `val` is in the given `list` */ +bool +simple_string_list_member(SimpleStringList *list, const char *val) +{ + SimpleStringListCell *cell; + + for (cell = list->head; cell; cell = cell->next) + { + if (strcmp(cell->val, val) == 0) + return true; + } + return false; +} + +/* Returns the number of elements in the given SimpleStringList */ +size_t +simple_string_list_size(SimpleStringList list) +{ + size_t i = 0; + SimpleStringListCell *cell = list.head; + + while (cell) + { + cell = cell->next; + i++; + } + + return i; +} + static char * prompt_for_password(void) { diff --git a/bin/pgut/pgut.h b/bin/pgut/pgut.h index f5217e6..93d7137 100644 --- a/bin/pgut/pgut.h +++ b/bin/pgut/pgut.h @@ -171,6 +171,24 @@ extern bool parse_time(const char *value, time_t *time); #define ToLower(c) (tolower((unsigned char)(c))) #define ToUpper(c) (toupper((unsigned char)(c))) +/* linked list of string values and helper functions, stolen from pg_dump. */ +typedef struct SimpleStringListCell +{ + struct SimpleStringListCell *next; + char val[1]; /* VARIABLE LENGTH FIELD */ +} SimpleStringListCell; + +typedef struct SimpleStringList +{ + SimpleStringListCell *head; + SimpleStringListCell *tail; +} SimpleStringList; + +extern void simple_string_list_append(SimpleStringList *list, const char *val); +extern bool simple_string_list_member(SimpleStringList *list, const char *val); +extern size_t simple_string_list_size(SimpleStringList list); + + /* * socket operations */