Fix up buggy initialization code for poll() and select().
Also some logging and variable name cleanup.
This commit is contained in:
		| @ -602,7 +602,8 @@ rebuild_indexes(const repack_table *table) | ||||
| 	const char	   *params[1]; | ||||
| 	int			    num_indexes; | ||||
| 	int				i; | ||||
| 	int				num_active_workers = 0; | ||||
| 	int				num_active_workers; | ||||
| 	int				num_workers; | ||||
| 	repack_index   *index_jobs; | ||||
| 	char			buffer[12]; | ||||
| 	bool            have_error = false; | ||||
| @ -630,8 +631,16 @@ rebuild_indexes(const repack_table *table) | ||||
| 		" FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params); | ||||
|  | ||||
| 	num_indexes = PQntuples(res); | ||||
|  | ||||
| 	/* We might have more actual worker connectionss than we need, | ||||
| 	 * if the number of workers exceeds the number of indexes to be | ||||
| 	 * built. In that case, ignore the extra workers. | ||||
| 	 */ | ||||
| 	num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; | ||||
| 	num_active_workers = num_workers; | ||||
|  | ||||
| 	elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes, | ||||
| 		 workers.num_workers); | ||||
| 		 num_workers); | ||||
|  | ||||
| 	index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes); | ||||
|  | ||||
| @ -651,7 +660,7 @@ rebuild_indexes(const repack_table *table) | ||||
| 		elog(DEBUG2, "target_oid   : %u", index_jobs[i].target_oid); | ||||
| 		elog(DEBUG2, "create_index : %s", index_jobs[i].create_index); | ||||
|  | ||||
| 		if (workers.num_workers <= 1) { | ||||
| 		if (num_workers <= 1) { | ||||
| 			/* Use primary connection if we are not setting up parallel | ||||
| 			 * index building, or if we only have one worker. | ||||
| 			 */ | ||||
| @ -662,12 +671,12 @@ rebuild_indexes(const repack_table *table) | ||||
| 			 */ | ||||
| 			index_jobs[i].status = FINISHED; | ||||
| 		} | ||||
| 		else if (i < workers.num_workers) { | ||||
| 		else if (i < num_workers) { | ||||
| 			/* Assign available worker to build an index. */ | ||||
| 			index_jobs[i].status = INPROGRESS; | ||||
| 			index_jobs[i].worker_idx = i; | ||||
| 			elog(DEBUG2, "Worker %d building index: %s", i, | ||||
| 				 index_jobs[i].create_index); | ||||
| 			elog(LOG, "Initial worker %d to build index: %s", | ||||
| 				 i, index_jobs[i].create_index); | ||||
|  | ||||
| 			if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index))) | ||||
| 			{ | ||||
| @ -685,20 +694,18 @@ rebuild_indexes(const repack_table *table) | ||||
| 	} | ||||
| 	PQclear(res); | ||||
|  | ||||
| 	/* How many workers we kicked off earlier. */ | ||||
| 	num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; | ||||
|  | ||||
| 	if (workers.num_workers > 1) | ||||
| 	if (num_workers > 1) | ||||
| 	{ | ||||
| 		int freed_worker = -1; | ||||
| 		int ret; | ||||
|  | ||||
| /* Prefer poll() over select(), following PostgreSQL custom. */ | ||||
| #undef HAVE_POLL | ||||
| #ifdef HAVE_POLL | ||||
| 		struct pollfd *input_fds; | ||||
|  | ||||
| 		input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers); | ||||
| 		for (i = 0; i < num_active_workers; i++) | ||||
| 		input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers); | ||||
| 		for (i = 0; i < num_workers; i++) | ||||
| 		{ | ||||
| 			input_fds[i].fd = PQsocket(workers.conns[i]); | ||||
| 			input_fds[i].events = POLLIN | POLLERR; | ||||
| @ -708,15 +715,7 @@ rebuild_indexes(const repack_table *table) | ||||
| 		fd_set input_mask; | ||||
| 		struct timeval timeout; | ||||
| 		/* select() needs the highest-numbered socket descriptor */ | ||||
| 		int max_fd = 0; | ||||
|  | ||||
| 		FD_ZERO(&input_mask); | ||||
| 		for (i = 0; i < num_active_workers; i++) | ||||
| 		{ | ||||
| 			FD_SET(PQsocket(workers.conns[i]), &input_mask); | ||||
| 			if (PQsocket(workers.conns[i]) > max_fd) | ||||
| 				max_fd = PQsocket(workers.conns[i]); | ||||
| 		} | ||||
| 		int max_fd; | ||||
| #endif | ||||
|  | ||||
| 		/* Now go through our index builds, and look for any which is | ||||
| @ -728,14 +727,23 @@ rebuild_indexes(const repack_table *table) | ||||
| 			elog(DEBUG2, "polling %d active workers", num_active_workers); | ||||
|  | ||||
| #ifdef HAVE_POLL | ||||
| 			ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000); | ||||
| 			ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000); | ||||
| #else | ||||
| 			/* re-initialize timeout before each invocation of select() | ||||
| 			 * just in case select() modifies timeout to indicate remaining | ||||
| 			 * time. | ||||
| 			/* re-initialize timeout and input_mask before each | ||||
| 			 * invocation of select(). I think this isn't | ||||
| 			 * necessary on many Unixen, but just in case. | ||||
| 			 */ | ||||
| 			timeout.tv_sec = POLL_TIMEOUT; | ||||
| 			timeout.tv_usec = 0; | ||||
|  | ||||
| 			FD_ZERO(&input_mask); | ||||
| 			for (i = 0, max_fd = 0; i < num_workers; i++) | ||||
| 			{ | ||||
| 				FD_SET(PQsocket(workers.conns[i]), &input_mask); | ||||
| 				if (PQsocket(workers.conns[i]) > max_fd) | ||||
| 					max_fd = PQsocket(workers.conns[i]); | ||||
| 			} | ||||
|  | ||||
| 			ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); | ||||
| #endif | ||||
| 			if (ret < 0 && errno != EINTR) | ||||
| @ -756,7 +764,7 @@ rebuild_indexes(const repack_table *table) | ||||
| 					} | ||||
| 					if (!PQisBusy(workers.conns[index_jobs[i].worker_idx])) | ||||
| 					{ | ||||
| 						elog(NOTICE, "Command finished in worker %d: %s", | ||||
| 						elog(LOG, "Command finished in worker %d: %s", | ||||
| 							 index_jobs[i].worker_idx, | ||||
| 							 index_jobs[i].create_index); | ||||
|  | ||||
| @ -794,7 +802,7 @@ rebuild_indexes(const repack_table *table) | ||||
| 					{ | ||||
| 						index_jobs[i].status = INPROGRESS; | ||||
| 						index_jobs[i].worker_idx = freed_worker; | ||||
| 						elog(NOTICE, "Assigning worker %d to build index #%d: " | ||||
| 						elog(LOG, "Assigning worker %d to build index #%d: " | ||||
| 							 "%s", freed_worker, i, | ||||
| 							 index_jobs[i].create_index); | ||||
|  | ||||
|  | ||||
		Reference in New Issue
	
	Block a user