multiprocessing support! uses a pool (default size: 1) to query multiple munin nodes at once.

lots of code cleanup: shows elapsed time of query, logging levels.
This commit is contained in:
Jeffrey Forman 2013-01-03 22:42:49 -05:00
parent 5d73640841
commit 9ed5a2ad13

View File

@ -1,9 +1,9 @@
#!/usr/bin/python #!/usr/bin/python
"""Gather Munin statistics and deliver to Carbon for Graphite display.""" """Gather Munin statistics and deliver to Carbon for Graphite display."""
#from multiprocessing import Pool
import argparse import argparse
import logging import logging
import multiprocessing
import pickle import pickle
import re import re
import socket import socket
@ -12,9 +12,10 @@ import sys
import time import time
LOGGING_FORMAT = "%(asctime)s : %(levelname)s : %(message)s" LOGGING_FORMAT = "%(asctime)s : %(levelname)s : %(message)s"
HOSTLIST = ["localhost"] HOSTLIST = ["localhost", "127.0.0.1"]
RE_LEFTRIGHT = re.compile(r"^(?P<left>\S+)\s+(?P<right>\S+)$") RE_LEFTRIGHT = re.compile(r"^(?P<left>\S+)\s+(?P<right>\S+)$")
class Munin(): class Munin():
"""Munin host object with querying getter functions.""" """Munin host object with querying getter functions."""
def __init__(self, hostname="localhost", port=4949, args=None): def __init__(self, hostname="localhost", port=4949, args=None):
@ -22,6 +23,11 @@ class Munin():
self.port = port self.port = port
self.args = args self.args = args
def go(self):
"""Bootstrap method to start processing hosts's Munin stats."""
self.connect()
self.process_host_stats()
def connect(self): def connect(self):
"""Initial connection to Munin host.""" """Initial connection to Munin host."""
try: try:
@ -96,9 +102,11 @@ class Munin():
def process_host_stats(self): def process_host_stats(self):
"""Process Munin node data, potentially sending to Carbon.""" """Process Munin node data, potentially sending to Carbon."""
start_timestamp = time.time()
logging.info("Querying host %s", self.hostname)
plugins = self.list_plugins() plugins = self.list_plugins()
logging.debug("Plugin List: %s", plugins) logging.debug("Plugin List: %s", plugins)
timestamp = int(time.time()) epoch_timestamp = int(start_timestamp)
for current_plugin in plugins: for current_plugin in plugins:
logging.info("Fetching plugin: %s (Host: %s)", logging.info("Fetching plugin: %s (Host: %s)",
@ -110,13 +118,21 @@ class Munin():
plugin_data = self.fetch(current_plugin) plugin_data = self.fetch(current_plugin)
logging.debug("Plugin Data: %s", plugin_data) logging.debug("Plugin Data: %s", plugin_data)
if self.args.carbon: if self.args.carbon:
self.send_to_carbon(timestamp, current_plugin, plugin_config, plugin_data) self.send_to_carbon(epoch_timestamp,
current_plugin,
plugin_config,
plugin_data)
end_timestamp = time.time() - start_timestamp
self.close_connection()
logging.info("Finished querying host %s (Execution Time: %.2f sec).",
self.hostname, end_timestamp)
def send_to_carbon(self, timestamp, plugin_name, plugin_config, plugin_data): def send_to_carbon(self, timestamp, plugin_name, plugin_config, plugin_data):
"""Send plugin data to Carbon over Pickle format.""" """Send plugin data to Carbon over Pickle format."""
carbon_host, carbon_port = self.args.carbon.split(":") carbon_host, carbon_port = self.args.carbon.split(":")
data_list = [] data_list = []
logging.info("Creating metrics for plugin %s, timestamp: %d", logging.info("Creating metric for plugin %s, timestamp: %d",
plugin_name, timestamp) plugin_name, timestamp)
short_hostname = self.hostname.split(".")[0] short_hostname = self.hostname.split(".")[0]
for data_key in plugin_data: for data_key in plugin_data:
@ -126,15 +142,15 @@ class Munin():
logging.debug("Creating metric %s, value: %s", metric, value) logging.debug("Creating metric %s, value: %s", metric, value)
data_list.append((metric, (timestamp, value))) data_list.append((metric, (timestamp, value)))
payload = pickle.dumps(data_list)
header = struct.pack("!L", len(payload))
message = header + payload
if self.args.noop: if self.args.noop:
logging.info("NOOP: Not sending data to Carbon") logging.info("NOOP: Not sending data to Carbon")
return return
logging.info("Sending plugin %s data to Carbon for host %s.", logging.info("Sending plugin %s data to Carbon for host %s.",
plugin_name, self.hostname) plugin_name, self.hostname)
payload = pickle.dumps(data_list)
header = struct.pack("!L", len(payload))
message = header + payload
carbon_sock = socket.create_connection((carbon_host, carbon_port), 10) carbon_sock = socket.create_connection((carbon_host, carbon_port), 10)
carbon_sock.sendall(message) carbon_sock.sendall(message)
carbon_sock.close() carbon_sock.close()
@ -155,30 +171,34 @@ def parse_args():
default=2, default=2,
type=int, type=int,
help="Verbosity level. 1:ERROR, 2:INFO/Default, 3:DEBUG.") help="Verbosity level. 1:ERROR, 2:INFO/Default, 3:DEBUG.")
parser.add_argument("--poolsize", type=int, default=1,
help="Pool size of simultaneous connections for polling.")
args = parser.parse_args() args = parser.parse_args()
return args return args
def worker_bootstrap(host, args):
"""Handles pool process allocation."""
munin_obj = Munin(hostname=host, args=args)
return munin_obj.go()
def worker_return(retval):
"""Outputs any return values from each pool iteration."""
logging.debug("Iteration Return Value: %s", retval)
if __name__ == '__main__': if __name__ == '__main__':
args = parse_args() args = parse_args()
if args.verbose == 3: if args.verbose == 1:
LOGGING_LEVEL = logging.DEBUG
elif args.verbose == 1:
LOGGING_LEVEL = logging.ERROR LOGGING_LEVEL = logging.ERROR
elif args.verbose == 3:
LOGGING_LEVEL = logging.DEBUG
else: else:
LOGGING_LEVEL = logging.INFO LOGGING_LEVEL = logging.INFO
logging.basicConfig(format=LOGGING_FORMAT, level=LOGGING_LEVEL) logging.basicConfig(format=LOGGING_FORMAT, level=LOGGING_LEVEL)
while True: pool = multiprocessing.Pool(args.poolsize)
for current_host in HOSTLIST: for host in HOSTLIST:
start_time = time.time() logging.debug("Adding host %s to the pool.", host)
logging.info("Querying host: %s", current_host) pool.apply_async(worker_bootstrap, args = (host, args,), callback = worker_return)
munin_host = Munin(hostname=current_host, args=args) pool.close()
munin_host.connect() pool.join()
munin_host.process_host_stats()
munin_host.close_connection()
end_time = time.time()
elapsed_time = end_time - start_time
logging.info("Finished querying host %s (Execution Time: %.2f sec)",
current_host, elapsed_time)
time.sleep(60)