Perform sql_pop delete in bulk instead of one-at-a-time.
Prior to 506104686b these DELETEs had been done in large batches (of DEFAULT_PEEK_COUNT size), but that naive method of choosing rows to delete was unsafe. Here we continue to keep track of which rows must be deleted as we process them.
This commit is contained in:
parent
f0cd787bf5
commit
e2c720b89c
@ -191,7 +191,7 @@ CREATE VIEW repack.tables AS
|
||||
'INSERT INTO repack.table_' || R.oid || ' VALUES ($1.*)' AS sql_insert,
|
||||
'DELETE FROM repack.table_' || R.oid || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_delete,
|
||||
'UPDATE repack.table_' || R.oid || ' SET ' || repack.get_assign(R.oid, '$2') || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_update,
|
||||
'DELETE FROM repack.log_' || R.oid || ' WHERE id = $1' AS sql_pop
|
||||
'DELETE FROM repack.log_' || R.oid || ' WHERE id IN (' AS sql_pop
|
||||
FROM pg_class R
|
||||
LEFT JOIN pg_class T ON R.reltoastrelid = T.oid
|
||||
LEFT JOIN repack.primary_keys PK
|
||||
|
40
lib/repack.c
40
lib/repack.c
@ -190,7 +190,7 @@ repack_trigger(PG_FUNCTION_ARGS)
|
||||
* @param sql_insert SQL to insert into temp table.
|
||||
* @param sql_delete SQL to delete from temp table.
|
||||
* @param sql_update SQL to update temp table.
|
||||
* @param sql_pop SQL to delete tuple from log table.
|
||||
* @param sql_pop SQL to bulk-delete tuples from log table.
|
||||
* @param count Max number of operations, or no count iff <=0.
|
||||
* @retval Number of performed operations.
|
||||
*/
|
||||
@ -203,18 +203,20 @@ repack_apply(PG_FUNCTION_ARGS)
|
||||
const char *sql_insert = PG_GETARG_CSTRING(1);
|
||||
const char *sql_delete = PG_GETARG_CSTRING(2);
|
||||
const char *sql_update = PG_GETARG_CSTRING(3);
|
||||
const char *sql_pop = PG_GETARG_CSTRING(4);
|
||||
/* sql_pop, the fourth arg, will be used in the loop below */
|
||||
int32 count = PG_GETARG_INT32(5);
|
||||
|
||||
SPIPlanPtr plan_peek = NULL;
|
||||
SPIPlanPtr plan_insert = NULL;
|
||||
SPIPlanPtr plan_delete = NULL;
|
||||
SPIPlanPtr plan_update = NULL;
|
||||
SPIPlanPtr plan_pop = NULL;
|
||||
uint32 n, i;
|
||||
Oid argtypes_peek[1] = { INT4OID };
|
||||
Datum values_peek[1];
|
||||
bool nulls_peek[1] = { 0 };
|
||||
StringInfoData sql_pop;
|
||||
|
||||
initStringInfo(&sql_pop);
|
||||
|
||||
/* authority check */
|
||||
must_be_superuser("repack_apply");
|
||||
@ -252,15 +254,22 @@ repack_apply(PG_FUNCTION_ARGS)
|
||||
argtypes[1] = SPI_gettypeid(desc, 2); /* pk */
|
||||
argtypes[2] = SPI_gettypeid(desc, 3); /* row */
|
||||
|
||||
resetStringInfo(&sql_pop);
|
||||
appendStringInfoString(&sql_pop, PG_GETARG_CSTRING(4));
|
||||
|
||||
for (i = 0; i < ntuples; i++, n++)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
char *pkid;
|
||||
|
||||
tuple = tuptable->vals[i];
|
||||
values[0] = SPI_getbinval(tuple, desc, 1, &nulls[0]);
|
||||
values[1] = SPI_getbinval(tuple, desc, 2, &nulls[1]);
|
||||
values[2] = SPI_getbinval(tuple, desc, 3, &nulls[2]);
|
||||
|
||||
pkid = SPI_getvalue(tuple, desc, 1);
|
||||
Assert(pkid != NULL);
|
||||
|
||||
if (nulls[1])
|
||||
{
|
||||
/* INSERT */
|
||||
@ -282,15 +291,26 @@ repack_apply(PG_FUNCTION_ARGS)
|
||||
plan_update = repack_prepare(sql_update, 2, &argtypes[1]);
|
||||
execute_plan(SPI_OK_UPDATE, plan_update, &values[1], &nulls[1]);
|
||||
}
|
||||
/* Delete tuple in log.
|
||||
* XXX It would be a lot more efficient to perform
|
||||
* this DELETE in bulk, but be careful to only
|
||||
* delete log entries we have actually processed.
|
||||
|
||||
/* Add the primary key ID of each row from the log
|
||||
* table we have processed so far to this
|
||||
* DELETE ... IN (...) query string, so we
|
||||
* can delete all the rows we have processed at-once.
|
||||
*/
|
||||
if (plan_pop == NULL)
|
||||
plan_pop = repack_prepare(sql_pop, 1, argtypes);
|
||||
execute_plan(SPI_OK_DELETE, plan_pop, values, nulls);
|
||||
if (i == 0)
|
||||
appendStringInfoString(&sql_pop, pkid);
|
||||
else
|
||||
appendStringInfo(&sql_pop, ",%s", pkid);
|
||||
pfree(pkid);
|
||||
}
|
||||
/* i must be > 0 (and hence we must have some rows to delete)
|
||||
* since SPI_processed > 0
|
||||
*/
|
||||
Assert(i > 0);
|
||||
appendStringInfoString(&sql_pop, ");");
|
||||
|
||||
/* Bulk delete of processed rows from the log table */
|
||||
execute(SPI_OK_DELETE, sql_pop.data);
|
||||
|
||||
SPI_freetuptable(tuptable);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user