diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 0f78542..fa8138f 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -204,6 +204,7 @@ static void repack_one_table(repack_table *table, const char *order_by); static bool repack_table_indexes(PGresult *index_details); static bool repack_all_indexes(char *errbuf, size_t errsize); static void repack_cleanup(bool fatal, const repack_table *table); +static void repack_cleanup_callback(bool fatal, void *userdata); static bool rebuild_indexes(const repack_table *table); static char *getstr(PGresult *res, int row, int col); @@ -235,13 +236,20 @@ static bool only_indexes = false; static int wait_timeout = 60; /* in seconds */ static int jobs = 0; /* number of concurrent worker conns. */ static bool dryrun = false; +static unsigned int temp_obj_num = 0; /* temporary objects counter */ /* buffer should have at least 11 bytes */ static char * utoa(unsigned int value, char *buffer) { sprintf(buffer, "%u", value); - return buffer; + /* XXX: originally, we would just return buffer here without + * the pgut_strdup(). But repack_cleanup_callback() seems to + * depend on getting back a freshly strdup'd copy of buffer, + * not sure why. So now we are leaking a tiny bit of memory + * with each utoa() call. + */ + return pgut_strdup(buffer); } static pgut_option options[] = @@ -898,6 +906,13 @@ rebuild_indexes(const repack_table *table) ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); #endif + /* XXX: the errno != EINTR check means we won't bail + * out on SIGINT. We should probably just remove this + * check, though it seems we also need to fix up + * the on_interrupt handling for workers' index + * builds (those PGconns don't seem to have c->cancel + * set, so we don't cancel the in-progress builds). + */ if (ret < 0 && errno != EINTR) elog(ERROR, "poll() failed: %d, %d", ret, errno); @@ -990,7 +1005,7 @@ static void repack_one_table(repack_table *table, const char *orderby) { PGresult *res = NULL; - const char *params[2]; + const char *params[3]; int num; char *vxid = NULL; char buffer[12]; @@ -1041,6 +1056,9 @@ repack_one_table(repack_table *table, const char *orderby) if (dryrun) return; + + /* push repack_cleanup_callback() on stack to clean temporary objects */ + pgut_atexit_push(repack_cleanup_callback, &table->target_oid); /* * 1. Setup advisory lock and trigger on main table. @@ -1146,8 +1164,11 @@ repack_one_table(repack_table *table, const char *orderby) CLEARPGRES(res); command(table->create_pktype, 0, NULL); + temp_obj_num++; command(table->create_log, 0, NULL); + temp_obj_num++; command(table->create_trigger, 0, NULL); + temp_obj_num++; command(table->enable_trigger, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); command(sql.data, 0, NULL); @@ -1283,6 +1304,7 @@ repack_one_table(repack_table *table, const char *orderby) goto cleanup; command(table->create_table, 0, NULL); + temp_obj_num++; printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); if (table->drop_columns) command(table->drop_columns, 0, NULL); @@ -1375,8 +1397,10 @@ repack_one_table(repack_table *table, const char *orderby) elog(DEBUG2, "---- drop ----"); command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); - command("SELECT repack.repack_drop($1)", 1, params); + params[1] = utoa(temp_obj_num, buffer); + command("SELECT repack.repack_drop($1, $2)", 2, params); command("COMMIT", 0, NULL); + temp_obj_num = 0; /* reset temporary object counter after cleanup */ /* * 7. Analyze. @@ -1395,7 +1419,7 @@ repack_one_table(repack_table *table, const char *orderby) /* Release advisory lock on table. */ params[0] = REPACK_LOCK_PREFIX_STR; - params[1] = buffer; + params[1] = utoa(table->target_oid, buffer); res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))", 2, params); @@ -1675,6 +1699,31 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta return ret; } +/* This function calls to repack_drop() to clean temporary objects on error + * in creation of temporary objects. + */ +void +repack_cleanup_callback(bool fatal, void *userdata) +{ + Oid target_table = *(Oid *) userdata; + const char *params[2]; + char buffer[12]; + + if(fatal) + { + params[0] = utoa(target_table, buffer); + params[1] = utoa(temp_obj_num, buffer); + + /* testing PQstatus() of connection and conn2, as we do + * in repack_cleanup(), doesn't seem to work here, + * so just use an unconditional reconnect(). + */ + reconnect(ERROR); + command("SELECT repack.repack_drop($1, $2)", 2, params); + temp_obj_num = 0; /* reset temporary object counter after cleanup */ + } +} + /* * The userdata pointing a table being re-organized. We need to cleanup temp * objects before the program exits. @@ -1689,7 +1738,7 @@ repack_cleanup(bool fatal, const repack_table *table) else { char buffer[12]; - const char *params[1]; + const char *params[2]; /* Try reconnection if not available. */ if (PQstatus(connection) != CONNECTION_OK || @@ -1698,7 +1747,9 @@ repack_cleanup(bool fatal, const repack_table *table) /* do cleanup */ params[0] = utoa(table->target_oid, buffer); - command("SELECT repack.repack_drop($1)", 1, params); + params[1] = utoa(temp_obj_num, buffer); + command("SELECT repack.repack_drop($1, $2)", 2, params); + temp_obj_num = 0; /* reset temporary object counter after cleanup */ } } diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index 52a6e21..e533bef 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -64,7 +64,7 @@ static void on_before_exec(pgutConn *conn); static void on_after_exec(pgutConn *conn); static void on_interrupt(void); static void on_cleanup(void); -static void exit_or_abort(int exitcode); +static void exit_or_abort(int exitcode, int elevel); void pgut_init(int argc, char **argv) @@ -872,7 +872,10 @@ pgut_errfinish(int dummy, ...) edata->detail.data); if (pgut_abort_level <= edata->elevel && edata->elevel <= PANIC) - exit_or_abort(edata->code); + { + in_cleanup = true; /* need to be set for cleaning temporary objects on error */ + exit_or_abort(edata->code, edata->elevel); + } } #ifndef PGUT_OVERRIDE_ELOG @@ -1180,7 +1183,9 @@ call_atexit_callbacks(bool fatal) pgut_atexit_item *item; for (item = pgut_atexit_stack; item; item = item->next) + { item->callback(fatal, item->userdata); + } } static void @@ -1193,12 +1198,19 @@ on_cleanup(void) } static void -exit_or_abort(int exitcode) +exit_or_abort(int exitcode, int elevel) { - if (in_cleanup) + + if (in_cleanup && FATAL > elevel) { /* oops, error in cleanup*/ call_atexit_callbacks(true); + exit(exitcode); + } + else if (FATAL <= elevel <= PANIC) + { + /* on FATAL or PANIC */ + call_atexit_callbacks(true); abort(); } else diff --git a/lib/pg_repack.sql.in b/lib/pg_repack.sql.in index 574fb0e..2c34013 100644 --- a/lib/pg_repack.sql.in +++ b/lib/pg_repack.sql.in @@ -247,7 +247,7 @@ CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS 'MODULE_PATHNAME', 'repack_swap' LANGUAGE C VOLATILE STRICT; -CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS +CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS 'MODULE_PATHNAME', 'repack_drop' LANGUAGE C VOLATILE STRICT; diff --git a/lib/repack.c b/lib/repack.c index f5f5c61..cce90ea 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -928,6 +928,7 @@ Datum repack_drop(PG_FUNCTION_ARGS) { Oid oid = PG_GETARG_OID(0); + int numobj = PG_GETARG_INT32(1); const char *relname = get_quoted_relname(oid); const char *nspname = get_quoted_nspname(oid); @@ -943,14 +944,41 @@ repack_drop(PG_FUNCTION_ARGS) /* connect to SPI manager */ repack_init(); + /* drop log table: must be done before dropping the pk type, + * since the log table is dependent on the pk type. (That's + * why we check numobj > 1 here.) + */ + if (numobj > 1) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TABLE IF EXISTS repack.log_%u CASCADE", + oid); + --numobj; + } + + /* drop type for pk type */ + if (numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TYPE IF EXISTS repack.pk_%u", + oid); + --numobj; + } + /* * drop repack trigger: We have already dropped the trigger in normal * cases, but it can be left on error. */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", - nspname, relname); + if (numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", + nspname, relname); + --numobj; + } #if PG_VERSION_NUM < 80400 /* delete autovacuum settings */ @@ -965,23 +993,15 @@ repack_drop(PG_FUNCTION_ARGS) oid, oid); #endif - /* drop log table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TABLE IF EXISTS repack.log_%u CASCADE", - oid); - /* drop temp table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TABLE IF EXISTS repack.table_%u CASCADE", - oid); - - /* drop type for log table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TYPE IF EXISTS repack.pk_%u CASCADE", - oid); + if (numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TABLE IF EXISTS repack.table_%u CASCADE", + oid); + --numobj; + } SPI_finish();