From 962fdff1af0a4de66ee4969d0594f42c5b4d6337 Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Fri, 14 Dec 2012 18:17:45 -0700 Subject: [PATCH] Fix up buggy initialization code for poll() and select(). Also some logging and variable name cleanup. --- bin/pg_repack.c | 62 ++++++++++++++++++++++++++--------------------- doc/pg_repack.rst | 2 +- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index 935bc5a..9c49ad7 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -602,7 +602,8 @@ rebuild_indexes(const repack_table *table) const char *params[1]; int num_indexes; int i; - int num_active_workers = 0; + int num_active_workers; + int num_workers; repack_index *index_jobs; char buffer[12]; bool have_error = false; @@ -630,8 +631,16 @@ rebuild_indexes(const repack_table *table) " FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params); num_indexes = PQntuples(res); + + /* We might have more actual worker connectionss than we need, + * if the number of workers exceeds the number of indexes to be + * built. In that case, ignore the extra workers. + */ + num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; + num_active_workers = num_workers; + elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes, - workers.num_workers); + num_workers); index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes); @@ -651,7 +660,7 @@ rebuild_indexes(const repack_table *table) elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid); elog(DEBUG2, "create_index : %s", index_jobs[i].create_index); - if (workers.num_workers <= 1) { + if (num_workers <= 1) { /* Use primary connection if we are not setting up parallel * index building, or if we only have one worker. */ @@ -662,12 +671,12 @@ rebuild_indexes(const repack_table *table) */ index_jobs[i].status = FINISHED; } - else if (i < workers.num_workers) { + else if (i < num_workers) { /* Assign available worker to build an index. */ index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = i; - elog(DEBUG2, "Worker %d building index: %s", i, - index_jobs[i].create_index); + elog(LOG, "Initial worker %d to build index: %s", + i, index_jobs[i].create_index); if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) { @@ -685,20 +694,18 @@ rebuild_indexes(const repack_table *table) } PQclear(res); - /* How many workers we kicked off earlier. */ - num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; - - if (workers.num_workers > 1) + if (num_workers > 1) { int freed_worker = -1; int ret; /* Prefer poll() over select(), following PostgreSQL custom. */ +#undef HAVE_POLL #ifdef HAVE_POLL struct pollfd *input_fds; - input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers); - for (i = 0; i < num_active_workers; i++) + input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers); + for (i = 0; i < num_workers; i++) { input_fds[i].fd = PQsocket(workers.conns[i]); input_fds[i].events = POLLIN | POLLERR; @@ -708,15 +715,7 @@ rebuild_indexes(const repack_table *table) fd_set input_mask; struct timeval timeout; /* select() needs the highest-numbered socket descriptor */ - int max_fd = 0; - - FD_ZERO(&input_mask); - for (i = 0; i < num_active_workers; i++) - { - FD_SET(PQsocket(workers.conns[i]), &input_mask); - if (PQsocket(workers.conns[i]) > max_fd) - max_fd = PQsocket(workers.conns[i]); - } + int max_fd; #endif /* Now go through our index builds, and look for any which is @@ -728,14 +727,23 @@ rebuild_indexes(const repack_table *table) elog(DEBUG2, "polling %d active workers", num_active_workers); #ifdef HAVE_POLL - ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000); + ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000); #else - /* re-initialize timeout before each invocation of select() - * just in case select() modifies timeout to indicate remaining - * time. + /* re-initialize timeout and input_mask before each + * invocation of select(). I think this isn't + * necessary on many Unixen, but just in case. */ timeout.tv_sec = POLL_TIMEOUT; timeout.tv_usec = 0; + + FD_ZERO(&input_mask); + for (i = 0, max_fd = 0; i < num_workers; i++) + { + FD_SET(PQsocket(workers.conns[i]), &input_mask); + if (PQsocket(workers.conns[i]) > max_fd) + max_fd = PQsocket(workers.conns[i]); + } + ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); #endif if (ret < 0 && errno != EINTR) @@ -756,7 +764,7 @@ rebuild_indexes(const repack_table *table) } if (!PQisBusy(workers.conns[index_jobs[i].worker_idx])) { - elog(NOTICE, "Command finished in worker %d: %s", + elog(LOG, "Command finished in worker %d: %s", index_jobs[i].worker_idx, index_jobs[i].create_index); @@ -794,7 +802,7 @@ rebuild_indexes(const repack_table *table) { index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = freed_worker; - elog(NOTICE, "Assigning worker %d to build index #%d: " + elog(LOG, "Assigning worker %d to build index #%d: " "%s", freed_worker, i, index_jobs[i].create_index); diff --git a/doc/pg_repack.rst b/doc/pg_repack.rst index 95fb685..0e1e66d 100644 --- a/doc/pg_repack.rst +++ b/doc/pg_repack.rst @@ -117,7 +117,7 @@ The following options can be specified in ``OPTIONS``. Options: -a, --all repack all databases - -j, --jobs Use this many parallel jobs for each table + -j, --jobs Use this many parallel jobs for each table -n, --no-order do vacuum full instead of cluster -o, --order-by=COLUMNS order by columns instead of cluster keys -t, --table=TABLE repack specific table only