From ee23ec8ffd25a67699d2195f1a198c37a6c3fdb4 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Fri, 21 Jun 2013 18:41:09 -0400 Subject: [PATCH] Initial import of indexes-only building patch. Patch from Beena Emerson. --- bin/pg_repack.c | 290 ++++++++++++++++++++++++++++++++++++++++--- lib/pg_repack.sql.in | 6 +- lib/repack.c | 41 +++++- 3 files changed, 319 insertions(+), 18 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 47759f0..70ae380 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -177,6 +177,8 @@ static bool preliminary_checks(char *errbuf, size_t errsize); static void repack_all_databases(const char *order_by); static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize); static void repack_one_table(const repack_table *table, const char *order_by); +static bool repack_one_index(Oid table, const char *table_name, Oid index, const char *schema_name); +static bool repack_all_indexes(char *errbuf, size_t errsize); static void repack_cleanup(bool fatal, const repack_table *table); static bool rebuild_indexes(const repack_table *table); @@ -202,6 +204,8 @@ static SimpleStringList table_list = {NULL, NULL}; static char *orderby = NULL; static char *tablespace = NULL; static bool moveidx = false; +static char *r_index = NULL; +static bool only_indexes = false; static int wait_timeout = 60; /* in seconds */ static int jobs = 0; /* number of concurrent worker conns. */ @@ -221,6 +225,8 @@ static pgut_option options[] = { 's', 'o', "order-by", &orderby }, { 's', 's', "tablespace", &tablespace }, { 'b', 'S', "moveidx", &moveidx }, + { 's', 'i', "index", &r_index }, + { 'b', 'x', "only-index", &only_indexes }, { 'i', 'T', "wait-timeout", &wait_timeout }, { 'B', 'Z', "no-analyze", &analyze }, { 'i', 'j', "jobs", &jobs }, @@ -231,6 +237,7 @@ int main(int argc, char *argv[]) { int i; + char errbuf[256]; i = pgut_getopt(argc, argv, options); @@ -243,24 +250,58 @@ main(int argc, char *argv[]) check_tablespace(); - if (noorder) - orderby = ""; - - if (alldb) + if (r_index || only_indexes) { - if (table_list.head) - ereport(ERROR, - (errcode(EINVAL), - errmsg("cannot repack specific table(s) in all databases"))); - repack_all_databases(orderby); + if (r_index && table_list.head) + ereport(ERROR, (errcode(EINVAL), + errmsg("cannot specify --index (-i) and --table (-t)"))); + else if (r_index && only_indexes) + ereport(ERROR, (errcode(EINVAL), + errmsg("cannot specify --index (-i) and --indexes_only (-x)"))); + else if (only_indexes && !table_list.head) + ereport(ERROR, (errcode(EINVAL), + errmsg("cannot repack all indexes of database, specify the table with -t option"))); + else if (alldb) + ereport(ERROR, (errcode(EINVAL), + errmsg("cannot repack specific index(es) in all databases"))); + else + { + if (orderby) + ereport(WARNING, (errcode(EINVAL), + errmsg("option -o (--order-by) has no effect while repacking indexes"))); + else if (noorder) + ereport(WARNING, (errcode(EINVAL), + errmsg("option -n (--no-order) has no effect while repacking indexes"))); + else if (!analyze) + ereport(WARNING, (errcode(EINVAL), + errmsg("ANALYZE is not performed after repacking indexes, -z (--no-analyze) has no effect"))); + else if (jobs) + ereport(WARNING, (errcode(EINVAL), + errmsg("option -j (--jobs) has no effect, repacking indexes doesnot use parallel jobs"))); + if (!repack_all_indexes(errbuf, sizeof(errbuf))) + ereport(ERROR, + (errcode(ERROR), errmsg("%s", errbuf))); + } } else { - char errbuf[256]; - if (!repack_one_database(orderby, errbuf, sizeof(errbuf))) - ereport(ERROR, - (errcode(ERROR), - errmsg("%s", errbuf))); + if (noorder) + orderby = ""; + + if (alldb) + { + if (table_list.head) + ereport(ERROR, + (errcode(EINVAL), + errmsg("cannot repack specific table(s) in all databases"))); + repack_all_databases(orderby); + } + else + { + if (!repack_one_database(orderby, errbuf, sizeof(errbuf))) + ereport(ERROR, + (errcode(ERROR), errmsg("%s", errbuf))); + } } return 0; @@ -710,7 +751,7 @@ rebuild_indexes(const repack_table *table) } res = execute("SELECT indexrelid," - " repack.repack_indexdef(indexrelid, indrelid, $2) " + " repack.repack_indexdef(indexrelid, indrelid, $2, FALSE) " " FROM pg_index WHERE indrelid = $1 AND indisvalid", 2, params); @@ -1533,6 +1574,223 @@ repack_cleanup(bool fatal, const repack_table *table) } } +/* + * repack one index + */ +static bool +repack_one_index(Oid table, const char *table_name, Oid index, const char *schema_name){ + bool ret = false; + PGresult *res = NULL; + StringInfoData sql, temp_index; + char buffer[2][12]; + char *create_idx; + const char *params[3]; + + params[0] = utoa(index, buffer[0]); + params[1] = utoa(table, buffer[1]); + params[2] = tablespace; + res = execute("SELECT repack.repack_indexdef($1, $2, $3, true)", 3, params); + if (PQntuples(res) < 1) + { + ereport(ERROR, (errcode(EINVAL), + errmsg("unable to generate SQL to CREATE new index"))); + goto cleanup; + } + create_idx = getstr(res, 0, 0); + CLEARPGRES(res); + res = execute_elevel(create_idx, 0, NULL, DEBUG2); + + initStringInfo(&temp_index); + if (schema_name) + appendStringInfo(&temp_index, "%s.index_%u", schema_name, index); + else + appendStringInfo(&temp_index, "index_%u", index); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + ereport(ERROR, + (errcode(E_PG_COMMAND), + errmsg("%s", PQerrorMessage(connection)), + errdetail("The temporary index may be left behind " + " by a pg_repack command on the table which" + " was interrupted and failed to clean up" + " the temporary objects. Please use the \"DROP INDEX %s\"" + " to remove the temporary index.", temp_index.data))); + goto cleanup; + } + CLEARPGRES(res); + + /* take exclusive lock on table before calling repack_index_swap() */ + initStringInfo(&sql); + if (schema_name) + appendStringInfo(&sql, "LOCK TABLE %s.%s IN ACCESS EXCLUSIVE MODE", schema_name, table_name); + else + appendStringInfo(&sql, "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE", table_name); + if (!(lock_exclusive(connection, params[1], sql.data, TRUE))) + { + elog(WARNING, "lock_exclusive() failed in connection for %s", + table_name); + goto drop_idx; + } + pgut_command(connection, "SELECT repack.repack_index_swap($1)", 1, params); + pgut_command(connection, "COMMIT", 0, NULL); + +drop_idx: + initStringInfo(&sql); +#if PG_VERSION_NUM < 90200 + appendStringInfoString(&sql, "DROP INDEX "); +#else + appendStringInfoString(&sql, "DROP INDEX CONCURRENTLY "); +#endif + appendStringInfo(&sql, "%s", temp_index.data); + command(sql.data, 0, NULL); + ret = true; +cleanup: + CLEARPGRES(res); + termStringInfo(&sql); + return ret; +} + +/* + * Call repack_one_index for each of the indexes + */ +static bool +repack_all_indexes(char *errbuf, size_t errsize){ + bool ret = false; + PGresult *res = NULL, *res2 = NULL; + int i; + int num; + StringInfoData sql; + const char *params[1]; + const char *table_name = NULL; + const char *schema_name = NULL; + char *pos; + + initStringInfo(&sql); + reconnect(ERROR); + + if (!preliminary_checks(errbuf, errsize)) + goto cleanup; + + /* If only one index is specified, append the appropriate data to the sql and check if the index exists */ + if (r_index) + { + appendStringInfoString(&sql, "SELECT i.relname, idx.indexrelid, idx.indisvalid, tbl.oid, tbl.relname" + " FROM pg_class tbl JOIN pg_index idx ON tbl.oid = idx.indrelid" + " JOIN pg_class i ON i.oid = idx.indexrelid" + " WHERE idx.indexrelid = $1::regclass"); + params[0] = r_index; + + res = execute_elevel(sql.data, 1, params, DEBUG2); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + snprintf(errbuf, errsize, "%s", PQerrorMessage(connection)); + goto cleanup; + } + else + { + num = PQntuples(res); + if (num == 0) + { + ereport(ERROR, + (errcode(EINVAL), + errmsg("index \"%s\" doesnot exist.\n", r_index))); + goto cleanup; + } + } + + // seperate schema name and index name + pos = strchr(params[0], '.'); + if (pos) + { + pos[0] = '\0'; + schema_name = params[0]; + r_index = pos + 1; + } + table_name = getstr(res, 0, 4); + } + /* To repack all indexes, append appropriate data to the sql and run the query */ + else { + params[0] = table_list.head->val; + + appendStringInfoString(&sql, "SELECT i.relname, idx.indexrelid, idx.indisvalid, idx.indrelid" + " FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid" + " WHERE idx.indrelid = $1::regclass"); + + res = execute_elevel(sql.data, 1, params, DEBUG2); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + snprintf(errbuf, errsize, "%s", PQerrorMessage(connection)); + goto cleanup; + } + else + { + num = PQntuples(res); + if (num == 0) + { + elog(WARNING, "\"%s\" doesnot have any indexes", table_list.head->val); + ret = true; + goto cleanup; + } + } + + // seperate schema name and table name + pos = strchr(params[0], '.'); + if (pos) + { + pos[0] = '\0'; + schema_name = params[0]; + table_name = pos + 1; + } + else + table_name = params[0]; + } + + /* Check if any concurrent pg_repack command is being run on the same table */ + initStringInfo(&sql); + appendStringInfo(&sql, "SELECT pg_try_advisory_lock(%u)", getoid(res, 0, 3)); + + res2 = execute_elevel(sql.data, 0, NULL, DEBUG2); + if (PQresultStatus(res2) != PGRES_TUPLES_OK) + { + elog(ERROR, "%s", PQerrorMessage(connection)); + goto cleanup; + } + else if (strcmp(getstr(res2, 0, 0), "f") == 0) + { + snprintf(errbuf, errsize, "Another pg_repack command may be running on the table. Please try again later."); + goto cleanup; + } + + for (i = 0; i < num; i++) + { + char *isvalid = getstr(res, i, 2); + if (isvalid[0] == 't') + { + if (schema_name) + elog(INFO, "repacking index \"%s.%s\"", schema_name, getstr(res, i, 0)); + else + elog(INFO, "repacking index \"%s\"", getstr(res, i, 0)); + + if (!(repack_one_index(getoid(res, i, 3), table_name, getoid(res, i, 1), schema_name))) + goto cleanup; + } + else + if (schema_name) + elog(WARNING, "skipping invalid index: %s.%s", schema_name, getstr(res, i, 0)); + else + elog(WARNING, "skipping invalid index: %s", getstr(res, i, 0)); + } + ret = true; +cleanup: + CLEARPGRES(res); + disconnect(); + termStringInfo(&sql); + return ret; +} + void pgut_help(bool details) { @@ -1551,6 +1809,8 @@ pgut_help(bool details) printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n"); printf(" -n, --no-order do vacuum full instead of cluster\n"); printf(" -j, --jobs=NUM Use this many parallel jobs for each table\n"); + printf(" -i, --index=INDEX move only the specified index\n"); + printf(" -x, --only-index move only indexes of the specified table\n"); printf(" -T, --wait-timeout=SECS timeout to cancel other backends on conflict\n"); printf(" -Z, --no-analyze don't analyze at end\n"); } diff --git a/lib/pg_repack.sql.in b/lib/pg_repack.sql.in index d002872..eb42436 100644 --- a/lib/pg_repack.sql.in +++ b/lib/pg_repack.sql.in @@ -207,7 +207,7 @@ CREATE VIEW repack.tables AS AND N.nspname NOT IN ('pg_catalog', 'information_schema') AND N.nspname NOT LIKE E'pg\\_temp\\_%'; -CREATE FUNCTION repack.repack_indexdef(oid, oid, name) RETURNS text AS +CREATE FUNCTION repack.repack_indexdef(oid, oid, name, bool) RETURNS text AS 'MODULE_PATHNAME', 'repack_indexdef' LANGUAGE C STABLE; @@ -246,3 +246,7 @@ LANGUAGE C VOLATILE STRICT; CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS 'MODULE_PATHNAME', 'repack_drop' LANGUAGE C VOLATILE STRICT; + +CREATE FUNCTION repack.repack_index_swap(oid) RETURNS void AS +'MODULE_PATHNAME', 'repack_index_swap' +LANGUAGE C STABLE STRICT; diff --git a/lib/repack.c b/lib/repack.c index 165aa88..2168c4f 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -46,6 +46,7 @@ extern Datum PGUT_EXPORT repack_indexdef(PG_FUNCTION_ARGS); extern Datum PGUT_EXPORT repack_swap(PG_FUNCTION_ARGS); extern Datum PGUT_EXPORT repack_drop(PG_FUNCTION_ARGS); extern Datum PGUT_EXPORT repack_disable_autovacuum(PG_FUNCTION_ARGS); +extern Datum PGUT_EXPORT repack_index_swap(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(repack_version); PG_FUNCTION_INFO_V1(repack_trigger); @@ -55,6 +56,7 @@ PG_FUNCTION_INFO_V1(repack_indexdef); PG_FUNCTION_INFO_V1(repack_swap); PG_FUNCTION_INFO_V1(repack_drop); PG_FUNCTION_INFO_V1(repack_disable_autovacuum); +PG_FUNCTION_INFO_V1(repack_index_swap); static void repack_init(void); static SPIPlanPtr repack_prepare(const char *src, int nargs, Oid *argtypes); @@ -674,6 +676,7 @@ repack_indexdef(PG_FUNCTION_ARGS) Name tablespace = NULL; IndexDef stmt; StringInfoData str; + bool concurrent_index = PG_GETARG_BOOL(3); if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) PG_RETURN_NULL(); @@ -687,8 +690,12 @@ repack_indexdef(PG_FUNCTION_ARGS) parse_indexdef(&stmt, index, table); initStringInfo(&str); - appendStringInfo(&str, "%s index_%u ON repack.table_%u USING %s (%s)%s", - stmt.create, index, table, stmt.type, stmt.columns, stmt.options); + if(concurrent_index) + appendStringInfo(&str, "%s CONCURRENTLY index_%u ON %s USING %s (%s)%s", + stmt.create, index, stmt.table, stmt.type, stmt.columns, stmt.options); + else + appendStringInfo(&str, "%s index_%u ON repack.table_%u USING %s (%s)%s", + stmt.create, index, table, stmt.type, stmt.columns, stmt.options); /* specify the new tablespace or the original one if any */ if (tablespace || stmt.tablespace) @@ -1186,6 +1193,36 @@ swap_heap_or_index_files(Oid r1, Oid r2) heap_close(relRelation, RowExclusiveLock); } +Datum +repack_index_swap(PG_FUNCTION_ARGS) +{ + Oid oid = PG_GETARG_OID(0); + Oid idx1, idx2; + StringInfoData str; + SPITupleTable *tuptable; + TupleDesc desc; + HeapTuple tuple; + + /* authority check */ + must_be_superuser("repack_index_swap"); + + /* connect to SPI manager */ + repack_init(); + + idx1 = oid; + initStringInfo(&str); + appendStringInfo(&str,"SELECT oid FROM pg_class WHERE relname = 'index_%u'",idx1); + execute(SPI_OK_SELECT, str.data); + tuptable = SPI_tuptable; + desc = tuptable->tupdesc; + tuple = tuptable->vals[0]; + idx2 = getoid(tuple, desc, 1); + swap_heap_or_index_files(idx1, idx2); + CommandCounterIncrement(); + SPI_finish(); + PG_RETURN_VOID(); +} + #if PG_VERSION_NUM < 80400 /* XXX: You might need to add PGDLLIMPORT into your miscadmin.h. */