From 42357353a7e4672513c3a1055760c4ab5fbffaa2 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Thu, 13 Dec 2012 19:12:05 -0700 Subject: [PATCH] Several fixes for concurrent index builds: * Use poll() if it is available, or select() otherwise, to efficiently wait on index builds in worker queries to finish. * fix off-by-one error when initially assigning workers * move PQsetnonblocking() calls to setup_workers() --- bin/pg_repack.c | 79 +++++++++++++++++++++++++++++++++++++++++----- bin/pgut/pgut-fe.c | 15 ++++++++- 2 files changed, 85 insertions(+), 9 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 0884027..50483d1 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -24,17 +24,32 @@ const char *PROGRAM_VERSION = "unknown"; #include "pgut/pgut-fe.h" +#include #include #include #include #include + +#ifdef HAVE_POLL_H +#include +#endif +#ifdef HAVE_SYS_POLL_H +#include +#endif +#ifdef HAVE_SYS_SELECT_H +#include +#endif + + /* * APPLY_COUNT: Number of applied logs per transaction. Larger values * could be faster, but will be long transactions in the REDO phase. */ #define APPLY_COUNT 1000 +/* poll() or select() timeout, in seconds */ +#define POLL_TIMEOUT 3 /* Compile an array of existing transactions which are active during * pg_repack's setup. Some transactions we can safely ignore: @@ -633,7 +648,7 @@ rebuild_indexes(const repack_table *table) */ index_jobs[i].status = FINISHED; } - else if (i <= workers.num_workers) { + else if (i < workers.num_workers) { /* Assign available worker to build an index. */ index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = i; @@ -656,23 +671,67 @@ rebuild_indexes(const repack_table *table) } PQclear(res); + /* How many workers we kicked off earlier. */ + num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; + if (workers.num_workers > 1) { - /* How many workers we kicked off earlier. */ - num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; + int freed_worker = -1; + int ret; + +/* Prefer poll() over select(), following PostgreSQL custom. */ +#ifdef HAVE_POLL + struct pollfd *input_fds; + + input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers); + for (i = 0; i < num_active_workers; i++) + { + input_fds[i].fd = PQsocket(workers.conns[i]); + input_fds[i].events = POLLIN | POLLERR; + input_fds[i].revents = 0; + } +#else + fd_set input_mask; + struct timeval timeout; + /* select() needs the highest-numbered socket descriptor */ + int max_fd = 0; + + FD_ZERO(&input_mask); + for (i = 0; i < num_active_workers; i++) + { + FD_SET(PQsocket(workers.conns[i]), &input_mask); + if (PQsocket(workers.conns[i]) > max_fd) + max_fd = PQsocket(workers.conns[i]); + } +#endif /* Now go through our index builds, and look for any which is * reported complete. Reassign that worker to the next index to * be built, if any. */ - while (num_active_workers) + while (num_active_workers > 0) { - int freed_worker = -1; + elog(DEBUG2, "polling %d active workers", num_active_workers); + +#ifdef HAVE_POLL + ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000); +#else + /* re-initialize timeout before each invocation of select() + * just in case select() modifies timeout to indicate remaining + * time. + */ + timeout.tv_sec = POLL_TIMEOUT; + timeout.tv_usec = 0; + ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); +#endif + if (ret < 0 && errno != EINTR) + elog(ERROR, "poll() failed: %d, %d", ret, errno); for (i = 0; i < num_indexes; i++) { if (index_jobs[i].status == INPROGRESS) { + Assert(index_jobs[i].worker_idx >= 0); /* Must call PQconsumeInput before we can check PQisBusy */ if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1) { @@ -699,7 +758,13 @@ rebuild_indexes(const repack_table *table) } PQclear(res); } - + + /* We are only going to re-queue one worker, even + * though more than one index build might be finished. + * Any other jobs which may be finished will + * just have to wait for the next pass through the + * poll()/select() loop. + */ freed_worker = index_jobs[i].worker_idx; index_jobs[i].status = FINISHED; num_active_workers--; @@ -733,7 +798,6 @@ rebuild_indexes(const repack_table *table) } freed_worker = -1; } - sleep(1); } } @@ -980,7 +1044,6 @@ repack_one_table(const repack_table *table, const char *orderby) goto cleanup; } - /* * 4. Apply log to temp table until no tuples are left in the log * and all of the old transactions are finished. diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index 49d66f9..442ab93 100644 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -82,7 +82,11 @@ setup_workers(int num_workers) */ elog(DEBUG2, "Setting up worker conn %d", i); - /* Don't confuse pgut_connections by using pgut_connect() */ + /* Don't confuse pgut_connections by using pgut_connect() + * + * XXX: could use PQconnectStart() and PQconnectPoll() to + * open these connections in non-blocking manner. + */ conn = PQconnectdb(buf.data); if (PQstatus(conn) == CONNECTION_OK) { @@ -94,6 +98,15 @@ setup_workers(int num_workers) PQerrorMessage(conn)); break; } + + /* Make sure each worker connection can work in non-blocking + * mode. + */ + if (PQsetnonblocking(workers.conns[i], 1)) + { + elog(ERROR, "Unable to set worker connection %d " + "non-blocking.", i); + } } /* In case we bailed out of setting up all workers, record * how many successful worker conns we actually have.