diff --git a/bin/pg_repack.c b/bin/pg_repack.c index ea3b732..1b5cd2c 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -107,15 +107,15 @@ typedef struct repack_index } repack_index; static void repack_all_databases(const char *order_by); -static bool repack_one_database(const char *order_by, const char *table); +static bool repack_one_database(const char *order_by); static void repack_one_table(const repack_table *table, const char *order_by); -static void repack_cleanup(bool fatal, void *userdata); +static void repack_cleanup(bool fatal, const repack_table *table); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); +static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); -static void lock_access_share(PGconn *conn, Oid relid, const char *target_name); +static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -155,7 +155,6 @@ int main(int argc, char *argv[]) { int i; - SimpleStringListCell *cell; i = pgut_getopt(argc, argv, options); @@ -171,22 +170,15 @@ main(int argc, char *argv[]) if (alldb) { - if (table_list.head != NULL) + if (table_list.head) ereport(ERROR, (errcode(EINVAL), - errmsg("cannot repack a specific table in all databases"))); + errmsg("cannot repack specific table(s) in all databases"))); repack_all_databases(orderby); } else { - if (table_list.head != NULL) - { - for (cell = table_list.head; cell; cell = cell->next) - { - repack_one_database(orderby, cell->val); - } - } - else if (!repack_one_database(orderby, NULL)) + if (!repack_one_database(orderby)) ereport(ERROR, (errcode(ENOENT), errmsg("%s is not installed", PROGRAM_NAME))); @@ -221,7 +213,7 @@ repack_all_databases(const char *orderby) fflush(stdout); } - ret = repack_one_database(orderby, NULL); + ret = repack_one_database(orderby); if (pgut_log_level >= INFO) { @@ -259,13 +251,14 @@ getoid(PGresult *res, int row, int col) * Call repack_one_table for the target table or each table in a database. */ static bool -repack_one_database(const char *orderby, const char *table) +repack_one_database(const char *orderby) { - bool ret = true; - PGresult *res; - int i; - int num; - StringInfoData sql; + bool ret = true; + PGresult *res; + int i; + int num; + StringInfoData sql; + SimpleStringListCell *cell; initStringInfo(&sql); @@ -282,10 +275,18 @@ repack_one_database(const char *orderby, const char *table) /* acquire target tables */ appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE "); - if (table) + if (table_list.head) { - appendStringInfoString(&sql, "relid = $1::regclass"); - res = execute_elevel(sql.data, 1, &table, DEBUG2); + appendStringInfoString(&sql, "( "); + for (cell = table_list.head; cell; cell = cell->next) + { + /* FIXME: bogus table quoting */ + appendStringInfo(&sql, "relid = '%s'::regclass", cell->val); + if (cell->next) + appendStringInfoString(&sql, " OR "); + } + appendStringInfoString(&sql, " )"); + res = execute_elevel(sql.data, 0, NULL, DEBUG2); } else { @@ -300,6 +301,7 @@ repack_one_database(const char *orderby, const char *table) if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)) { /* Schema repack does not exist. Skip the database. */ + PQclear(res); ret = false; goto cleanup; } @@ -349,9 +351,12 @@ repack_one_database(const char *orderby, const char *table) { /* CLUSTER mode */ if (ckey == NULL) - ereport(ERROR, + { + ereport(WARNING, (errcode(E_PG_COMMAND), errmsg("relation \"%s\" has no cluster key", table.target_name))); + continue; + } appendStringInfo(&sql, "%s ORDER BY %s", create_table, ckey); table.create_table = sql.data; } @@ -418,12 +423,13 @@ repack_one_table(const repack_table *table, const char *orderby) int num; int i; int num_waiting = 0; - char *vxid; - char *lock_conn_pid; + char *vxid = NULL; char buffer[12]; StringInfoData sql; + bool have_error = false; initStringInfo(&sql); + printf("Repack table: %s\n", table->target_name); elog(DEBUG2, "---- repack_one_table ----"); elog(DEBUG2, "target_name : %s", table->target_name); @@ -450,7 +456,12 @@ repack_one_table(const repack_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE); + if (!(lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE))) + { + elog(WARNING, "lock_exclusive() failed for %s", table->target_name); + have_error = true; + goto cleanup; + } /* * Check z_repack_trigger is the trigger executed at last so that @@ -460,10 +471,15 @@ repack_one_table(const repack_table *table, const char *orderby) res = execute("SELECT repack.conflicted_triggers($1)", 1, params); if (PQntuples(res) > 0) - ereport(ERROR, + { + ereport(WARNING, (errcode(E_PG_COMMAND), errmsg("trigger %s conflicted for %s", PQgetvalue(res, 0, 0), table->target_name))); + PQclear(res); + have_error = true; + goto cleanup; + } PQclear(res); command(table->create_pktype, 0, NULL); @@ -478,7 +494,6 @@ repack_one_table(const repack_table *table, const char *orderby) * We want to submit this query in conn2 while connection's * transaction still holds its lock, so that no DDL may sneak in * between the time that connection commits and conn2 gets its lock. - * */ pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); @@ -490,9 +505,11 @@ repack_one_table(const repack_table *table, const char *orderby) { printf("%s", PQerrorMessage(conn2)); PQclear(res); - exit(1); + have_error = true; + goto cleanup; } - lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); + buffer[0] = '\0'; + strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1); PQclear(res); /* @@ -504,9 +521,10 @@ repack_one_table(const repack_table *table, const char *orderby) table->target_name); elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); if (!(PQsendQuery(conn2, sql.data))) { - printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); - /* XXX: better error handling */ - exit(1); + elog(WARNING, "Error sending async query: %s\n%s", sql.data, + PQerrorMessage(conn2)); + have_error = true; + goto cleanup; } /* Now that we've submitted the LOCK TABLE request through conn2, @@ -521,10 +539,11 @@ repack_one_table(const repack_table *table, const char *orderby) */ if (!(kill_ddl(connection, table->target_oid, true))) { - exit(1); + elog(WARNING, "kill_ddl() failed."); + have_error = true; + goto cleanup; } - /* We're finished killing off any unsafe DDL. COMMIT in our main * connection, so that conn2 may get its AccessShare lock. */ @@ -540,19 +559,12 @@ repack_one_table(const repack_table *table, const char *orderby) { printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2)); PQclear(res); - exit(1); + have_error = true; + goto cleanup; } PQclear(res); } - - /* - * Register the table to be dropped on error. We use pktype as - * an advisory lock. The registration should be done after - * the first command succeeds. - */ - pgut_atexit_push(&repack_cleanup, (void *) table); - /* * 2. Copy tuples into temp table. */ @@ -566,9 +578,16 @@ repack_one_table(const repack_table *table, const char *orderby) /* Fetch an array of Virtual IDs of all transactions active right now. */ - params[0] = lock_conn_pid; + params[0] = buffer; res = execute(SQL_XID_SNAPSHOT, 1, params); - vxid = strdup(PQgetvalue(res, 0, 0)); + if (!(vxid = strdup(PQgetvalue(res, 0, 0)))) + { + elog(WARNING, "Unable to allocate vxid, length: %d\n", + PQgetlength(res, 0, 0)); + PQclear(res); + have_error = true; + goto cleanup; + } PQclear(res); command(table->delete_log, 0, NULL); @@ -585,7 +604,11 @@ repack_one_table(const repack_table *table, const char *orderby) * CREATE TABLE ... AS SELECT does not deadlock waiting for an * AccessShare lock. */ - lock_access_share(connection, table->target_oid, table->target_name); + if (!(lock_access_share(connection, table->target_oid, table->target_name))) + { + have_error = true; + goto cleanup; + } command(table->create_table, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); @@ -684,8 +707,16 @@ repack_one_table(const repack_table *table, const char *orderby) */ elog(DEBUG2, "---- swap ----"); /* Bump our existing AccessShare lock to AccessExclusive */ - lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table, - FALSE); + + if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer), + table->lock_table, FALSE))) + { + elog(WARNING, "lock_exclusive() failed in conn2 for %s", + table->target_name); + have_error = true; + goto cleanup; + } + apply_log(conn2, table, 0); params[0] = utoa(table->target_oid, buffer); pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params); @@ -701,10 +732,6 @@ repack_one_table(const repack_table *table, const char *orderby) command("SELECT repack.repack_drop($1)", 1, params); command("COMMIT", 0, NULL); - pgut_atexit_pop(&repack_cleanup, (void *) table); - free(vxid); - free(lock_conn_pid); - /* * 7. Analyze. * Note that cleanup hook has been already uninstalled here because analyze @@ -720,7 +747,16 @@ repack_one_table(const repack_table *table, const char *orderby) command("COMMIT", 0, NULL); } +cleanup: termStringInfo(&sql); + if (vxid) + free(vxid); + + /* XXX: distinguish between fatal and non-fatal errors via the first + * arg to repack_cleanup(). + */ + if (have_error) + repack_cleanup(false, table); } /* Kill off any concurrent DDL (or any transaction attempting to take @@ -743,19 +779,23 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate) res = pgut_execute(conn, sql.data, 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - printf("Error canceling unsafe queries: %s", PQerrorMessage(conn)); + elog(WARNING, "Error canceling unsafe queries: %s", + PQerrorMessage(conn)); ret = false; } else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400) { - elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); + elog(WARNING, + "Canceled %d unsafe queries. Terminating any remaining PIDs.", + PQntuples(res)); PQclear(res); printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid); res = pgut_execute(conn, sql.data, 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - printf("Error killing unsafe queries: %s", PQerrorMessage(conn)); + elog(WARNING, "Error killing unsafe queries: %s", + PQerrorMessage(conn)); ret = false; } } @@ -764,7 +804,6 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate) else elog(DEBUG2, "No competing DDL to cancel."); - PQclear(res); termStringInfo(&sql); @@ -783,12 +822,13 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate) * relid: OID of relation * target_name: name of table */ -static void +static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name) { StringInfoData sql; time_t start = time(NULL); int i; + bool ret = true; initStringInfo(&sql); @@ -808,9 +848,12 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) * already. */ if (duration > (wait_timeout * 2)) - kill_ddl(conn, relid, true); + ret = kill_ddl(conn, relid, true); else - kill_ddl(conn, relid, false); + ret = kill_ddl(conn, relid, false); + + if (!ret) + break; /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); @@ -837,14 +880,16 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) else { /* exit otherwise */ - printf("%s", PQerrorMessage(connection)); + elog(WARNING, "%s", PQerrorMessage(connection)); PQclear(res); - exit(1); + ret = false; + break; } } termStringInfo(&sql); pgut_command(conn, "RESET statement_timeout", 0, NULL); + return ret; } @@ -858,11 +903,12 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed * start_xact: whether we need to issue a BEGIN; */ -static void +static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) { time_t start = time(NULL); int i; + bool ret = true; for (i = 1; ; i++) { @@ -925,11 +971,13 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta /* exit otherwise */ printf("%s", PQerrorMessage(connection)); PQclear(res); - exit(1); + ret = false; + break; } } pgut_command(conn, "RESET statement_timeout", 0, NULL); + return ret; } /* @@ -937,10 +985,8 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta * objects before the program exits. */ static void -repack_cleanup(bool fatal, void *userdata) +repack_cleanup(bool fatal, const repack_table *table) { - const repack_table *table = (const repack_table *) userdata; - if (fatal) { fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n"); @@ -950,15 +996,13 @@ repack_cleanup(bool fatal, void *userdata) char buffer[12]; const char *params[1]; - /* Rollback current transaction */ - if (conn2) - pgut_command(conn2, "ROLLBACK", 0, NULL); - - if (connection) - command("ROLLBACK", 0, NULL); + /* Rollback current transactions */ + pgut_rollback(connection); + pgut_rollback(conn2); /* Try reconnection if not available. */ - if (PQstatus(connection) != CONNECTION_OK) + if (PQstatus(connection) != CONNECTION_OK || + PQstatus(conn2) != CONNECTION_OK) reconnect(ERROR); /* do cleanup */