Ignore other pg_repack clients which may be running concurrently for the purposes of SQL_XID_SNAPSHOT.

Use application_name from pg_stat_activity, if available, to identify
other pg_repack clients. Fixes Issue #1.
This commit is contained in:
Josh Kupershmidt 2012-12-06 16:11:55 -07:00
parent 1a0a28d3f8
commit a072cc9812

View File

@ -35,13 +35,52 @@ const char *PROGRAM_VERSION = "unknown";
*/
#define APPLY_COUNT 1000
/* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
* servers. See GH ticket #1.
/* Compile an array of existing transactions which are active during
* pg_repack's setup. Some transactions we can safely ignore:
* a. The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
* servers. See https://github.com/reorg/pg_reorg/issues/1
* b. Our own database connection
* c. Other pg_repack clients, as distinguished by application_name, which
* may be operating on other tables at the same time. See
* https://github.com/reorg/pg_repack/issues/1
*
* Note, there is some redundancy in how the filtering is done (e.g. excluding
* based on pg_backend_pid() and application_name), but that shouldn't hurt
* anything.
*/
#define SQL_XID_SNAPSHOT_90200 \
"SELECT repack.array_accum(l.virtualtransaction) " \
" FROM pg_locks AS l " \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.pid " \
" WHERE l.locktype = 'virtualxid' AND l.pid <> pg_backend_pid() " \
"AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
"AND (a.application_name IS NULL OR a.application_name <> $1)"
#define SQL_XID_SNAPSHOT_90000 \
"SELECT repack.array_accum(l.virtualtransaction) " \
" FROM pg_locks AS l " \
" LEFT JOIN pg_stat_activity AS a " \
" ON l.pid = a.procpid " \
" WHERE l.locktype = 'virtualxid' AND l.pid <> pg_backend_pid() " \
"AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
"AND (a.application_name IS NULL OR a.application_name <> $1)"
/* application_name is not available before 9.0. The last clause of
* the WHERE clause is just to eat the $1 parameter (application name).
*/
#define SQL_XID_SNAPSHOT_80300 \
"SELECT repack.array_accum(virtualtransaction) FROM pg_locks" \
" WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()" \
" AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0') " \
" AND ($1 IS NOT NULL)"
#define SQL_XID_SNAPSHOT \
"SELECT repack.array_accum(virtualtransaction) FROM pg_locks"\
" WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()"\
" AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0')"
(PQserverVersion(connection) >= 90200 ? SQL_XID_SNAPSHOT_90200 : \
(PQserverVersion(connection) >= 90000 ? SQL_XID_SNAPSHOT_90000 : \
SQL_XID_SNAPSHOT_80300))
#define SQL_XID_ALIVE \
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
@ -554,7 +593,8 @@ repack_one_table(const repack_table *table, const char *orderby)
command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
if (orderby && !orderby[0])
command("SET LOCAL synchronize_seqscans = off", 0, NULL);
res = execute(SQL_XID_SNAPSHOT, 0, NULL);
params[0] = PROGRAM_NAME;
res = execute(SQL_XID_SNAPSHOT, 1, params);
vxid = strdup(PQgetvalue(res, 0, 0));
PQclear(res);