Fix data corruption bug reported by robjderr (#1010664).

pg_reorg broke catalog definition if the target table had any dropped columns.
Now pg_reorg removes dropped columns and renumbers valid columns.
You can use pg_reorg to shrink column definitions if you have many dropped
columns. (without pg_reorg, dropped columns are filled with zero forever)
This commit is contained in:
Takahiro Itagaki 2009-07-02 09:50:58 +00:00
parent 5fe3f037be
commit 6155932b60
10 changed files with 251 additions and 136 deletions

View File

@ -6,7 +6,7 @@ CREATE TABLE tbl_cluster (
col1 int,
col2 timestamp,
":-)" text,
primary key(":-)", col1)
PRIMARY KEY (":-)", col1)
) WITH (fillfactor = 70);
CREATE INDEX cidx_cluster ON tbl_cluster (col2, length(":-)"));
ALTER TABLE tbl_cluster CLUSTER ON cidx_cluster;
@ -27,6 +27,17 @@ CREATE TABLE tbl_gistkey (
);
CREATE INDEX cidx_circle ON tbl_gistkey USING gist (c);
ALTER TABLE tbl_gistkey CLUSTER ON cidx_circle;
CREATE TABLE tbl_with_dropped_column (
d1 text,
c1 text,
id integer PRIMARY KEY,
d2 text,
c2 text,
d3 text
);
ALTER TABLE tbl_with_dropped_column CLUSTER ON tbl_with_dropped_column_pkey;
CREATE INDEX idx_c1c2 ON tbl_with_dropped_column (c1, c2);
CREATE INDEX idx_c2c1 ON tbl_with_dropped_column (c2, c1);
--
-- insert data
--
@ -41,6 +52,22 @@ INSERT INTO tbl_only_ckey VALUES(1, '2008-01-01 00:00:00', 'abc');
INSERT INTO tbl_only_ckey VALUES(2, '2008-02-01 00:00:00', 'def');
INSERT INTO tbl_gistkey VALUES(1, '<(1,2),3>');
INSERT INTO tbl_gistkey VALUES(2, '<(4,5),6>');
INSERT INTO tbl_with_dropped_column VALUES('d1', 'c1', 2, 'd2', 'c2', 'd3');
INSERT INTO tbl_with_dropped_column VALUES('d1', 'c1', 1, 'd2', 'c2', 'd3');
ALTER TABLE tbl_with_dropped_column DROP COLUMN d1;
ALTER TABLE tbl_with_dropped_column DROP COLUMN d2;
ALTER TABLE tbl_with_dropped_column DROP COLUMN d3;
ALTER TABLE tbl_with_dropped_column ADD COLUMN c3 text;
--
-- before
--
SELECT * FROM tbl_with_dropped_column;
c1 | id | c2 | c3
----+----+----+----
c1 | 2 | c2 |
c1 | 1 | c2 |
(2 rows)
--
-- do reorg
--
@ -48,7 +75,7 @@ INSERT INTO tbl_gistkey VALUES(2, '<(4,5),6>');
\! pg_reorg --dbname=contrib_regression
\! pg_reorg --dbname=contrib_regression --table=tbl_cluster
--
-- results
-- after
--
\d tbl_cluster
Table "public.tbl_cluster"
@ -90,6 +117,19 @@ Indexes:
Indexes:
"tbl_only_pkey_pkey" PRIMARY KEY, btree (col1)
\d tbl_with_dropped_column
Table "public.tbl_with_dropped_column"
Column | Type | Modifiers
--------+---------+-----------
c1 | text |
id | integer | not null
c2 | text |
c3 | text |
Indexes:
"tbl_with_dropped_column_pkey" PRIMARY KEY, btree (id) CLUSTER
"idx_c1c2" btree (c1, c2)
"idx_c2c1" btree (c2, c1)
SELECT col1, to_char(col2, 'YYYY-MM-DD HH24:MI:SS'), ":-)" FROM tbl_cluster;
col1 | to_char | :-)
------+---------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@ -121,6 +161,13 @@ SELECT * FROM tbl_gistkey ORDER BY 1;
2 | <(4,5),6>
(2 rows)
SELECT * FROM tbl_with_dropped_column;
c1 | id | c2 | c3
----+----+----+----
c1 | 1 | c2 |
c1 | 2 | c2 |
(2 rows)
--
-- clean up
--
@ -128,4 +175,5 @@ DROP TABLE tbl_cluster;
DROP TABLE tbl_only_pkey;
DROP TABLE tbl_only_ckey;
DROP TABLE tbl_gistkey;
DROP TABLE tbl_with_dropped_column;
RESET client_min_messages;

View File

@ -8,7 +8,7 @@
* @brief Client Modules
*/
const char *PROGRAM_VERSION = "1.0.5";
const char *PROGRAM_VERSION = "1.0.6";
const char *PROGRAM_URL = "http://reorg.projects.postgresql.org/";
const char *PROGRAM_EMAIL = "reorg-general@lists.pgfoundry.org";
@ -93,7 +93,6 @@ static bool sqlstate_equals(PGresult *res, const char *state)
}
static bool verbose = false;
static bool quiet = false;
static bool analyze = true;
/*
@ -111,7 +110,6 @@ utoa(unsigned int value, char *buffer)
}
const struct option pgut_options[] = {
{"quiet", no_argument, NULL, 'q'},
{"verbose", no_argument, NULL, 'v'},
{"all", no_argument, NULL, 'a'},
{"table", required_argument, NULL, 't'},
@ -130,9 +128,6 @@ pgut_argument(int c, const char *arg)
{
switch (c)
{
case 'q':
quiet = true;
break;
case 'v':
verbose = true;
break;
@ -587,14 +582,17 @@ reorg_one_table(const reorg_table *table, const char *orderby)
* Note that current_table is already set to NULL here because analyze
* is an unimportant operation; No clean up even if failed.
*/
if (verbose)
fprintf(stderr, "---- analyze ----\n");
if (analyze)
{
if (verbose)
fprintf(stderr, "---- analyze ----\n");
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
printfStringInfo(&sql, "ANALYZE %s%s",
(verbose ? "VERBOSE " : ""), table->target_name);
command(sql.data, 0, NULL);
command("COMMIT", 0, NULL);
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
printfStringInfo(&sql, "ANALYZE %s%s",
(verbose ? "VERBOSE " : ""), table->target_name);
command(sql.data, 0, NULL);
command("COMMIT", 0, NULL);
}
termStringInfo(&sql);
}
@ -643,7 +641,6 @@ pgut_help(void)
" -n, --no-order do vacuum full instead of cluster\n"
" -o, --order-by=columns order by columns instead of cluster keys\n"
" -Z, --no-analyze don't analyze at end\n"
" -q, --quiet don't write any messages\n"
" -v, --verbose display detailed information during processing\n",
PROGRAM_NAME, PROGRAM_NAME);
}

View File

@ -22,6 +22,7 @@ const char *port = NULL;
const char *username = NULL;
bool password = false;
bool debug = false;
bool quiet = false;
/* Database connections */
PGconn *connection = NULL;
@ -45,6 +46,7 @@ const struct option default_options[] =
{"dbname", required_argument, NULL, 'd'},
{"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"quiet", no_argument, NULL, 'q'},
{"username", required_argument, NULL, 'U'},
{"password", no_argument, NULL, 'W'},
{"debug", no_argument, NULL, '!'},
@ -136,6 +138,9 @@ parse_options(int argc, char **argv)
case 'p':
assign_option(&port, c, optarg);
break;
case 'q':
quiet = true;
break;
case 'U':
assign_option(&username, c, optarg);
break;
@ -196,13 +201,22 @@ assign_option(const char **value, int c, const char *arg)
return true;
}
void
reconnect(void)
/*
* the result is also available with the global variable 'connection'.
*/
PGconn *
reconnect_elevel(int elevel)
{
PGconn *conn;
char *pwd = NULL;
bool new_pass;
if (interrupted)
{
interrupted = false;
elog(ERROR, "%s: interrupted", PROGRAM_NAME);
}
disconnect();
if (password)
@ -218,7 +232,10 @@ reconnect(void)
conn = PQsetdbLogin(host, port, NULL, NULL, dbname, username, pwd);
if (!conn)
elog(ERROR, "could not connect to database %s", dbname);
{
elog(elevel, "could not connect to database %s", dbname);
return NULL;
}
if (PQstatus(conn) == CONNECTION_BAD &&
#if PG_VERSION_NUM >= 80300
@ -239,10 +256,17 @@ reconnect(void)
/* check to see that the backend connection was successfully made */
if (PQstatus(conn) == CONNECTION_BAD)
elog(ERROR, "could not connect to database %s: %s",
elog(elevel, "could not connect to database %s: %s",
dbname, PQerrorMessage(conn));
connection = conn;
return conn;
}
void
reconnect(void)
{
reconnect_elevel(ERROR);
}
void
@ -290,6 +314,7 @@ execute_elevel(const char *query, int nParams, const char **params, int elevel)
{
case PGRES_TUPLES_OK:
case PGRES_COMMAND_OK:
case PGRES_COPY_IN:
break;
default:
elog(elevel, "query failed: %squery was: %s",
@ -329,6 +354,8 @@ elog(int elevel, const char *fmt, ...)
if (!debug && elevel <= LOG)
return;
if (quiet && elevel <= WARNING)
return;
switch (elevel)
{
@ -481,6 +508,7 @@ static void help(void)
fprintf(stderr, " -U, --username=USERNAME user name to connect as\n");
fprintf(stderr, " -W, --password force password prompt\n");
fprintf(stderr, "\nGeneric options:\n");
fprintf(stderr, " -q, --quiet don't write any messages\n");
fprintf(stderr, " --debug debug mode\n");
fprintf(stderr, " --help show this help, then exit\n");
fprintf(stderr, " --version output version information, then exit\n\n");
@ -566,3 +594,4 @@ sleep(unsigned int seconds)
}
#endif /* WIN32 */

View File

@ -13,6 +13,7 @@
#include "libpq-fe.h"
#include "pqexpbuffer.h"
#include <assert.h>
#include <getopt.h>
#if !defined(C_H) && !defined(__cplusplus)
@ -51,6 +52,7 @@ extern const char *port;
extern const char *username;
extern bool password;
extern bool debug;
extern bool quiet;
extern PGconn *connection;
extern bool interrupted;
@ -58,6 +60,8 @@ extern bool interrupted;
extern void parse_options(int argc, char **argv);
extern bool assign_option(const char **value, int c, const char *arg);
extern PGconn *reconnect_elevel(int elevel);
extern void reconnect(void);
extern void disconnect(void);
extern PGresult *execute_elevel(const char *query, int nParams, const char **params, int elevel);
@ -68,6 +72,15 @@ extern void command(const char *query, int nParams, const char **params);
extern unsigned int sleep(unsigned int seconds);
#endif
/*
* IsXXX
*/
#define IsSpace(c) (isspace((unsigned char)(c)))
#define IsAlpha(c) (isalpha((unsigned char)(c)))
#define IsAlnum(c) (isalnum((unsigned char)(c)))
#define IsIdentHead(c) (IsAlpha(c) || (c) == '_')
#define IsIdentBody(c) (IsAlnum(c) || (c) == '_')
/*
* elog
*/
@ -104,6 +117,20 @@ __attribute__((format(printf, 2, 3)));
#define appendStringInfoChar appendPQExpBufferChar
#define appendBinaryStringInfo appendBinaryPQExpBuffer
/*
* Assert
*/
#undef Assert
#undef AssertMacro
#ifdef USE_ASSERT_CHECKING
#define Assert(x) assert(x)
#define AssertMacro(x) assert(x)
#else
#define Assert(x) ((void) 0)
#define AssertMacro(x) ((void) 0)
#endif
/*
* import from postgres.h and catalog/genbki.h in 8.4
*/
@ -124,3 +151,4 @@ typedef int aclitem;
#endif
#endif /* PGUT_H */

View File

@ -6,7 +6,7 @@ CREATE TABLE tbl_cluster (
col1 int,
col2 timestamp,
":-)" text,
primary key(":-)", col1)
PRIMARY KEY (":-)", col1)
) WITH (fillfactor = 70);
CREATE INDEX cidx_cluster ON tbl_cluster (col2, length(":-)"));
@ -34,6 +34,18 @@ CREATE TABLE tbl_gistkey (
CREATE INDEX cidx_circle ON tbl_gistkey USING gist (c);
ALTER TABLE tbl_gistkey CLUSTER ON cidx_circle;
CREATE TABLE tbl_with_dropped_column (
d1 text,
c1 text,
id integer PRIMARY KEY,
d2 text,
c2 text,
d3 text
);
ALTER TABLE tbl_with_dropped_column CLUSTER ON tbl_with_dropped_column_pkey;
CREATE INDEX idx_c1c2 ON tbl_with_dropped_column (c1, c2);
CREATE INDEX idx_c2c1 ON tbl_with_dropped_column (c2, c1);
--
-- insert data
--
@ -53,6 +65,18 @@ INSERT INTO tbl_only_ckey VALUES(2, '2008-02-01 00:00:00', 'def');
INSERT INTO tbl_gistkey VALUES(1, '<(1,2),3>');
INSERT INTO tbl_gistkey VALUES(2, '<(4,5),6>');
INSERT INTO tbl_with_dropped_column VALUES('d1', 'c1', 2, 'd2', 'c2', 'd3');
INSERT INTO tbl_with_dropped_column VALUES('d1', 'c1', 1, 'd2', 'c2', 'd3');
ALTER TABLE tbl_with_dropped_column DROP COLUMN d1;
ALTER TABLE tbl_with_dropped_column DROP COLUMN d2;
ALTER TABLE tbl_with_dropped_column DROP COLUMN d3;
ALTER TABLE tbl_with_dropped_column ADD COLUMN c3 text;
--
-- before
--
SELECT * FROM tbl_with_dropped_column;
--
-- do reorg
--
@ -62,18 +86,20 @@ INSERT INTO tbl_gistkey VALUES(2, '<(4,5),6>');
\! pg_reorg --dbname=contrib_regression --table=tbl_cluster
--
-- results
-- after
--
\d tbl_cluster
\d tbl_gistkey
\d tbl_only_ckey
\d tbl_only_pkey
\d tbl_with_dropped_column
SELECT col1, to_char(col2, 'YYYY-MM-DD HH24:MI:SS'), ":-)" FROM tbl_cluster;
SELECT * FROM tbl_only_ckey ORDER BY 1;
SELECT * FROM tbl_only_pkey ORDER BY 1;
SELECT * FROM tbl_gistkey ORDER BY 1;
SELECT * FROM tbl_with_dropped_column;
--
-- clean up
@ -83,4 +109,5 @@ DROP TABLE tbl_cluster;
DROP TABLE tbl_only_pkey;
DROP TABLE tbl_only_ckey;
DROP TABLE tbl_gistkey;
DROP TABLE tbl_with_dropped_column;
RESET client_min_messages;

View File

@ -3,7 +3,7 @@
#
# Copyright (c) 2008-2009, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
#
SRCS = reorg.c pgut/pgut-be.c
SRCS = reorg.c pgut/pgut-be.c pgut/pgut-spi.c
OBJS = $(SRCS:.c=.o)
MODULE_big = pg_reorg
DATA_built = pg_reorg.sql

View File

@ -83,7 +83,7 @@ $$
') = (' || $2 || '.' ||
array_to_string(reorg.array_accum(quote_ident(attname)), ', ' || $2 || '.') || ')'
FROM (SELECT attname FROM pg_attribute
WHERE attrelid = $1 AND attnum > 0
WHERE attrelid = $1 AND attnum > 0 AND NOT attisdropped
ORDER BY attnum) tmp;
$$
LANGUAGE sql STABLE STRICT;

View File

@ -41,21 +41,4 @@ cstring_to_text(const char *s)
return result;
}
int
SPI_execute_with_args(const char *src,
int nargs, Oid *argtypes,
Datum *values, const char *nulls,
bool read_only, long tcount)
{
SPIPlanPtr plan;
int ret;
plan = SPI_prepare(src, nargs, argtypes);
if (plan == NULL)
return SPI_result;
ret = SPI_execute_plan(plan, values, nulls, read_only, tcount);
SPI_freeplan(plan);
return ret;
}
#endif

View File

@ -10,8 +10,6 @@
#ifndef PGUT_BE_H
#define PGUT_BE_H
#include "executor/spi.h"
#if PG_VERSION_NUM < 80300
#define PGDLLIMPORT DLLIMPORT
@ -64,8 +62,6 @@ typedef void *BulkInsertState;
extern char *text_to_cstring(const text *t);
extern text *cstring_to_text(const char *s);
extern int SPI_execute_with_args(const char *src, int nargs, Oid *argtypes,
Datum *values, const char *nulls, bool read_only, long tcount);
#define CStringGetTextDatum(s) PointerGetDatum(cstring_to_text(s))
#define TextDatumGetCString(d) text_to_cstring((text *) DatumGetPointer(d))

View File

@ -4,10 +4,6 @@
* Copyright (c) 2008-2009, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
*/
/**
* @brief Core Modules
*/
#include "postgres.h"
#include <unistd.h>
@ -28,6 +24,7 @@
#include "utils/syscache.h"
#include "pgut/pgut-be.h"
#include "pgut/pgut-spi.h"
PG_MODULE_MAGIC;
@ -51,10 +48,6 @@ PG_FUNCTION_INFO_V1(reorg_disable_autovacuum);
static void reorg_init(void);
static SPIPlanPtr reorg_prepare(const char *src, int nargs, Oid *argtypes);
static void reorg_execp(SPIPlanPtr plan, Datum *values, const char *nulls, int expected);
static void reorg_execf(int expexted, const char *format, ...)
__attribute__((format(printf, 2, 3)));
static void reorg_execd(const char *src, int nargs, Oid *argtypes, Datum *values, const char *nulls, int expected);
static const char *get_quoted_relname(Oid oid);
static const char *get_quoted_nspname(Oid oid);
static void swap_heap_or_index_files(Oid r1, Oid r2);
@ -77,7 +70,7 @@ static void RenameRelationInternal(Oid myrelid, const char *newrelname, Oid name
Datum
reorg_version(PG_FUNCTION_ARGS)
{
return CStringGetTextDatum("pg_reorg 1.0.5");
return CStringGetTextDatum("pg_reorg 1.0.6");
}
/**
@ -95,7 +88,7 @@ reorg_trigger(PG_FUNCTION_ARGS)
TupleDesc desc;
HeapTuple tuple;
Datum values[2];
char nulls[2] = { ' ', ' ' };
bool nulls[2] = { 0, 0 };
Oid argtypes[2];
const char *sql;
@ -121,7 +114,7 @@ reorg_trigger(PG_FUNCTION_ARGS)
{
/* INSERT: (NULL, newtup) */
tuple = trigdata->tg_trigtuple;
nulls[0] = 'n';
nulls[0] = true;
values[1] = copy_tuple(tuple, desc);
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
@ -129,7 +122,7 @@ reorg_trigger(PG_FUNCTION_ARGS)
/* DELETE: (oldtup, NULL) */
tuple = trigdata->tg_trigtuple;
values[0] = copy_tuple(tuple, desc);
nulls[1] = 'n';
nulls[1] = true;
}
else
{
@ -140,7 +133,7 @@ reorg_trigger(PG_FUNCTION_ARGS)
}
/* INSERT INTO reorg.log VALUES ($1, $2) */
reorg_execd(sql, 2, argtypes, values, nulls, SPI_OK_INSERT);
execute_with_args(SPI_OK_INSERT, sql, 2, argtypes, values, nulls);
SPI_finish();
@ -164,7 +157,7 @@ reorg_trigger(PG_FUNCTION_ARGS)
Datum
reorg_apply(PG_FUNCTION_ARGS)
{
#define NUMBER_OF_PROCESSING 1000
#define DEFAULT_PEEK_COUNT 1000
const char *sql_peek = PG_GETARG_CSTRING(0);
const char *sql_insert = PG_GETARG_CSTRING(1);
@ -179,13 +172,9 @@ reorg_apply(PG_FUNCTION_ARGS)
SPIPlanPtr plan_update = NULL;
SPIPlanPtr plan_pop = NULL;
uint32 n, i;
TupleDesc desc;
Oid argtypes[3]; /* id, pk, row */
Datum values[3]; /* id, pk, row */
char nulls[3]; /* id, pk, row */
Oid argtypes_peek[1] = { INT4OID };
Datum values_peek[1];
char nulls_peek[1] = { ' ' };
bool nulls_peek[1] = { 0 };
/* authority check */
must_be_superuser("reorg_apply");
@ -200,14 +189,18 @@ reorg_apply(PG_FUNCTION_ARGS)
{
int ntuples;
SPITupleTable *tuptable;
TupleDesc desc;
Oid argtypes[3]; /* id, pk, row */
Datum values[3]; /* id, pk, row */
bool nulls[3]; /* id, pk, row */
/* peek tuple in log */
if (count == 0)
values_peek[0] = Int32GetDatum(NUMBER_OF_PROCESSING);
values_peek[0] = Int32GetDatum(DEFAULT_PEEK_COUNT);
else
values_peek[0] = Int32GetDatum(Min(count - n, NUMBER_OF_PROCESSING));
values_peek[0] = Int32GetDatum(Min(count - n, DEFAULT_PEEK_COUNT));
reorg_execp(plan_peek, values_peek, nulls_peek, SPI_OK_SELECT);
execute_plan(SPI_OK_SELECT, plan_peek, values_peek, nulls_peek);
if (SPI_processed <= 0)
break;
@ -222,43 +215,39 @@ reorg_apply(PG_FUNCTION_ARGS)
for (i = 0; i < ntuples; i++, n++)
{
HeapTuple tuple;
bool isnull;
tuple = tuptable->vals[i];
values[0] = SPI_getbinval(tuple, desc, 1, &isnull);
nulls[0] = ' ';
values[1] = SPI_getbinval(tuple, desc, 2, &isnull);
nulls[1] = (isnull ? 'n' : ' ');
values[2] = SPI_getbinval(tuple, desc, 3, &isnull);
nulls[2] = (isnull ? 'n' : ' ');
values[0] = SPI_getbinval(tuple, desc, 1, &nulls[0]);
values[1] = SPI_getbinval(tuple, desc, 2, &nulls[1]);
values[2] = SPI_getbinval(tuple, desc, 3, &nulls[2]);
if (nulls[1] == 'n')
if (nulls[1])
{
/* INSERT */
if (plan_insert == NULL)
plan_insert = reorg_prepare(sql_insert, 1, &argtypes[2]);
reorg_execp(plan_insert, &values[2], &nulls[2], SPI_OK_INSERT);
execute_plan(SPI_OK_INSERT, plan_insert, &values[2], &nulls[2]);
}
else if (nulls[2] == 'n')
else if (nulls[2])
{
/* DELETE */
if (plan_delete == NULL)
plan_delete = reorg_prepare(sql_delete, 1, &argtypes[1]);
reorg_execp(plan_delete, &values[1], &nulls[1], SPI_OK_DELETE);
execute_plan(SPI_OK_DELETE, plan_delete, &values[1], &nulls[1]);
}
else
{
/* UPDATE */
if (plan_update == NULL)
plan_update = reorg_prepare(sql_update, 2, &argtypes[1]);
reorg_execp(plan_update, &values[1], &nulls[1], SPI_OK_UPDATE);
execute_plan(SPI_OK_UPDATE, plan_update, &values[1], &nulls[1]);
}
}
/* delete tuple in log */
if (plan_pop == NULL)
plan_pop = reorg_prepare(sql_pop, 1, argtypes);
reorg_execp(plan_pop, values, nulls, SPI_OK_DELETE);
execute_plan(SPI_OK_DELETE, plan_pop, values, nulls);
SPI_freetuptable(tuptable);
}
@ -269,7 +258,7 @@ reorg_apply(PG_FUNCTION_ARGS)
}
/*
* Deparsed create index sql. You can rebuild sql using
* Parsed CREATE INDEX statement. You can rebuild sql using
* sprintf(buf, "%s %s ON %s USING %s (%s)%s",
* create, index, table type, columns, options)
*/
@ -301,7 +290,7 @@ get_relation_name(Oid relid)
static char *
parse_error(Oid index)
{
elog(ERROR, "Unexpected indexdef: %s", pg_get_indexdef_string(index));
elog(ERROR, "unexpected index definition: %s", pg_get_indexdef_string(index));
return NULL;
}
@ -471,6 +460,14 @@ getoid(HeapTuple tuple, TupleDesc desc, int column)
return isnull ? InvalidOid : DatumGetObjectId(datum);
}
static int16
getint16(HeapTuple tuple, TupleDesc desc, int column)
{
bool isnull;
Datum datum = SPI_getbinval(tuple, desc, column, &isnull);
return isnull ? 0 : DatumGetInt16(datum);
}
/**
* @fn Datum reorg_swap(PG_FUNCTION_ARGS)
* @brief Swapping relfilenode of tables and relation ids of toast tables
@ -478,6 +475,8 @@ getoid(HeapTuple tuple, TupleDesc desc, int column)
*
* reorg_swap(oid, relname)
*
* TODO: remove useless CommandCounterIncrement().
*
* @param oid Oid of table of target.
* @retval None.
*/
@ -488,7 +487,7 @@ reorg_swap(PG_FUNCTION_ARGS)
const char *relname = get_quoted_relname(oid);
const char *nspname = get_quoted_nspname(oid);
Oid argtypes[1] = { OIDOID };
char nulls[1] = { ' ' };
bool nulls[1] = { 0 };
Datum values[1];
SPITupleTable *tuptable;
TupleDesc desc;
@ -503,6 +502,8 @@ reorg_swap(PG_FUNCTION_ARGS)
Oid reltoastidxid2;
Oid owner1;
Oid owner2;
int16 natts1;
int16 natts2;
/* authority check */
must_be_superuser("reorg_swap");
@ -512,16 +513,17 @@ reorg_swap(PG_FUNCTION_ARGS)
/* swap relfilenode and dependencies for tables. */
values[0] = ObjectIdGetDatum(oid);
reorg_execd(
execute_with_args(SPI_OK_SELECT,
"SELECT X.reltoastrelid, TX.reltoastidxid, X.relowner,"
" Y.oid, Y.reltoastrelid, TY.reltoastidxid, Y.relowner"
" Y.oid, Y.reltoastrelid, TY.reltoastidxid, Y.relowner,"
" X.relnatts, Y.relnatts"
" FROM pg_catalog.pg_class X LEFT JOIN pg_catalog.pg_class TX"
" ON X.reltoastrelid = TX.oid,"
" pg_catalog.pg_class Y LEFT JOIN pg_catalog.pg_class TY"
" ON Y.reltoastrelid = TY.oid"
" WHERE X.oid = $1"
" AND Y.oid = ('reorg.table_' || X.oid)::regclass",
1, argtypes, values, nulls, SPI_OK_SELECT);
1, argtypes, values, nulls);
tuptable = SPI_tuptable;
desc = tuptable->tupdesc;
@ -539,6 +541,8 @@ reorg_swap(PG_FUNCTION_ARGS)
reltoastrelid2 = getoid(tuple, desc, 5);
reltoastidxid2 = getoid(tuple, desc, 6);
owner2 = getoid(tuple, desc, 7);
natts1 = getint16(tuple, desc, 8);
natts2 = getint16(tuple, desc, 9);;
/* should be all-or-nothing */
if ((reltoastrelid1 == InvalidOid || reltoastidxid1 == InvalidOid ||
@ -557,13 +561,13 @@ reorg_swap(PG_FUNCTION_ARGS)
CommandCounterIncrement();
}
/* swap heap and index files */
/* swap tables. */
swap_heap_or_index_files(oid, oid2);
CommandCounterIncrement();
/* swap relfilenode and dependencies for indxes. */
/* swap indexes. */
values[0] = ObjectIdGetDatum(oid);
reorg_execd(
execute_with_args(SPI_OK_SELECT,
"SELECT X.oid, Y.oid"
" FROM pg_catalog.pg_index I,"
" pg_catalog.pg_class X,"
@ -571,7 +575,7 @@ reorg_swap(PG_FUNCTION_ARGS)
" WHERE I.indrelid = $1"
" AND I.indexrelid = X.oid"
" AND Y.oid = ('reorg.index_' || X.oid)::regclass",
1, argtypes, values, nulls, SPI_OK_SELECT);
1, argtypes, values, nulls);
tuptable = SPI_tuptable;
desc = tuptable->tupdesc;
@ -579,10 +583,25 @@ reorg_swap(PG_FUNCTION_ARGS)
for (i = 0; i < records; i++)
{
Oid idx1, idx2;
tuple = tuptable->vals[i];
swap_heap_or_index_files(
getoid(tuple, desc, 1),
getoid(tuple, desc, 2));
idx1 = getoid(tuple, desc, 1);
idx2 = getoid(tuple, desc, 2);
swap_heap_or_index_files(idx1, idx2);
/* adjust key attnum if the target table has dropped columns */
if (natts1 != natts2)
{
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_index SET indkey = n.indkey"
" FROM pg_catalog.pg_index n"
" WHERE pg_catalog.pg_index.indexrelid = %u"
" AND n.indexrelid = 'reorg.index_%u'::regclass",
idx1, idx1);
if (SPI_processed != 1)
elog(ERROR, "failed to update pg_index.indkey (%u rows updated)", SPI_processed);
}
CommandCounterIncrement();
}
@ -614,8 +633,31 @@ reorg_swap(PG_FUNCTION_ARGS)
CommandCounterIncrement();
}
/* adjust attribute numbers if the target table has dropped columns */
if (natts1 != natts2)
{
/* delete dropped columns */
execute_with_format(SPI_OK_DELETE,
"DELETE FROM pg_catalog.pg_attribute"
" WHERE attrelid = %u AND attisdropped",
oid);
/* renumber attnum */
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_attribute"
" SET attnum = (SELECT count(*) FROM pg_attribute a"
" WHERE pg_catalog.pg_attribute.attrelid = a.attrelid"
" AND pg_catalog.pg_attribute.attnum >= a.attnum"
" AND a.attnum > 0 AND NOT a.attisdropped)"
" WHERE attrelid = %u AND attnum > 0 AND NOT attisdropped",
oid);
/* adjust attribute number of the table */
execute_with_format(SPI_OK_UPDATE,
"UPDATE pg_catalog.pg_class SET relnatts = %d WHERE oid = %u",
natts2, oid);
}
/* drop reorg trigger */
reorg_execf(
execute_with_format(
SPI_OK_UTILITY,
"DROP TRIGGER IF EXISTS z_reorg_trigger ON %s.%s CASCADE",
nspname, relname);
@ -651,14 +693,14 @@ reorg_drop(PG_FUNCTION_ARGS)
* drop reorg trigger: We have already dropped the trigger in normal
* cases, but it can be left on error.
*/
reorg_execf(
execute_with_format(
SPI_OK_UTILITY,
"DROP TRIGGER IF EXISTS z_reorg_trigger ON %s.%s CASCADE",
nspname, relname);
#if PG_VERSION_NUM < 80400
/* delete autovacuum settings */
reorg_execf(
execute_with_format(
SPI_OK_DELETE,
"DELETE FROM pg_catalog.pg_autovacuum v"
" USING pg_class c, pg_namespace n"
@ -670,19 +712,19 @@ reorg_drop(PG_FUNCTION_ARGS)
#endif
/* drop log table */
reorg_execf(
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS reorg.log_%u CASCADE",
oid);
/* drop temp table */
reorg_execf(
execute_with_format(
SPI_OK_UTILITY,
"DROP TABLE IF EXISTS reorg.table_%u CASCADE",
oid);
/* drop type for log table */
reorg_execf(
execute_with_format(
SPI_OK_UTILITY,
"DROP TYPE IF EXISTS reorg.pk_%u CASCADE",
oid);
@ -701,12 +743,12 @@ reorg_disable_autovacuum(PG_FUNCTION_ARGS)
reorg_init();
#if PG_VERSION_NUM >= 80400
reorg_execf(
execute_with_format(
SPI_OK_UTILITY,
"ALTER TABLE %s SET (autovacuum_enabled = off)",
get_relation_name(oid));
#else
reorg_execf(
execute_with_format(
SPI_OK_INSERT,
"INSERT INTO pg_catalog.pg_autovacuum VALUES (%u, false, -1, -1, -1, -1, -1, -1, -1, -1)",
oid);
@ -736,41 +778,6 @@ reorg_prepare(const char *src, int nargs, Oid *argtypes)
return plan;
}
/* execute prepared plan */
static void
reorg_execp(SPIPlanPtr plan, Datum *values, const char *nulls, int expected)
{
int ret = SPI_execute_plan(plan, values, nulls, false, 0);
if (ret != expected)
elog(ERROR, "pg_reorg: reorg_execp failed (code=%d, expected=%d)", ret, expected);
}
/* execute sql with format */
static void
reorg_execf(int expected, const char *format, ...)
{
va_list ap;
StringInfoData sql;
int ret;
initStringInfo(&sql);
va_start(ap, format);
appendStringInfoVA(&sql, format, ap);
va_end(ap);
if ((ret = SPI_exec(sql.data, 0)) != expected)
elog(ERROR, "pg_reorg: reorg_execf failed (sql=%s, code=%d, expected=%d)", sql.data, ret, expected);
}
/* execute a query */
static void
reorg_execd(const char *src, int nargs, Oid *argtypes, Datum *values, const char *nulls, int expected)
{
int ret = SPI_execute_with_args(src, nargs, argtypes, values, nulls, expected == SPI_OK_SELECT, 0);
if (ret != expected)
elog(ERROR, "pg_reorg: reorg_execd failed (sql=%s, code=%d, expected=%d)", src, ret, expected);
}
static const char *
get_quoted_relname(Oid oid)
{