diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 0458d67..c4e10d2 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -204,6 +204,7 @@ static void repack_one_table(repack_table *table, const char *order_by); static bool repack_table_indexes(PGresult *index_details); static bool repack_all_indexes(char *errbuf, size_t errsize); static void repack_cleanup(bool fatal, const repack_table *table); +static void repack_cleanup_callback(bool fatal, void *userdata); static bool rebuild_indexes(const repack_table *table); static char *getstr(PGresult *res, int row, int col); @@ -235,13 +236,14 @@ static bool only_indexes = false; static int wait_timeout = 60; /* in seconds */ static int jobs = 0; /* number of concurrent worker conns. */ static bool dryrun = false; +static unsigned int temp_obj_num = 0; /* temporary objects counter */ /* buffer should have at least 11 bytes */ static char * utoa(unsigned int value, char *buffer) { sprintf(buffer, "%u", value); - return buffer; + return strdup(buffer); } static pgut_option options[] = @@ -990,7 +992,7 @@ static void repack_one_table(repack_table *table, const char *orderby) { PGresult *res = NULL; - const char *params[2]; + const char *params[3]; int num; char *vxid = NULL; char buffer[12]; @@ -1041,6 +1043,9 @@ repack_one_table(repack_table *table, const char *orderby) if (dryrun) return; + + /* push repack_cleanup_callback() on stack to clean temporary objects */ + pgut_atexit_push(repack_cleanup_callback, &table->target_oid); /* * 1. Setup advisory lock and trigger on main table. @@ -1146,8 +1151,11 @@ repack_one_table(repack_table *table, const char *orderby) CLEARPGRES(res); command(table->create_pktype, 0, NULL); + temp_obj_num++; command(table->create_log, 0, NULL); + temp_obj_num++; command(table->create_trigger, 0, NULL); + temp_obj_num++; command(table->enable_trigger, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); command(sql.data, 0, NULL); @@ -1287,6 +1295,7 @@ repack_one_table(repack_table *table, const char *orderby) goto cleanup; command(table->create_table, 0, NULL); + temp_obj_num++; printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); if (table->drop_columns) command(table->drop_columns, 0, NULL); @@ -1379,7 +1388,8 @@ repack_one_table(repack_table *table, const char *orderby) elog(DEBUG2, "---- drop ----"); command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); - command("SELECT repack.repack_drop($1)", 1, params); + params[1] = utoa(temp_obj_num, buffer); + command("SELECT repack.repack_drop($1, $2)", 2, params); command("COMMIT", 0, NULL); /* @@ -1399,7 +1409,7 @@ repack_one_table(repack_table *table, const char *orderby) /* Release advisory lock on table. */ params[0] = REPACK_LOCK_PREFIX_STR; - params[1] = buffer; + params[1] = utoa(table->target_oid, buffer); res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))", 2, params); @@ -1679,6 +1689,26 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta return ret; } +/* This function calls to repack_drop() to clean temporary objects on error + * in creation of temporary objects. + */ +void +repack_cleanup_callback(bool fatal, void *userdata) +{ + Oid target_table = *(Oid *) userdata; + const char *params[2]; + char buffer[12]; + + if(fatal) + { + params[0] = utoa(target_table, buffer); + params[1] = utoa(temp_obj_num, buffer); + + reconnect(ERROR); + command("SELECT repack.repack_drop($1, $2)", 2, params); + } +} + /* * The userdata pointing a table being re-organized. We need to cleanup temp * objects before the program exits. @@ -1693,7 +1723,7 @@ repack_cleanup(bool fatal, const repack_table *table) else { char buffer[12]; - const char *params[1]; + const char *params[2]; /* Try reconnection if not available. */ if (PQstatus(connection) != CONNECTION_OK || @@ -1702,7 +1732,8 @@ repack_cleanup(bool fatal, const repack_table *table) /* do cleanup */ params[0] = utoa(table->target_oid, buffer); - command("SELECT repack.repack_drop($1)", 1, params); + params[1] = utoa(temp_obj_num, buffer); + command("SELECT repack.repack_drop($1, $2)", 2, params); } } diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index 52a6e21..37cc301 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -1180,7 +1180,9 @@ call_atexit_callbacks(bool fatal) pgut_atexit_item *item; for (item = pgut_atexit_stack; item; item = item->next) + { item->callback(fatal, item->userdata); + } } static void @@ -1195,10 +1197,11 @@ on_cleanup(void) static void exit_or_abort(int exitcode) { + call_atexit_callbacks(true); if (in_cleanup) { /* oops, error in cleanup*/ - call_atexit_callbacks(true); + in_cleanup = false; abort(); } else diff --git a/lib/pg_repack.sql.in b/lib/pg_repack.sql.in index 574fb0e..2c34013 100644 --- a/lib/pg_repack.sql.in +++ b/lib/pg_repack.sql.in @@ -247,7 +247,7 @@ CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS 'MODULE_PATHNAME', 'repack_swap' LANGUAGE C VOLATILE STRICT; -CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS +CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS 'MODULE_PATHNAME', 'repack_drop' LANGUAGE C VOLATILE STRICT; diff --git a/lib/repack.c b/lib/repack.c index f5f5c61..070fd3a 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -928,6 +928,7 @@ Datum repack_drop(PG_FUNCTION_ARGS) { Oid oid = PG_GETARG_OID(0); + int numobj = PG_GETARG_INT32(1); const char *relname = get_quoted_relname(oid); const char *nspname = get_quoted_nspname(oid); @@ -943,14 +944,38 @@ repack_drop(PG_FUNCTION_ARGS) /* connect to SPI manager */ repack_init(); + /* drop log table */ + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TABLE IF EXISTS repack.log_%u CASCADE", + oid); + --numobj; + } + + /* drop type for pk type */ + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TYPE IF EXISTS repack.pk_%u", + oid); + --numobj; + } + /* * drop repack trigger: We have already dropped the trigger in normal * cases, but it can be left on error. */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", - nspname, relname); + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", + nspname, relname); + --numobj; + } #if PG_VERSION_NUM < 80400 /* delete autovacuum settings */ @@ -965,23 +990,15 @@ repack_drop(PG_FUNCTION_ARGS) oid, oid); #endif - /* drop log table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TABLE IF EXISTS repack.log_%u CASCADE", - oid); - /* drop temp table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TABLE IF EXISTS repack.table_%u CASCADE", - oid); - - /* drop type for log table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TYPE IF EXISTS repack.pk_%u CASCADE", - oid); + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TABLE IF EXISTS repack.table_%u CASCADE", + oid); + --numobj; + } SPI_finish();