pg_repack/lib/pg_repack.sql.in
Josh Kupershmidt 506104686b Fix a nasty data loss/corruption bug from the sql_pop query.
It is not safe to assume that we can bulk-delete all entries from the
log table based on their "id" being less than the largest "id" we have
processed so far, as we may unintentionally delete some tuples from the
log which we haven't actually processed yet in the case of concurrent
transactions adding tuples into the log table.
2014-04-22 10:56:24 -04:00

253 lines
9.0 KiB
MySQL

/*
* pg_repack: lib/pg_repack.sql.in
*
* Portions Copyright (c) 2008-2011, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2011, Itagaki Takahiro
* Portions Copyright (c) 2012, The Reorg Development Team
*/
CREATE SCHEMA repack;
CREATE FUNCTION repack.version() RETURNS text AS
'MODULE_PATHNAME', 'repack_version'
LANGUAGE C IMMUTABLE STRICT;
CREATE FUNCTION repack.version_sql() RETURNS text AS
$$SELECT 'pg_repack REPACK_VERSION'::text$$
LANGUAGE SQL IMMUTABLE STRICT;
CREATE AGGREGATE repack.array_accum (
sfunc = array_append,
basetype = anyelement,
stype = anyarray,
initcond = '{}'
);
CREATE FUNCTION repack.oid2text(oid) RETURNS text AS
$$
SELECT textin(regclassout($1));
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.get_index_columns(oid, text) RETURNS text AS
$$
SELECT array_to_string(repack.array_accum(quote_ident(attname)), $2)
FROM pg_attribute,
(SELECT indrelid,
indkey,
generate_series(0, indnatts-1) AS i
FROM pg_index
WHERE indexrelid = $1
) AS keys
WHERE attrelid = indrelid
AND attnum = indkey[i];
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.get_order_by(oid, oid) RETURNS text AS
'MODULE_PATHNAME', 'repack_get_order_by'
LANGUAGE C STABLE STRICT;
CREATE FUNCTION repack.get_create_index_type(oid, name) RETURNS text AS
$$
SELECT 'CREATE TYPE ' || $2 || ' AS (' ||
array_to_string(repack.array_accum(quote_ident(attname) || ' ' ||
pg_catalog.format_type(atttypid, atttypmod)), ', ') || ')'
FROM pg_attribute,
(SELECT indrelid,
indkey,
generate_series(0, indnatts-1) AS i
FROM pg_index
WHERE indexrelid = $1
) AS keys
WHERE attrelid = indrelid
AND attnum = indkey[i];
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.get_create_trigger(relid oid, pkid oid)
RETURNS text AS
$$
SELECT 'CREATE TRIGGER z_repack_trigger' ||
' BEFORE INSERT OR DELETE OR UPDATE ON ' || repack.oid2text($1) ||
' FOR EACH ROW EXECUTE PROCEDURE repack.repack_trigger(' ||
'''INSERT INTO repack.log_' || $1 || '(pk, row) VALUES(' ||
' CASE WHEN $1 IS NULL THEN NULL ELSE (ROW($1.' ||
repack.get_index_columns($2, ', $1.') || ')::repack.pk_' ||
$1 || ') END, $2)'')';
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.get_enable_trigger(relid oid)
RETURNS text AS
$$
SELECT 'ALTER TABLE ' || repack.oid2text($1) ||
' ENABLE ALWAYS TRIGGER z_repack_trigger';
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.get_assign(oid, text) RETURNS text AS
$$
SELECT '(' || array_to_string(repack.array_accum(quote_ident(attname)), ', ') ||
') = (' || $2 || '.' ||
array_to_string(repack.array_accum(quote_ident(attname)), ', ' || $2 || '.') || ')'
FROM (SELECT attname FROM pg_attribute
WHERE attrelid = $1 AND attnum > 0 AND NOT attisdropped
ORDER BY attnum) tmp;
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.get_compare_pkey(oid, text)
RETURNS text AS
$$
SELECT '(' || array_to_string(repack.array_accum(quote_ident(attname)), ', ') ||
') = (' || $2 || '.' ||
array_to_string(repack.array_accum(quote_ident(attname)), ', ' || $2 || '.') || ')'
FROM pg_attribute,
(SELECT indrelid,
indkey,
generate_series(0, indnatts-1) AS i
FROM pg_index
WHERE indexrelid = $1
) AS keys
WHERE attrelid = indrelid
AND attnum = indkey[i];
$$
LANGUAGE sql STABLE STRICT;
-- Get a column list for SELECT all columns including dropped ones.
-- We use NULLs of integer types for dropped columns (types are not important).
CREATE FUNCTION repack.get_columns_for_create_as(oid)
RETURNS text AS
$$
SELECT array_to_string(repack.array_accum(c), ',') FROM (SELECT
CASE WHEN attisdropped
THEN 'NULL::integer AS ' || quote_ident(attname)
ELSE quote_ident(attname)
END AS c
FROM pg_attribute
WHERE attrelid = $1 AND attnum > 0 ORDER BY attnum
) AS COL
$$
LANGUAGE sql STABLE STRICT;
-- Get a SQL text to DROP dropped columns for the table,
-- or NULL if it has no dropped columns.
CREATE FUNCTION repack.get_drop_columns(oid, text)
RETURNS text AS
$$
SELECT
'ALTER TABLE ' || $2 || ' ' || array_to_string(dropped_columns, ', ')
FROM (
SELECT
repack.array_accum('DROP COLUMN ' || quote_ident(attname)) AS dropped_columns
FROM (
SELECT * FROM pg_attribute
WHERE attrelid = $1 AND attnum > 0 AND attisdropped
ORDER BY attnum
) T
) T
WHERE
array_upper(dropped_columns, 1) > 0
$$
LANGUAGE sql STABLE STRICT;
-- includes not only PRIMARY KEYS but also UNIQUE NOT NULL keys
CREATE VIEW repack.primary_keys AS
SELECT indrelid, (repack.array_accum(indexrelid))[1] AS indexrelid
FROM (SELECT indrelid, indexrelid FROM pg_index
WHERE indisunique
AND indisvalid
AND indpred IS NULL
AND 0 <> ALL(indkey)
AND NOT EXISTS(
SELECT 1 FROM pg_attribute
WHERE attrelid = indrelid
AND attnum = ANY(indkey)
AND NOT attnotnull)
ORDER BY indrelid, indisprimary DESC, indnatts, indkey) tmp
GROUP BY indrelid;
CREATE VIEW repack.tables AS
SELECT R.oid::regclass AS relname,
R.oid AS relid,
R.reltoastrelid AS reltoastrelid,
CASE WHEN R.reltoastrelid = 0 THEN 0 ELSE (SELECT reltoastidxid FROM pg_class WHERE oid = R.reltoastrelid) END AS reltoastidxid,
PK.indexrelid AS pkid,
CK.indexrelid AS ckid,
repack.get_create_index_type(PK.indexrelid, 'repack.pk_' || R.oid) AS create_pktype,
'CREATE TABLE repack.log_' || R.oid || ' (id bigserial PRIMARY KEY, pk repack.pk_' || R.oid || ', row ' || repack.oid2text(R.oid) || ')' AS create_log,
repack.get_create_trigger(R.oid, PK.indexrelid) AS create_trigger,
repack.get_enable_trigger(R.oid) as enable_trigger,
'CREATE TABLE repack.table_' || R.oid || ' WITH (' || array_to_string(array_append(R.reloptions, 'oids=' || CASE WHEN R.relhasoids THEN 'true' ELSE 'false' END), ',') || ') TABLESPACE ' AS create_table_1,
coalesce(quote_ident(S.spcname), 'pg_default') as tablespace_orig,
' AS SELECT ' || repack.get_columns_for_create_as(R.oid) || ' FROM ONLY ' || repack.oid2text(R.oid) AS create_table_2,
repack.get_drop_columns(R.oid, 'repack.table_' || R.oid) AS drop_columns,
'DELETE FROM repack.log_' || R.oid AS delete_log,
'LOCK TABLE ' || repack.oid2text(R.oid) || ' IN ACCESS EXCLUSIVE MODE' AS lock_table,
repack.get_order_by(CK.indexrelid, R.oid) AS ckey,
'SELECT * FROM repack.log_' || R.oid || ' ORDER BY id LIMIT $1' AS sql_peek,
'INSERT INTO repack.table_' || R.oid || ' VALUES ($1.*)' AS sql_insert,
'DELETE FROM repack.table_' || R.oid || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_delete,
'UPDATE repack.table_' || R.oid || ' SET ' || repack.get_assign(R.oid, '$2') || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_update,
'DELETE FROM repack.log_' || R.oid || ' WHERE id = $1' AS sql_pop
FROM pg_class R
LEFT JOIN pg_class T ON R.reltoastrelid = T.oid
LEFT JOIN repack.primary_keys PK
ON R.oid = PK.indrelid
LEFT JOIN (SELECT CKI.* FROM pg_index CKI, pg_class CKT
WHERE CKI.indisvalid
AND CKI.indexrelid = CKT.oid
AND CKI.indisclustered
AND CKT.relam = 403) CK
ON R.oid = CK.indrelid
LEFT JOIN pg_namespace N ON N.oid = R.relnamespace
LEFT JOIN pg_tablespace S ON S.oid = R.reltablespace
WHERE R.relkind = 'r'
AND N.nspname NOT IN ('pg_catalog', 'information_schema')
AND N.nspname NOT LIKE E'pg\\_temp\\_%';
CREATE FUNCTION repack.repack_indexdef(oid, oid, name, bool) RETURNS text AS
'MODULE_PATHNAME', 'repack_indexdef'
LANGUAGE C STABLE;
CREATE FUNCTION repack.repack_trigger() RETURNS trigger AS
'MODULE_PATHNAME', 'repack_trigger'
LANGUAGE C VOLATILE STRICT SECURITY DEFINER;
CREATE FUNCTION repack.conflicted_triggers(oid) RETURNS SETOF name AS
$$
SELECT tgname FROM pg_trigger
WHERE tgrelid = $1 AND tgname >= 'z_repack_trigger'
AND (tgtype & 2) = 2 -- BEFORE trigger
ORDER BY tgname;
$$
LANGUAGE sql STABLE STRICT;
CREATE FUNCTION repack.disable_autovacuum(regclass) RETURNS void AS
'MODULE_PATHNAME', 'repack_disable_autovacuum'
LANGUAGE C VOLATILE STRICT;
CREATE FUNCTION repack.repack_apply(
sql_peek cstring,
sql_insert cstring,
sql_delete cstring,
sql_update cstring,
sql_pop cstring,
count integer)
RETURNS integer AS
'MODULE_PATHNAME', 'repack_apply'
LANGUAGE C VOLATILE;
CREATE FUNCTION repack.repack_swap(oid) RETURNS void AS
'MODULE_PATHNAME', 'repack_swap'
LANGUAGE C VOLATILE STRICT;
CREATE FUNCTION repack.repack_drop(oid) RETURNS void AS
'MODULE_PATHNAME', 'repack_drop'
LANGUAGE C VOLATILE STRICT;
CREATE FUNCTION repack.repack_index_swap(oid) RETURNS void AS
'MODULE_PATHNAME', 'repack_index_swap'
LANGUAGE C STABLE STRICT;