Several fixes for concurrent index builds:

* Use poll() if it is available, or select() otherwise, to
   efficiently wait on index builds in worker queries to finish.
 * fix off-by-one error when initially assigning workers
 * move PQsetnonblocking() calls to setup_workers()
This commit is contained in:
Josh Kupershmidt 2012-12-13 19:12:05 -07:00
parent 8ab54cc803
commit a1821e3dcb
2 changed files with 85 additions and 9 deletions

View File

@ -24,17 +24,32 @@ const char *PROGRAM_VERSION = "unknown";
#include "pgut/pgut-fe.h"
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
/*
* APPLY_COUNT: Number of applied logs per transaction. Larger values
* could be faster, but will be long transactions in the REDO phase.
*/
#define APPLY_COUNT 1000
/* poll() or select() timeout, in seconds */
#define POLL_TIMEOUT 3
/* Compile an array of existing transactions which are active during
* pg_repack's setup. Some transactions we can safely ignore:
@ -633,7 +648,7 @@ rebuild_indexes(const repack_table *table)
*/
index_jobs[i].status = FINISHED;
}
else if (i <= workers.num_workers) {
else if (i < workers.num_workers) {
/* Assign available worker to build an index. */
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = i;
@ -656,23 +671,67 @@ rebuild_indexes(const repack_table *table)
}
PQclear(res);
/* How many workers we kicked off earlier. */
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
if (workers.num_workers > 1)
{
/* How many workers we kicked off earlier. */
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
int freed_worker = -1;
int ret;
/* Prefer poll() over select(), following PostgreSQL custom. */
#ifdef HAVE_POLL
struct pollfd *input_fds;
input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers);
for (i = 0; i < num_active_workers; i++)
{
input_fds[i].fd = PQsocket(workers.conns[i]);
input_fds[i].events = POLLIN | POLLERR;
input_fds[i].revents = 0;
}
#else
fd_set input_mask;
struct timeval timeout;
/* select() needs the highest-numbered socket descriptor */
int max_fd = 0;
FD_ZERO(&input_mask);
for (i = 0; i < num_active_workers; i++)
{
FD_SET(PQsocket(workers.conns[i]), &input_mask);
if (PQsocket(workers.conns[i]) > max_fd)
max_fd = PQsocket(workers.conns[i]);
}
#endif
/* Now go through our index builds, and look for any which is
* reported complete. Reassign that worker to the next index to
* be built, if any.
*/
while (num_active_workers)
while (num_active_workers > 0)
{
int freed_worker = -1;
elog(DEBUG2, "polling %d active workers", num_active_workers);
#ifdef HAVE_POLL
ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000);
#else
/* re-initialize timeout before each invocation of select()
* just in case select() modifies timeout to indicate remaining
* time.
*/
timeout.tv_sec = POLL_TIMEOUT;
timeout.tv_usec = 0;
ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
#endif
if (ret < 0 && errno != EINTR)
elog(ERROR, "poll() failed: %d, %d", ret, errno);
for (i = 0; i < num_indexes; i++)
{
if (index_jobs[i].status == INPROGRESS)
{
Assert(index_jobs[i].worker_idx >= 0);
/* Must call PQconsumeInput before we can check PQisBusy */
if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
{
@ -699,7 +758,13 @@ rebuild_indexes(const repack_table *table)
}
PQclear(res);
}
/* We are only going to re-queue one worker, even
* though more than one index build might be finished.
* Any other jobs which may be finished will
* just have to wait for the next pass through the
* poll()/select() loop.
*/
freed_worker = index_jobs[i].worker_idx;
index_jobs[i].status = FINISHED;
num_active_workers--;
@ -733,7 +798,6 @@ rebuild_indexes(const repack_table *table)
}
freed_worker = -1;
}
sleep(1);
}
}
@ -994,7 +1058,6 @@ repack_one_table(const repack_table *table, const char *orderby)
goto cleanup;
}
/*
* 4. Apply log to temp table until no tuples are left in the log
* and all of the old transactions are finished.

View File

@ -82,7 +82,11 @@ setup_workers(int num_workers)
*/
elog(DEBUG2, "Setting up worker conn %d", i);
/* Don't confuse pgut_connections by using pgut_connect() */
/* Don't confuse pgut_connections by using pgut_connect()
*
* XXX: could use PQconnectStart() and PQconnectPoll() to
* open these connections in non-blocking manner.
*/
conn = PQconnectdb(buf.data);
if (PQstatus(conn) == CONNECTION_OK)
{
@ -94,6 +98,15 @@ setup_workers(int num_workers)
PQerrorMessage(conn));
break;
}
/* Make sure each worker connection can work in non-blocking
* mode.
*/
if (PQsetnonblocking(workers.conns[i], 1))
{
elog(ERROR, "Unable to set worker connection %d "
"non-blocking.", i);
}
}
/* In case we bailed out of setting up all workers, record
* how many successful worker conns we actually have.