From 0d984ed3e54bed36f3fe47ebfd3ef177c48b9dfa Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Mon, 10 Dec 2012 21:08:01 -0700 Subject: [PATCH 1/6] First pass at implementing concurrent index builds using multiple connections. Adds a new --jobs command-line argument to specify how many worker connections you want. These worker connections should stick around while processing table(s) in a single database. For each table, parcel out the indexes to be built among these worker conns, submitting each CREATE INDEX ... request using PQsendQuery() i.e. in non-blocking fashion. Most of this is still rather crude, in particular the while (num_active_workers) ... loop in rebuild_indexes(), but it seems to be working, so I'm committing here. --- bin/pg_repack.c | 256 ++++++++++++++++++++++++++++++++++++++------- bin/pgut/pgut-fe.c | 105 +++++++++++++++++++ bin/pgut/pgut-fe.h | 11 ++ 3 files changed, 332 insertions(+), 40 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index ec2d82d..fb204b0 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -137,6 +137,14 @@ typedef struct repack_table const char *sql_pop; /* SQL used in flush */ } repack_table; + +typedef enum +{ + UNPROCESSED, + INPROGRESS, + FINISHED +} index_status_t; + /* * per-index information */ @@ -144,6 +152,8 @@ typedef struct repack_index { Oid target_oid; /* target: OID */ const char *create_index; /* CREATE INDEX */ + index_status_t status; /* Track parallel build statuses. */ + int worker_idx; /* which worker conn is handling */ } repack_index; static bool is_superuser(void); @@ -151,6 +161,7 @@ static void repack_all_databases(const char *order_by); static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize); static void repack_one_table(const repack_table *table, const char *order_by); static void repack_cleanup(bool fatal, const repack_table *table); +static bool rebuild_indexes(const repack_table *table); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); @@ -172,6 +183,7 @@ static bool noorder = false; static SimpleStringList table_list = {NULL, NULL}; static char *orderby = NULL; static int wait_timeout = 60; /* in seconds */ +static int jobs = 0; /* number of concurrent worker conns. */ /* buffer should have at least 11 bytes */ static char * @@ -189,6 +201,7 @@ static pgut_option options[] = { 's', 'o', "order-by", &orderby }, { 'i', 'T', "wait-timeout", &wait_timeout }, { 'B', 'Z', "no-analyze", &analyze }, + { 'i', 'j', "jobs", &jobs }, { 0 }, }; @@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col) } /* - * Call repack_one_table for the target table or each table in a database. + * Call repack_one_table for the target tables or each table in a database. */ static bool repack_one_database(const char *orderby, char *errbuf, size_t errsize) @@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize) reconnect(ERROR); + /* No sense in setting up concurrent workers if --jobs=1 */ + if (jobs > 1) + setup_workers(jobs); + if (!is_superuser()) { if (errbuf) snprintf(errbuf, errsize, "You must be a superuser to use %s", @@ -403,6 +420,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize) } goto cleanup; } + PQclear(res); /* Disable statement timeout. */ command("SET statement_timeout = 0", 0, NULL); @@ -558,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count) return result; } +/* + * Create indexes on temp table, possibly using multiple worker connections + * concurrently if the user asked for --jobs=... + */ +static bool +rebuild_indexes(const repack_table *table) +{ + PGresult *res; + const char *params[1]; + int num_indexes; + int i; + int num_active_workers = 0; + repack_index *index_jobs; + char buffer[12]; + bool have_error = false; + + elog(DEBUG2, "---- create indexes ----"); + + params[0] = utoa(table->target_oid, buffer); + res = execute("SELECT indexrelid," + " repack.repack_indexdef(indexrelid, indrelid), " + " pg_get_indexdef(indexrelid)" + " FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params); + + num_indexes = PQntuples(res); + elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes, + workers.num_workers); + + index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes); + + for (i = 0; i < num_indexes; i++) + { + int c = 0; + const char *indexdef; + + index_jobs[i].target_oid = getoid(res, i, c++); + index_jobs[i].create_index = getstr(res, i, c++); + index_jobs[i].status = UNPROCESSED; + index_jobs[i].worker_idx = -1; /* Unassigned */ + + indexdef = getstr(res, i, c++); + + elog(DEBUG2, "set up index_jobs [%d]", i); + elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid); + elog(DEBUG2, "create_index : %s", index_jobs[i].create_index); + + if (workers.num_workers <= 1) { + /* Use primary connection if we are not setting up parallel + * index building, or if we only have one worker. + */ + command(index_jobs[i].create_index, 0, NULL); + + /* This bookkeeping isn't actually important in this no-workers + * case, but just for clarity. + */ + index_jobs[i].status = FINISHED; + } + } + PQclear(res); + + if (workers.num_workers > 1) + { + /* First time through, assign every available worker to build an index. + */ + for (i = 0; i < num_indexes && i < workers.num_workers; i++) + { + index_jobs[i].status = INPROGRESS; + index_jobs[i].worker_idx = i; + elog(DEBUG2, "Worker %d building index: %s", i, + index_jobs[i].create_index); + + /* Make sure each worker connection can work in non-blocking + * mode. + */ + if (PQsetnonblocking(workers.conns[i], 1)) + { + elog(WARNING, "Unable to set worker connection %d " + "non-blocking.", i); + have_error = true; + goto cleanup; + } + + if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) + { + elog(WARNING, "Error sending async query: %s\n%s", + index_jobs[i].create_index, + PQerrorMessage(workers.conns[i])); + have_error = true; + goto cleanup; + } + + } + num_active_workers = i; + + /* Now go through our index builds, and look for any which is + * reported complete. Reassign that worker to the next index to + * be built, if any. + */ + while (num_active_workers) + { + int freed_worker = -1; + + for (i = 0; i < num_indexes; i++) + { + if (index_jobs[i].status == INPROGRESS) + { + /* Must call PQconsumeInput before we can check PQisBusy */ + if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1) + { + elog(WARNING, "Error fetching async query status: %s", + PQerrorMessage(workers.conns[index_jobs[i].worker_idx])); + have_error = true; + goto cleanup; + } + if (!PQisBusy(workers.conns[index_jobs[i].worker_idx])) + { + elog(NOTICE, "Command finished in worker %d: %s", + index_jobs[i].worker_idx, + index_jobs[i].create_index); + + while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx]))) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + elog(WARNING, "Error with create index: %s", + PQerrorMessage(workers.conns[index_jobs[i].worker_idx])); + PQclear(res); + have_error = true; + goto cleanup; + } + PQclear(res); + } + + freed_worker = index_jobs[i].worker_idx; + index_jobs[i].status = FINISHED; + num_active_workers--; + break; + } + } + } + if (freed_worker > -1) + { + for (i = 0; i < num_indexes; i++) + { + if (index_jobs[i].status == UNPROCESSED) + { + index_jobs[i].status = INPROGRESS; + index_jobs[i].worker_idx = freed_worker; + elog(NOTICE, "Assigning worker %d execute job %d: %s", + freed_worker, i, index_jobs[i].create_index); + + if (!(PQsendQuery(workers.conns[freed_worker], + index_jobs[i].create_index))) { + elog(WARNING, "Error sending async query: %s\n%s", + index_jobs[i].create_index, + PQerrorMessage(workers.conns[freed_worker])); + have_error = true; + goto cleanup; + } + num_active_workers++; + break; + } + } + freed_worker = -1; + } + sleep(1); + } + + } + +cleanup: + return (!have_error); +} + + /* * Re-organize one table. */ @@ -567,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby) PGresult *res; const char *params[2]; int num; - int i; int num_waiting = 0; + char *vxid = NULL; char buffer[12]; StringInfoData sql; @@ -665,7 +858,14 @@ repack_one_table(const repack_table *table, const char *orderby) printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); - if (!(PQsendQuery(conn2, sql.data))) { + if (PQsetnonblocking(conn2, 1)) + { + elog(WARNING, "Unable to set conn2 nonblocking."); + have_error = true; + goto cleanup; + } + if (!(PQsendQuery(conn2, sql.data))) + { elog(WARNING, "Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); have_error = true; @@ -710,6 +910,14 @@ repack_one_table(const repack_table *table, const char *orderby) PQclear(res); } + /* Turn conn2 back into blocking mode for further non-async use. */ + if (PQsetnonblocking(conn2, 0)) + { + elog(WARNING, "Unable to set conn2 blocking."); + have_error = true; + goto cleanup; + } + /* * 2. Copy tuples into temp table. */ @@ -775,44 +983,11 @@ repack_one_table(const repack_table *table, const char *orderby) /* * 3. Create indexes on temp table. */ - elog(DEBUG2, "---- create indexes ----"); - - params[0] = utoa(table->target_oid, buffer); - res = execute("SELECT indexrelid," - " repack.repack_indexdef(indexrelid, indrelid)," - " indisvalid," - " pg_get_indexdef(indexrelid)" - " FROM pg_index WHERE indrelid = $1", 1, params); - - num = PQntuples(res); - for (i = 0; i < num; i++) - { - repack_index index; - int c = 0; - const char *isvalid; - const char *indexdef; - - index.target_oid = getoid(res, i, c++); - index.create_index = getstr(res, i, c++); - isvalid = getstr(res, i, c++); - indexdef = getstr(res, i, c++); - - if (isvalid && isvalid[0] == 'f') { - elog(WARNING, "skipping invalid index: %s", indexdef); - continue; - } - - elog(DEBUG2, "[%d]", i); - elog(DEBUG2, "target_oid : %u", index.target_oid); - elog(DEBUG2, "create_index : %s", index.create_index); - - /* - * NOTE: If we want to create multiple indexes in parallel, - * we need to call create_index in multiple connections. - */ - command(index.create_index, 0, NULL); + if (!rebuild_indexes(table)) { + have_error = true; + goto cleanup; } - PQclear(res); + /* * 4. Apply log to temp table until no tuples are left in the log @@ -1172,6 +1347,7 @@ pgut_help(bool details) printf("Options:\n"); printf(" -a, --all repack all databases\n"); + printf(" -j --jobs Use this many parallel jobs"); printf(" -n, --no-order do vacuum full instead of cluster\n"); printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n"); printf(" -t, --table=TABLE repack specific table only\n"); diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index c7f888c..49d66f9 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -26,9 +26,113 @@ YesNo prompt_password = DEFAULT; PGconn *connection = NULL; PGconn *conn2 = NULL; +worker_conns workers = { + .num_workers = 0, + .conns = NULL +}; + + static bool parse_pair(const char buffer[], char key[], char value[]); static char *get_username(void); + +/* + * Set up worker conns which will be used for concurrent index rebuilds. + * 'num_workers' is the desired number of worker connections, i.e. from + * --jobs flag. Due to max_connections we might not actually be able to + * set up that many workers, but don't treat that as a fatal error. + */ +void +setup_workers(int num_workers) +{ + StringInfoData buf; + int i; + PGconn *conn; + + elog(DEBUG2, "In setup_workers(), target num_workers = %d", num_workers); + + if (num_workers > 1 && num_workers > workers.num_workers) + { + initStringInfo(&buf); + if (dbname && dbname[0]) + appendStringInfo(&buf, "dbname=%s ", dbname); + if (host && host[0]) + appendStringInfo(&buf, "host=%s ", host); + if (port && port[0]) + appendStringInfo(&buf, "port=%s ", port); + if (username && username[0]) + appendStringInfo(&buf, "user=%s ", username); + if (password && password[0]) + appendStringInfo(&buf, "password=%s ", password); + + if (workers.conns == NULL) + { + elog(NOTICE, "Setting up workers.conns"); + workers.conns = (PGconn **) pgut_malloc(sizeof(PGconn *) * num_workers); + } + else + { + elog(ERROR, "TODO: Implement pool resizing."); + } + + for (i = 0; i < num_workers; i++) + { + /* Don't prompt for password again; we should have gotten + * it already from reconnect(). + */ + elog(DEBUG2, "Setting up worker conn %d", i); + + /* Don't confuse pgut_connections by using pgut_connect() */ + conn = PQconnectdb(buf.data); + if (PQstatus(conn) == CONNECTION_OK) + { + workers.conns[i] = conn; + } + else + { + elog(WARNING, "Unable to set up worker conn #%d: %s", i, + PQerrorMessage(conn)); + break; + } + } + /* In case we bailed out of setting up all workers, record + * how many successful worker conns we actually have. + */ + workers.num_workers = i; + + termStringInfo(&buf); + } +} + +/* Disconnect all our worker conns. */ +void disconnect_workers(void) +{ + int i; + + if (!(workers.num_workers)) + elog(DEBUG2, "No workers to disconnect."); + else + { + for (i = 0; i < workers.num_workers; i++) + { + if (workers.conns[i]) + { + elog(DEBUG2, "Disconnecting worker %d.", i); + PQfinish(workers.conns[i]); + workers.conns[i] = NULL; + } + else + { + elog(NOTICE, "Worker %d already disconnected?", i); + } + } + workers.num_workers = 0; + free(workers.conns); + workers.conns = NULL; + } +} + + /* * the result is also available with the global variable 'connection'. */ @@ -82,6 +186,7 @@ disconnect(void) pgut_disconnect(conn2); conn2 = NULL; } + disconnect_workers(); } static void diff --git a/bin/pgut/pgut-fe.h b/bin/pgut/pgut-fe.h index 7529587..cdfef90 100644 --- a/bin/pgut/pgut-fe.h +++ b/bin/pgut/pgut-fe.h @@ -48,6 +48,14 @@ typedef struct pgut_option typedef void (*pgut_optfn) (pgut_option *opt, const char *arg); +typedef struct worker_conns +{ + int max_num_workers; + int num_workers; + PGconn **conns; +} worker_conns; + + extern char *dbname; extern char *host; @@ -58,12 +66,15 @@ extern YesNo prompt_password; extern PGconn *connection; extern PGconn *conn2; +extern worker_conns workers; extern void pgut_help(bool details); extern void help(bool details); extern void disconnect(void); extern void reconnect(int elevel); +extern void setup_workers(int num_workers); +extern void disconnect_workers(void); extern PGresult *execute(const char *query, int nParams, const char **params); extern PGresult *execute_elevel(const char *query, int nParams, const char **params, int elevel); extern ExecStatusType command(const char *query, int nParams, const char **params); From 8c2dd1660824904bd8dc657aaa487d21cc19fd12 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Tue, 11 Dec 2012 19:46:49 -0700 Subject: [PATCH 2/6] Small fixes related to the concurrent_indexes changes. Move PQsetnonblocking() call to setup_workers(), and make sure we're not forgetting any workers. --- bin/pg_repack.c | 40 ++++++++++++++++------------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index fb204b0..0884027 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -633,42 +633,33 @@ rebuild_indexes(const repack_table *table) */ index_jobs[i].status = FINISHED; } - } - PQclear(res); - - if (workers.num_workers > 1) - { - /* First time through, assign every available worker to build an index. - */ - for (i = 0; i < num_indexes && i < workers.num_workers; i++) - { + else if (i <= workers.num_workers) { + /* Assign available worker to build an index. */ index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = i; elog(DEBUG2, "Worker %d building index: %s", i, index_jobs[i].create_index); - /* Make sure each worker connection can work in non-blocking - * mode. - */ - if (PQsetnonblocking(workers.conns[i], 1)) - { - elog(WARNING, "Unable to set worker connection %d " - "non-blocking.", i); - have_error = true; - goto cleanup; - } - if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) { elog(WARNING, "Error sending async query: %s\n%s", index_jobs[i].create_index, PQerrorMessage(workers.conns[i])); + PQclear(res); have_error = true; goto cleanup; } - } - num_active_workers = i; + /* Else we have more indexes to be built than workers + * available. That's OK, we'll get to them later. + */ + } + PQclear(res); + + if (workers.num_workers > 1) + { + /* How many workers we kicked off earlier. */ + num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; /* Now go through our index builds, and look for any which is * reported complete. Reassign that worker to the next index to @@ -724,8 +715,9 @@ rebuild_indexes(const repack_table *table) { index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = freed_worker; - elog(NOTICE, "Assigning worker %d execute job %d: %s", - freed_worker, i, index_jobs[i].create_index); + elog(NOTICE, "Assigning worker %d to build index #%d: " + "%s", freed_worker, i, + index_jobs[i].create_index); if (!(PQsendQuery(workers.conns[freed_worker], index_jobs[i].create_index))) { From 42357353a7e4672513c3a1055760c4ab5fbffaa2 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Thu, 13 Dec 2012 19:12:05 -0700 Subject: [PATCH 3/6] Several fixes for concurrent index builds: * Use poll() if it is available, or select() otherwise, to efficiently wait on index builds in worker queries to finish. * fix off-by-one error when initially assigning workers * move PQsetnonblocking() calls to setup_workers() --- bin/pg_repack.c | 79 +++++++++++++++++++++++++++++++++++++++++----- bin/pgut/pgut-fe.c | 15 ++++++++- 2 files changed, 85 insertions(+), 9 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 0884027..50483d1 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -24,17 +24,32 @@ const char *PROGRAM_VERSION = "unknown"; #include "pgut/pgut-fe.h" +#include #include #include #include #include + +#ifdef HAVE_POLL_H +#include +#endif +#ifdef HAVE_SYS_POLL_H +#include +#endif +#ifdef HAVE_SYS_SELECT_H +#include +#endif + + /* * APPLY_COUNT: Number of applied logs per transaction. Larger values * could be faster, but will be long transactions in the REDO phase. */ #define APPLY_COUNT 1000 +/* poll() or select() timeout, in seconds */ +#define POLL_TIMEOUT 3 /* Compile an array of existing transactions which are active during * pg_repack's setup. Some transactions we can safely ignore: @@ -633,7 +648,7 @@ rebuild_indexes(const repack_table *table) */ index_jobs[i].status = FINISHED; } - else if (i <= workers.num_workers) { + else if (i < workers.num_workers) { /* Assign available worker to build an index. */ index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = i; @@ -656,23 +671,67 @@ rebuild_indexes(const repack_table *table) } PQclear(res); + /* How many workers we kicked off earlier. */ + num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; + if (workers.num_workers > 1) { - /* How many workers we kicked off earlier. */ - num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; + int freed_worker = -1; + int ret; + +/* Prefer poll() over select(), following PostgreSQL custom. */ +#ifdef HAVE_POLL + struct pollfd *input_fds; + + input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers); + for (i = 0; i < num_active_workers; i++) + { + input_fds[i].fd = PQsocket(workers.conns[i]); + input_fds[i].events = POLLIN | POLLERR; + input_fds[i].revents = 0; + } +#else + fd_set input_mask; + struct timeval timeout; + /* select() needs the highest-numbered socket descriptor */ + int max_fd = 0; + + FD_ZERO(&input_mask); + for (i = 0; i < num_active_workers; i++) + { + FD_SET(PQsocket(workers.conns[i]), &input_mask); + if (PQsocket(workers.conns[i]) > max_fd) + max_fd = PQsocket(workers.conns[i]); + } +#endif /* Now go through our index builds, and look for any which is * reported complete. Reassign that worker to the next index to * be built, if any. */ - while (num_active_workers) + while (num_active_workers > 0) { - int freed_worker = -1; + elog(DEBUG2, "polling %d active workers", num_active_workers); + +#ifdef HAVE_POLL + ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000); +#else + /* re-initialize timeout before each invocation of select() + * just in case select() modifies timeout to indicate remaining + * time. + */ + timeout.tv_sec = POLL_TIMEOUT; + timeout.tv_usec = 0; + ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); +#endif + if (ret < 0 && errno != EINTR) + elog(ERROR, "poll() failed: %d, %d", ret, errno); for (i = 0; i < num_indexes; i++) { if (index_jobs[i].status == INPROGRESS) { + Assert(index_jobs[i].worker_idx >= 0); /* Must call PQconsumeInput before we can check PQisBusy */ if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1) { @@ -699,7 +758,13 @@ rebuild_indexes(const repack_table *table) } PQclear(res); } - + + /* We are only going to re-queue one worker, even + * though more than one index build might be finished. + * Any other jobs which may be finished will + * just have to wait for the next pass through the + * poll()/select() loop. + */ freed_worker = index_jobs[i].worker_idx; index_jobs[i].status = FINISHED; num_active_workers--; @@ -733,7 +798,6 @@ rebuild_indexes(const repack_table *table) } freed_worker = -1; } - sleep(1); } } @@ -980,7 +1044,6 @@ repack_one_table(const repack_table *table, const char *orderby) goto cleanup; } - /* * 4. Apply log to temp table until no tuples are left in the log * and all of the old transactions are finished. diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index 49d66f9..442ab93 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -82,7 +82,11 @@ setup_workers(int num_workers) */ elog(DEBUG2, "Setting up worker conn %d", i); - /* Don't confuse pgut_connections by using pgut_connect() */ + /* Don't confuse pgut_connections by using pgut_connect() + * + * XXX: could use PQconnectStart() and PQconnectPoll() to + * open these connections in non-blocking manner. + */ conn = PQconnectdb(buf.data); if (PQstatus(conn) == CONNECTION_OK) { @@ -94,6 +98,15 @@ setup_workers(int num_workers) PQerrorMessage(conn)); break; } + + /* Make sure each worker connection can work in non-blocking + * mode. + */ + if (PQsetnonblocking(workers.conns[i], 1)) + { + elog(ERROR, "Unable to set worker connection %d " + "non-blocking.", i); + } } /* In case we bailed out of setting up all workers, record * how many successful worker conns we actually have. From 4c0c2f36183c34907d29b99e04f9089eedade209 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Thu, 13 Dec 2012 19:56:44 -0700 Subject: [PATCH 4/6] restore the warning message about invalid indexes, to match old behavior and pass installcheck. --- bin/pg_repack.c | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 50483d1..e93ebcb 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -603,13 +603,27 @@ rebuild_indexes(const repack_table *table) int num_indexes; int i; int num_active_workers = 0; - repack_index *index_jobs; + repack_index *index_jobs; char buffer[12]; bool have_error = false; elog(DEBUG2, "---- create indexes ----"); params[0] = utoa(table->target_oid, buffer); + + /* First, just display a warning message for any invalid indexes + * which may be on the table (mostly to match the behavior of 1.1.8). + */ + res = execute("SELECT pg_get_indexdef(indexrelid)" + " FROM pg_index WHERE indrelid = $1 AND NOT indisvalid", + 1, params); + for (i = 0; i < PQntuples(res); i++) + { + const char *indexdef; + indexdef = getstr(res, i, 0); + elog(WARNING, "skipping invalid index: %s", indexdef); + } + res = execute("SELECT indexrelid," " repack.repack_indexdef(indexrelid, indrelid), " " pg_get_indexdef(indexrelid)" From b9c7189fa9bdc25cef4459506ba6aa114e0fc73d Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Thu, 13 Dec 2012 21:10:59 -0700 Subject: [PATCH 5/6] Add description of --jobs to the docs. Also, add missing newline in --help output. --- bin/pg_repack.c | 2 +- doc/pg_repack.rst | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index e93ebcb..935bc5a 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -1416,7 +1416,7 @@ pgut_help(bool details) printf("Options:\n"); printf(" -a, --all repack all databases\n"); - printf(" -j --jobs Use this many parallel jobs"); + printf(" -j --jobs Use this many parallel jobs for each table\n"); printf(" -n, --no-order do vacuum full instead of cluster\n"); printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n"); printf(" -t, --table=TABLE repack specific table only\n"); diff --git a/doc/pg_repack.rst b/doc/pg_repack.rst index 756d1fc..95fb685 100644 --- a/doc/pg_repack.rst +++ b/doc/pg_repack.rst @@ -117,6 +117,7 @@ The following options can be specified in ``OPTIONS``. Options: -a, --all repack all databases + -j, --jobs Use this many parallel jobs for each table -n, --no-order do vacuum full instead of cluster -o, --order-by=COLUMNS order by columns instead of cluster keys -t, --table=TABLE repack specific table only @@ -145,6 +146,12 @@ Options to order rows. If not specified, pg_repack performs an online CLUSTER using cluster indexes. Only one option can be specified. You may also specify target tables or databases. +``-j``, ``--jobs`` + Create the specified number of extra connections to PostgreSQL, and + use these extra connections to parallelize the rebuild of indexes + on each table. If your PostgreSQL server has extra cores and disk + I/O available, this can be a useful way to speed up pg_repack. + ``-n``, ``--no-order`` Perform an online VACUUM FULL. From 962fdff1af0a4de66ee4969d0594f42c5b4d6337 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Fri, 14 Dec 2012 18:17:45 -0700 Subject: [PATCH 6/6] Fix up buggy initialization code for poll() and select(). Also some logging and variable name cleanup. --- bin/pg_repack.c | 62 ++++++++++++++++++++++++++--------------------- doc/pg_repack.rst | 2 +- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 935bc5a..9c49ad7 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -602,7 +602,8 @@ rebuild_indexes(const repack_table *table) const char *params[1]; int num_indexes; int i; - int num_active_workers = 0; + int num_active_workers; + int num_workers; repack_index *index_jobs; char buffer[12]; bool have_error = false; @@ -630,8 +631,16 @@ rebuild_indexes(const repack_table *table) " FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params); num_indexes = PQntuples(res); + + /* We might have more actual worker connectionss than we need, + * if the number of workers exceeds the number of indexes to be + * built. In that case, ignore the extra workers. + */ + num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; + num_active_workers = num_workers; + elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes, - workers.num_workers); + num_workers); index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes); @@ -651,7 +660,7 @@ rebuild_indexes(const repack_table *table) elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid); elog(DEBUG2, "create_index : %s", index_jobs[i].create_index); - if (workers.num_workers <= 1) { + if (num_workers <= 1) { /* Use primary connection if we are not setting up parallel * index building, or if we only have one worker. */ @@ -662,12 +671,12 @@ rebuild_indexes(const repack_table *table) */ index_jobs[i].status = FINISHED; } - else if (i < workers.num_workers) { + else if (i < num_workers) { /* Assign available worker to build an index. */ index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = i; - elog(DEBUG2, "Worker %d building index: %s", i, - index_jobs[i].create_index); + elog(LOG, "Initial worker %d to build index: %s", + i, index_jobs[i].create_index); if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) { @@ -685,20 +694,18 @@ rebuild_indexes(const repack_table *table) } PQclear(res); - /* How many workers we kicked off earlier. */ - num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; - - if (workers.num_workers > 1) + if (num_workers > 1) { int freed_worker = -1; int ret; /* Prefer poll() over select(), following PostgreSQL custom. */ +#undef HAVE_POLL #ifdef HAVE_POLL struct pollfd *input_fds; - input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers); - for (i = 0; i < num_active_workers; i++) + input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers); + for (i = 0; i < num_workers; i++) { input_fds[i].fd = PQsocket(workers.conns[i]); input_fds[i].events = POLLIN | POLLERR; @@ -708,15 +715,7 @@ rebuild_indexes(const repack_table *table) fd_set input_mask; struct timeval timeout; /* select() needs the highest-numbered socket descriptor */ - int max_fd = 0; - - FD_ZERO(&input_mask); - for (i = 0; i < num_active_workers; i++) - { - FD_SET(PQsocket(workers.conns[i]), &input_mask); - if (PQsocket(workers.conns[i]) > max_fd) - max_fd = PQsocket(workers.conns[i]); - } + int max_fd; #endif /* Now go through our index builds, and look for any which is @@ -728,14 +727,23 @@ rebuild_indexes(const repack_table *table) elog(DEBUG2, "polling %d active workers", num_active_workers); #ifdef HAVE_POLL - ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000); + ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000); #else - /* re-initialize timeout before each invocation of select() - * just in case select() modifies timeout to indicate remaining - * time. + /* re-initialize timeout and input_mask before each + * invocation of select(). I think this isn't + * necessary on many Unixen, but just in case. */ timeout.tv_sec = POLL_TIMEOUT; timeout.tv_usec = 0; + + FD_ZERO(&input_mask); + for (i = 0, max_fd = 0; i < num_workers; i++) + { + FD_SET(PQsocket(workers.conns[i]), &input_mask); + if (PQsocket(workers.conns[i]) > max_fd) + max_fd = PQsocket(workers.conns[i]); + } + ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); #endif if (ret < 0 && errno != EINTR) @@ -756,7 +764,7 @@ rebuild_indexes(const repack_table *table) } if (!PQisBusy(workers.conns[index_jobs[i].worker_idx])) { - elog(NOTICE, "Command finished in worker %d: %s", + elog(LOG, "Command finished in worker %d: %s", index_jobs[i].worker_idx, index_jobs[i].create_index); @@ -794,7 +802,7 @@ rebuild_indexes(const repack_table *table) { index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = freed_worker; - elog(NOTICE, "Assigning worker %d to build index #%d: " + elog(LOG, "Assigning worker %d to build index #%d: " "%s", freed_worker, i, index_jobs[i].create_index); diff --git a/doc/pg_repack.rst b/doc/pg_repack.rst index 95fb685..0e1e66d 100644 --- a/doc/pg_repack.rst +++ b/doc/pg_repack.rst @@ -117,7 +117,7 @@ The following options can be specified in ``OPTIONS``. Options: -a, --all repack all databases - -j, --jobs Use this many parallel jobs for each table + -j, --jobs Use this many parallel jobs for each table -n, --no-order do vacuum full instead of cluster -o, --order-by=COLUMNS order by columns instead of cluster keys -t, --table=TABLE repack specific table only