Factor out advisory locking code into advisory_lock()
Also, some further small fixes of error messages, comments, marking dubious kludges (e.g. strchr() to determine schema name).
This commit is contained in:
		| @ -184,6 +184,7 @@ static bool rebuild_indexes(const repack_table *table); | ||||
|  | ||||
| static char *getstr(PGresult *res, int row, int col); | ||||
| static Oid getoid(PGresult *res, int row, int col); | ||||
| static bool advisory_lock(PGconn *conn, const char *relid); | ||||
| static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); | ||||
| static bool kill_ddl(PGconn *conn, Oid relid, bool terminate); | ||||
| static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name); | ||||
| @ -1005,26 +1006,12 @@ repack_one_table(const repack_table *table, const char *orderby) | ||||
| 	 */ | ||||
| 	elog(DEBUG2, "---- setup ----"); | ||||
|  | ||||
| 	/* Obtain an advisory lock on the table's OID, to make sure no other | ||||
| 	 * pg_repack is working on the table. (Not a real concern with only | ||||
| 	 * full-table repacks, but mainly for index-only repacks.) | ||||
| 	 */ | ||||
| 	params[0] = utoa(table->target_oid, buffer); | ||||
| 	res = pgut_execute(connection, "SELECT pg_try_advisory_lock($1::bigint)", | ||||
| 					   1, params); | ||||
| 	if (PQresultStatus(res) != PGRES_TUPLES_OK) | ||||
| 	{ | ||||
| 		elog(ERROR, "%s",  PQerrorMessage(connection)); | ||||
| 		goto cleanup; | ||||
| 	} | ||||
| 	else if (strcmp(getstr(res, 0, 0), "t") != 0) | ||||
| 	{ | ||||
| 		elog(WARNING, "Another pg_repack command may be running on the table. Please try again later."); | ||||
| 		goto cleanup; | ||||
| 	} | ||||
| 	CLEARPGRES(res); | ||||
|  | ||||
| 	if (!(lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE))) | ||||
| 	if (!advisory_lock(connection, buffer)) | ||||
| 		goto cleanup; | ||||
|  | ||||
| 	if (!(lock_exclusive(connection, buffer, table->lock_table, TRUE))) | ||||
| 	{ | ||||
| 		elog(WARNING, "lock_exclusive() failed for %s", table->target_name); | ||||
| 		goto cleanup; | ||||
| @ -1465,6 +1452,40 @@ lock_access_share(PGconn *conn, Oid relid, const char *target_name) | ||||
| } | ||||
|  | ||||
|  | ||||
| /* XXX: Make sure that repack_one_table() also obtains an advisory | ||||
|  * lock on the table, so that we can't have a table-wide repack running | ||||
|  * along with an indexes-only repack. Also, since advisory locks are | ||||
|  * 8 bytes wide and OIDs are only 4 bytes, consider using our own prefix | ||||
|  * rather than just the table OID, to avoid inadvertent conflict with | ||||
|  * other applications using advisory locks. | ||||
|  */ | ||||
|  | ||||
| /* Obtain an advisory lock on the table's OID, to make sure no other | ||||
|  * pg_repack is working on the table. This is not so much a concern with | ||||
|  * full-table repacks, but mainly so that index-only repacks don't interfere | ||||
|  * with each other or a full-table repack. | ||||
|  */ | ||||
| static bool advisory_lock(PGconn *conn, const char *relid) | ||||
| { | ||||
| 	PGresult       *res = NULL; | ||||
| 	bool            ret = false; | ||||
|  | ||||
| 	res = pgut_execute(conn, "SELECT pg_try_advisory_lock($1::bigint)", | ||||
| 					   1, &relid); | ||||
|  | ||||
| 	if (PQresultStatus(res) != PGRES_TUPLES_OK) { | ||||
| 		elog(ERROR, "%s",  PQerrorMessage(connection)); | ||||
| 	} | ||||
| 	else if (strcmp(getstr(res, 0, 0), "t") != 0) { | ||||
| 		elog(WARNING, "Another pg_repack command may be running on the table. Please try again later."); | ||||
| 	} | ||||
| 	else { | ||||
| 		ret = true; | ||||
| 	} | ||||
| 	CLEARPGRES(res); | ||||
| 	return ret; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long | ||||
|  * waits by killing off other sessions. | ||||
| @ -1668,12 +1689,14 @@ cleanup: | ||||
|  * Call repack_one_index for each of the indexes | ||||
|  */ | ||||
| static bool | ||||
| repack_all_indexes(char *errbuf, size_t errsize){ | ||||
| repack_all_indexes(char *errbuf, size_t errsize) | ||||
| { | ||||
| 	bool				ret = false; | ||||
| 	PGresult		   *res = NULL, *res2 = NULL; | ||||
| 	PGresult		   *res = NULL; | ||||
| 	int					i; | ||||
| 	int					num; | ||||
| 	StringInfoData		sql; | ||||
| 	char				buffer[12]; | ||||
| 	const char			*params[1]; | ||||
| 	const char			*table_name = NULL; | ||||
| 	const char			*schema_name = NULL; | ||||
| @ -1715,7 +1738,7 @@ repack_all_indexes(char *errbuf, size_t errsize){ | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// seperate schema name and index name | ||||
| 		// separate schema name and index name. FIXME: kludge | ||||
| 		pos = strchr(params[0], '.'); | ||||
| 		if (pos) | ||||
| 		{ | ||||
| @ -1751,7 +1774,7 @@ repack_all_indexes(char *errbuf, size_t errsize){ | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// seperate schema name and table name | ||||
| 		// separate schema name and table name. FIXME: kludge | ||||
| 		pos = strchr(params[0], '.'); | ||||
| 		if (pos) | ||||
| 		{ | ||||
| @ -1763,30 +1786,14 @@ repack_all_indexes(char *errbuf, size_t errsize){ | ||||
| 			table_name = params[0]; | ||||
| 	} | ||||
|  | ||||
| 	/* XXX: Make sure that repack_one_table() also obtains an advisory | ||||
| 	 * lock on the table, so that we can't have a table-wide repack running | ||||
| 	 * along with an indexes-only repack. Also, since advisory locks are | ||||
| 	 * 8 bytes wide and OIDs are only 4 bytes, consider using our own prefix | ||||
| 	 * rather than just the table OID, to avoid inadvertent conflict with | ||||
| 	 * other applications using advisory locks. | ||||
| 	/* Check if any concurrent pg_repack command is being run on the same | ||||
| 	 * table. | ||||
| 	 */ | ||||
|  | ||||
| 	/* Check if any concurrent pg_repack command is being run on the same table */ | ||||
| 	initStringInfo(&sql); | ||||
| 	appendStringInfo(&sql, "SELECT pg_try_advisory_lock(%u)", getoid(res, 0, 3)); | ||||
|  | ||||
| 	res2 = execute_elevel(sql.data, 0, NULL, DEBUG2); | ||||
| 	if (PQresultStatus(res2) != PGRES_TUPLES_OK) | ||||
| 	{ | ||||
| 		elog(ERROR, "%s",  PQerrorMessage(connection)); | ||||
| 	if (!advisory_lock(connection, utoa(getoid(res, 0, 3), buffer))) { | ||||
| 		snprintf(errbuf, errsize, "Unable to obtain advisory lock on %s", | ||||
| 				 table_name); | ||||
| 		goto cleanup; | ||||
| 	} | ||||
| 	else if (strcmp(getstr(res2, 0, 0), "t") != 0) | ||||
| 	{ | ||||
| 		snprintf(errbuf, errsize, "Another pg_repack command may be running on the table. Please try again later."); | ||||
| 		goto cleanup; | ||||
| 	} | ||||
|  | ||||
| 	for (i = 0; i < num; i++) | ||||
| 	{ | ||||
| 		char *isvalid = getstr(res, i, 2); | ||||
| @ -1797,9 +1804,11 @@ repack_all_indexes(char *errbuf, size_t errsize){ | ||||
| 			else | ||||
| 				elog(INFO, "repacking index \"%s\"", getstr(res, i, 0)); | ||||
|  | ||||
| 			if (!(repack_one_index(getoid(res, i, 3), table_name, getoid(res, i, 1), schema_name))) | ||||
| 			if (!(repack_one_index(getoid(res, i, 3), table_name, getoid(res, i, 1), schema_name))) { | ||||
| 				/* FIXME: fill in errbuf here */ | ||||
| 				goto cleanup; | ||||
| 			} | ||||
| 		} | ||||
| 		else | ||||
| 			if (schema_name) | ||||
| 				elog(WARNING, "skipping invalid index: %s.%s", schema_name, getstr(res, i, 0)); | ||||
| @ -1810,7 +1819,6 @@ repack_all_indexes(char *errbuf, size_t errsize){ | ||||
|  | ||||
| cleanup: | ||||
| 	CLEARPGRES(res); | ||||
| 	CLEARPGRES(res2); | ||||
| 	disconnect(); | ||||
| 	termStringInfo(&sql); | ||||
| 	return ret; | ||||
|  | ||||
		Reference in New Issue
	
	Block a user