From 8ab54cc8038e223959e92ffeb44e8fbfd6e6ee9f Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Tue, 11 Dec 2012 19:46:49 -0700 Subject: [PATCH] Small fixes related to the concurrent_indexes changes. Move PQsetnonblocking() call to setup_workers(), and make sure we're not forgetting any workers. --- bin/pg_repack.c | 40 ++++++++++++++++------------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/bin/pg_repack.c b/bin/pg_repack.c index c7746b7..ba62bb2 100644 --- a/bin/pg_repack.c +++ b/bin/pg_repack.c @@ -633,42 +633,33 @@ rebuild_indexes(const repack_table *table) */ index_jobs[i].status = FINISHED; } - } - PQclear(res); - - if (workers.num_workers > 1) - { - /* First time through, assign every available worker to build an index. - */ - for (i = 0; i < num_indexes && i < workers.num_workers; i++) - { + else if (i <= workers.num_workers) { + /* Assign available worker to build an index. */ index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = i; elog(DEBUG2, "Worker %d building index: %s", i, index_jobs[i].create_index); - /* Make sure each worker connection can work in non-blocking - * mode. - */ - if (PQsetnonblocking(workers.conns[i], 1)) - { - elog(WARNING, "Unable to set worker connection %d " - "non-blocking.", i); - have_error = true; - goto cleanup; - } - if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) { elog(WARNING, "Error sending async query: %s\n%s", index_jobs[i].create_index, PQerrorMessage(workers.conns[i])); + PQclear(res); have_error = true; goto cleanup; } - } - num_active_workers = i; + /* Else we have more indexes to be built than workers + * available. That's OK, we'll get to them later. + */ + } + PQclear(res); + + if (workers.num_workers > 1) + { + /* How many workers we kicked off earlier. */ + num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; /* Now go through our index builds, and look for any which is * reported complete. Reassign that worker to the next index to @@ -724,8 +715,9 @@ rebuild_indexes(const repack_table *table) { index_jobs[i].status = INPROGRESS; index_jobs[i].worker_idx = freed_worker; - elog(NOTICE, "Assigning worker %d execute job %d: %s", - freed_worker, i, index_jobs[i].create_index); + elog(NOTICE, "Assigning worker %d to build index #%d: " + "%s", freed_worker, i, + index_jobs[i].create_index); if (!(PQsendQuery(workers.conns[freed_worker], index_jobs[i].create_index))) {