diff --git a/bin/pg_repack.c b/bin/pg_repack.c index c6ec6ef..c7746b7 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -137,6 +137,14 @@ typedef struct repack_table const char *sql_pop; /* SQL used in flush */ } repack_table; + +typedef enum +{ + UNPROCESSED, + INPROGRESS, + FINISHED +} index_status_t; + /* * per-index information */ @@ -144,6 +152,8 @@ typedef struct repack_index { Oid target_oid; /* target: OID */ const char *create_index; /* CREATE INDEX */ + index_status_t status; /* Track parallel build statuses. */ + int worker_idx; /* which worker conn is handling */ } repack_index; static bool is_superuser(void); @@ -151,6 +161,7 @@ static void repack_all_databases(const char *order_by); static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize); static void repack_one_table(const repack_table *table, const char *order_by); static void repack_cleanup(bool fatal, const repack_table *table); +static bool rebuild_indexes(const repack_table *table); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); @@ -172,6 +183,7 @@ static bool noorder = false; static SimpleStringList table_list = {NULL, NULL}; static char *orderby = NULL; static int wait_timeout = 60; /* in seconds */ +static int jobs = 0; /* number of concurrent worker conns. */ /* buffer should have at least 11 bytes */ static char * @@ -189,6 +201,7 @@ static pgut_option options[] = { 's', 'o', "order-by", &orderby }, { 'i', 'T', "wait-timeout", &wait_timeout }, { 'B', 'Z', "no-analyze", &analyze }, + { 'i', 'j', "jobs", &jobs }, { 0 }, }; @@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col) } /* - * Call repack_one_table for the target table or each table in a database. + * Call repack_one_table for the target tables or each table in a database. */ static bool repack_one_database(const char *orderby, char *errbuf, size_t errsize) @@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize) reconnect(ERROR); + /* No sense in setting up concurrent workers if --jobs=1 */ + if (jobs > 1) + setup_workers(jobs); + if (!is_superuser()) { if (errbuf) snprintf(errbuf, errsize, "You must be a superuser to use %s", @@ -559,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count) return result; } +/* + * Create indexes on temp table, possibly using multiple worker connections + * concurrently if the user asked for --jobs=... + */ +static bool +rebuild_indexes(const repack_table *table) +{ + PGresult *res; + const char *params[1]; + int num_indexes; + int i; + int num_active_workers = 0; + repack_index *index_jobs; + char buffer[12]; + bool have_error = false; + + elog(DEBUG2, "---- create indexes ----"); + + params[0] = utoa(table->target_oid, buffer); + res = execute("SELECT indexrelid," + " repack.repack_indexdef(indexrelid, indrelid), " + " pg_get_indexdef(indexrelid)" + " FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params); + + num_indexes = PQntuples(res); + elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes, + workers.num_workers); + + index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes); + + for (i = 0; i < num_indexes; i++) + { + int c = 0; + const char *indexdef; + + index_jobs[i].target_oid = getoid(res, i, c++); + index_jobs[i].create_index = getstr(res, i, c++); + index_jobs[i].status = UNPROCESSED; + index_jobs[i].worker_idx = -1; /* Unassigned */ + + indexdef = getstr(res, i, c++); + + elog(DEBUG2, "set up index_jobs [%d]", i); + elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid); + elog(DEBUG2, "create_index : %s", index_jobs[i].create_index); + + if (workers.num_workers <= 1) { + /* Use primary connection if we are not setting up parallel + * index building, or if we only have one worker. + */ + command(index_jobs[i].create_index, 0, NULL); + + /* This bookkeeping isn't actually important in this no-workers + * case, but just for clarity. + */ + index_jobs[i].status = FINISHED; + } + } + PQclear(res); + + if (workers.num_workers > 1) + { + /* First time through, assign every available worker to build an index. + */ + for (i = 0; i < num_indexes && i < workers.num_workers; i++) + { + index_jobs[i].status = INPROGRESS; + index_jobs[i].worker_idx = i; + elog(DEBUG2, "Worker %d building index: %s", i, + index_jobs[i].create_index); + + /* Make sure each worker connection can work in non-blocking + * mode. + */ + if (PQsetnonblocking(workers.conns[i], 1)) + { + elog(WARNING, "Unable to set worker connection %d " + "non-blocking.", i); + have_error = true; + goto cleanup; + } + + if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) + { + elog(WARNING, "Error sending async query: %s\n%s", + index_jobs[i].create_index, + PQerrorMessage(workers.conns[i])); + have_error = true; + goto cleanup; + } + + } + num_active_workers = i; + + /* Now go through our index builds, and look for any which is + * reported complete. Reassign that worker to the next index to + * be built, if any. + */ + while (num_active_workers) + { + int freed_worker = -1; + + for (i = 0; i < num_indexes; i++) + { + if (index_jobs[i].status == INPROGRESS) + { + /* Must call PQconsumeInput before we can check PQisBusy */ + if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1) + { + elog(WARNING, "Error fetching async query status: %s", + PQerrorMessage(workers.conns[index_jobs[i].worker_idx])); + have_error = true; + goto cleanup; + } + if (!PQisBusy(workers.conns[index_jobs[i].worker_idx])) + { + elog(NOTICE, "Command finished in worker %d: %s", + index_jobs[i].worker_idx, + index_jobs[i].create_index); + + while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx]))) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + elog(WARNING, "Error with create index: %s", + PQerrorMessage(workers.conns[index_jobs[i].worker_idx])); + PQclear(res); + have_error = true; + goto cleanup; + } + PQclear(res); + } + + freed_worker = index_jobs[i].worker_idx; + index_jobs[i].status = FINISHED; + num_active_workers--; + break; + } + } + } + if (freed_worker > -1) + { + for (i = 0; i < num_indexes; i++) + { + if (index_jobs[i].status == UNPROCESSED) + { + index_jobs[i].status = INPROGRESS; + index_jobs[i].worker_idx = freed_worker; + elog(NOTICE, "Assigning worker %d execute job %d: %s", + freed_worker, i, index_jobs[i].create_index); + + if (!(PQsendQuery(workers.conns[freed_worker], + index_jobs[i].create_index))) { + elog(WARNING, "Error sending async query: %s\n%s", + index_jobs[i].create_index, + PQerrorMessage(workers.conns[freed_worker])); + have_error = true; + goto cleanup; + } + num_active_workers++; + break; + } + } + freed_worker = -1; + } + sleep(1); + } + + } + +cleanup: + return (!have_error); +} + + /* * Re-organize one table. */ @@ -568,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby) PGresult *res; const char *params[2]; int num; - int i; int num_waiting = 0; + char *vxid = NULL; char buffer[12]; StringInfoData sql; @@ -674,7 +866,14 @@ repack_one_table(const repack_table *table, const char *orderby) printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); - if (!(PQsendQuery(conn2, sql.data))) { + if (PQsetnonblocking(conn2, 1)) + { + elog(WARNING, "Unable to set conn2 nonblocking."); + have_error = true; + goto cleanup; + } + if (!(PQsendQuery(conn2, sql.data))) + { elog(WARNING, "Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); have_error = true; @@ -725,6 +924,14 @@ repack_one_table(const repack_table *table, const char *orderby) PQclear(res); } + /* Turn conn2 back into blocking mode for further non-async use. */ + if (PQsetnonblocking(conn2, 0)) + { + elog(WARNING, "Unable to set conn2 blocking."); + have_error = true; + goto cleanup; + } + /* * 2. Copy tuples into temp table. */ @@ -790,44 +997,11 @@ repack_one_table(const repack_table *table, const char *orderby) /* * 3. Create indexes on temp table. */ - elog(DEBUG2, "---- create indexes ----"); - - params[0] = utoa(table->target_oid, buffer); - res = execute("SELECT indexrelid," - " repack.repack_indexdef(indexrelid, indrelid)," - " indisvalid," - " pg_get_indexdef(indexrelid)" - " FROM pg_index WHERE indrelid = $1", 1, params); - - num = PQntuples(res); - for (i = 0; i < num; i++) - { - repack_index index; - int c = 0; - const char *isvalid; - const char *indexdef; - - index.target_oid = getoid(res, i, c++); - index.create_index = getstr(res, i, c++); - isvalid = getstr(res, i, c++); - indexdef = getstr(res, i, c++); - - if (isvalid && isvalid[0] == 'f') { - elog(WARNING, "skipping invalid index: %s", indexdef); - continue; - } - - elog(DEBUG2, "[%d]", i); - elog(DEBUG2, "target_oid : %u", index.target_oid); - elog(DEBUG2, "create_index : %s", index.create_index); - - /* - * NOTE: If we want to create multiple indexes in parallel, - * we need to call create_index in multiple connections. - */ - command(index.create_index, 0, NULL); + if (!rebuild_indexes(table)) { + have_error = true; + goto cleanup; } - PQclear(res); + /* * 4. Apply log to temp table until no tuples are left in the log @@ -1187,6 +1361,7 @@ pgut_help(bool details) printf("Options:\n"); printf(" -a, --all repack all databases\n"); + printf(" -j --jobs Use this many parallel jobs"); printf(" -n, --no-order do vacuum full instead of cluster\n"); printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n"); printf(" -t, --table=TABLE repack specific table only\n"); diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index c7f888c..49d66f9 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -26,9 +26,113 @@ YesNo prompt_password = DEFAULT; PGconn *connection = NULL; PGconn *conn2 = NULL; +worker_conns workers = { + .num_workers = 0, + .conns = NULL +}; + + static bool parse_pair(const char buffer[], char key[], char value[]); static char *get_username(void); + +/* + * Set up worker conns which will be used for concurrent index rebuilds. + * 'num_workers' is the desired number of worker connections, i.e. from + * --jobs flag. Due to max_connections we might not actually be able to + * set up that many workers, but don't treat that as a fatal error. + */ +void +setup_workers(int num_workers) +{ + StringInfoData buf; + int i; + PGconn *conn; + + elog(DEBUG2, "In setup_workers(), target num_workers = %d", num_workers); + + if (num_workers > 1 && num_workers > workers.num_workers) + { + initStringInfo(&buf); + if (dbname && dbname[0]) + appendStringInfo(&buf, "dbname=%s ", dbname); + if (host && host[0]) + appendStringInfo(&buf, "host=%s ", host); + if (port && port[0]) + appendStringInfo(&buf, "port=%s ", port); + if (username && username[0]) + appendStringInfo(&buf, "user=%s ", username); + if (password && password[0]) + appendStringInfo(&buf, "password=%s ", password); + + if (workers.conns == NULL) + { + elog(NOTICE, "Setting up workers.conns"); + workers.conns = (PGconn **) pgut_malloc(sizeof(PGconn *) * num_workers); + } + else + { + elog(ERROR, "TODO: Implement pool resizing."); + } + + for (i = 0; i < num_workers; i++) + { + /* Don't prompt for password again; we should have gotten + * it already from reconnect(). + */ + elog(DEBUG2, "Setting up worker conn %d", i); + + /* Don't confuse pgut_connections by using pgut_connect() */ + conn = PQconnectdb(buf.data); + if (PQstatus(conn) == CONNECTION_OK) + { + workers.conns[i] = conn; + } + else + { + elog(WARNING, "Unable to set up worker conn #%d: %s", i, + PQerrorMessage(conn)); + break; + } + } + /* In case we bailed out of setting up all workers, record + * how many successful worker conns we actually have. + */ + workers.num_workers = i; + + termStringInfo(&buf); + } +} + +/* Disconnect all our worker conns. */ +void disconnect_workers(void) +{ + int i; + + if (!(workers.num_workers)) + elog(DEBUG2, "No workers to disconnect."); + else + { + for (i = 0; i < workers.num_workers; i++) + { + if (workers.conns[i]) + { + elog(DEBUG2, "Disconnecting worker %d.", i); + PQfinish(workers.conns[i]); + workers.conns[i] = NULL; + } + else + { + elog(NOTICE, "Worker %d already disconnected?", i); + } + } + workers.num_workers = 0; + free(workers.conns); + workers.conns = NULL; + } +} + + /* * the result is also available with the global variable 'connection'. */ @@ -82,6 +186,7 @@ disconnect(void) pgut_disconnect(conn2); conn2 = NULL; } + disconnect_workers(); } static void diff --git a/bin/pgut/pgut-fe.h b/bin/pgut/pgut-fe.h index 7529587..cdfef90 100644 --- a/bin/pgut/pgut-fe.h +++ b/bin/pgut/pgut-fe.h @@ -48,6 +48,14 @@ typedef struct pgut_option typedef void (*pgut_optfn) (pgut_option *opt, const char *arg); +typedef struct worker_conns +{ + int max_num_workers; + int num_workers; + PGconn **conns; +} worker_conns; + + extern char *dbname; extern char *host; @@ -58,12 +66,15 @@ extern YesNo prompt_password; extern PGconn *connection; extern PGconn *conn2; +extern worker_conns workers; extern void pgut_help(bool details); extern void help(bool details); extern void disconnect(void); extern void reconnect(int elevel); +extern void setup_workers(int num_workers); +extern void disconnect_workers(void); extern PGresult *execute(const char *query, int nParams, const char **params); extern PGresult *execute_elevel(const char *query, int nParams, const char **params, int elevel); extern ExecStatusType command(const char *query, int nParams, const char **params);