Fix ownership bug.

New toast table, toast index, and toast type should not have
been owned by the executor of pg_reorg, but by the original owner.
This commit is contained in:
Takahiro Itagaki
2009-05-14 08:19:25 +00:00
parent 0c659ed31f
commit 9a8f2e9c33
7 changed files with 424 additions and 246 deletions

View File

@ -3,7 +3,7 @@
#
# Copyright (c) 2008-2009, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
#
SRCS = reorg.c
SRCS = reorg.c pgut/pgut-be.c
OBJS = $(SRCS:.c=.o)
MODULE_big = pg_reorg
DATA_built = pg_reorg.sql

61
lib/pgut/pgut-be.c Executable file
View File

@ -0,0 +1,61 @@
/*-------------------------------------------------------------------------
*
* pgut-be.c
*
* Copyright (c) 2009, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "pgut-be.h"
#if PG_VERSION_NUM < 80400
char *
text_to_cstring(const text *t)
{
text *tunpacked = pg_detoast_datum_packed((struct varlena *) t);
int len = VARSIZE_ANY_EXHDR(tunpacked);
char *result;
result = (char *) palloc(len + 1);
memcpy(result, VARDATA_ANY(tunpacked), len);
result[len] = '\0';
if (tunpacked != t)
pfree(tunpacked);
return result;
}
text *
cstring_to_text(const char *s)
{
int len = strlen(s);
text *result = palloc(len + VARHDRSZ);
SET_VARSIZE(result, len + VARHDRSZ);
memcpy(VARDATA(result), s, len);
return result;
}
int
SPI_execute_with_args(const char *src,
int nargs, Oid *argtypes,
Datum *values, const char *nulls,
bool read_only, long tcount)
{
SPIPlanPtr plan;
int ret;
plan = SPI_prepare(src, nargs, argtypes);
if (plan == NULL)
return SPI_result;
ret = SPI_execute_plan(plan, values, nulls, read_only, tcount);
SPI_freeplan(plan);
return ret;
}
#endif

72
lib/pgut/pgut-be.h Executable file
View File

@ -0,0 +1,72 @@
/*-------------------------------------------------------------------------
*
* pgut-be.h
*
* Copyright (c) 2009, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
*
*-------------------------------------------------------------------------
*/
#ifndef PGUT_BE_H
#define PGUT_BE_H
#include "executor/spi.h"
#if PG_VERSION_NUM < 80300
#define PGDLLIMPORT DLLIMPORT
#define SK_BT_DESC 0 /* Always ASC */
#define SK_BT_NULLS_FIRST 0 /* Always NULLS LAST */
#define MaxHeapTupleSize MaxTupleSize
#define PG_GETARG_TEXT_PP(n) PG_GETARG_TEXT_P(n)
#define VARSIZE_ANY_EXHDR(v) (VARSIZE(v) - VARHDRSZ)
#define VARDATA_ANY(v) VARDATA(v)
#define SET_VARSIZE(v, sz) (VARATT_SIZEP(v) = (sz))
#define pg_detoast_datum_packed(v) pg_detoast_datum(v)
#define DatumGetTextPP(v) DatumGetTextP(v)
#define ItemIdIsNormal(v) ItemIdIsUsed(v)
#define IndexBuildHeapScan(heap, index, info, sync, callback, state) \
IndexBuildHeapScan((heap), (index), (info), (callback), (state))
#define planner_rt_fetch(rti, root) \
rt_fetch(rti, (root)->parse->rtable)
#define heap_sync(rel) ((void)0)
#define ItemIdIsDead(itemId) ItemIdDeleted(itemId)
#define GetCurrentCommandId(used) GetCurrentCommandId()
#define stringToQualifiedNameList(str) \
stringToQualifiedNameList((str), "pg_bulkload")
#define setNewRelfilenode(rel, xid) \
setNewRelfilenode((rel))
#define PageAddItem(page, item, size, offnum, overwrite, is_heap) \
PageAddItem((page), (item), (size), (offnum), LP_USED)
typedef void *SPIPlanPtr;
#endif
#if PG_VERSION_NUM < 80400
#define MAIN_FORKNUM 0
#define HEAP_INSERT_SKIP_WAL 0x0001
#define HEAP_INSERT_SKIP_FSM 0x0002
#define relpath(rnode, forknum) relpath((rnode))
#define smgrimmedsync(reln, forknum) smgrimmedsync((reln))
#define smgrread(reln, forknum, blocknum, buffer) \
smgrread((reln), (blocknum), (buffer))
#define mdclose(reln, forknum) mdclose((reln))
#define heap_insert(relation, tup, cid, options, bistate) \
heap_insert((relation), (tup), (cid), true, true)
#define GetBulkInsertState() (NULL)
#define FreeBulkInsertState(bistate) ((void)0)
typedef void *BulkInsertState;
extern char *text_to_cstring(const text *t);
extern text *cstring_to_text(const char *s);
extern int SPI_execute_with_args(const char *src, int nargs, Oid *argtypes,
Datum *values, const char *nulls, bool read_only, long tcount);
#endif
#endif /* PGUT_BE_H */

View File

@ -21,20 +21,15 @@
#include "catalog/pg_type.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
PG_MODULE_MAGIC;
#include "pgut/pgut-be.h"
#if PG_VERSION_NUM < 80300
#define SET_VARSIZE(PTR, len) (VARATT_SIZEP((PTR)) = (len))
#define PGDLLIMPORT DLLIMPORT
typedef void *SPIPlanPtr;
#endif
PG_MODULE_MAGIC;
Datum reorg_trigger(PG_FUNCTION_ARGS);
Datum reorg_apply(PG_FUNCTION_ARGS);
@ -72,10 +67,6 @@ must_be_superuser(const char *func)
}
#if PG_VERSION_NUM < 80400
static int SPI_execute_with_args(const char *src, int nargs, Oid *argtypes,
Datum *values, const char *nulls,
bool read_only, long tcount);
static text *cstring_to_text(const char * s);
static void RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId);
#endif
@ -205,7 +196,7 @@ reorg_apply(PG_FUNCTION_ARGS)
values_peek[0] = Int32GetDatum(NUMBER_OF_PROCESSING);
else
values_peek[0] = Int32GetDatum(Min(count - n, NUMBER_OF_PROCESSING));
reorg_execp(plan_peek, values_peek, nulls_peek, SPI_OK_SELECT);
if (SPI_processed <= 0)
break;
@ -222,7 +213,7 @@ reorg_apply(PG_FUNCTION_ARGS)
{
HeapTuple tuple;
bool isnull;
tuple = tuptable->vals[i];
values[0] = SPI_getbinval(tuple, desc, 1, &isnull);
nulls[0] = ' ';
@ -230,7 +221,7 @@ reorg_apply(PG_FUNCTION_ARGS)
nulls[1] = (isnull ? 'n' : ' ');
values[2] = SPI_getbinval(tuple, desc, 3, &isnull);
nulls[2] = (isnull ? 'n' : ' ');
if (nulls[1] == 'n')
{
/* INSERT */
@ -500,6 +491,8 @@ reorg_swap(PG_FUNCTION_ARGS)
Oid oid2;
Oid reltoastrelid2;
Oid reltoastidxid2;
Oid owner1;
Oid owner2;
/* authority check */
must_be_superuser("reorg_swap");
@ -510,10 +503,12 @@ reorg_swap(PG_FUNCTION_ARGS)
/* swap relfilenode and dependencies for tables. */
values[0] = ObjectIdGetDatum(oid);
reorg_execd(
"SELECT X.oid, X.reltoastrelid, TX.reltoastidxid,"
" Y.oid, Y.reltoastrelid, TY.reltoastidxid"
" FROM pg_class X LEFT JOIN pg_class TX ON X.reltoastrelid = TX.oid,"
" pg_class Y LEFT JOIN pg_class TY ON Y.reltoastrelid = TY.oid"
"SELECT X.reltoastrelid, TX.reltoastidxid, X.relowner,"
" Y.oid, Y.reltoastrelid, TY.reltoastidxid, Y.relowner"
" FROM pg_catalog.pg_class X LEFT JOIN pg_catalog.pg_class TX"
" ON X.reltoastrelid = TX.oid,"
" pg_catalog.pg_class Y LEFT JOIN pg_catalog.pg_class TY"
" ON Y.reltoastrelid = TY.oid"
" WHERE X.oid = $1"
" AND Y.oid = ('reorg.table_' || X.oid)::regclass",
1, argtypes, values, nulls, SPI_OK_SELECT);
@ -527,11 +522,13 @@ reorg_swap(PG_FUNCTION_ARGS)
tuple = tuptable->vals[0];
reltoastrelid1 = getoid(tuple, desc, 2);
reltoastidxid1 = getoid(tuple, desc, 3);
reltoastrelid1 = getoid(tuple, desc, 1);
reltoastidxid1 = getoid(tuple, desc, 2);
owner1 = getoid(tuple, desc, 3);
oid2 = getoid(tuple, desc, 4);
reltoastrelid2 = getoid(tuple, desc, 5);
reltoastidxid2 = getoid(tuple, desc, 6);
owner2 = getoid(tuple, desc, 7);
/* should be all-or-nothing */
if ((reltoastrelid1 == InvalidOid || reltoastidxid1 == InvalidOid ||
@ -543,6 +540,14 @@ reorg_swap(PG_FUNCTION_ARGS)
reltoastrelid1, reltoastidxid1, reltoastrelid2, reltoastidxid2);
}
/* change owner of new relation to original owner */
if (owner1 != owner2)
{
ATExecChangeOwner(oid2, owner1, true);
CommandCounterIncrement();
}
/* swap heap and index files */
swap_heap_or_index_files(oid, oid2);
CommandCounterIncrement();
@ -550,7 +555,9 @@ reorg_swap(PG_FUNCTION_ARGS)
values[0] = ObjectIdGetDatum(oid);
reorg_execd(
"SELECT X.oid, Y.oid"
" FROM pg_index I, pg_class X, pg_class Y"
" FROM pg_catalog.pg_index I,"
" pg_catalog.pg_class X,"
" pg_catalog.pg_class Y"
" WHERE I.indrelid = $1"
" AND I.indexrelid = X.oid"
" AND Y.oid = ('reorg.index_' || X.oid)::regclass",
@ -897,35 +904,6 @@ swap_heap_or_index_files(Oid r1, Oid r2)
extern PGDLLIMPORT bool allowSystemTableMods;
static int
SPI_execute_with_args(const char *src,
int nargs, Oid *argtypes,
Datum *values, const char *nulls,
bool read_only, long tcount)
{
SPIPlanPtr plan;
int ret;
plan = SPI_prepare(src, nargs, argtypes);
if (plan == NULL)
return SPI_result;
ret = SPI_execute_plan(plan, values, nulls, read_only, tcount);
SPI_freeplan(plan);
return ret;
}
static text *
cstring_to_text(const char * s)
{
int len = strlen(s);
text *result = palloc(len + VARHDRSZ);
SET_VARSIZE(result, len + VARHDRSZ);
memcpy(VARDATA(result), s, len);
return result;
}
static void
RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
{