Fix up buggy initialization code for poll() and select().

Also some logging and variable name cleanup.
This commit is contained in:
Josh Kupershmidt 2012-12-14 18:17:45 -07:00
parent b9c7189fa9
commit 962fdff1af
2 changed files with 36 additions and 28 deletions

View File

@ -602,7 +602,8 @@ rebuild_indexes(const repack_table *table)
const char *params[1];
int num_indexes;
int i;
int num_active_workers = 0;
int num_active_workers;
int num_workers;
repack_index *index_jobs;
char buffer[12];
bool have_error = false;
@ -630,8 +631,16 @@ rebuild_indexes(const repack_table *table)
" FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params);
num_indexes = PQntuples(res);
/* We might have more actual worker connectionss than we need,
* if the number of workers exceeds the number of indexes to be
* built. In that case, ignore the extra workers.
*/
num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
num_active_workers = num_workers;
elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
workers.num_workers);
num_workers);
index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes);
@ -651,7 +660,7 @@ rebuild_indexes(const repack_table *table)
elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid);
elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
if (workers.num_workers <= 1) {
if (num_workers <= 1) {
/* Use primary connection if we are not setting up parallel
* index building, or if we only have one worker.
*/
@ -662,12 +671,12 @@ rebuild_indexes(const repack_table *table)
*/
index_jobs[i].status = FINISHED;
}
else if (i < workers.num_workers) {
else if (i < num_workers) {
/* Assign available worker to build an index. */
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = i;
elog(DEBUG2, "Worker %d building index: %s", i,
index_jobs[i].create_index);
elog(LOG, "Initial worker %d to build index: %s",
i, index_jobs[i].create_index);
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
{
@ -685,20 +694,18 @@ rebuild_indexes(const repack_table *table)
}
PQclear(res);
/* How many workers we kicked off earlier. */
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
if (workers.num_workers > 1)
if (num_workers > 1)
{
int freed_worker = -1;
int ret;
/* Prefer poll() over select(), following PostgreSQL custom. */
#undef HAVE_POLL
#ifdef HAVE_POLL
struct pollfd *input_fds;
input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers);
for (i = 0; i < num_active_workers; i++)
input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers);
for (i = 0; i < num_workers; i++)
{
input_fds[i].fd = PQsocket(workers.conns[i]);
input_fds[i].events = POLLIN | POLLERR;
@ -708,15 +715,7 @@ rebuild_indexes(const repack_table *table)
fd_set input_mask;
struct timeval timeout;
/* select() needs the highest-numbered socket descriptor */
int max_fd = 0;
FD_ZERO(&input_mask);
for (i = 0; i < num_active_workers; i++)
{
FD_SET(PQsocket(workers.conns[i]), &input_mask);
if (PQsocket(workers.conns[i]) > max_fd)
max_fd = PQsocket(workers.conns[i]);
}
int max_fd;
#endif
/* Now go through our index builds, and look for any which is
@ -728,14 +727,23 @@ rebuild_indexes(const repack_table *table)
elog(DEBUG2, "polling %d active workers", num_active_workers);
#ifdef HAVE_POLL
ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000);
ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000);
#else
/* re-initialize timeout before each invocation of select()
* just in case select() modifies timeout to indicate remaining
* time.
/* re-initialize timeout and input_mask before each
* invocation of select(). I think this isn't
* necessary on many Unixen, but just in case.
*/
timeout.tv_sec = POLL_TIMEOUT;
timeout.tv_usec = 0;
FD_ZERO(&input_mask);
for (i = 0, max_fd = 0; i < num_workers; i++)
{
FD_SET(PQsocket(workers.conns[i]), &input_mask);
if (PQsocket(workers.conns[i]) > max_fd)
max_fd = PQsocket(workers.conns[i]);
}
ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
#endif
if (ret < 0 && errno != EINTR)
@ -756,7 +764,7 @@ rebuild_indexes(const repack_table *table)
}
if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
{
elog(NOTICE, "Command finished in worker %d: %s",
elog(LOG, "Command finished in worker %d: %s",
index_jobs[i].worker_idx,
index_jobs[i].create_index);
@ -794,7 +802,7 @@ rebuild_indexes(const repack_table *table)
{
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = freed_worker;
elog(NOTICE, "Assigning worker %d to build index #%d: "
elog(LOG, "Assigning worker %d to build index #%d: "
"%s", freed_worker, i,
index_jobs[i].create_index);

View File

@ -117,7 +117,7 @@ The following options can be specified in ``OPTIONS``.
Options:
-a, --all repack all databases
-j, --jobs Use this many parallel jobs for each table
-j, --jobs Use this many parallel jobs for each table
-n, --no-order do vacuum full instead of cluster
-o, --order-by=COLUMNS order by columns instead of cluster keys
-t, --table=TABLE repack specific table only