diff --git a/m2g-poller.py b/m2g-poller.py index 516c63d..8456011 100755 --- a/m2g-poller.py +++ b/m2g-poller.py @@ -1,9 +1,9 @@ #!/usr/bin/python """Gather Munin statistics and deliver to Carbon for Graphite display.""" -#from multiprocessing import Pool import argparse import logging +import multiprocessing import pickle import re import socket @@ -12,9 +12,10 @@ import sys import time LOGGING_FORMAT = "%(asctime)s : %(levelname)s : %(message)s" -HOSTLIST = ["localhost"] +HOSTLIST = ["localhost", "127.0.0.1"] RE_LEFTRIGHT = re.compile(r"^(?P\S+)\s+(?P\S+)$") + class Munin(): """Munin host object with querying getter functions.""" def __init__(self, hostname="localhost", port=4949, args=None): @@ -22,6 +23,11 @@ class Munin(): self.port = port self.args = args + def go(self): + """Bootstrap method to start processing hosts's Munin stats.""" + self.connect() + self.process_host_stats() + def connect(self): """Initial connection to Munin host.""" try: @@ -96,9 +102,11 @@ class Munin(): def process_host_stats(self): """Process Munin node data, potentially sending to Carbon.""" + start_timestamp = time.time() + logging.info("Querying host %s", self.hostname) plugins = self.list_plugins() logging.debug("Plugin List: %s", plugins) - timestamp = int(time.time()) + epoch_timestamp = int(start_timestamp) for current_plugin in plugins: logging.info("Fetching plugin: %s (Host: %s)", @@ -110,13 +118,21 @@ class Munin(): plugin_data = self.fetch(current_plugin) logging.debug("Plugin Data: %s", plugin_data) if self.args.carbon: - self.send_to_carbon(timestamp, current_plugin, plugin_config, plugin_data) + self.send_to_carbon(epoch_timestamp, + current_plugin, + plugin_config, + plugin_data) + end_timestamp = time.time() - start_timestamp + self.close_connection() + logging.info("Finished querying host %s (Execution Time: %.2f sec).", + self.hostname, end_timestamp) + def send_to_carbon(self, timestamp, plugin_name, plugin_config, plugin_data): """Send plugin data to Carbon over Pickle format.""" carbon_host, carbon_port = self.args.carbon.split(":") data_list = [] - logging.info("Creating metrics for plugin %s, timestamp: %d", + logging.info("Creating metric for plugin %s, timestamp: %d", plugin_name, timestamp) short_hostname = self.hostname.split(".")[0] for data_key in plugin_data: @@ -126,15 +142,15 @@ class Munin(): logging.debug("Creating metric %s, value: %s", metric, value) data_list.append((metric, (timestamp, value))) - payload = pickle.dumps(data_list) - header = struct.pack("!L", len(payload)) - message = header + payload if self.args.noop: logging.info("NOOP: Not sending data to Carbon") return logging.info("Sending plugin %s data to Carbon for host %s.", plugin_name, self.hostname) + payload = pickle.dumps(data_list) + header = struct.pack("!L", len(payload)) + message = header + payload carbon_sock = socket.create_connection((carbon_host, carbon_port), 10) carbon_sock.sendall(message) carbon_sock.close() @@ -155,30 +171,34 @@ def parse_args(): default=2, type=int, help="Verbosity level. 1:ERROR, 2:INFO/Default, 3:DEBUG.") + parser.add_argument("--poolsize", type=int, default=1, + help="Pool size of simultaneous connections for polling.") args = parser.parse_args() return args +def worker_bootstrap(host, args): + """Handles pool process allocation.""" + munin_obj = Munin(hostname=host, args=args) + return munin_obj.go() + +def worker_return(retval): + """Outputs any return values from each pool iteration.""" + logging.debug("Iteration Return Value: %s", retval) + if __name__ == '__main__': args = parse_args() - if args.verbose == 3: - LOGGING_LEVEL = logging.DEBUG - elif args.verbose == 1: + if args.verbose == 1: LOGGING_LEVEL = logging.ERROR + elif args.verbose == 3: + LOGGING_LEVEL = logging.DEBUG else: LOGGING_LEVEL = logging.INFO logging.basicConfig(format=LOGGING_FORMAT, level=LOGGING_LEVEL) - while True: - for current_host in HOSTLIST: - start_time = time.time() - logging.info("Querying host: %s", current_host) - munin_host = Munin(hostname=current_host, args=args) - munin_host.connect() - munin_host.process_host_stats() - munin_host.close_connection() - end_time = time.time() - elapsed_time = end_time - start_time - logging.info("Finished querying host %s (Execution Time: %.2f sec)", - current_host, elapsed_time) - time.sleep(60) + pool = multiprocessing.Pool(args.poolsize) + for host in HOSTLIST: + logging.debug("Adding host %s to the pool.", host) + pool.apply_async(worker_bootstrap, args = (host, args,), callback = worker_return) + pool.close() + pool.join()