First pass at implementing concurrent index builds using multiple connections.
Adds a new --jobs command-line argument to specify how many worker connections you want. These worker connections should stick around while processing table(s) in a single database. For each table, parcel out the indexes to be built among these worker conns, submitting each CREATE INDEX ... request using PQsendQuery() i.e. in non-blocking fashion. Most of this is still rather crude, in particular the while (num_active_workers) ... loop in rebuild_indexes(), but it seems to be working, so I'm committing here.
This commit is contained in:
parent
9d776b3980
commit
0d984ed3e5
256
bin/pg_repack.c
256
bin/pg_repack.c
@ -137,6 +137,14 @@ typedef struct repack_table
|
||||
const char *sql_pop; /* SQL used in flush */
|
||||
} repack_table;
|
||||
|
||||
|
||||
typedef enum
|
||||
{
|
||||
UNPROCESSED,
|
||||
INPROGRESS,
|
||||
FINISHED
|
||||
} index_status_t;
|
||||
|
||||
/*
|
||||
* per-index information
|
||||
*/
|
||||
@ -144,6 +152,8 @@ typedef struct repack_index
|
||||
{
|
||||
Oid target_oid; /* target: OID */
|
||||
const char *create_index; /* CREATE INDEX */
|
||||
index_status_t status; /* Track parallel build statuses. */
|
||||
int worker_idx; /* which worker conn is handling */
|
||||
} repack_index;
|
||||
|
||||
static bool is_superuser(void);
|
||||
@ -151,6 +161,7 @@ static void repack_all_databases(const char *order_by);
|
||||
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
|
||||
static void repack_one_table(const repack_table *table, const char *order_by);
|
||||
static void repack_cleanup(bool fatal, const repack_table *table);
|
||||
static bool rebuild_indexes(const repack_table *table);
|
||||
|
||||
static char *getstr(PGresult *res, int row, int col);
|
||||
static Oid getoid(PGresult *res, int row, int col);
|
||||
@ -172,6 +183,7 @@ static bool noorder = false;
|
||||
static SimpleStringList table_list = {NULL, NULL};
|
||||
static char *orderby = NULL;
|
||||
static int wait_timeout = 60; /* in seconds */
|
||||
static int jobs = 0; /* number of concurrent worker conns. */
|
||||
|
||||
/* buffer should have at least 11 bytes */
|
||||
static char *
|
||||
@ -189,6 +201,7 @@ static pgut_option options[] =
|
||||
{ 's', 'o', "order-by", &orderby },
|
||||
{ 'i', 'T', "wait-timeout", &wait_timeout },
|
||||
{ 'B', 'Z', "no-analyze", &analyze },
|
||||
{ 'i', 'j', "jobs", &jobs },
|
||||
{ 0 },
|
||||
};
|
||||
|
||||
@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col)
|
||||
}
|
||||
|
||||
/*
|
||||
* Call repack_one_table for the target table or each table in a database.
|
||||
* Call repack_one_table for the target tables or each table in a database.
|
||||
*/
|
||||
static bool
|
||||
repack_one_database(const char *orderby, char *errbuf, size_t errsize)
|
||||
@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
|
||||
|
||||
reconnect(ERROR);
|
||||
|
||||
/* No sense in setting up concurrent workers if --jobs=1 */
|
||||
if (jobs > 1)
|
||||
setup_workers(jobs);
|
||||
|
||||
if (!is_superuser()) {
|
||||
if (errbuf)
|
||||
snprintf(errbuf, errsize, "You must be a superuser to use %s",
|
||||
@ -403,6 +420,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
|
||||
}
|
||||
goto cleanup;
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
/* Disable statement timeout. */
|
||||
command("SET statement_timeout = 0", 0, NULL);
|
||||
@ -558,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count)
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create indexes on temp table, possibly using multiple worker connections
|
||||
* concurrently if the user asked for --jobs=...
|
||||
*/
|
||||
static bool
|
||||
rebuild_indexes(const repack_table *table)
|
||||
{
|
||||
PGresult *res;
|
||||
const char *params[1];
|
||||
int num_indexes;
|
||||
int i;
|
||||
int num_active_workers = 0;
|
||||
repack_index *index_jobs;
|
||||
char buffer[12];
|
||||
bool have_error = false;
|
||||
|
||||
elog(DEBUG2, "---- create indexes ----");
|
||||
|
||||
params[0] = utoa(table->target_oid, buffer);
|
||||
res = execute("SELECT indexrelid,"
|
||||
" repack.repack_indexdef(indexrelid, indrelid), "
|
||||
" pg_get_indexdef(indexrelid)"
|
||||
" FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params);
|
||||
|
||||
num_indexes = PQntuples(res);
|
||||
elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
|
||||
workers.num_workers);
|
||||
|
||||
index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes);
|
||||
|
||||
for (i = 0; i < num_indexes; i++)
|
||||
{
|
||||
int c = 0;
|
||||
const char *indexdef;
|
||||
|
||||
index_jobs[i].target_oid = getoid(res, i, c++);
|
||||
index_jobs[i].create_index = getstr(res, i, c++);
|
||||
index_jobs[i].status = UNPROCESSED;
|
||||
index_jobs[i].worker_idx = -1; /* Unassigned */
|
||||
|
||||
indexdef = getstr(res, i, c++);
|
||||
|
||||
elog(DEBUG2, "set up index_jobs [%d]", i);
|
||||
elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid);
|
||||
elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
|
||||
|
||||
if (workers.num_workers <= 1) {
|
||||
/* Use primary connection if we are not setting up parallel
|
||||
* index building, or if we only have one worker.
|
||||
*/
|
||||
command(index_jobs[i].create_index, 0, NULL);
|
||||
|
||||
/* This bookkeeping isn't actually important in this no-workers
|
||||
* case, but just for clarity.
|
||||
*/
|
||||
index_jobs[i].status = FINISHED;
|
||||
}
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
if (workers.num_workers > 1)
|
||||
{
|
||||
/* First time through, assign every available worker to build an index.
|
||||
*/
|
||||
for (i = 0; i < num_indexes && i < workers.num_workers; i++)
|
||||
{
|
||||
index_jobs[i].status = INPROGRESS;
|
||||
index_jobs[i].worker_idx = i;
|
||||
elog(DEBUG2, "Worker %d building index: %s", i,
|
||||
index_jobs[i].create_index);
|
||||
|
||||
/* Make sure each worker connection can work in non-blocking
|
||||
* mode.
|
||||
*/
|
||||
if (PQsetnonblocking(workers.conns[i], 1))
|
||||
{
|
||||
elog(WARNING, "Unable to set worker connection %d "
|
||||
"non-blocking.", i);
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
|
||||
{
|
||||
elog(WARNING, "Error sending async query: %s\n%s",
|
||||
index_jobs[i].create_index,
|
||||
PQerrorMessage(workers.conns[i]));
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
}
|
||||
num_active_workers = i;
|
||||
|
||||
/* Now go through our index builds, and look for any which is
|
||||
* reported complete. Reassign that worker to the next index to
|
||||
* be built, if any.
|
||||
*/
|
||||
while (num_active_workers)
|
||||
{
|
||||
int freed_worker = -1;
|
||||
|
||||
for (i = 0; i < num_indexes; i++)
|
||||
{
|
||||
if (index_jobs[i].status == INPROGRESS)
|
||||
{
|
||||
/* Must call PQconsumeInput before we can check PQisBusy */
|
||||
if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
|
||||
{
|
||||
elog(WARNING, "Error fetching async query status: %s",
|
||||
PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
|
||||
{
|
||||
elog(NOTICE, "Command finished in worker %d: %s",
|
||||
index_jobs[i].worker_idx,
|
||||
index_jobs[i].create_index);
|
||||
|
||||
while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx])))
|
||||
{
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
{
|
||||
elog(WARNING, "Error with create index: %s",
|
||||
PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
|
||||
PQclear(res);
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
freed_worker = index_jobs[i].worker_idx;
|
||||
index_jobs[i].status = FINISHED;
|
||||
num_active_workers--;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (freed_worker > -1)
|
||||
{
|
||||
for (i = 0; i < num_indexes; i++)
|
||||
{
|
||||
if (index_jobs[i].status == UNPROCESSED)
|
||||
{
|
||||
index_jobs[i].status = INPROGRESS;
|
||||
index_jobs[i].worker_idx = freed_worker;
|
||||
elog(NOTICE, "Assigning worker %d execute job %d: %s",
|
||||
freed_worker, i, index_jobs[i].create_index);
|
||||
|
||||
if (!(PQsendQuery(workers.conns[freed_worker],
|
||||
index_jobs[i].create_index))) {
|
||||
elog(WARNING, "Error sending async query: %s\n%s",
|
||||
index_jobs[i].create_index,
|
||||
PQerrorMessage(workers.conns[freed_worker]));
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
num_active_workers++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
freed_worker = -1;
|
||||
}
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cleanup:
|
||||
return (!have_error);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Re-organize one table.
|
||||
*/
|
||||
@ -567,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
PGresult *res;
|
||||
const char *params[2];
|
||||
int num;
|
||||
int i;
|
||||
int num_waiting = 0;
|
||||
|
||||
char *vxid = NULL;
|
||||
char buffer[12];
|
||||
StringInfoData sql;
|
||||
@ -665,7 +858,14 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
|
||||
table->target_name);
|
||||
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
|
||||
if (!(PQsendQuery(conn2, sql.data))) {
|
||||
if (PQsetnonblocking(conn2, 1))
|
||||
{
|
||||
elog(WARNING, "Unable to set conn2 nonblocking.");
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
if (!(PQsendQuery(conn2, sql.data)))
|
||||
{
|
||||
elog(WARNING, "Error sending async query: %s\n%s", sql.data,
|
||||
PQerrorMessage(conn2));
|
||||
have_error = true;
|
||||
@ -710,6 +910,14 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/* Turn conn2 back into blocking mode for further non-async use. */
|
||||
if (PQsetnonblocking(conn2, 0))
|
||||
{
|
||||
elog(WARNING, "Unable to set conn2 blocking.");
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* 2. Copy tuples into temp table.
|
||||
*/
|
||||
@ -775,44 +983,11 @@ repack_one_table(const repack_table *table, const char *orderby)
|
||||
/*
|
||||
* 3. Create indexes on temp table.
|
||||
*/
|
||||
elog(DEBUG2, "---- create indexes ----");
|
||||
|
||||
params[0] = utoa(table->target_oid, buffer);
|
||||
res = execute("SELECT indexrelid,"
|
||||
" repack.repack_indexdef(indexrelid, indrelid),"
|
||||
" indisvalid,"
|
||||
" pg_get_indexdef(indexrelid)"
|
||||
" FROM pg_index WHERE indrelid = $1", 1, params);
|
||||
|
||||
num = PQntuples(res);
|
||||
for (i = 0; i < num; i++)
|
||||
{
|
||||
repack_index index;
|
||||
int c = 0;
|
||||
const char *isvalid;
|
||||
const char *indexdef;
|
||||
|
||||
index.target_oid = getoid(res, i, c++);
|
||||
index.create_index = getstr(res, i, c++);
|
||||
isvalid = getstr(res, i, c++);
|
||||
indexdef = getstr(res, i, c++);
|
||||
|
||||
if (isvalid && isvalid[0] == 'f') {
|
||||
elog(WARNING, "skipping invalid index: %s", indexdef);
|
||||
continue;
|
||||
}
|
||||
|
||||
elog(DEBUG2, "[%d]", i);
|
||||
elog(DEBUG2, "target_oid : %u", index.target_oid);
|
||||
elog(DEBUG2, "create_index : %s", index.create_index);
|
||||
|
||||
/*
|
||||
* NOTE: If we want to create multiple indexes in parallel,
|
||||
* we need to call create_index in multiple connections.
|
||||
*/
|
||||
command(index.create_index, 0, NULL);
|
||||
if (!rebuild_indexes(table)) {
|
||||
have_error = true;
|
||||
goto cleanup;
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
|
||||
/*
|
||||
* 4. Apply log to temp table until no tuples are left in the log
|
||||
@ -1172,6 +1347,7 @@ pgut_help(bool details)
|
||||
|
||||
printf("Options:\n");
|
||||
printf(" -a, --all repack all databases\n");
|
||||
printf(" -j --jobs Use this many parallel jobs");
|
||||
printf(" -n, --no-order do vacuum full instead of cluster\n");
|
||||
printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n");
|
||||
printf(" -t, --table=TABLE repack specific table only\n");
|
||||
|
@ -26,9 +26,113 @@ YesNo prompt_password = DEFAULT;
|
||||
PGconn *connection = NULL;
|
||||
PGconn *conn2 = NULL;
|
||||
|
||||
worker_conns workers = {
|
||||
.num_workers = 0,
|
||||
.conns = NULL
|
||||
};
|
||||
|
||||
|
||||
static bool parse_pair(const char buffer[], char key[], char value[]);
|
||||
static char *get_username(void);
|
||||
|
||||
|
||||
/*
|
||||
* Set up worker conns which will be used for concurrent index rebuilds.
|
||||
* 'num_workers' is the desired number of worker connections, i.e. from
|
||||
* --jobs flag. Due to max_connections we might not actually be able to
|
||||
* set up that many workers, but don't treat that as a fatal error.
|
||||
*/
|
||||
void
|
||||
setup_workers(int num_workers)
|
||||
{
|
||||
StringInfoData buf;
|
||||
int i;
|
||||
PGconn *conn;
|
||||
|
||||
elog(DEBUG2, "In setup_workers(), target num_workers = %d", num_workers);
|
||||
|
||||
if (num_workers > 1 && num_workers > workers.num_workers)
|
||||
{
|
||||
initStringInfo(&buf);
|
||||
if (dbname && dbname[0])
|
||||
appendStringInfo(&buf, "dbname=%s ", dbname);
|
||||
if (host && host[0])
|
||||
appendStringInfo(&buf, "host=%s ", host);
|
||||
if (port && port[0])
|
||||
appendStringInfo(&buf, "port=%s ", port);
|
||||
if (username && username[0])
|
||||
appendStringInfo(&buf, "user=%s ", username);
|
||||
if (password && password[0])
|
||||
appendStringInfo(&buf, "password=%s ", password);
|
||||
|
||||
if (workers.conns == NULL)
|
||||
{
|
||||
elog(NOTICE, "Setting up workers.conns");
|
||||
workers.conns = (PGconn **) pgut_malloc(sizeof(PGconn *) * num_workers);
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(ERROR, "TODO: Implement pool resizing.");
|
||||
}
|
||||
|
||||
for (i = 0; i < num_workers; i++)
|
||||
{
|
||||
/* Don't prompt for password again; we should have gotten
|
||||
* it already from reconnect().
|
||||
*/
|
||||
elog(DEBUG2, "Setting up worker conn %d", i);
|
||||
|
||||
/* Don't confuse pgut_connections by using pgut_connect() */
|
||||
conn = PQconnectdb(buf.data);
|
||||
if (PQstatus(conn) == CONNECTION_OK)
|
||||
{
|
||||
workers.conns[i] = conn;
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(WARNING, "Unable to set up worker conn #%d: %s", i,
|
||||
PQerrorMessage(conn));
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* In case we bailed out of setting up all workers, record
|
||||
* how many successful worker conns we actually have.
|
||||
*/
|
||||
workers.num_workers = i;
|
||||
|
||||
termStringInfo(&buf);
|
||||
}
|
||||
}
|
||||
|
||||
/* Disconnect all our worker conns. */
|
||||
void disconnect_workers(void)
|
||||
{
|
||||
int i;
|
||||
|
||||
if (!(workers.num_workers))
|
||||
elog(DEBUG2, "No workers to disconnect.");
|
||||
else
|
||||
{
|
||||
for (i = 0; i < workers.num_workers; i++)
|
||||
{
|
||||
if (workers.conns[i])
|
||||
{
|
||||
elog(DEBUG2, "Disconnecting worker %d.", i);
|
||||
PQfinish(workers.conns[i]);
|
||||
workers.conns[i] = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(NOTICE, "Worker %d already disconnected?", i);
|
||||
}
|
||||
}
|
||||
workers.num_workers = 0;
|
||||
free(workers.conns);
|
||||
workers.conns = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* the result is also available with the global variable 'connection'.
|
||||
*/
|
||||
@ -82,6 +186,7 @@ disconnect(void)
|
||||
pgut_disconnect(conn2);
|
||||
conn2 = NULL;
|
||||
}
|
||||
disconnect_workers();
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -48,6 +48,14 @@ typedef struct pgut_option
|
||||
|
||||
typedef void (*pgut_optfn) (pgut_option *opt, const char *arg);
|
||||
|
||||
typedef struct worker_conns
|
||||
{
|
||||
int max_num_workers;
|
||||
int num_workers;
|
||||
PGconn **conns;
|
||||
} worker_conns;
|
||||
|
||||
|
||||
|
||||
extern char *dbname;
|
||||
extern char *host;
|
||||
@ -58,12 +66,15 @@ extern YesNo prompt_password;
|
||||
|
||||
extern PGconn *connection;
|
||||
extern PGconn *conn2;
|
||||
extern worker_conns workers;
|
||||
|
||||
extern void pgut_help(bool details);
|
||||
extern void help(bool details);
|
||||
|
||||
extern void disconnect(void);
|
||||
extern void reconnect(int elevel);
|
||||
extern void setup_workers(int num_workers);
|
||||
extern void disconnect_workers(void);
|
||||
extern PGresult *execute(const char *query, int nParams, const char **params);
|
||||
extern PGresult *execute_elevel(const char *query, int nParams, const char **params, int elevel);
|
||||
extern ExecStatusType command(const char *query, int nParams, const char **params);
|
||||
|
Loading…
x
Reference in New Issue
Block a user