Small fixes related to the concurrent_indexes changes.
Move PQsetnonblocking() call to setup_workers(), and make sure we're not forgetting any workers.
This commit is contained in:
parent
0d984ed3e5
commit
8c2dd16608
@ -633,42 +633,33 @@ rebuild_indexes(const repack_table *table)
|
|||||||
*/
|
*/
|
||||||
index_jobs[i].status = FINISHED;
|
index_jobs[i].status = FINISHED;
|
||||||
}
|
}
|
||||||
}
|
else if (i <= workers.num_workers) {
|
||||||
PQclear(res);
|
/* Assign available worker to build an index. */
|
||||||
|
|
||||||
if (workers.num_workers > 1)
|
|
||||||
{
|
|
||||||
/* First time through, assign every available worker to build an index.
|
|
||||||
*/
|
|
||||||
for (i = 0; i < num_indexes && i < workers.num_workers; i++)
|
|
||||||
{
|
|
||||||
index_jobs[i].status = INPROGRESS;
|
index_jobs[i].status = INPROGRESS;
|
||||||
index_jobs[i].worker_idx = i;
|
index_jobs[i].worker_idx = i;
|
||||||
elog(DEBUG2, "Worker %d building index: %s", i,
|
elog(DEBUG2, "Worker %d building index: %s", i,
|
||||||
index_jobs[i].create_index);
|
index_jobs[i].create_index);
|
||||||
|
|
||||||
/* Make sure each worker connection can work in non-blocking
|
|
||||||
* mode.
|
|
||||||
*/
|
|
||||||
if (PQsetnonblocking(workers.conns[i], 1))
|
|
||||||
{
|
|
||||||
elog(WARNING, "Unable to set worker connection %d "
|
|
||||||
"non-blocking.", i);
|
|
||||||
have_error = true;
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
|
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
|
||||||
{
|
{
|
||||||
elog(WARNING, "Error sending async query: %s\n%s",
|
elog(WARNING, "Error sending async query: %s\n%s",
|
||||||
index_jobs[i].create_index,
|
index_jobs[i].create_index,
|
||||||
PQerrorMessage(workers.conns[i]));
|
PQerrorMessage(workers.conns[i]));
|
||||||
|
PQclear(res);
|
||||||
have_error = true;
|
have_error = true;
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
num_active_workers = i;
|
/* Else we have more indexes to be built than workers
|
||||||
|
* available. That's OK, we'll get to them later.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
if (workers.num_workers > 1)
|
||||||
|
{
|
||||||
|
/* How many workers we kicked off earlier. */
|
||||||
|
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
|
||||||
|
|
||||||
/* Now go through our index builds, and look for any which is
|
/* Now go through our index builds, and look for any which is
|
||||||
* reported complete. Reassign that worker to the next index to
|
* reported complete. Reassign that worker to the next index to
|
||||||
@ -724,8 +715,9 @@ rebuild_indexes(const repack_table *table)
|
|||||||
{
|
{
|
||||||
index_jobs[i].status = INPROGRESS;
|
index_jobs[i].status = INPROGRESS;
|
||||||
index_jobs[i].worker_idx = freed_worker;
|
index_jobs[i].worker_idx = freed_worker;
|
||||||
elog(NOTICE, "Assigning worker %d execute job %d: %s",
|
elog(NOTICE, "Assigning worker %d to build index #%d: "
|
||||||
freed_worker, i, index_jobs[i].create_index);
|
"%s", freed_worker, i,
|
||||||
|
index_jobs[i].create_index);
|
||||||
|
|
||||||
if (!(PQsendQuery(workers.conns[freed_worker],
|
if (!(PQsendQuery(workers.conns[freed_worker],
|
||||||
index_jobs[i].create_index))) {
|
index_jobs[i].create_index))) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user