First pass at implementing concurrent index builds using multiple connections.

Adds a new --jobs command-line argument to specify how many worker
connections you want. These worker connections should stick around
while processing table(s) in a single database. For each table,
parcel out the indexes to be built among these worker conns,
submitting each CREATE INDEX ... request using PQsendQuery() i.e.
in non-blocking fashion.

Most of this is still rather crude, in particular the
while (num_active_workers) ... loop in rebuild_indexes(), but
it seems to be working, so I'm committing here.
This commit is contained in:
Josh Kupershmidt 2012-12-10 21:08:01 -07:00
parent b4d8a90437
commit 509e568c52
3 changed files with 331 additions and 40 deletions

View File

@ -137,6 +137,14 @@ typedef struct repack_table
const char *sql_pop; /* SQL used in flush */
} repack_table;
typedef enum
{
UNPROCESSED,
INPROGRESS,
FINISHED
} index_status_t;
/*
* per-index information
*/
@ -144,6 +152,8 @@ typedef struct repack_index
{
Oid target_oid; /* target: OID */
const char *create_index; /* CREATE INDEX */
index_status_t status; /* Track parallel build statuses. */
int worker_idx; /* which worker conn is handling */
} repack_index;
static bool is_superuser(void);
@ -151,6 +161,7 @@ static void repack_all_databases(const char *order_by);
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
static void repack_one_table(const repack_table *table, const char *order_by);
static void repack_cleanup(bool fatal, const repack_table *table);
static bool rebuild_indexes(const repack_table *table);
static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col);
@ -172,6 +183,7 @@ static bool noorder = false;
static SimpleStringList table_list = {NULL, NULL};
static char *orderby = NULL;
static int wait_timeout = 60; /* in seconds */
static int jobs = 0; /* number of concurrent worker conns. */
/* buffer should have at least 11 bytes */
static char *
@ -189,6 +201,7 @@ static pgut_option options[] =
{ 's', 'o', "order-by", &orderby },
{ 'i', 'T', "wait-timeout", &wait_timeout },
{ 'B', 'Z', "no-analyze", &analyze },
{ 'i', 'j', "jobs", &jobs },
{ 0 },
};
@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col)
}
/*
* Call repack_one_table for the target table or each table in a database.
* Call repack_one_table for the target tables or each table in a database.
*/
static bool
repack_one_database(const char *orderby, char *errbuf, size_t errsize)
@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
reconnect(ERROR);
/* No sense in setting up concurrent workers if --jobs=1 */
if (jobs > 1)
setup_workers(jobs);
if (!is_superuser()) {
if (errbuf)
snprintf(errbuf, errsize, "You must be a superuser to use %s",
@ -559,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count)
return result;
}
/*
* Create indexes on temp table, possibly using multiple worker connections
* concurrently if the user asked for --jobs=...
*/
static bool
rebuild_indexes(const repack_table *table)
{
PGresult *res;
const char *params[1];
int num_indexes;
int i;
int num_active_workers = 0;
repack_index *index_jobs;
char buffer[12];
bool have_error = false;
elog(DEBUG2, "---- create indexes ----");
params[0] = utoa(table->target_oid, buffer);
res = execute("SELECT indexrelid,"
" repack.repack_indexdef(indexrelid, indrelid), "
" pg_get_indexdef(indexrelid)"
" FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params);
num_indexes = PQntuples(res);
elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
workers.num_workers);
index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes);
for (i = 0; i < num_indexes; i++)
{
int c = 0;
const char *indexdef;
index_jobs[i].target_oid = getoid(res, i, c++);
index_jobs[i].create_index = getstr(res, i, c++);
index_jobs[i].status = UNPROCESSED;
index_jobs[i].worker_idx = -1; /* Unassigned */
indexdef = getstr(res, i, c++);
elog(DEBUG2, "set up index_jobs [%d]", i);
elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid);
elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
if (workers.num_workers <= 1) {
/* Use primary connection if we are not setting up parallel
* index building, or if we only have one worker.
*/
command(index_jobs[i].create_index, 0, NULL);
/* This bookkeeping isn't actually important in this no-workers
* case, but just for clarity.
*/
index_jobs[i].status = FINISHED;
}
}
PQclear(res);
if (workers.num_workers > 1)
{
/* First time through, assign every available worker to build an index.
*/
for (i = 0; i < num_indexes && i < workers.num_workers; i++)
{
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = i;
elog(DEBUG2, "Worker %d building index: %s", i,
index_jobs[i].create_index);
/* Make sure each worker connection can work in non-blocking
* mode.
*/
if (PQsetnonblocking(workers.conns[i], 1))
{
elog(WARNING, "Unable to set worker connection %d "
"non-blocking.", i);
have_error = true;
goto cleanup;
}
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
{
elog(WARNING, "Error sending async query: %s\n%s",
index_jobs[i].create_index,
PQerrorMessage(workers.conns[i]));
have_error = true;
goto cleanup;
}
}
num_active_workers = i;
/* Now go through our index builds, and look for any which is
* reported complete. Reassign that worker to the next index to
* be built, if any.
*/
while (num_active_workers)
{
int freed_worker = -1;
for (i = 0; i < num_indexes; i++)
{
if (index_jobs[i].status == INPROGRESS)
{
/* Must call PQconsumeInput before we can check PQisBusy */
if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
{
elog(WARNING, "Error fetching async query status: %s",
PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
have_error = true;
goto cleanup;
}
if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
{
elog(NOTICE, "Command finished in worker %d: %s",
index_jobs[i].worker_idx,
index_jobs[i].create_index);
while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx])))
{
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(WARNING, "Error with create index: %s",
PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
PQclear(res);
have_error = true;
goto cleanup;
}
PQclear(res);
}
freed_worker = index_jobs[i].worker_idx;
index_jobs[i].status = FINISHED;
num_active_workers--;
break;
}
}
}
if (freed_worker > -1)
{
for (i = 0; i < num_indexes; i++)
{
if (index_jobs[i].status == UNPROCESSED)
{
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = freed_worker;
elog(NOTICE, "Assigning worker %d execute job %d: %s",
freed_worker, i, index_jobs[i].create_index);
if (!(PQsendQuery(workers.conns[freed_worker],
index_jobs[i].create_index))) {
elog(WARNING, "Error sending async query: %s\n%s",
index_jobs[i].create_index,
PQerrorMessage(workers.conns[freed_worker]));
have_error = true;
goto cleanup;
}
num_active_workers++;
break;
}
}
freed_worker = -1;
}
sleep(1);
}
}
cleanup:
return (!have_error);
}
/*
* Re-organize one table.
*/
@ -568,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby)
PGresult *res;
const char *params[2];
int num;
int i;
int num_waiting = 0;
char *vxid = NULL;
char buffer[12];
StringInfoData sql;
@ -674,7 +866,14 @@ repack_one_table(const repack_table *table, const char *orderby)
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
table->target_name);
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
if (!(PQsendQuery(conn2, sql.data))) {
if (PQsetnonblocking(conn2, 1))
{
elog(WARNING, "Unable to set conn2 nonblocking.");
have_error = true;
goto cleanup;
}
if (!(PQsendQuery(conn2, sql.data)))
{
elog(WARNING, "Error sending async query: %s\n%s", sql.data,
PQerrorMessage(conn2));
have_error = true;
@ -725,6 +924,14 @@ repack_one_table(const repack_table *table, const char *orderby)
PQclear(res);
}
/* Turn conn2 back into blocking mode for further non-async use. */
if (PQsetnonblocking(conn2, 0))
{
elog(WARNING, "Unable to set conn2 blocking.");
have_error = true;
goto cleanup;
}
/*
* 2. Copy tuples into temp table.
*/
@ -790,44 +997,11 @@ repack_one_table(const repack_table *table, const char *orderby)
/*
* 3. Create indexes on temp table.
*/
elog(DEBUG2, "---- create indexes ----");
params[0] = utoa(table->target_oid, buffer);
res = execute("SELECT indexrelid,"
" repack.repack_indexdef(indexrelid, indrelid),"
" indisvalid,"
" pg_get_indexdef(indexrelid)"
" FROM pg_index WHERE indrelid = $1", 1, params);
num = PQntuples(res);
for (i = 0; i < num; i++)
{
repack_index index;
int c = 0;
const char *isvalid;
const char *indexdef;
index.target_oid = getoid(res, i, c++);
index.create_index = getstr(res, i, c++);
isvalid = getstr(res, i, c++);
indexdef = getstr(res, i, c++);
if (isvalid && isvalid[0] == 'f') {
elog(WARNING, "skipping invalid index: %s", indexdef);
continue;
}
elog(DEBUG2, "[%d]", i);
elog(DEBUG2, "target_oid : %u", index.target_oid);
elog(DEBUG2, "create_index : %s", index.create_index);
/*
* NOTE: If we want to create multiple indexes in parallel,
* we need to call create_index in multiple connections.
*/
command(index.create_index, 0, NULL);
if (!rebuild_indexes(table)) {
have_error = true;
goto cleanup;
}
PQclear(res);
/*
* 4. Apply log to temp table until no tuples are left in the log
@ -1187,6 +1361,7 @@ pgut_help(bool details)
printf("Options:\n");
printf(" -a, --all repack all databases\n");
printf(" -j --jobs Use this many parallel jobs");
printf(" -n, --no-order do vacuum full instead of cluster\n");
printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n");
printf(" -t, --table=TABLE repack specific table only\n");

View File

@ -26,9 +26,113 @@ YesNo prompt_password = DEFAULT;
PGconn *connection = NULL;
PGconn *conn2 = NULL;
worker_conns workers = {
.num_workers = 0,
.conns = NULL
};
static bool parse_pair(const char buffer[], char key[], char value[]);
static char *get_username(void);
/*
* Set up worker conns which will be used for concurrent index rebuilds.
* 'num_workers' is the desired number of worker connections, i.e. from
* --jobs flag. Due to max_connections we might not actually be able to
* set up that many workers, but don't treat that as a fatal error.
*/
void
setup_workers(int num_workers)
{
StringInfoData buf;
int i;
PGconn *conn;
elog(DEBUG2, "In setup_workers(), target num_workers = %d", num_workers);
if (num_workers > 1 && num_workers > workers.num_workers)
{
initStringInfo(&buf);
if (dbname && dbname[0])
appendStringInfo(&buf, "dbname=%s ", dbname);
if (host && host[0])
appendStringInfo(&buf, "host=%s ", host);
if (port && port[0])
appendStringInfo(&buf, "port=%s ", port);
if (username && username[0])
appendStringInfo(&buf, "user=%s ", username);
if (password && password[0])
appendStringInfo(&buf, "password=%s ", password);
if (workers.conns == NULL)
{
elog(NOTICE, "Setting up workers.conns");
workers.conns = (PGconn **) pgut_malloc(sizeof(PGconn *) * num_workers);
}
else
{
elog(ERROR, "TODO: Implement pool resizing.");
}
for (i = 0; i < num_workers; i++)
{
/* Don't prompt for password again; we should have gotten
* it already from reconnect().
*/
elog(DEBUG2, "Setting up worker conn %d", i);
/* Don't confuse pgut_connections by using pgut_connect() */
conn = PQconnectdb(buf.data);
if (PQstatus(conn) == CONNECTION_OK)
{
workers.conns[i] = conn;
}
else
{
elog(WARNING, "Unable to set up worker conn #%d: %s", i,
PQerrorMessage(conn));
break;
}
}
/* In case we bailed out of setting up all workers, record
* how many successful worker conns we actually have.
*/
workers.num_workers = i;
termStringInfo(&buf);
}
}
/* Disconnect all our worker conns. */
void disconnect_workers(void)
{
int i;
if (!(workers.num_workers))
elog(DEBUG2, "No workers to disconnect.");
else
{
for (i = 0; i < workers.num_workers; i++)
{
if (workers.conns[i])
{
elog(DEBUG2, "Disconnecting worker %d.", i);
PQfinish(workers.conns[i]);
workers.conns[i] = NULL;
}
else
{
elog(NOTICE, "Worker %d already disconnected?", i);
}
}
workers.num_workers = 0;
free(workers.conns);
workers.conns = NULL;
}
}
/*
* the result is also available with the global variable 'connection'.
*/
@ -82,6 +186,7 @@ disconnect(void)
pgut_disconnect(conn2);
conn2 = NULL;
}
disconnect_workers();
}
static void

View File

@ -48,6 +48,14 @@ typedef struct pgut_option
typedef void (*pgut_optfn) (pgut_option *opt, const char *arg);
typedef struct worker_conns
{
int max_num_workers;
int num_workers;
PGconn **conns;
} worker_conns;
extern char *dbname;
extern char *host;
@ -58,12 +66,15 @@ extern YesNo prompt_password;
extern PGconn *connection;
extern PGconn *conn2;
extern worker_conns workers;
extern void pgut_help(bool details);
extern void help(bool details);
extern void disconnect(void);
extern void reconnect(int elevel);
extern void setup_workers(int num_workers);
extern void disconnect_workers(void);
extern PGresult *execute(const char *query, int nParams, const char **params);
extern PGresult *execute_elevel(const char *query, int nParams, const char **params, int elevel);
extern ExecStatusType command(const char *query, int nParams, const char **params);