Take an ACCESS SHARE LOCK on the target table, in an initial attempt to prevent concurrent DDL.
This is a first pass at Daniele's suggestion in Issue #8, although it is definitely still buggy -- it is still possible for another transaction to get in an AccessExclusive lock and perform DDL either before the ACCESS SHARE lock is acquired or immediately after it is released.
This commit is contained in:
parent
e02811689a
commit
78bae38718
@ -35,14 +35,20 @@ const char *PROGRAM_VERSION = "unknown";
|
||||
*/
|
||||
#define APPLY_COUNT 1000
|
||||
|
||||
/* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
|
||||
/* Record the PIDs of any possibly-conflicting transactions. Ignore the PID
|
||||
* of our primary connection, and our second connection holding an
|
||||
* ACCESS SHARE table lock.
|
||||
* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
|
||||
* servers. See GH ticket #1.
|
||||
*/
|
||||
#define SQL_XID_SNAPSHOT \
|
||||
"SELECT repack.array_accum(virtualtransaction) FROM pg_locks"\
|
||||
" WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()"\
|
||||
" WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)"\
|
||||
" AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0')"
|
||||
|
||||
/* Later, check whether any of the transactions we saw before are still
|
||||
* alive, and wait for them to go away.
|
||||
*/
|
||||
#define SQL_XID_ALIVE \
|
||||
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
|
||||
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
|
||||
@ -89,7 +95,7 @@ static void repack_cleanup(bool fatal, void *userdata);
|
||||
|
||||
static char *getstr(PGresult *res, int row, int col);
|
||||
static Oid getoid(PGresult *res, int row, int col);
|
||||
static void lock_exclusive(const char *relid, const char *lock_query);
|
||||
static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2);
|
||||
|
||||
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
|
||||
#define SQLSTATE_QUERY_CANCELED "57014"
|
||||
@ -384,6 +390,7 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
int i;
|
||||
int num_waiting = 0;
|
||||
char *vxid;
|
||||
char *lock_conn_pid;
|
||||
char buffer[12];
|
||||
StringInfoData sql;
|
||||
|
||||
@ -414,7 +421,8 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
* 1. Setup workspaces and a trigger.
|
||||
*/
|
||||
elog(DEBUG2, "---- setup ----");
|
||||
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
|
||||
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE);
|
||||
|
||||
|
||||
/*
|
||||
* Check z_repack_trigger is the trigger executed at last so that
|
||||
@ -438,6 +446,37 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
command(sql.data, 0, NULL);
|
||||
command("COMMIT", 0, NULL);
|
||||
|
||||
PQclear(PQexec(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED"));
|
||||
elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name);
|
||||
|
||||
/* XXX: table name escaping? */
|
||||
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
|
||||
table->target_name);
|
||||
res = PQexec(conn2, sql.data);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
{
|
||||
printf("%s", PQerrorMessage(conn2));
|
||||
PQclear(res);
|
||||
exit(1);
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name);
|
||||
|
||||
/* store the backend PID of our connection keeping an ACCESS SHARE
|
||||
lock on the target table.
|
||||
*/
|
||||
res = PQexec(conn2, "SELECT pg_backend_pid()");
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
{
|
||||
printf("%s", PQerrorMessage(conn2));
|
||||
PQclear(res);
|
||||
exit(1);
|
||||
}
|
||||
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
|
||||
elog(WARNING, "Have backend PID: %s", lock_conn_pid);
|
||||
PQclear(res);
|
||||
|
||||
/*
|
||||
* Register the table to be dropped on error. We use pktype as
|
||||
* an advisory lock. The registration should be done after
|
||||
@ -455,9 +494,14 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
|
||||
if (orderby && !orderby[0])
|
||||
command("SET LOCAL synchronize_seqscans = off", 0, NULL);
|
||||
res = execute(SQL_XID_SNAPSHOT, 0, NULL);
|
||||
|
||||
/* Fetch an array of Virtual IDs of all transactions active right now.
|
||||
*/
|
||||
params[0] = lock_conn_pid;
|
||||
res = execute(SQL_XID_SNAPSHOT, 1, params);
|
||||
vxid = strdup(PQgetvalue(res, 0, 0));
|
||||
PQclear(res);
|
||||
|
||||
command(table->delete_log, 0, NULL);
|
||||
command(table->create_table, 0, NULL);
|
||||
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
|
||||
@ -554,7 +598,7 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
* 5. Swap.
|
||||
*/
|
||||
elog(DEBUG2, "---- swap ----");
|
||||
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
|
||||
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE);
|
||||
apply_log(table, 0);
|
||||
params[0] = utoa(table->target_oid, buffer);
|
||||
command("SELECT repack.repack_swap($1)", 1, params);
|
||||
@ -593,9 +637,15 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
|
||||
/*
|
||||
* Try acquire a table lock but avoid long time locks when conflict.
|
||||
* Arguments:
|
||||
*
|
||||
* relid: OID of relation
|
||||
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
|
||||
* release_conn2: whether we should issue a COMMIT in conn2 to release
|
||||
* its lock.
|
||||
*/
|
||||
static void
|
||||
lock_exclusive(const char *relid, const char *lock_query)
|
||||
lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
|
||||
{
|
||||
time_t start = time(NULL);
|
||||
int i;
|
||||
@ -634,6 +684,12 @@ lock_exclusive(const char *relid, const char *lock_query)
|
||||
command(cancel_query, 1, &relid);
|
||||
}
|
||||
|
||||
/* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
|
||||
* lock.
|
||||
*/
|
||||
if (release_conn2)
|
||||
PQclear(PQexec(conn2, "COMMIT"));
|
||||
|
||||
/* wait for a while to lock the table. */
|
||||
wait_msec = Min(1000, i * 100);
|
||||
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
|
||||
@ -683,6 +739,9 @@ repack_cleanup(bool fatal, void *userdata)
|
||||
const char *params[1];
|
||||
|
||||
/* Rollback current transaction */
|
||||
if (conn2)
|
||||
PQclear(PQexec(conn2, "ROLLBACK"));
|
||||
|
||||
if (connection)
|
||||
command("ROLLBACK", 0, NULL);
|
||||
|
||||
|
@ -24,6 +24,7 @@ char *password = NULL;
|
||||
YesNo prompt_password = DEFAULT;
|
||||
|
||||
PGconn *connection = NULL;
|
||||
PGconn *conn2 = NULL;
|
||||
|
||||
static bool parse_pair(const char buffer[], char key[], char value[]);
|
||||
static char *get_username(void);
|
||||
@ -51,6 +52,7 @@ reconnect(int elevel)
|
||||
appendStringInfo(&buf, "password=%s ", password);
|
||||
|
||||
connection = pgut_connect(buf.data, prompt_password, elevel);
|
||||
conn2 = pgut_connect(buf.data, prompt_password, elevel);
|
||||
|
||||
/* update password */
|
||||
if (connection)
|
||||
|
@ -56,6 +56,7 @@ extern char *password;
|
||||
extern YesNo prompt_password;
|
||||
|
||||
extern PGconn *connection;
|
||||
extern PGconn *conn2;
|
||||
|
||||
extern void pgut_help(bool details);
|
||||
extern void help(bool details);
|
||||
|
Loading…
x
Reference in New Issue
Block a user