Small fixes related to the concurrent_indexes changes.

Move PQsetnonblocking() call to setup_workers(), and make
sure we're not forgetting any workers.
This commit is contained in:
Josh Kupershmidt 2012-12-11 19:46:49 -07:00
parent 509e568c52
commit 8ab54cc803

View File

@ -633,42 +633,33 @@ rebuild_indexes(const repack_table *table)
*/
index_jobs[i].status = FINISHED;
}
}
PQclear(res);
if (workers.num_workers > 1)
{
/* First time through, assign every available worker to build an index.
*/
for (i = 0; i < num_indexes && i < workers.num_workers; i++)
{
else if (i <= workers.num_workers) {
/* Assign available worker to build an index. */
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = i;
elog(DEBUG2, "Worker %d building index: %s", i,
index_jobs[i].create_index);
/* Make sure each worker connection can work in non-blocking
* mode.
*/
if (PQsetnonblocking(workers.conns[i], 1))
{
elog(WARNING, "Unable to set worker connection %d "
"non-blocking.", i);
have_error = true;
goto cleanup;
}
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
{
elog(WARNING, "Error sending async query: %s\n%s",
index_jobs[i].create_index,
PQerrorMessage(workers.conns[i]));
PQclear(res);
have_error = true;
goto cleanup;
}
}
num_active_workers = i;
/* Else we have more indexes to be built than workers
* available. That's OK, we'll get to them later.
*/
}
PQclear(res);
if (workers.num_workers > 1)
{
/* How many workers we kicked off earlier. */
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
/* Now go through our index builds, and look for any which is
* reported complete. Reassign that worker to the next index to
@ -724,8 +715,9 @@ rebuild_indexes(const repack_table *table)
{
index_jobs[i].status = INPROGRESS;
index_jobs[i].worker_idx = freed_worker;
elog(NOTICE, "Assigning worker %d execute job %d: %s",
freed_worker, i, index_jobs[i].create_index);
elog(NOTICE, "Assigning worker %d to build index #%d: "
"%s", freed_worker, i,
index_jobs[i].create_index);
if (!(PQsendQuery(workers.conns[freed_worker],
index_jobs[i].create_index))) {