A few fixes to the indexes-only building.

Improve locking behavior in indexes-only mode so we only need one
ACCESS EXCLUSIVE lock per table instead of one per index. Support
multiple --index options. Fix for schema-name parsing.

Patch from Beena Emerson.
This commit is contained in:
Josh Kupershmidt 2013-06-29 09:32:18 -04:00
parent df53964529
commit f0f36688f0

View File

@ -180,7 +180,7 @@ static bool preliminary_checks(char *errbuf, size_t errsize);
static void repack_all_databases(const char *order_by); static void repack_all_databases(const char *order_by);
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize); static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
static void repack_one_table(const repack_table *table, const char *order_by); static void repack_one_table(const repack_table *table, const char *order_by);
static bool repack_one_index(Oid table, const char *table_name, Oid index, const char *schema_name); static bool repack_table_indexes(PGresult *index_details);
static bool repack_all_indexes(char *errbuf, size_t errsize); static bool repack_all_indexes(char *errbuf, size_t errsize);
static void repack_cleanup(bool fatal, const repack_table *table); static void repack_cleanup(bool fatal, const repack_table *table);
static bool rebuild_indexes(const repack_table *table); static bool rebuild_indexes(const repack_table *table);
@ -208,7 +208,7 @@ static SimpleStringList table_list = {NULL, NULL};
static char *orderby = NULL; static char *orderby = NULL;
static char *tablespace = NULL; static char *tablespace = NULL;
static bool moveidx = false; static bool moveidx = false;
static char *r_index = NULL; static SimpleStringList r_index = {NULL, NULL};
static bool only_indexes = false; static bool only_indexes = false;
static int wait_timeout = 60; /* in seconds */ static int wait_timeout = 60; /* in seconds */
static int jobs = 0; /* number of concurrent worker conns. */ static int jobs = 0; /* number of concurrent worker conns. */
@ -229,7 +229,7 @@ static pgut_option options[] =
{ 's', 'o', "order-by", &orderby }, { 's', 'o', "order-by", &orderby },
{ 's', 's', "tablespace", &tablespace }, { 's', 's', "tablespace", &tablespace },
{ 'b', 'S', "moveidx", &moveidx }, { 'b', 'S', "moveidx", &moveidx },
{ 's', 'i', "index", &r_index }, { 'l', 'i', "index", &r_index },
{ 'b', 'x', "only-indexes", &only_indexes }, { 'b', 'x', "only-indexes", &only_indexes },
{ 'i', 'T', "wait-timeout", &wait_timeout }, { 'i', 'T', "wait-timeout", &wait_timeout },
{ 'B', 'Z', "no-analyze", &analyze }, { 'B', 'Z', "no-analyze", &analyze },
@ -254,12 +254,12 @@ main(int argc, char *argv[])
check_tablespace(); check_tablespace();
if (r_index || only_indexes) if (r_index.head || only_indexes)
{ {
if (r_index && table_list.head) if (r_index.head && table_list.head)
ereport(ERROR, (errcode(EINVAL), ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --index (-i) and --table (-t)"))); errmsg("cannot specify --index (-i) and --table (-t)")));
else if (r_index && only_indexes) else if (r_index.head && only_indexes)
ereport(ERROR, (errcode(EINVAL), ereport(ERROR, (errcode(EINVAL),
errmsg("cannot specify --index (-i) and --only-indexes (-x)"))); errmsg("cannot specify --index (-i) and --only-indexes (-x)")));
else if (only_indexes && !table_list.head) else if (only_indexes && !table_list.head)
@ -1606,56 +1606,87 @@ repack_cleanup(bool fatal, const repack_table *table)
} }
/* /*
* repack one index * Indexes of a table are repacked.
*/ */
static bool static bool
repack_one_index(Oid table, const char *table_name, Oid index, const char *schema_name) repack_table_indexes(PGresult *index_details)
{ {
bool ret = false; bool ret = false;
PGresult *res = NULL; PGresult *res = NULL;
StringInfoData sql, temp_index; StringInfoData sql, sql_drop;
char buffer[2][12]; char buffer[2][12];
char *create_idx; char *create_idx;
const char *params[3]; const char *schema_name, *table_name, *params[3];
Oid table, index;
int i, num, num_valid = -1;
params[0] = utoa(index, buffer[0]); num = PQntuples(index_details);
table = getoid(index_details, 0, 3);
params[1] = utoa(table, buffer[1]); params[1] = utoa(table, buffer[1]);
params[2] = tablespace; params[2] = tablespace;
schema_name = getstr(index_details, 0, 5);
table_name = getstr(index_details, 0, 4);
/* Check if any concurrent pg_repack command is being run on the same
* table.
*/
if (!advisory_lock(connection, params[1]))
ereport(ERROR, (errcode(EINVAL),
errmsg("Unable to obtain advisory lock on \"%s\"", table_name)));
for (i = 0; i < num; i++)
{
char *isvalid = getstr(index_details, i, 2);
if (isvalid[0] == 't')
{
index = getoid(index_details, i, 1);
params[0] = utoa(index, buffer[0]);
res = execute("SELECT repack.repack_indexdef($1, $2, $3, true)", 3, params); res = execute("SELECT repack.repack_indexdef($1, $2, $3, true)", 3, params);
if (PQntuples(res) < 1) if (PQntuples(res) < 1)
{ {
ereport(ERROR, (errcode(EINVAL), elog(WARNING,
errmsg("unable to generate SQL to CREATE new index"))); "unable to generate SQL to CREATE work index for %s.%s",
schema_name, getstr(index_details, i, 0));
num_valid = i;
goto drop_idx;
} }
create_idx = getstr(res, 0, 0); create_idx = getstr(res, 0, 0);
CLEARPGRES(res); CLEARPGRES(res);
res = execute_elevel(create_idx, 0, NULL, DEBUG2); res = execute_elevel(create_idx, 0, NULL, DEBUG2);
initStringInfo(&temp_index);
if (schema_name)
appendStringInfo(&temp_index, "%s.index_%u", schema_name, index);
else
appendStringInfo(&temp_index, "index_%u", index);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ {
ereport(ERROR, ereport(WARNING,
(errcode(E_PG_COMMAND), (errcode(E_PG_COMMAND),
errmsg("%s", PQerrorMessage(connection)), errmsg("SQL failed with message- %s",
errdetail("An invalid index may have been left behind " PQerrorMessage(connection)),
errdetail("An invalid index may have been left behind"
" by a pg_repack command on the table which" " by a pg_repack command on the table which"
" was interrupted and failed to clean up" " was interrupted and failed to clean up"
" the temporary objects. Please use \"DROP INDEX %s\"" " the temporary objects. Please use \"DROP INDEX %s.index_%u\""
" to remove this index.", temp_index.data))); " to remove this index and try again.", schema_name, index)));
num_valid = i + 1;
ret = false;
goto drop_idx;
} }
CLEARPGRES(res); CLEARPGRES(res);
}
else
{
if (num_valid == -1)
num_valid = i;
elog(WARNING, "skipping invalid index: %s.%s", schema_name, getstr(index_details, i, 0));
}
}
if (num_valid == -1)
num_valid = i;
/* take exclusive lock on table before calling repack_index_swap() */ /* take exclusive lock on table before calling repack_index_swap() */
initStringInfo(&sql); initStringInfo(&sql);
if (schema_name)
appendStringInfo(&sql, "LOCK TABLE %s.%s IN ACCESS EXCLUSIVE MODE",
schema_name, table_name);
else
appendStringInfo(&sql, "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE", appendStringInfo(&sql, "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE",
table_name); table_name);
if (!(lock_exclusive(connection, params[1], sql.data, TRUE))) if (!(lock_exclusive(connection, params[1], sql.data, TRUE)))
@ -1664,155 +1695,109 @@ repack_one_index(Oid table, const char *table_name, Oid index, const char *schem
table_name); table_name);
goto drop_idx; goto drop_idx;
} }
pgut_command(connection, "SELECT repack.repack_index_swap($1)", 1, params);
pgut_command(connection, "COMMIT", 0, NULL);
for (i = 0; i < num_valid; i++)
{
index = getoid(index_details, i, 1);
params[0] = utoa(index, buffer[0]);
pgut_command(connection, "SELECT repack.repack_index_swap($1)", 1, params);
}
pgut_command(connection, "COMMIT", 0, NULL);
ret = true; ret = true;
drop_idx: drop_idx:
initStringInfo(&sql); initStringInfo(&sql);
initStringInfo(&sql_drop);
#if PG_VERSION_NUM < 90200 #if PG_VERSION_NUM < 90200
appendStringInfoString(&sql, "DROP INDEX "); appendStringInfoString(&sql, "DROP INDEX IF EXISTS ");
#else #else
appendStringInfoString(&sql, "DROP INDEX CONCURRENTLY "); appendStringInfoString(&sql, "DROP INDEX CONCURRENTLY IF EXISTS ");
#endif #endif
appendStringInfo(&sql, "%s", temp_index.data); appendStringInfo(&sql, "%s.", schema_name);
command(sql.data, 0, NULL);
CLEARPGRES(res); for (i = 0; i < num_valid; i++)
{
initStringInfo(&sql_drop);
appendStringInfo(&sql_drop, "%sindex_%u", sql.data, getoid(index_details, i, 1));
command(sql_drop.data, 0, NULL);
}
termStringInfo(&sql_drop);
termStringInfo(&sql); termStringInfo(&sql);
return ret; return ret;
} }
/* /*
* Call repack_one_index for each of the indexes * Call repack_table_indexes for each of the table
*/ */
static bool static bool
repack_all_indexes(char *errbuf, size_t errsize) repack_all_indexes(char *errbuf, size_t errsize)
{ {
bool ret = false; bool ret = false;
PGresult *res = NULL; PGresult *res = NULL;
int i;
int num;
StringInfoData sql; StringInfoData sql;
char buffer[12]; SimpleStringListCell *cell = NULL;
const char *params[1]; const char *params[1];
const char *table_name = NULL;
const char *schema_name = NULL;
char *pos;
initStringInfo(&sql); initStringInfo(&sql);
reconnect(ERROR); reconnect(ERROR);
assert(r_index.head || table_list.head);
if (!preliminary_checks(errbuf, errsize)) if (!preliminary_checks(errbuf, errsize))
goto cleanup; goto cleanup;
/* If only one index is specified, append the appropriate data to the sql if (r_index.head)
* and check if the index exists
*/
if (r_index)
{ {
appendStringInfoString(&sql, "SELECT i.relname, idx.indexrelid, idx.indisvalid, tbl.oid, tbl.relname" appendStringInfoString(&sql,
" FROM pg_class tbl JOIN pg_index idx ON tbl.oid = idx.indrelid" "SELECT i.relname, idx.indexrelid, idx.indisvalid, idx.indrelid, idx.indrelid::regclass, n.nspname"
" JOIN pg_class i ON i.oid = idx.indexrelid"
" WHERE idx.indexrelid = $1::regclass");
params[0] = r_index;
res = execute_elevel(sql.data, 1, params, DEBUG2);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
goto cleanup;
}
else
{
num = PQntuples(res);
if (num == 0)
{
ereport(ERROR,
(errcode(EINVAL),
errmsg("index \"%s\" does not exist.\n", r_index)));
}
}
// separate schema name and index name. FIXME: kludge
pos = strchr(params[0], '.');
if (pos)
{
pos[0] = '\0';
schema_name = params[0];
r_index = pos + 1;
}
table_name = getstr(res, 0, 4);
}
/* To repack all indexes, append appropriate data to the sql and run the query */
else {
params[0] = table_list.head->val;
appendStringInfoString(&sql, "SELECT i.relname, idx.indexrelid, idx.indisvalid, idx.indrelid"
" FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid" " FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid"
" WHERE idx.indrelid = $1::regclass"); " JOIN pg_namespace n ON n.oid = i.relnamespace"
" WHERE idx.indexrelid = $1::regclass ORDER BY indisvalid DESC");
cell = r_index.head;
}
else if(table_list.head)
{
appendStringInfoString(&sql,
"SELECT i.relname, idx.indexrelid, idx.indisvalid, idx.indrelid, $1::text, n.nspname"
" FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid"
" JOIN pg_namespace n ON n.oid = i.relnamespace"
" WHERE idx.indrelid = $1::regclass ORDER BY indisvalid DESC");
cell = table_list.head;
}
for (; cell; cell = cell->next)
{
params[0] = cell->val;
res = execute_elevel(sql.data, 1, params, DEBUG2); res = execute_elevel(sql.data, 1, params, DEBUG2);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
snprintf(errbuf, errsize, "%s", PQerrorMessage(connection)); elog(WARNING, "SQL failed with message- %s",
goto cleanup; PQerrorMessage(connection));
continue;
} }
else
{ if (PQntuples(res) == 0)
num = PQntuples(res);
if (num == 0)
{ {
if(table_list.head)
elog(WARNING, "\"%s\" does not have any indexes", elog(WARNING, "\"%s\" does not have any indexes",
table_list.head->val); cell->val);
ret = true; else if(r_index.head)
goto cleanup; elog(WARNING, "\"%s\" is not a valid index",
} cell->val);
continue;
} }
// separate schema name and table name. FIXME: kludge if(table_list.head)
pos = strchr(params[0], '.'); elog(INFO, "repacking indexes of \"%s\"", cell->val);
if (pos)
{
pos[0] = '\0';
schema_name = params[0];
table_name = pos + 1;
}
else else
table_name = params[0]; elog(INFO, "repacking \"%s\"", cell->val);
}
/* Check if any concurrent pg_repack command is being run on the same if (!repack_table_indexes(res))
* table. elog(WARNING, "repack failed for \"%s\"", cell->val);
*/
if (!advisory_lock(connection, utoa(getoid(res, 0, 3), buffer))) {
snprintf(errbuf, errsize, "Unable to obtain advisory lock on \"%s\"",
table_name);
goto cleanup;
}
for (i = 0; i < num; i++)
{
char *isvalid = getstr(res, i, 2);
if (isvalid[0] == 't')
{
if (schema_name)
elog(INFO, "repacking index \"%s.%s\"", schema_name, getstr(res, i, 0));
else
elog(INFO, "repacking index \"%s\"", getstr(res, i, 0));
if (!(repack_one_index(getoid(res, i, 3), table_name, getoid(res, i, 1), schema_name))) {
/* FIXME: fill in errbuf here */
goto cleanup;
}
}
else
if (schema_name)
elog(WARNING, "skipping invalid index: %s.%s", schema_name, getstr(res, i, 0));
else
elog(WARNING, "skipping invalid index: %s", getstr(res, i, 0));
} }
ret = true; ret = true;