Several fixes for concurrent index builds:
* Use poll() if it is available, or select() otherwise, to efficiently wait on index builds in worker queries to finish. * fix off-by-one error when initially assigning workers * move PQsetnonblocking() calls to setup_workers()
This commit is contained in:
		| @ -24,17 +24,32 @@ const char *PROGRAM_VERSION = "unknown"; | ||||
|  | ||||
| #include "pgut/pgut-fe.h" | ||||
|  | ||||
| #include <errno.h> | ||||
| #include <string.h> | ||||
| #include <stdlib.h> | ||||
| #include <unistd.h> | ||||
| #include <time.h> | ||||
|  | ||||
|  | ||||
| #ifdef HAVE_POLL_H | ||||
| #include <poll.h> | ||||
| #endif | ||||
| #ifdef HAVE_SYS_POLL_H | ||||
| #include <sys/poll.h> | ||||
| #endif | ||||
| #ifdef HAVE_SYS_SELECT_H | ||||
| #include <sys/select.h> | ||||
| #endif | ||||
|  | ||||
|  | ||||
| /* | ||||
|  * APPLY_COUNT: Number of applied logs per transaction. Larger values | ||||
|  * could be faster, but will be long transactions in the REDO phase. | ||||
|  */ | ||||
| #define APPLY_COUNT		1000 | ||||
|  | ||||
| /* poll() or select() timeout, in seconds */ | ||||
| #define POLL_TIMEOUT    3 | ||||
|  | ||||
| /* Compile an array of existing transactions which are active during | ||||
|  * pg_repack's setup. Some transactions we can safely ignore: | ||||
| @ -633,7 +648,7 @@ rebuild_indexes(const repack_table *table) | ||||
| 			 */ | ||||
| 			index_jobs[i].status = FINISHED; | ||||
| 		} | ||||
| 		else if (i <= workers.num_workers) { | ||||
| 		else if (i < workers.num_workers) { | ||||
| 			/* Assign available worker to build an index. */ | ||||
| 			index_jobs[i].status = INPROGRESS; | ||||
| 			index_jobs[i].worker_idx = i; | ||||
| @ -656,23 +671,67 @@ rebuild_indexes(const repack_table *table) | ||||
| 	} | ||||
| 	PQclear(res); | ||||
|  | ||||
| 	/* How many workers we kicked off earlier. */ | ||||
| 	num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; | ||||
|  | ||||
| 	if (workers.num_workers > 1) | ||||
| 	{ | ||||
| 		/* How many workers we kicked off earlier. */ | ||||
| 		num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes; | ||||
| 		int freed_worker = -1; | ||||
| 		int ret; | ||||
|  | ||||
| /* Prefer poll() over select(), following PostgreSQL custom. */ | ||||
| #ifdef HAVE_POLL | ||||
| 		struct pollfd *input_fds; | ||||
|  | ||||
| 		input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers); | ||||
| 		for (i = 0; i < num_active_workers; i++) | ||||
| 		{ | ||||
| 			input_fds[i].fd = PQsocket(workers.conns[i]); | ||||
| 			input_fds[i].events = POLLIN | POLLERR; | ||||
| 			input_fds[i].revents = 0; | ||||
| 		} | ||||
| #else | ||||
| 		fd_set input_mask; | ||||
| 		struct timeval timeout; | ||||
| 		/* select() needs the highest-numbered socket descriptor */ | ||||
| 		int max_fd = 0; | ||||
|  | ||||
| 		FD_ZERO(&input_mask); | ||||
| 		for (i = 0; i < num_active_workers; i++) | ||||
| 		{ | ||||
| 			FD_SET(PQsocket(workers.conns[i]), &input_mask); | ||||
| 			if (PQsocket(workers.conns[i]) > max_fd) | ||||
| 				max_fd = PQsocket(workers.conns[i]); | ||||
| 		} | ||||
| #endif | ||||
|  | ||||
| 		/* Now go through our index builds, and look for any which is | ||||
| 		 * reported complete. Reassign that worker to the next index to | ||||
| 		 * be built, if any. | ||||
| 		 */ | ||||
| 		while (num_active_workers) | ||||
| 		while (num_active_workers > 0) | ||||
| 		{ | ||||
| 			int freed_worker = -1; | ||||
| 			elog(DEBUG2, "polling %d active workers", num_active_workers); | ||||
|  | ||||
| #ifdef HAVE_POLL | ||||
| 			ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000); | ||||
| #else | ||||
| 			/* re-initialize timeout before each invocation of select() | ||||
| 			 * just in case select() modifies timeout to indicate remaining | ||||
| 			 * time. | ||||
| 			 */ | ||||
| 			timeout.tv_sec = POLL_TIMEOUT; | ||||
| 			timeout.tv_usec = 0; | ||||
| 			ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout); | ||||
| #endif | ||||
| 			if (ret < 0 && errno != EINTR) | ||||
| 				elog(ERROR, "poll() failed: %d, %d", ret, errno); | ||||
|  | ||||
| 			for (i = 0; i < num_indexes; i++) | ||||
| 			{ | ||||
| 				if (index_jobs[i].status == INPROGRESS) | ||||
| 				{ | ||||
| 					Assert(index_jobs[i].worker_idx >= 0); | ||||
| 					/* Must call PQconsumeInput before we can check PQisBusy */ | ||||
| 					if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1) | ||||
| 					{ | ||||
| @ -699,7 +758,13 @@ rebuild_indexes(const repack_table *table) | ||||
| 							} | ||||
| 							PQclear(res); | ||||
| 						} | ||||
|  | ||||
| 						 | ||||
| 						/* We are only going to re-queue one worker, even | ||||
| 						 * though more than one index build might be finished. | ||||
| 						 * Any other jobs which may be finished will | ||||
| 						 * just have to wait for the next pass through the | ||||
| 						 * poll()/select() loop. | ||||
| 						 */ | ||||
| 						freed_worker = index_jobs[i].worker_idx; | ||||
| 						index_jobs[i].status = FINISHED; | ||||
| 						num_active_workers--; | ||||
| @ -733,7 +798,6 @@ rebuild_indexes(const repack_table *table) | ||||
| 				} | ||||
| 				freed_worker = -1; | ||||
| 			} | ||||
| 			sleep(1); | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| @ -980,7 +1044,6 @@ repack_one_table(const repack_table *table, const char *orderby) | ||||
| 		goto cleanup; | ||||
| 	} | ||||
|  | ||||
|  | ||||
| 	/* | ||||
| 	 * 4. Apply log to temp table until no tuples are left in the log | ||||
| 	 * and all of the old transactions are finished. | ||||
|  | ||||
| @ -82,7 +82,11 @@ setup_workers(int num_workers) | ||||
|  			 */ | ||||
|  			elog(DEBUG2, "Setting up worker conn %d", i); | ||||
|  | ||||
|  			/* Don't confuse pgut_connections by using pgut_connect() */ | ||||
|  			/* Don't confuse pgut_connections by using pgut_connect() | ||||
| 			 * | ||||
| 			 * XXX: could use PQconnectStart() and PQconnectPoll() to | ||||
| 			 * open these connections in non-blocking manner. | ||||
| 			 */ | ||||
|  			conn = PQconnectdb(buf.data); | ||||
|  			if (PQstatus(conn) == CONNECTION_OK) | ||||
|  			{ | ||||
| @ -94,6 +98,15 @@ setup_workers(int num_workers) | ||||
| 					 PQerrorMessage(conn)); | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
|             /* Make sure each worker connection can work in non-blocking | ||||
|              * mode. | ||||
|              */ | ||||
|             if (PQsetnonblocking(workers.conns[i], 1)) | ||||
| 			{ | ||||
| 				elog(ERROR, "Unable to set worker connection %d " | ||||
| 					 "non-blocking.", i); | ||||
| 			} | ||||
|  		} | ||||
| 		/* In case we bailed out of setting up all workers, record | ||||
| 		 * how many successful worker conns we actually have. | ||||
|  | ||||
		Reference in New Issue
	
	Block a user