Support reorganizing tables with non-default operator class.

This commit is contained in:
Takahiro Itagaki
2009-12-28 08:25:00 +00:00
parent 27e6839132
commit 038c07523a
10 changed files with 1083 additions and 957 deletions

View File

@ -8,12 +8,14 @@
#include <unistd.h>
#include "access/genam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_type.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
@ -55,6 +57,9 @@ static void swap_heap_or_index_files(Oid r1, Oid r2);
#define copy_tuple(tuple, desc) \
PointerGetDatum(SPI_returntuple((tuple), (desc)))
#define IsToken(c) \
(IS_HIGHBIT_SET((c)) || isalnum((unsigned char) (c)) || (c) == '_')
/* check access authority */
static void
must_be_superuser(const char *func)
@ -313,11 +318,32 @@ skip_const(Oid index, char *sql, const char *arg1, const char *arg2)
static char *
skip_ident(Oid index, char *sql)
{
sql = strchr(sql, ' ');
if (sql)
while (*sql && isspace((unsigned char) *sql))
sql++;
if (*sql == '"')
{
sql++;
for (;;)
{
char *end = strchr(sql, '"');
if (end == NULL)
return parse_error(index);
else if (end[1] != '"')
{
end[1] = '\0';
return end + 2;
}
else /* escaped quote ("") */
sql = end + 2;
}
}
else
{
while (*sql && IsToken(*sql))
sql++;
*sql = '\0';
return sql + 2;
return sql + 1;
}
/* error */
@ -325,12 +351,13 @@ skip_ident(Oid index, char *sql)
}
static char *
skip_columns(Oid index, char *sql)
skip_until(Oid index, char *sql, char end)
{
char instr = 0;
int nopen = 1;
int nopen = 0;
for (; *sql && nopen > 0; sql++)
sql++;
for (; *sql && (nopen > 0 || instr != 0 || *sql != end); sql++)
{
if (instr)
{
@ -342,9 +369,7 @@ skip_columns(Oid index, char *sql)
instr = 0;
}
else if (sql[0] == '\\')
{
sql++; // next char is always string
}
sql++; /* next char is always string */
}
else
{
@ -364,10 +389,10 @@ skip_columns(Oid index, char *sql)
}
}
if (nopen == 0)
if (nopen == 0 && instr == 0)
{
sql[-1] = '\0';
return sql;
*sql = '\0';
return sql + 1;
}
/* error */
@ -395,11 +420,14 @@ parse_indexdef(IndexDef *stmt, Oid index, Oid table)
/* USING */
sql = skip_const(index, sql, "USING", NULL);
/* type */
stmt->type= sql;
stmt->type = sql;
sql = skip_ident(index, sql);
/* (columns) */
if ((sql = strchr(sql, '(')) == NULL)
parse_error(index);
sql++;
stmt->columns = sql;
sql = skip_columns(index, sql);
sql = skip_until(index, sql, ')');
/* options */
stmt->options = sql;
}
@ -413,6 +441,9 @@ parse_indexdef(IndexDef *stmt, Oid index, Oid table)
* @param index Oid of target index.
* @param table Oid of table of the index.
* @retval Create index DDL for temp table.
*
* FIXME: this function is named get_index_keys, but actually returns
* an expression for ORDER BY clause. get_order_by() might be a better name.
*/
Datum
reorg_get_index_keys(PG_FUNCTION_ARGS)
@ -420,10 +451,107 @@ reorg_get_index_keys(PG_FUNCTION_ARGS)
Oid index = PG_GETARG_OID(0);
Oid table = PG_GETARG_OID(1);
IndexDef stmt;
char *token;
char *next;
StringInfoData str;
Relation indexRel = NULL;
int nattr;
parse_indexdef(&stmt, index, table);
PG_RETURN_TEXT_P(cstring_to_text(stmt.columns));
/*
* FIXME: this is very unreliable implementation but I don't want to
* re-implement customized versions of pg_get_indexdef_string...
*
* TODO: Support ASC/DESC and NULL FIRST/LAST.
*/
initStringInfo(&str);
for (nattr = 0, next = stmt.columns; *next; nattr++)
{
char *opcname;
token = next;
next = skip_until(index, next, ',');
opcname = token + strlen(token);
if (opcname[-1] == '"')
{
opcname--;
for (;;)
{
char *beg = strrchr(opcname, '"');
if (beg == NULL)
parse_error(index);
else if (beg[-1] != '"')
break;
else /* escaped quote ("") */
opcname = beg - 1;
}
}
else
{
while (opcname > token && IsToken(opcname[-1]))
opcname--;
}
if (opcname > token && *opcname)
{
/* lookup default operator name from operator class */
Oid opclass;
Oid oprid;
int16 strategy = BTLessStrategyNumber;
#if PG_VERSION_NUM >= 80300
Oid opcintype;
Oid opfamily;
HeapTuple tp;
Form_pg_opclass opclassTup;
#endif
opclass = OpclassnameGetOpcid(BTREE_AM_OID, opcname);
#if PG_VERSION_NUM >= 80300
/* Retrieve operator information. */
tp = SearchSysCache(CLAOID, ObjectIdGetDatum(opclass), 0, 0, 0);
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for opclass %u", opclass);
opclassTup = (Form_pg_opclass) GETSTRUCT(tp);
opfamily = opclassTup->opcfamily;
opcintype = opclassTup->opcintype;
ReleaseSysCache(tp);
if (!OidIsValid(opcintype))
{
if (indexRel == NULL)
indexRel = index_open(index, NoLock);
opcintype = RelationGetDescr(indexRel)->attrs[nattr]->atttypid;
}
oprid = get_opfamily_member(opfamily, opcintype, opcintype, strategy);
if (!OidIsValid(oprid))
elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
strategy, opcintype, opcintype, opfamily);
#else
oprid = get_opclass_member(opclass, 0, strategy);
if (!OidIsValid(oprid))
elog(ERROR, "missing operator %d for %s", strategy, opcname);
#endif
opcname[-1] = '\0';
appendStringInfo(&str, "%s USING %s", token, get_opname(oprid));
}
else
appendStringInfoString(&str, token);
if (*next)
appendStringInfoChar(&str, ',');
}
if (indexRel != NULL)
index_close(indexRel, NoLock);
PG_RETURN_TEXT_P(cstring_to_text(str.data));
}
/**
@ -481,7 +609,19 @@ remove_dropped_columns_and_adjust_attnum(Oid oid, int16 natts1, int16 natts2)
natts2 - natts1, SPI_processed);
/* renumber attnum */
#if PG_VERSION_NUM >= 80300
#if PG_VERSION_NUM >= 80500
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_attribute m"
" SET attnum = (SELECT count(*) FROM pg_attribute a"
" WHERE m.attrelid = a.attrelid"
" AND m.attnum >= a.attnum"
" AND a.attnum > 0 AND NOT a.attisdropped)"
" WHERE attrelid = %u AND attnum > 0 AND NOT attisdropped",
oid);
if (SPI_processed != natts2)
elog(ERROR, "cannot update %d columns (%u columns updated)",
natts2, SPI_processed);
#elif PG_VERSION_NUM >= 80300
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_attribute"
" SET attnum = (SELECT count(*) FROM pg_attribute a"
@ -676,12 +816,21 @@ reorg_swap(PG_FUNCTION_ARGS)
/* adjust key attnum if the target table has dropped columns */
if (natts1 != natts2)
{
#if PG_VERSION_NUM >= 80500
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_index m SET indkey = n.indkey"
" FROM pg_catalog.pg_index n"
" WHERE m.indexrelid = %u"
" AND n.indexrelid = 'reorg.index_%u'::regclass",
idx1, idx1);
#else
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_index SET indkey = n.indkey"
" FROM pg_catalog.pg_index n"
" WHERE pg_catalog.pg_index.indexrelid = %u"
" AND n.indexrelid = 'reorg.index_%u'::regclass",
idx1, idx1);
#endif
if (SPI_processed != 1)
elog(ERROR, "failed to update pg_index.indkey (%u rows updated)", SPI_processed);
}