Improved error handling, particularly when processing multiple tables.

Previously, an error while processing any single table would cause
pg_reorg to cause exit() and bail out. Quick summary of fixes:
 * get rid of pgut_atexit_push() and pgut_atexit_pop() use, since
   we are no longer relying on calling exit() to handle mundane errors
 * remove lock_conn_pid variable; we can just use buffer instead
 * lock_exclusive() and lock_access_share() now return bool instead of
   bailing out on any error
 * ERROR-level ereport() or elog() calls now return WARNING instead,
   to avoid bailing out unnecessarily
 * signature of reorg_cleanup() changed; it no longer needs to take a
   void pointer
 * check return of strdup() for vxid
 * Use pgut_rollback() instead of sending ROLLBACK; command directly

There are still one or two FIXMEs left, including fixing table name
escaping, but I'm committing this much.
This commit is contained in:
Josh Kupershmidt 2012-11-11 20:20:48 -05:00 committed by Daniele Varrazzo
parent ad75dcfbb1
commit 00ddb1edf9

View File

@ -107,15 +107,15 @@ typedef struct repack_index
} repack_index; } repack_index;
static void repack_all_databases(const char *order_by); static void repack_all_databases(const char *order_by);
static bool repack_one_database(const char *order_by, const char *table); static bool repack_one_database(const char *order_by);
static void repack_one_table(const repack_table *table, const char *order_by); static void repack_one_table(const repack_table *table, const char *order_by);
static void repack_cleanup(bool fatal, void *userdata); static void repack_cleanup(bool fatal, const repack_table *table);
static char *getstr(PGresult *res, int row, int col); static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col);
static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
static void lock_access_share(PGconn *conn, Oid relid, const char *target_name); static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_QUERY_CANCELED "57014" #define SQLSTATE_QUERY_CANCELED "57014"
@ -155,7 +155,6 @@ int
main(int argc, char *argv[]) main(int argc, char *argv[])
{ {
int i; int i;
SimpleStringListCell *cell;
i = pgut_getopt(argc, argv, options); i = pgut_getopt(argc, argv, options);
@ -171,22 +170,15 @@ main(int argc, char *argv[])
if (alldb) if (alldb)
{ {
if (table_list.head != NULL) if (table_list.head)
ereport(ERROR, ereport(ERROR,
(errcode(EINVAL), (errcode(EINVAL),
errmsg("cannot repack a specific table in all databases"))); errmsg("cannot repack specific table(s) in all databases")));
repack_all_databases(orderby); repack_all_databases(orderby);
} }
else else
{ {
if (table_list.head != NULL) if (!repack_one_database(orderby))
{
for (cell = table_list.head; cell; cell = cell->next)
{
repack_one_database(orderby, cell->val);
}
}
else if (!repack_one_database(orderby, NULL))
ereport(ERROR, ereport(ERROR,
(errcode(ENOENT), (errcode(ENOENT),
errmsg("%s is not installed", PROGRAM_NAME))); errmsg("%s is not installed", PROGRAM_NAME)));
@ -221,7 +213,7 @@ repack_all_databases(const char *orderby)
fflush(stdout); fflush(stdout);
} }
ret = repack_one_database(orderby, NULL); ret = repack_one_database(orderby);
if (pgut_log_level >= INFO) if (pgut_log_level >= INFO)
{ {
@ -259,13 +251,14 @@ getoid(PGresult *res, int row, int col)
* Call repack_one_table for the target table or each table in a database. * Call repack_one_table for the target table or each table in a database.
*/ */
static bool static bool
repack_one_database(const char *orderby, const char *table) repack_one_database(const char *orderby)
{ {
bool ret = true; bool ret = true;
PGresult *res; PGresult *res;
int i; int i;
int num; int num;
StringInfoData sql; StringInfoData sql;
SimpleStringListCell *cell;
initStringInfo(&sql); initStringInfo(&sql);
@ -282,10 +275,18 @@ repack_one_database(const char *orderby, const char *table)
/* acquire target tables */ /* acquire target tables */
appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE "); appendStringInfoString(&sql, "SELECT * FROM repack.tables WHERE ");
if (table) if (table_list.head)
{ {
appendStringInfoString(&sql, "relid = $1::regclass"); appendStringInfoString(&sql, "( ");
res = execute_elevel(sql.data, 1, &table, DEBUG2); for (cell = table_list.head; cell; cell = cell->next)
{
/* FIXME: bogus table quoting */
appendStringInfo(&sql, "relid = '%s'::regclass", cell->val);
if (cell->next)
appendStringInfoString(&sql, " OR ");
}
appendStringInfoString(&sql, " )");
res = execute_elevel(sql.data, 0, NULL, DEBUG2);
} }
else else
{ {
@ -300,6 +301,7 @@ repack_one_database(const char *orderby, const char *table)
if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)) if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME))
{ {
/* Schema repack does not exist. Skip the database. */ /* Schema repack does not exist. Skip the database. */
PQclear(res);
ret = false; ret = false;
goto cleanup; goto cleanup;
} }
@ -349,9 +351,12 @@ repack_one_database(const char *orderby, const char *table)
{ {
/* CLUSTER mode */ /* CLUSTER mode */
if (ckey == NULL) if (ckey == NULL)
ereport(ERROR, {
ereport(WARNING,
(errcode(E_PG_COMMAND), (errcode(E_PG_COMMAND),
errmsg("relation \"%s\" has no cluster key", table.target_name))); errmsg("relation \"%s\" has no cluster key", table.target_name)));
continue;
}
appendStringInfo(&sql, "%s ORDER BY %s", create_table, ckey); appendStringInfo(&sql, "%s ORDER BY %s", create_table, ckey);
table.create_table = sql.data; table.create_table = sql.data;
} }
@ -418,12 +423,13 @@ repack_one_table(const repack_table *table, const char *orderby)
int num; int num;
int i; int i;
int num_waiting = 0; int num_waiting = 0;
char *vxid; char *vxid = NULL;
char *lock_conn_pid;
char buffer[12]; char buffer[12];
StringInfoData sql; StringInfoData sql;
bool have_error = false;
initStringInfo(&sql); initStringInfo(&sql);
printf("Repack table: %s\n", table->target_name);
elog(DEBUG2, "---- repack_one_table ----"); elog(DEBUG2, "---- repack_one_table ----");
elog(DEBUG2, "target_name : %s", table->target_name); elog(DEBUG2, "target_name : %s", table->target_name);
@ -450,7 +456,12 @@ repack_one_table(const repack_table *table, const char *orderby)
* 1. Setup workspaces and a trigger. * 1. Setup workspaces and a trigger.
*/ */
elog(DEBUG2, "---- setup ----"); elog(DEBUG2, "---- setup ----");
lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE); if (!(lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE)))
{
elog(WARNING, "lock_exclusive() failed for %s", table->target_name);
have_error = true;
goto cleanup;
}
/* /*
* Check z_repack_trigger is the trigger executed at last so that * Check z_repack_trigger is the trigger executed at last so that
@ -460,10 +471,15 @@ repack_one_table(const repack_table *table, const char *orderby)
res = execute("SELECT repack.conflicted_triggers($1)", 1, params); res = execute("SELECT repack.conflicted_triggers($1)", 1, params);
if (PQntuples(res) > 0) if (PQntuples(res) > 0)
ereport(ERROR, {
ereport(WARNING,
(errcode(E_PG_COMMAND), (errcode(E_PG_COMMAND),
errmsg("trigger %s conflicted for %s", errmsg("trigger %s conflicted for %s",
PQgetvalue(res, 0, 0), table->target_name))); PQgetvalue(res, 0, 0), table->target_name)));
PQclear(res);
have_error = true;
goto cleanup;
}
PQclear(res); PQclear(res);
command(table->create_pktype, 0, NULL); command(table->create_pktype, 0, NULL);
@ -478,7 +494,6 @@ repack_one_table(const repack_table *table, const char *orderby)
* We want to submit this query in conn2 while connection's * We want to submit this query in conn2 while connection's
* transaction still holds its lock, so that no DDL may sneak in * transaction still holds its lock, so that no DDL may sneak in
* between the time that connection commits and conn2 gets its lock. * between the time that connection commits and conn2 gets its lock.
*
*/ */
pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
@ -490,9 +505,11 @@ repack_one_table(const repack_table *table, const char *orderby)
{ {
printf("%s", PQerrorMessage(conn2)); printf("%s", PQerrorMessage(conn2));
PQclear(res); PQclear(res);
exit(1); have_error = true;
goto cleanup;
} }
lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); buffer[0] = '\0';
strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1);
PQclear(res); PQclear(res);
/* /*
@ -504,9 +521,10 @@ repack_one_table(const repack_table *table, const char *orderby)
table->target_name); table->target_name);
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
if (!(PQsendQuery(conn2, sql.data))) { if (!(PQsendQuery(conn2, sql.data))) {
printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); elog(WARNING, "Error sending async query: %s\n%s", sql.data,
/* XXX: better error handling */ PQerrorMessage(conn2));
exit(1); have_error = true;
goto cleanup;
} }
/* Now that we've submitted the LOCK TABLE request through conn2, /* Now that we've submitted the LOCK TABLE request through conn2,
@ -521,10 +539,11 @@ repack_one_table(const repack_table *table, const char *orderby)
*/ */
if (!(kill_ddl(connection, table->target_oid, true))) if (!(kill_ddl(connection, table->target_oid, true)))
{ {
exit(1); elog(WARNING, "kill_ddl() failed.");
have_error = true;
goto cleanup;
} }
/* We're finished killing off any unsafe DDL. COMMIT in our main /* We're finished killing off any unsafe DDL. COMMIT in our main
* connection, so that conn2 may get its AccessShare lock. * connection, so that conn2 may get its AccessShare lock.
*/ */
@ -540,19 +559,12 @@ repack_one_table(const repack_table *table, const char *orderby)
{ {
printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2)); printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2));
PQclear(res); PQclear(res);
exit(1); have_error = true;
goto cleanup;
} }
PQclear(res); PQclear(res);
} }
/*
* Register the table to be dropped on error. We use pktype as
* an advisory lock. The registration should be done after
* the first command succeeds.
*/
pgut_atexit_push(&repack_cleanup, (void *) table);
/* /*
* 2. Copy tuples into temp table. * 2. Copy tuples into temp table.
*/ */
@ -566,9 +578,16 @@ repack_one_table(const repack_table *table, const char *orderby)
/* Fetch an array of Virtual IDs of all transactions active right now. /* Fetch an array of Virtual IDs of all transactions active right now.
*/ */
params[0] = lock_conn_pid; params[0] = buffer;
res = execute(SQL_XID_SNAPSHOT, 1, params); res = execute(SQL_XID_SNAPSHOT, 1, params);
vxid = strdup(PQgetvalue(res, 0, 0)); if (!(vxid = strdup(PQgetvalue(res, 0, 0))))
{
elog(WARNING, "Unable to allocate vxid, length: %d\n",
PQgetlength(res, 0, 0));
PQclear(res);
have_error = true;
goto cleanup;
}
PQclear(res); PQclear(res);
command(table->delete_log, 0, NULL); command(table->delete_log, 0, NULL);
@ -585,7 +604,11 @@ repack_one_table(const repack_table *table, const char *orderby)
* CREATE TABLE ... AS SELECT does not deadlock waiting for an * CREATE TABLE ... AS SELECT does not deadlock waiting for an
* AccessShare lock. * AccessShare lock.
*/ */
lock_access_share(connection, table->target_oid, table->target_name); if (!(lock_access_share(connection, table->target_oid, table->target_name)))
{
have_error = true;
goto cleanup;
}
command(table->create_table, 0, NULL); command(table->create_table, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
@ -684,8 +707,16 @@ repack_one_table(const repack_table *table, const char *orderby)
*/ */
elog(DEBUG2, "---- swap ----"); elog(DEBUG2, "---- swap ----");
/* Bump our existing AccessShare lock to AccessExclusive */ /* Bump our existing AccessShare lock to AccessExclusive */
lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table,
FALSE); if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer),
table->lock_table, FALSE)))
{
elog(WARNING, "lock_exclusive() failed in conn2 for %s",
table->target_name);
have_error = true;
goto cleanup;
}
apply_log(conn2, table, 0); apply_log(conn2, table, 0);
params[0] = utoa(table->target_oid, buffer); params[0] = utoa(table->target_oid, buffer);
pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params); pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
@ -701,10 +732,6 @@ repack_one_table(const repack_table *table, const char *orderby)
command("SELECT repack.repack_drop($1)", 1, params); command("SELECT repack.repack_drop($1)", 1, params);
command("COMMIT", 0, NULL); command("COMMIT", 0, NULL);
pgut_atexit_pop(&repack_cleanup, (void *) table);
free(vxid);
free(lock_conn_pid);
/* /*
* 7. Analyze. * 7. Analyze.
* Note that cleanup hook has been already uninstalled here because analyze * Note that cleanup hook has been already uninstalled here because analyze
@ -720,7 +747,16 @@ repack_one_table(const repack_table *table, const char *orderby)
command("COMMIT", 0, NULL); command("COMMIT", 0, NULL);
} }
cleanup:
termStringInfo(&sql); termStringInfo(&sql);
if (vxid)
free(vxid);
/* XXX: distinguish between fatal and non-fatal errors via the first
* arg to repack_cleanup().
*/
if (have_error)
repack_cleanup(false, table);
} }
/* Kill off any concurrent DDL (or any transaction attempting to take /* Kill off any concurrent DDL (or any transaction attempting to take
@ -743,19 +779,23 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate)
res = pgut_execute(conn, sql.data, 0, NULL); res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
printf("Error canceling unsafe queries: %s", PQerrorMessage(conn)); elog(WARNING, "Error canceling unsafe queries: %s",
PQerrorMessage(conn));
ret = false; ret = false;
} }
else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400) else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
{ {
elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); elog(WARNING,
"Canceled %d unsafe queries. Terminating any remaining PIDs.",
PQntuples(res));
PQclear(res); PQclear(res);
printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid); printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL); res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
printf("Error killing unsafe queries: %s", PQerrorMessage(conn)); elog(WARNING, "Error killing unsafe queries: %s",
PQerrorMessage(conn));
ret = false; ret = false;
} }
} }
@ -764,7 +804,6 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate)
else else
elog(DEBUG2, "No competing DDL to cancel."); elog(DEBUG2, "No competing DDL to cancel.");
PQclear(res); PQclear(res);
termStringInfo(&sql); termStringInfo(&sql);
@ -783,12 +822,13 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate)
* relid: OID of relation * relid: OID of relation
* target_name: name of table * target_name: name of table
*/ */
static void static bool
lock_access_share(PGconn *conn, Oid relid, const char *target_name) lock_access_share(PGconn *conn, Oid relid, const char *target_name)
{ {
StringInfoData sql; StringInfoData sql;
time_t start = time(NULL); time_t start = time(NULL);
int i; int i;
bool ret = true;
initStringInfo(&sql); initStringInfo(&sql);
@ -808,9 +848,12 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name)
* already. * already.
*/ */
if (duration > (wait_timeout * 2)) if (duration > (wait_timeout * 2))
kill_ddl(conn, relid, true); ret = kill_ddl(conn, relid, true);
else else
kill_ddl(conn, relid, false); ret = kill_ddl(conn, relid, false);
if (!ret)
break;
/* wait for a while to lock the table. */ /* wait for a while to lock the table. */
wait_msec = Min(1000, i * 100); wait_msec = Min(1000, i * 100);
@ -837,14 +880,16 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name)
else else
{ {
/* exit otherwise */ /* exit otherwise */
printf("%s", PQerrorMessage(connection)); elog(WARNING, "%s", PQerrorMessage(connection));
PQclear(res); PQclear(res);
exit(1); ret = false;
break;
} }
} }
termStringInfo(&sql); termStringInfo(&sql);
pgut_command(conn, "RESET statement_timeout", 0, NULL); pgut_command(conn, "RESET statement_timeout", 0, NULL);
return ret;
} }
@ -858,11 +903,12 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name)
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
* start_xact: whether we need to issue a BEGIN; * start_xact: whether we need to issue a BEGIN;
*/ */
static void static bool
lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact)
{ {
time_t start = time(NULL); time_t start = time(NULL);
int i; int i;
bool ret = true;
for (i = 1; ; i++) for (i = 1; ; i++)
{ {
@ -925,11 +971,13 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
/* exit otherwise */ /* exit otherwise */
printf("%s", PQerrorMessage(connection)); printf("%s", PQerrorMessage(connection));
PQclear(res); PQclear(res);
exit(1); ret = false;
break;
} }
} }
pgut_command(conn, "RESET statement_timeout", 0, NULL); pgut_command(conn, "RESET statement_timeout", 0, NULL);
return ret;
} }
/* /*
@ -937,10 +985,8 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
* objects before the program exits. * objects before the program exits.
*/ */
static void static void
repack_cleanup(bool fatal, void *userdata) repack_cleanup(bool fatal, const repack_table *table)
{ {
const repack_table *table = (const repack_table *) userdata;
if (fatal) if (fatal)
{ {
fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n"); fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n");
@ -950,15 +996,13 @@ repack_cleanup(bool fatal, void *userdata)
char buffer[12]; char buffer[12];
const char *params[1]; const char *params[1];
/* Rollback current transaction */ /* Rollback current transactions */
if (conn2) pgut_rollback(connection);
pgut_command(conn2, "ROLLBACK", 0, NULL); pgut_rollback(conn2);
if (connection)
command("ROLLBACK", 0, NULL);
/* Try reconnection if not available. */ /* Try reconnection if not available. */
if (PQstatus(connection) != CONNECTION_OK) if (PQstatus(connection) != CONNECTION_OK ||
PQstatus(conn2) != CONNECTION_OK)
reconnect(ERROR); reconnect(ERROR);
/* do cleanup */ /* do cleanup */