From ad109edb5b23f07a8f15917f23d431b54d6cdf8c Mon Sep 17 00:00:00 2001 From: kotsachin Date: Fri, 17 Apr 2015 13:07:01 +0900 Subject: [PATCH 1/3] This patch contains repack_cleanup_callback() which calls to repack_drop() for cleaning temporary objects. repack_cleanup_callback() will be pushed on stack using pgut_atexit_push() at beginning so that it will pop on abort or exit of program. This patch includes one global counter (temp_obj_num) which counts number of temporary objects created by pg_repack. Correct order of deletion of temporary object as per count avoids unintentional error messages. --- bin/pg_repack.c | 43 ++++++++++++++++++++++++++++----- bin/pgut/pgut.c | 5 +++- lib/pg_repack.sql.in | 2 +- lib/repack.c | 57 ++++++++++++++++++++++++++++---------------- 4 files changed, 79 insertions(+), 28 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index d8b2962..7105f53 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -204,6 +204,7 @@ static void repack_one_table(repack_table *table, const char *order_by); static bool repack_table_indexes(PGresult *index_details); static bool repack_all_indexes(char *errbuf, size_t errsize); static void repack_cleanup(bool fatal, const repack_table *table); +static void repack_cleanup_callback(bool fatal, void *userdata); static bool rebuild_indexes(const repack_table *table); static char *getstr(PGresult *res, int row, int col); @@ -235,13 +236,14 @@ static bool only_indexes = false; static int wait_timeout = 60; /* in seconds */ static int jobs = 0; /* number of concurrent worker conns. */ static bool dryrun = false; +static unsigned int temp_obj_num = 0; /* temporary objects counter */ /* buffer should have at least 11 bytes */ static char * utoa(unsigned int value, char *buffer) { sprintf(buffer, "%u", value); - return buffer; + return strdup(buffer); } static pgut_option options[] = @@ -990,7 +992,7 @@ static void repack_one_table(repack_table *table, const char *orderby) { PGresult *res = NULL; - const char *params[2]; + const char *params[3]; int num; char *vxid = NULL; char buffer[12]; @@ -1041,6 +1043,9 @@ repack_one_table(repack_table *table, const char *orderby) if (dryrun) return; + + /* push repack_cleanup_callback() on stack to clean temporary objects */ + pgut_atexit_push(repack_cleanup_callback, &table->target_oid); /* * 1. Setup advisory lock and trigger on main table. @@ -1146,8 +1151,11 @@ repack_one_table(repack_table *table, const char *orderby) CLEARPGRES(res); command(table->create_pktype, 0, NULL); + temp_obj_num++; command(table->create_log, 0, NULL); + temp_obj_num++; command(table->create_trigger, 0, NULL); + temp_obj_num++; command(table->enable_trigger, 0, NULL); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); command(sql.data, 0, NULL); @@ -1287,6 +1295,7 @@ repack_one_table(repack_table *table, const char *orderby) goto cleanup; command(table->create_table, 0, NULL); + temp_obj_num++; printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); if (table->drop_columns) command(table->drop_columns, 0, NULL); @@ -1379,7 +1388,8 @@ repack_one_table(repack_table *table, const char *orderby) elog(DEBUG2, "---- drop ----"); command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); - command("SELECT repack.repack_drop($1)", 1, params); + params[1] = utoa(temp_obj_num, buffer); + command("SELECT repack.repack_drop($1, $2)", 2, params); command("COMMIT", 0, NULL); /* @@ -1399,7 +1409,7 @@ repack_one_table(repack_table *table, const char *orderby) /* Release advisory lock on table. */ params[0] = REPACK_LOCK_PREFIX_STR; - params[1] = buffer; + params[1] = utoa(table->target_oid, buffer); res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))", 2, params); @@ -1679,6 +1689,26 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta return ret; } +/* This function calls to repack_drop() to clean temporary objects on error + * in creation of temporary objects. + */ +void +repack_cleanup_callback(bool fatal, void *userdata) +{ + Oid target_table = *(Oid *) userdata; + const char *params[2]; + char buffer[12]; + + if(fatal) + { + params[0] = utoa(target_table, buffer); + params[1] = utoa(temp_obj_num, buffer); + + reconnect(ERROR); + command("SELECT repack.repack_drop($1, $2)", 2, params); + } +} + /* * The userdata pointing a table being re-organized. We need to cleanup temp * objects before the program exits. @@ -1693,7 +1723,7 @@ repack_cleanup(bool fatal, const repack_table *table) else { char buffer[12]; - const char *params[1]; + const char *params[2]; /* Try reconnection if not available. */ if (PQstatus(connection) != CONNECTION_OK || @@ -1702,7 +1732,8 @@ repack_cleanup(bool fatal, const repack_table *table) /* do cleanup */ params[0] = utoa(table->target_oid, buffer); - command("SELECT repack.repack_drop($1)", 1, params); + params[1] = utoa(temp_obj_num, buffer); + command("SELECT repack.repack_drop($1, $2)", 2, params); } } diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index 52a6e21..37cc301 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -1180,7 +1180,9 @@ call_atexit_callbacks(bool fatal) pgut_atexit_item *item; for (item = pgut_atexit_stack; item; item = item->next) + { item->callback(fatal, item->userdata); + } } static void @@ -1195,10 +1197,11 @@ on_cleanup(void) static void exit_or_abort(int exitcode) { + call_atexit_callbacks(true); if (in_cleanup) { /* oops, error in cleanup*/ - call_atexit_callbacks(true); + in_cleanup = false; abort(); } else diff --git a/lib/pg_repack.sql.in b/lib/pg_repack.sql.in index 574fb0e..2c34013 100644 --- a/lib/pg_repack.sql.in +++ b/lib/pg_repack.sql.in @@ -247,7 +247,7 @@ CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS 'MODULE_PATHNAME', 'repack_swap' LANGUAGE C VOLATILE STRICT; -CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS +CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS 'MODULE_PATHNAME', 'repack_drop' LANGUAGE C VOLATILE STRICT; diff --git a/lib/repack.c b/lib/repack.c index f5f5c61..070fd3a 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -928,6 +928,7 @@ Datum repack_drop(PG_FUNCTION_ARGS) { Oid oid = PG_GETARG_OID(0); + int numobj = PG_GETARG_INT32(1); const char *relname = get_quoted_relname(oid); const char *nspname = get_quoted_nspname(oid); @@ -943,14 +944,38 @@ repack_drop(PG_FUNCTION_ARGS) /* connect to SPI manager */ repack_init(); + /* drop log table */ + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TABLE IF EXISTS repack.log_%u CASCADE", + oid); + --numobj; + } + + /* drop type for pk type */ + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TYPE IF EXISTS repack.pk_%u", + oid); + --numobj; + } + /* * drop repack trigger: We have already dropped the trigger in normal * cases, but it can be left on error. */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", - nspname, relname); + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", + nspname, relname); + --numobj; + } #if PG_VERSION_NUM < 80400 /* delete autovacuum settings */ @@ -965,23 +990,15 @@ repack_drop(PG_FUNCTION_ARGS) oid, oid); #endif - /* drop log table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TABLE IF EXISTS repack.log_%u CASCADE", - oid); - /* drop temp table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TABLE IF EXISTS repack.table_%u CASCADE", - oid); - - /* drop type for log table */ - execute_with_format( - SPI_OK_UTILITY, - "DROP TYPE IF EXISTS repack.pk_%u CASCADE", - oid); + if(numobj > 0) + { + execute_with_format( + SPI_OK_UTILITY, + "DROP TABLE IF EXISTS repack.table_%u CASCADE", + oid); + --numobj; + } SPI_finish(); From 8a0466e4c2d45521dc4b0ffabe07c89cec869bf7 Mon Sep 17 00:00:00 2001 From: kotsachin Date: Mon, 25 May 2015 18:14:47 +0900 Subject: [PATCH 2/3] Some improvements and fixes to previously submitted pull request for cleaning temporary objects --- bin/pg_repack.c | 5 ++++- bin/pgut/pgut.c | 21 +++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 7105f53..84f5dac 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -243,7 +243,7 @@ static char * utoa(unsigned int value, char *buffer) { sprintf(buffer, "%u", value); - return strdup(buffer); + return pgut_strdup(buffer); } static pgut_option options[] = @@ -1391,6 +1391,7 @@ repack_one_table(repack_table *table, const char *orderby) params[1] = utoa(temp_obj_num, buffer); command("SELECT repack.repack_drop($1, $2)", 2, params); command("COMMIT", 0, NULL); + temp_obj_num = 0; /* reset temporary object counter after cleanup */ /* * 7. Analyze. @@ -1706,6 +1707,7 @@ repack_cleanup_callback(bool fatal, void *userdata) reconnect(ERROR); command("SELECT repack.repack_drop($1, $2)", 2, params); + temp_obj_num = 0; /* reset temporary object counter after cleanup */ } } @@ -1734,6 +1736,7 @@ repack_cleanup(bool fatal, const repack_table *table) params[0] = utoa(table->target_oid, buffer); params[1] = utoa(temp_obj_num, buffer); command("SELECT repack.repack_drop($1, $2)", 2, params); + temp_obj_num = 0; /* reset temporary object counter after cleanup */ } } diff --git a/bin/pgut/pgut.c b/bin/pgut/pgut.c index 37cc301..e533bef 100644 --- a/bin/pgut/pgut.c +++ b/bin/pgut/pgut.c @@ -64,7 +64,7 @@ static void on_before_exec(pgutConn *conn); static void on_after_exec(pgutConn *conn); static void on_interrupt(void); static void on_cleanup(void); -static void exit_or_abort(int exitcode); +static void exit_or_abort(int exitcode, int elevel); void pgut_init(int argc, char **argv) @@ -872,7 +872,10 @@ pgut_errfinish(int dummy, ...) edata->detail.data); if (pgut_abort_level <= edata->elevel && edata->elevel <= PANIC) - exit_or_abort(edata->code); + { + in_cleanup = true; /* need to be set for cleaning temporary objects on error */ + exit_or_abort(edata->code, edata->elevel); + } } #ifndef PGUT_OVERRIDE_ELOG @@ -1195,13 +1198,19 @@ on_cleanup(void) } static void -exit_or_abort(int exitcode) +exit_or_abort(int exitcode, int elevel) { - call_atexit_callbacks(true); - if (in_cleanup) + + if (in_cleanup && FATAL > elevel) { /* oops, error in cleanup*/ - in_cleanup = false; + call_atexit_callbacks(true); + exit(exitcode); + } + else if (FATAL <= elevel <= PANIC) + { + /* on FATAL or PANIC */ + call_atexit_callbacks(true); abort(); } else From 8fc8b656a2bfc784ba60c15c559cead27b67a2d9 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Mon, 25 May 2015 16:56:44 -0400 Subject: [PATCH 3/3] Some comments about our new atexit handling. --- bin/pg_repack.c | 19 ++++++++++++++++++- lib/repack.c | 13 ++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index b275e27..f1293fe 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -243,6 +243,12 @@ static char * utoa(unsigned int value, char *buffer) { sprintf(buffer, "%u", value); + /* XXX: originally, we would just return buffer here without + * the pgut_strdup(). But repack_cleanup_callback() seems to + * depend on getting back a freshly strdup'd copy of buffer, + * not sure why. So now we are leaking a tiny bit of memory + * with each utoa() call. + */ return pgut_strdup(buffer); } @@ -900,6 +906,13 @@ rebuild_indexes(const repack_table *table) ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); #endif + /* XXX: the errno != EINTR check means we won't bail + * out on SIGINT. We should probably just remove this + * check, though it seems we also need to fix up + * the on_interrupt handling for workers' index + * builds (those PGconns don't seem to have c->cancel + * set, so we don't cancel the in-progress builds). + */ if (ret < 0 && errno != EINTR) elog(ERROR, "poll() failed: %d, %d", ret, errno); @@ -1703,8 +1716,12 @@ repack_cleanup_callback(bool fatal, void *userdata) if(fatal) { params[0] = utoa(target_table, buffer); - params[1] = utoa(temp_obj_num, buffer); + params[1] = utoa(temp_obj_num, buffer); + /* testing PQstatus() of connection and conn2, as we do + * in repack_cleanup(), doesn't seem to work here, + * so just use an unconditional reconnect(). + */ reconnect(ERROR); command("SELECT repack.repack_drop($1, $2)", 2, params); temp_obj_num = 0; /* reset temporary object counter after cleanup */ diff --git a/lib/repack.c b/lib/repack.c index 070fd3a..cce90ea 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -944,8 +944,11 @@ repack_drop(PG_FUNCTION_ARGS) /* connect to SPI manager */ repack_init(); - /* drop log table */ - if(numobj > 0) + /* drop log table: must be done before dropping the pk type, + * since the log table is dependent on the pk type. (That's + * why we check numobj > 1 here.) + */ + if (numobj > 1) { execute_with_format( SPI_OK_UTILITY, @@ -955,7 +958,7 @@ repack_drop(PG_FUNCTION_ARGS) } /* drop type for pk type */ - if(numobj > 0) + if (numobj > 0) { execute_with_format( SPI_OK_UTILITY, @@ -968,7 +971,7 @@ repack_drop(PG_FUNCTION_ARGS) * drop repack trigger: We have already dropped the trigger in normal * cases, but it can be left on error. */ - if(numobj > 0) + if (numobj > 0) { execute_with_format( SPI_OK_UTILITY, @@ -991,7 +994,7 @@ repack_drop(PG_FUNCTION_ARGS) #endif /* drop temp table */ - if(numobj > 0) + if (numobj > 0) { execute_with_format( SPI_OK_UTILITY,