resolve conflicts

This commit is contained in:
Dmitry Ivanov
2017-03-23 14:14:18 +03:00
9 changed files with 440 additions and 145 deletions

View File

@ -152,6 +152,11 @@ const char *PROGRAM_VERSION = "unknown";
" AND granted = false AND relation = %u"\
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
#define COUNT_COMPETING_LOCKS \
"SELECT pid FROM pg_locks WHERE locktype = 'relation'" \
" AND granted = false AND relation = %u" \
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
/* Will be used as a unique prefix for advisory locks. */
#define REPACK_LOCK_PREFIX_STR "16185446"
@ -186,8 +191,8 @@ typedef struct repack_table
Oid ckid; /* target: CK OID */
const char *create_pktype; /* CREATE TYPE pk */
const char *create_log; /* CREATE TABLE log */
const char *create_trigger; /* CREATE TRIGGER z_repack_trigger */
const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER z_repack_trigger */
const char *create_trigger; /* CREATE TRIGGER repack_trigger */
const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
const char *create_table; /* CREATE TABLE table AS SELECT */
const char *drop_columns; /* ALTER TABLE DROP COLUMNs */
const char *delete_log; /* DELETE FROM log */
@ -244,6 +249,8 @@ static int wait_timeout = 60; /* in seconds */
static int jobs = 0; /* number of concurrent worker conns. */
static bool dryrun = false;
static unsigned int temp_obj_num = 0; /* temporary objects counter */
static bool no_kill_backend = false; /* abandon when timed-out */
static bool no_superuser_check = false;
static bool include_extensions = false; /* repack tables of extensions */
/* buffer should have at least 11 bytes */
@ -270,6 +277,8 @@ static pgut_option options[] =
{ 'i', 'T', "wait-timeout", &wait_timeout },
{ 'B', 'Z', "no-analyze", &analyze },
{ 'i', 'j', "jobs", &jobs },
{ 'b', 'D', "no-kill-backend", &no_kill_backend },
{ 'b', 'k', "no-superuser-check", &no_superuser_check },
{ 'b', 'C', "include-extensions", &include_extensions },
{ 0 },
};
@ -373,6 +382,9 @@ is_superuser(void)
{
const char *val;
if (no_superuser_check)
return true;
if (!connection)
return false;
@ -1042,7 +1054,7 @@ repack_one_table(repack_table *table, const char *orderby)
const char *appname = getenv("PGAPPNAME");
/* Keep track of whether we have gotten through setup to install
* the z_repack_trigger, log table, etc. ourselves. We don't want to
* the repack_trigger, log table, etc. ourselves. We don't want to
* go through repack_cleanup() if we didn't actually set up the
* trigger ourselves, lest we be cleaning up another pg_repack's mess,
* or worse, interfering with a still-running pg_repack.
@ -1092,7 +1104,10 @@ repack_one_table(repack_table *table, const char *orderby)
if (!(lock_exclusive(connection, buffer, table->lock_table, TRUE)))
{
elog(WARNING, "lock_exclusive() failed for %s", table->target_name);
if (no_kill_backend)
elog(INFO, "Skipping repack %s due to timeout", table->target_name);
else
elog(WARNING, "lock_exclusive() failed for %s", table->target_name);
goto cleanup;
}
@ -1144,43 +1159,28 @@ repack_one_table(repack_table *table, const char *orderby)
/*
* Check z_repack_trigger is the trigger executed last so that
* other before triggers cannot modify triggered tuples.
* Check if repack_trigger is not conflict with existing trigger. We can
* find it out later but we check it in advance and go to cleanup if needed.
* In AFTER trigger context, since triggered tuple is not changed by other
* trigger we don't care about the fire order.
*/
res = execute("SELECT repack.conflicted_triggers($1)", 1, params);
if (PQntuples(res) > 0)
{
if (0 == strcmp("z_repack_trigger", PQgetvalue(res, 0, 0)))
{
ereport(WARNING,
ereport(WARNING,
(errcode(E_PG_COMMAND),
errmsg("the table \"%s\" already has a trigger called \"%s\"",
table->target_name, PQgetvalue(res, 0, 0)),
table->target_name, "repack_trigger"),
errdetail(
"The trigger was probably installed during a previous"
" attempt to run pg_repack on the table which was"
" interrupted and for some reason failed to clean up"
" the temporary objects. Please drop the trigger or drop"
"The trigger was probably installed during a previous"
" attempt to run pg_repack on the table which was"
" interrupted and for some reason failed to clean up"
" the temporary objects. Please drop the trigger or drop"
" and recreate the pg_repack extension altogether"
" to remove all the temporary objects left over.")));
}
else
{
ereport(WARNING,
(errcode(E_PG_COMMAND),
errmsg("trigger \"%s\" conflicting on table \"%s\"",
PQgetvalue(res, 0, 0), table->target_name),
errdetail(
"The trigger \"z_repack_trigger\" must be the last of the"
" BEFORE triggers to fire on the table (triggers fire in"
" alphabetical order). Please rename the trigger so that"
" it sorts before \"z_repack_trigger\": you can use"
" \"ALTER TRIGGER %s ON %s RENAME TO newname\".",
PQgetvalue(res, 0, 0), table->target_name)));
}
" to remove all the temporary objects left over.")));
goto cleanup;
}
CLEARPGRES(res);
command(table->create_pktype, 0, NULL);
@ -1241,7 +1241,10 @@ repack_one_table(repack_table *table, const char *orderby)
*/
if (!(kill_ddl(connection, table->target_oid, true)))
{
elog(WARNING, "kill_ddl() failed.");
if (no_kill_backend)
elog(INFO, "Skipping repack %s due to timeout.", table->target_name);
else
elog(WARNING, "kill_ddl() failed.");
goto cleanup;
}
@ -1250,7 +1253,7 @@ repack_one_table(repack_table *table, const char *orderby)
*/
command("COMMIT", 0, NULL);
/* The main connection has now committed its z_repack_trigger,
/* The main connection has now committed its repack_trigger,
* log table, and temp. table. If any error occurs from this point
* on and we bail out, we should try to clean those up.
*/
@ -1471,9 +1474,9 @@ cleanup:
}
/* Kill off any concurrent DDL (or any transaction attempting to take
* an AccessExclusive lock) trying to run against our table. Note, we're
* killing these queries off *before* they are granted an AccessExclusive
* lock on our table.
* an AccessExclusive lock) trying to run against our table if we want to
* do. Note, we're killing these queries off *before* they are granted
* an AccessExclusive lock on our table.
*
* Returns true if no problems encountered, false otherwise.
*/
@ -1483,35 +1486,57 @@ kill_ddl(PGconn *conn, Oid relid, bool terminate)
bool ret = true;
PGresult *res;
StringInfoData sql;
int n_tuples;
initStringInfo(&sql);
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid);
/* Check the number of backends competing AccessExclusiveLock */
printfStringInfo(&sql, COUNT_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Error canceling unsafe queries: %s",
PQerrorMessage(conn));
ret = false;
}
else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
{
elog(WARNING,
"Canceled %d unsafe queries. Terminating any remaining PIDs.",
PQntuples(res));
n_tuples = PQntuples(res);
CLEARPGRES(res);
printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
if (n_tuples != 0)
{
/* Competing backend is exsits, but if we do not want to calcel/terminate
* any backend, do nothing.
*/
if (no_kill_backend)
{
elog(WARNING, "Error killing unsafe queries: %s",
PQerrorMessage(conn));
elog(WARNING, "%d unsafe queries remain but do not cancel them and skip to repack it",
n_tuples);
ret = false;
}
else
{
resetStringInfo(&sql);
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Error canceling unsafe queries: %s",
PQerrorMessage(conn));
ret = false;
}
else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
{
elog(WARNING,
"Canceled %d unsafe queries. Terminating any remaining PIDs.",
PQntuples(res));
CLEARPGRES(res);
printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
res = pgut_execute(conn, sql.data, 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Error killing unsafe queries: %s",
PQerrorMessage(conn));
ret = false;
}
}
else if (PQntuples(res) > 0)
elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res));
}
}
else if (PQntuples(res) > 0)
elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res));
else
elog(DEBUG2, "No competing DDL to cancel.");
@ -1670,26 +1695,35 @@ lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool sta
duration = time(NULL) - start;
if (duration > wait_timeout)
{
const char *cancel_query;
if (PQserverVersion(conn) >= 80400 &&
duration > wait_timeout * 2)
if (no_kill_backend)
{
elog(WARNING, "terminating conflicted backends");
cancel_query =
"SELECT pg_terminate_backend(pid) FROM pg_locks"
" WHERE locktype = 'relation'"
" AND relation = $1 AND pid <> pg_backend_pid()";
elog(WARNING, "timed out, do not cancel conflicting backends");
ret = false;
break;
}
else
{
elog(WARNING, "canceling conflicted backends");
cancel_query =
"SELECT pg_cancel_backend(pid) FROM pg_locks"
" WHERE locktype = 'relation'"
" AND relation = $1 AND pid <> pg_backend_pid()";
}
const char *cancel_query;
if (PQserverVersion(conn) >= 80400 &&
duration > wait_timeout * 2)
{
elog(WARNING, "terminating conflicted backends");
cancel_query =
"SELECT pg_terminate_backend(pid) FROM pg_locks"
" WHERE locktype = 'relation'"
" AND relation = $1 AND pid <> pg_backend_pid()";
}
else
{
elog(WARNING, "canceling conflicted backends");
cancel_query =
"SELECT pg_cancel_backend(pid) FROM pg_locks"
" WHERE locktype = 'relation'"
" AND relation = $1 AND pid <> pg_backend_pid()";
}
pgut_command(conn, cancel_query, 1, &relid);
pgut_command(conn, cancel_query, 1, &relid);
}
}
/* wait for a while to lock the table. */
@ -2081,6 +2115,8 @@ pgut_help(bool details)
printf(" -i, --index=INDEX move only the specified index\n");
printf(" -x, --only-indexes move only indexes of the specified table\n");
printf(" -T, --wait-timeout=SECS timeout to cancel other backends on conflict\n");
printf(" -D, --no-kill-backend don't kill other backends when timed out\n");
printf(" -Z, --no-analyze don't analyze at end\n");
printf(" -k, --no-superuser-check skip superuser checks in client\n");
printf(" -C, --include-extensions repack tables which belong to extensions\n");
}

View File

@ -1307,7 +1307,7 @@ pgut_malloc(size_t size)
if ((ret = malloc(size)) == NULL)
ereport(FATAL,
(errcode_errno(),
errmsg("could not allocate memory (%lu bytes): ",
errmsg("could not allocate memory (" UINT64_FORMAT " bytes): ",
(unsigned long) size)));
return ret;
}
@ -1320,7 +1320,7 @@ pgut_realloc(void *p, size_t size)
if ((ret = realloc(p, size)) == NULL)
ereport(FATAL,
(errcode_errno(),
errmsg("could not re-allocate memory (%lu bytes): ",
errmsg("could not re-allocate memory (" UINT64_FORMAT " bytes): ",
(unsigned long) size)));
return ret;
}