diff --git a/README.md b/README.md index 787fdb9..50a769f 100644 --- a/README.md +++ b/README.md @@ -12,13 +12,58 @@ Collect Munin statistics, and convert their naming structure to submit to Graphi Clone the Git repository from [https://github.com/jforman/munin-graphite](https://github.com/jforman/munin-graphite) -## Example +## Program invocation -As of 20130209, the poller is best configured via a Cronjob that runs once a minute. +Optimal way is to run m2g-poller using **nohup** and define all hosts within an configuration file: - ./m2g-poller.py --muninhost localhost --carbon carbonhost:2004 - -Metrics paths are created using the hostname and various plugin data. The processes count plugin for Munin would produce metrics and values like the following: +Example: +nohup ./m2g-poller.py --config /etc/munin-graphite/hosts.cfg & + + +## Configuration file looks like: +``` +[node-001] +carbon=carbon01.company.com:2004 +host=node-001.infra.company.com +interval=60 +prefix=munin + +[node-002] +carbon=carbon99.company.com:2004 +host=node-002.infra.company.com +interval=120 +prefix=munin +``` +Runing with config above will start two threads, each thread for one node's section and these threads will in defined +intervals fetch data from munin-node on targets. + +If you do not specify all parameters they will be set to default values as described in --help option. + +You can run m2g-poller.py from commandline with parameters directly to override some or all of supported options, +in that case all specified option will be use instead of built-in defaults for nodes in configuration file. + +Example: +./m2g-poller.py --host node-003.company.com --displayname node-003 --carbon carbon01.infra.company.com:2004 --interval 90 --prefix someprefix + +## Signal handling + +* you can send SIGTERM to m2g-poller.py program. This would terminate program's run after all threads will finish its + current cycle. +* pressing CTRL+C when running from command line will terminate the program as well. +* you can send SIGHUP to m2g-poller.py program. This would signalize the program it should reload list of plugins of all + nodes from target list. + +## System log + +Program logs information into syslog, using prefix MUNIN-GRAPHITE and identification of originating thread. + +Example: +MUNIN-GRAPHITE: INFO Thread node-009.company.com: Finished querying host node-009.company.com (Execution Time: 5.12 sec). + +## Metrics + +Metrics paths are created using the hostname and various plugin data. The processes count plugin for Munin would produce +metrics and values like the following: servers.localhost.processes.processes.uninterruptible, value: 0 servers.localhost.processes.processes.processes, value: 224 @@ -31,6 +76,3 @@ Metrics paths are created using the hostname and various plugin data. The proces These paths of data are then sent to Carbon via the pickle port. -## TODO - -Logic will eventually be added to allow the m2g-poller to be run in the background with a list of hosts to query and not require a cronjob. diff --git a/m2g-poller.py b/m2g-poller.py index 0051e5f..773058b 100755 --- a/m2g-poller.py +++ b/m2g-poller.py @@ -2,24 +2,29 @@ """Gather Munin statistics and deliver to Carbon for Graphite display.""" import argparse +import ConfigParser import logging +import logging.handlers import pickle import re import socket import struct import sys import time +import signal +import threading -LOGGING_FORMAT = "%(asctime)s:%(levelname)s:%(message)s" RE_LEFTRIGHT = re.compile(r"^(?P\S+)\s+(?P\S+)$") RE_MUNIN_NODE_NAME = re.compile(r"^# munin node at\s+(?P\S+)$") -## TODO: Catch keyboard interrupt properly and die when requested +threads = [] +shutdown = False class Munin(): """Munin host object with querying getter functions.""" - def __init__(self, hostname, port=4949, args=None): + + def __init__(self, hostname, thread, port=4949, args=None): self.hostname = None self.remotenode = None @@ -27,28 +32,32 @@ class Munin(): self._conn = None self._carbon_sock = None self.hello_string = None + self.reload_plugins = True + self.plugins = {} + self.plugins_config = {} if ':' in hostname: - self.hostname, self.remotenode = hostname.split(":",1) + self.hostname, self.remotenode = hostname.split(":", 1) else: self.hostname = hostname self.port = port self.args = args if self.args.displayname: - self.displayname = self.args.displayname + self.displayname = self.args.displayname.split(".")[0] else: self.displayname = self.hostname.split(".")[0] - + self.thread = thread def go(self): """Bootstrap method to start processing hosts's Munin stats.""" + global shutdown self.connect() self.update_hostname() processing_time = self.process_host_stats() - interval = self.args.interval + interval = int(self.args.interval) - while True and interval != 0: + while True and interval != 0 and not shutdown: sleep_time = max(interval - processing_time, 0) time.sleep(sleep_time) self.connect() @@ -62,8 +71,8 @@ class Munin(): node_name = RE_MUNIN_NODE_NAME.search(self.hello_string).group(1) self.displayname = node_name.split(".")[0] except AttributeError: - logging.info("Unable to obtain munin node name from: %s", - self.hello_string) + logger.info("Thread %s: Unable to obtain munin node name from: %s", + self.thread.name, self.hello_string) return def connect(self): @@ -71,16 +80,16 @@ class Munin(): try: self._sock = socket.create_connection((self.hostname, self.port), 10) except socket.error: - logging.exception("Unable to connect to Munin host %s, port: %s", - self.hostname, self.port) + logger.exception("Thread %s: Unable to connect to Munin host %s, port: %s", + self.thread.name, self.hostname, self.port) sys.exit(1) try: self._conn = self._sock.makefile() self.hello_string = self._readline() except socket.error: - logging.exception("Unable to communicate to Munin host %s, port: %s", - self.hostname, self.port) + logger.exception("Thread %s: Unable to communicate to Munin host %s, port: %s", + self.thread.name, self.hostname, self.port) if self.args.carbon: self.connect_carbon() @@ -90,8 +99,8 @@ class Munin(): try: self._carbon_sock = socket.create_connection((carbon_host, carbon_port), 10) except socket.error: - logging.exception("Unable to connect to Carbon on host %s, port: %s", - carbon_host, carbon_port) + logger.exception("Thread %s: Unable to connect to Carbon on host %s, port: %s", + self.thread.name, carbon_host, carbon_port) sys.exit(1) def close_connection(self): @@ -111,7 +120,7 @@ class Munin(): """Iterator over Munin output.""" while True: current_line = self._readline() - logging.debug("Iterating over line: %s", current_line) + logger.debug("Thread %s: Iterating over line: %s", self.thread.name, current_line) if not current_line: break if current_line.startswith("#"): @@ -132,14 +141,14 @@ class Munin(): multigraph_prefix = multigraph.rstrip(".") + "." response[multigraph] = {} continue - # Some munin plugins have more than one space between key and value. + # Some munin plugins have more than one space between key and value. try: full_key_name, key_value = RE_LEFTRIGHT.search(current_line).group(1, 2) key_name = multigraph_prefix + full_key_name.split(".")[0] response[multigraph][key_name] = key_value except (KeyError, AttributeError): - logging.info("plugin %s returned invalid data [%s] for host" - " %s\n", plugin, current_line, self.hostname) + logger.info("Thread %s: Plugin %s returned invalid data [%s] for host" + " %s\n", self.thread.name, plugin, current_line, self.hostname) return response @@ -149,12 +158,25 @@ class Munin(): self._readline() # ignore response if self.remotenode: + logger.info("Thread %s: Asking for plugin list for remote node %s", self.thread.name, self.remotenode) self._sock.sendall("list %s\n" % self.remotenode) else: + logger.info("Thread %s: Asking for plugin list for local node %s", self.thread.name, self.hostname) self._sock.sendall("list\n") plugin_list = self._readline().split(" ") - return plugin_list + if self.args.filter: + try: + filteredlist = [plugin for plugin in plugin_list if re.search(self.args.filter, plugin, re.IGNORECASE)] + plugin_list = filteredlist + except re.error: + logger.info("Thread %s: Filter regexp for plugin list is not valid: %s" % self.args.filter) + # if there is no filter or we have got an re.error, simply return full list + result_list = [] + for plugin in plugin_list: + if len(plugin.strip()) > 0: + result_list.append(plugin) + return result_list def get_config(self, plugin): """Get config values for Munin plugin.""" @@ -189,35 +211,43 @@ class Munin(): def process_host_stats(self): """Process Munin node data, potentially sending to Carbon.""" start_timestamp = time.time() - logging.info("Querying host %s", self.hostname) - plugins = self.list_plugins() - logging.debug("Plugin List: %s", plugins) + logger.info("Thread %s: Querying host %s", self.thread.name, self.hostname) + # to be more efficient, load list of plugins just in case we do not have any + if self.reload_plugins: + self.plugins_config = {} + self.plugins = self.list_plugins() + self.reload_plugins = False + logger.debug("Thread %s: Plugin List: %s", self.thread.name, self.plugins) epoch_timestamp = int(start_timestamp) - for current_plugin in plugins: - logging.info("Fetching plugin: %s (Host: %s)", - current_plugin, self.hostname) + for current_plugin in self.plugins: + logger.info("Thread %s: Fetching plugin: %s (Host: %s)", + self.thread.name, current_plugin, self.hostname) - plugin_config = self.get_config(current_plugin) - logging.debug("Plugin Config: %s", plugin_config) + # after (re)load of list of plugins we have to load their configurations too + try: + self.plugins_config[current_plugin] + except KeyError: + self.plugins_config[current_plugin] = self.get_config(current_plugin) + logger.debug("Thread %s: Plugin Config: %s", self.thread.name, self.plugins_config[current_plugin]) plugin_data = self.fetch(current_plugin) - logging.debug("Plugin Data: %s", plugin_data) + logger.debug("Thread %s: Plugin Data: %s", self.thread.name, plugin_data) if self.args.carbon: - for multigraph in plugin_config: + for multigraph in self.plugins_config[current_plugin]: try: self.send_to_carbon(epoch_timestamp, current_plugin, - plugin_config[multigraph], + self.plugins_config[current_plugin][multigraph], plugin_data[multigraph]) except KeyError: - logging.info("plugin returns invalid data:\n plugin_config: %r host %s.", - plugin_config, self.hostname) + logger.info("Thread %s: Plugin returns invalid data:\n plugin_config: %r host %s.", + self.thread.name, self.plugins_config[current_plugin], self.hostname) end_timestamp = time.time() - start_timestamp self.close_connection() self.close_carbon_connection() - logging.info("Finished querying host %s (Execution Time: %.2f sec).", - self.hostname, end_timestamp) + logger.info("Thread %s: Finished querying host %s (Execution Time: %.2f sec).", + self.thread.name, self.hostname, end_timestamp) return end_timestamp def send_to_carbon(self, timestamp, plugin_name, plugin_config, plugin_data): @@ -232,42 +262,83 @@ class Munin(): hostname = self.remotenode data_list = [] - logging.info("Creating metric for plugin %s, timestamp: %d", - plugin_name, timestamp) + logger.info("Creating metric for plugin %s, timestamp: %d", + plugin_name, timestamp) for data_key in plugin_data: try: plugin_category = plugin_config["graph_category"] metric = "%s%s.%s.%s.%s" % (prefix, self.displayname, plugin_category, plugin_name, data_key) value = plugin_data[data_key] - logging.debug("Creating metric %s, value: %s", metric, value) + logger.debug("Creating metric %s, value: %s", metric, value) data_list.append((metric, (timestamp, value))) except KeyError: - logging.info("plugin returns invalid data:\n plugin_config: %r host %s.", plugin_config, self.hostname) + logger.info("plugin returns invalid data:\n plugin_config: %r host %s.", plugin_config, self.hostname) if self.args.noop: - logging.info("NOOP: Not sending data to Carbon") + logger.info("NOOP: Not sending data to Carbon") return - logging.info("Sending plugin %s data to Carbon for host %s.", - plugin_name, hostname) + logger.info("Sending plugin %s data to Carbon for host %s.", + plugin_name, hostname) payload = pickle.dumps(data_list) header = struct.pack("!L", len(payload)) message = header + payload try: self._carbon_sock.sendall(message) - logging.info("Finished sending plugin %s data to Carbon for host %s.", - plugin_name, self.hostname) + logger.info("Finished sending plugin %s data to Carbon for host %s.", + plugin_name, self.hostname) except socket.error: - logging.exception("Unable to send data to Carbon") + logger.exception("Unable to send data to Carbon") +### +# Custom Threading class, one thread for each host in configuration +### +class MuninThread(threading.Thread): + def __init__(self, params, cmdlineargs): + threading.Thread.__init__(self) + self.name = params['host'] + self.shutdown = False + # construct new namespace to pass it to the new Munin class instance + # for better manipulation, just prepare writable dcfg "link" to new namespace + cfg = argparse.Namespace() + dcfg = vars(cfg) + + #construct final arguments Namespace + for v in vars(cmdlineargs): + try: + dcfg[v] = params[v] + except KeyError: + dcfg[v] = getattr(cmdlineargs, v, None) + + self.munin = Munin(hostname=self.name, args=cfg, thread=self) + + def run(self): + logger.info("Starting thread for %s." % self.name) + self.munin.go() + logger.info("Finishing thread for %s." % self.name) + + def dostop(self): + global shutdown + logger.info("Thread %s: Got signal to stop." % self.name) + shutdown = True + + def reload(self): + self.munin.reload_plugins = True + logger.info("Thread %s: Got signal to reload." % self.name) + + +### +# bellow are common function +### def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Send Munin statistics to Graphite.") - parser.add_argument("--carbon", + parser.add_argument("--config", "-c", action="store", - help="Carbon host and Pickle port (ex: localhost:2004).") + default=False, + help="Configuration file with list of hosts and their plugins to fetch.") parser.add_argument("--host", action="store", default="localhost", @@ -277,6 +348,13 @@ def parse_args(): default=False, help="If defined, use this as the name to store metrics in Graphite instead of the Munin" " hostname.") + parser.add_argument("--carbon", + action="store", + help="Carbon host and Pickle port (ex: localhost:2004).") + parser.add_argument("--filter", + action="store", + default='.*', + help="Regular expression for selecting only defined subset of received plugins.") parser.add_argument("--interval", type=int, default=60, @@ -303,7 +381,71 @@ def parse_args(): return args +### +# stop all threads and exit +### +def handler_term(signum=signal.SIGTERM, frame=None): + global threads + + for t in threads: + t.dostop() + + +### +# set all threads to reload information about all munin-node's plugins +### +def handler_hup(signum, frame=None): + global threads + + for t in threads: + t.reload() + + +def read_configuration(configfile): + """ + Returns False if configuration file is not readable, list of dictionaries otherwise + + Configuration options follow parameters described as command line options. All parameters are optional except host, + displayname parameter is built from section name, so it is always presented too. + + Non-existent options are superseded by defaults + + Example: + [servername] + host=fqdn[:remotenode] + port=4949 + carbon=carbonhostfqdn:port + interval=60 + prefix=prefix for Graphite's target + noprefix=True|False + filter=^cpu.* + + @param configfile: full filepath to configuration file + @rtype : object + """ + + cf = ConfigParser.ConfigParser() + hostscfg = [] + try: + cf.read(configfile) + for section in cf.sections(): + di = {} + for ki, vi in cf.items(section): + # construct dictionary item + di[ki] = vi + if "host" in di.keys(): + di["displayname"] = section + hostscfg.append(di) + except ConfigParser.Error as e: + logger.critical("Failed to parse configuration or command line options. Exception was %s. Giving up." % e) + + return hostscfg + + def main(): + global threads + global logger + args = parse_args() if args.verbose == 1: logging_level = logging.ERROR @@ -312,9 +454,43 @@ def main(): else: logging_level = logging.INFO - logging.basicConfig(format=LOGGING_FORMAT, level=logging_level) - munin = Munin(hostname=args.host, args=args) - munin.go() + #logging.basicConfig(format=LOGGING_FORMAT, level=logging_level) + logger = logging.getLogger() + logger.setLevel(logging_level) + syslog = logging.handlers.SysLogHandler(address='/dev/log') + formatter = logging.Formatter('MUNIN-GRAPHITE: %(levelname)s %(message)s') + syslog.setFormatter(formatter) + logger.addHandler(syslog) + + # block for setting handling of signals + signal.signal(signal.SIGHUP, handler_hup) + signal.signal(signal.SIGTERM, handler_term) + signal.signal(signal.SIGINT, handler_term) + + hosts = list() + if args.config: + hosts = read_configuration(args.config) + if not hosts: + # no file configuration, trying to use commandline arguments only and construct one-item dictionary + hosts.append({'host': args.host}) + # we have got some items in hosts's list + for host in hosts: + logging.info("Going to thread with config %s" % host) + threads.append(MuninThread(host, args)) + + for t in threads: + t.start() + + while True: + try: + if not any([t.isAlive() for t in threads]): + logging.info("All threads finished, exiting.") + break + else: + time.sleep(1) + except KeyboardInterrupt: + handler_term() + if __name__ == '__main__': main()