Merge branch 'master' of https://github.com/kotsachin/pg_repack into kotsachin-master

This commit is contained in:
Josh Kupershmidt 2015-05-01 15:30:34 -04:00
commit 4e69428700
4 changed files with 79 additions and 28 deletions

View File

@ -204,6 +204,7 @@ static void repack_one_table(repack_table *table, const char *order_by);
static bool repack_table_indexes(PGresult *index_details); static bool repack_table_indexes(PGresult *index_details);
static bool repack_all_indexes(char *errbuf, size_t errsize); static bool repack_all_indexes(char *errbuf, size_t errsize);
static void repack_cleanup(bool fatal, const repack_table *table); static void repack_cleanup(bool fatal, const repack_table *table);
static void repack_cleanup_callback(bool fatal, void *userdata);
static bool rebuild_indexes(const repack_table *table); static bool rebuild_indexes(const repack_table *table);
static char *getstr(PGresult *res, int row, int col); static char *getstr(PGresult *res, int row, int col);
@ -235,13 +236,14 @@ static bool only_indexes = false;
static int wait_timeout = 60; /* in seconds */ static int wait_timeout = 60; /* in seconds */
static int jobs = 0; /* number of concurrent worker conns. */ static int jobs = 0; /* number of concurrent worker conns. */
static bool dryrun = false; static bool dryrun = false;
static unsigned int temp_obj_num = 0; /* temporary objects counter */
/* buffer should have at least 11 bytes */ /* buffer should have at least 11 bytes */
static char * static char *
utoa(unsigned int value, char *buffer) utoa(unsigned int value, char *buffer)
{ {
sprintf(buffer, "%u", value); sprintf(buffer, "%u", value);
return buffer; return strdup(buffer);
} }
static pgut_option options[] = static pgut_option options[] =
@ -990,7 +992,7 @@ static void
repack_one_table(repack_table *table, const char *orderby) repack_one_table(repack_table *table, const char *orderby)
{ {
PGresult *res = NULL; PGresult *res = NULL;
const char *params[2]; const char *params[3];
int num; int num;
char *vxid = NULL; char *vxid = NULL;
char buffer[12]; char buffer[12];
@ -1042,6 +1044,9 @@ repack_one_table(repack_table *table, const char *orderby)
if (dryrun) if (dryrun)
return; return;
/* push repack_cleanup_callback() on stack to clean temporary objects */
pgut_atexit_push(repack_cleanup_callback, &table->target_oid);
/* /*
* 1. Setup advisory lock and trigger on main table. * 1. Setup advisory lock and trigger on main table.
*/ */
@ -1146,8 +1151,11 @@ repack_one_table(repack_table *table, const char *orderby)
CLEARPGRES(res); CLEARPGRES(res);
command(table->create_pktype, 0, NULL); command(table->create_pktype, 0, NULL);
temp_obj_num++;
command(table->create_log, 0, NULL); command(table->create_log, 0, NULL);
temp_obj_num++;
command(table->create_trigger, 0, NULL); command(table->create_trigger, 0, NULL);
temp_obj_num++;
command(table->enable_trigger, 0, NULL); command(table->enable_trigger, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
command(sql.data, 0, NULL); command(sql.data, 0, NULL);
@ -1287,6 +1295,7 @@ repack_one_table(repack_table *table, const char *orderby)
goto cleanup; goto cleanup;
command(table->create_table, 0, NULL); command(table->create_table, 0, NULL);
temp_obj_num++;
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid); printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
if (table->drop_columns) if (table->drop_columns)
command(table->drop_columns, 0, NULL); command(table->drop_columns, 0, NULL);
@ -1379,7 +1388,8 @@ repack_one_table(repack_table *table, const char *orderby)
elog(DEBUG2, "---- drop ----"); elog(DEBUG2, "---- drop ----");
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
command("SELECT repack.repack_drop($1)", 1, params); params[1] = utoa(temp_obj_num, buffer);
command("SELECT repack.repack_drop($1, $2)", 2, params);
command("COMMIT", 0, NULL); command("COMMIT", 0, NULL);
/* /*
@ -1399,7 +1409,7 @@ repack_one_table(repack_table *table, const char *orderby)
/* Release advisory lock on table. */ /* Release advisory lock on table. */
params[0] = REPACK_LOCK_PREFIX_STR; params[0] = REPACK_LOCK_PREFIX_STR;
params[1] = buffer; params[1] = utoa(table->target_oid, buffer);
res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))", res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))",
2, params); 2, params);
@ -1679,6 +1689,26 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
return ret; return ret;
} }
/* This function calls to repack_drop() to clean temporary objects on error
* in creation of temporary objects.
*/
void
repack_cleanup_callback(bool fatal, void *userdata)
{
Oid target_table = *(Oid *) userdata;
const char *params[2];
char buffer[12];
if(fatal)
{
params[0] = utoa(target_table, buffer);
params[1] = utoa(temp_obj_num, buffer);
reconnect(ERROR);
command("SELECT repack.repack_drop($1, $2)", 2, params);
}
}
/* /*
* The userdata pointing a table being re-organized. We need to cleanup temp * The userdata pointing a table being re-organized. We need to cleanup temp
* objects before the program exits. * objects before the program exits.
@ -1693,7 +1723,7 @@ repack_cleanup(bool fatal, const repack_table *table)
else else
{ {
char buffer[12]; char buffer[12];
const char *params[1]; const char *params[2];
/* Try reconnection if not available. */ /* Try reconnection if not available. */
if (PQstatus(connection) != CONNECTION_OK || if (PQstatus(connection) != CONNECTION_OK ||
@ -1702,7 +1732,8 @@ repack_cleanup(bool fatal, const repack_table *table)
/* do cleanup */ /* do cleanup */
params[0] = utoa(table->target_oid, buffer); params[0] = utoa(table->target_oid, buffer);
command("SELECT repack.repack_drop($1)", 1, params); params[1] = utoa(temp_obj_num, buffer);
command("SELECT repack.repack_drop($1, $2)", 2, params);
} }
} }

View File

@ -1180,7 +1180,9 @@ call_atexit_callbacks(bool fatal)
pgut_atexit_item *item; pgut_atexit_item *item;
for (item = pgut_atexit_stack; item; item = item->next) for (item = pgut_atexit_stack; item; item = item->next)
{
item->callback(fatal, item->userdata); item->callback(fatal, item->userdata);
}
} }
static void static void
@ -1195,10 +1197,11 @@ on_cleanup(void)
static void static void
exit_or_abort(int exitcode) exit_or_abort(int exitcode)
{ {
call_atexit_callbacks(true);
if (in_cleanup) if (in_cleanup)
{ {
/* oops, error in cleanup*/ /* oops, error in cleanup*/
call_atexit_callbacks(true); in_cleanup = false;
abort(); abort();
} }
else else

View File

@ -247,7 +247,7 @@ CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS
'MODULE_PATHNAME', 'repack_swap' 'MODULE_PATHNAME', 'repack_swap'
LANGUAGE C VOLATILE STRICT; LANGUAGE C VOLATILE STRICT;
CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS
'MODULE_PATHNAME', 'repack_drop' 'MODULE_PATHNAME', 'repack_drop'
LANGUAGE C VOLATILE STRICT; LANGUAGE C VOLATILE STRICT;

View File

@ -928,6 +928,7 @@ Datum
repack_drop(PG_FUNCTION_ARGS) repack_drop(PG_FUNCTION_ARGS)
{ {
Oid oid = PG_GETARG_OID(0); Oid oid = PG_GETARG_OID(0);
int numobj = PG_GETARG_INT32(1);
const char *relname = get_quoted_relname(oid); const char *relname = get_quoted_relname(oid);
const char *nspname = get_quoted_nspname(oid); const char *nspname = get_quoted_nspname(oid);
@ -943,14 +944,38 @@ repack_drop(PG_FUNCTION_ARGS)
/* connect to SPI manager */ /* connect to SPI manager */
repack_init(); repack_init();
/* drop log table */
if(numobj > 0)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.log_%u CASCADE",
oid);
--numobj;
}
/* drop type for pk type */
if(numobj > 0)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP TYPE IF EXISTS repack.pk_%u",
oid);
--numobj;
}
/* /*
* drop repack trigger: We have already dropped the trigger in normal * drop repack trigger: We have already dropped the trigger in normal
* cases, but it can be left on error. * cases, but it can be left on error.
*/ */
if(numobj > 0)
{
execute_with_format( execute_with_format(
SPI_OK_UTILITY, SPI_OK_UTILITY,
"DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE", "DROP TRIGGER IF EXISTS z_repack_trigger ON %s.%s CASCADE",
nspname, relname); nspname, relname);
--numobj;
}
#if PG_VERSION_NUM < 80400 #if PG_VERSION_NUM < 80400
/* delete autovacuum settings */ /* delete autovacuum settings */
@ -965,23 +990,15 @@ repack_drop(PG_FUNCTION_ARGS)
oid, oid); oid, oid);
#endif #endif
/* drop log table */
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.log_%u CASCADE",
oid);
/* drop temp table */ /* drop temp table */
if(numobj > 0)
{
execute_with_format( execute_with_format(
SPI_OK_UTILITY, SPI_OK_UTILITY,
"DROP TABLE IF EXISTS repack.table_%u CASCADE", "DROP TABLE IF EXISTS repack.table_%u CASCADE",
oid); oid);
--numobj;
/* drop type for log table */ }
execute_with_format(
SPI_OK_UTILITY,
"DROP TYPE IF EXISTS repack.pk_%u CASCADE",
oid);
SPI_finish(); SPI_finish();