From e2c720b89c3a72a228c513c9b6fb3ddbb4b27757 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sun, 25 May 2014 22:04:13 -0400 Subject: [PATCH] Perform sql_pop delete in bulk instead of one-at-a-time. Prior to 506104686b these DELETEs had been done in large batches (of DEFAULT_PEEK_COUNT size), but that naive method of choosing rows to delete was unsafe. Here we continue to keep track of which rows must be deleted as we process them. --- lib/pg_repack.sql.in | 2 +- lib/repack.c | 40 ++++++++++++++++++++++++++++++---------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/lib/pg_repack.sql.in b/lib/pg_repack.sql.in index 8f7abea..37d7760 100644 --- a/lib/pg_repack.sql.in +++ b/lib/pg_repack.sql.in @@ -191,7 +191,7 @@ CREATE VIEW repack.tables AS 'INSERT INTO repack.table_' || R.oid || ' VALUES ($1.*)' AS sql_insert, 'DELETE FROM repack.table_' || R.oid || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_delete, 'UPDATE repack.table_' || R.oid || ' SET ' || repack.get_assign(R.oid, '$2') || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_update, - 'DELETE FROM repack.log_' || R.oid || ' WHERE id = $1' AS sql_pop + 'DELETE FROM repack.log_' || R.oid || ' WHERE id IN (' AS sql_pop FROM pg_class R LEFT JOIN pg_class T ON R.reltoastrelid = T.oid LEFT JOIN repack.primary_keys PK diff --git a/lib/repack.c b/lib/repack.c index eae361a..5dcfa7e 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -190,7 +190,7 @@ repack_trigger(PG_FUNCTION_ARGS) * @param sql_insert SQL to insert into temp table. * @param sql_delete SQL to delete from temp table. * @param sql_update SQL to update temp table. - * @param sql_pop SQL to delete tuple from log table. + * @param sql_pop SQL to bulk-delete tuples from log table. * @param count Max number of operations, or no count iff <=0. * @retval Number of performed operations. */ @@ -203,18 +203,20 @@ repack_apply(PG_FUNCTION_ARGS) const char *sql_insert = PG_GETARG_CSTRING(1); const char *sql_delete = PG_GETARG_CSTRING(2); const char *sql_update = PG_GETARG_CSTRING(3); - const char *sql_pop = PG_GETARG_CSTRING(4); + /* sql_pop, the fourth arg, will be used in the loop below */ int32 count = PG_GETARG_INT32(5); SPIPlanPtr plan_peek = NULL; SPIPlanPtr plan_insert = NULL; SPIPlanPtr plan_delete = NULL; SPIPlanPtr plan_update = NULL; - SPIPlanPtr plan_pop = NULL; uint32 n, i; Oid argtypes_peek[1] = { INT4OID }; Datum values_peek[1]; bool nulls_peek[1] = { 0 }; + StringInfoData sql_pop; + + initStringInfo(&sql_pop); /* authority check */ must_be_superuser("repack_apply"); @@ -252,15 +254,22 @@ repack_apply(PG_FUNCTION_ARGS) argtypes[1] = SPI_gettypeid(desc, 2); /* pk */ argtypes[2] = SPI_gettypeid(desc, 3); /* row */ + resetStringInfo(&sql_pop); + appendStringInfoString(&sql_pop, PG_GETARG_CSTRING(4)); + for (i = 0; i < ntuples; i++, n++) { HeapTuple tuple; + char *pkid; tuple = tuptable->vals[i]; values[0] = SPI_getbinval(tuple, desc, 1, &nulls[0]); values[1] = SPI_getbinval(tuple, desc, 2, &nulls[1]); values[2] = SPI_getbinval(tuple, desc, 3, &nulls[2]); + pkid = SPI_getvalue(tuple, desc, 1); + Assert(pkid != NULL); + if (nulls[1]) { /* INSERT */ @@ -282,15 +291,26 @@ repack_apply(PG_FUNCTION_ARGS) plan_update = repack_prepare(sql_update, 2, &argtypes[1]); execute_plan(SPI_OK_UPDATE, plan_update, &values[1], &nulls[1]); } - /* Delete tuple in log. - * XXX It would be a lot more efficient to perform - * this DELETE in bulk, but be careful to only - * delete log entries we have actually processed. + + /* Add the primary key ID of each row from the log + * table we have processed so far to this + * DELETE ... IN (...) query string, so we + * can delete all the rows we have processed at-once. */ - if (plan_pop == NULL) - plan_pop = repack_prepare(sql_pop, 1, argtypes); - execute_plan(SPI_OK_DELETE, plan_pop, values, nulls); + if (i == 0) + appendStringInfoString(&sql_pop, pkid); + else + appendStringInfo(&sql_pop, ",%s", pkid); + pfree(pkid); } + /* i must be > 0 (and hence we must have some rows to delete) + * since SPI_processed > 0 + */ + Assert(i > 0); + appendStringInfoString(&sql_pop, ");"); + + /* Bulk delete of processed rows from the log table */ + execute(SPI_OK_DELETE, sql_pop.data); SPI_freetuptable(tuptable); }