520 lines
19 KiB
Python
520 lines
19 KiB
Python
#!/usr/bin/env python
|
|
"""Gather Munin statistics and deliver to Carbon for Graphite display."""
|
|
|
|
import argparse
|
|
import ConfigParser
|
|
import logging
|
|
import logging.handlers
|
|
import pickle
|
|
import re
|
|
import socket
|
|
import struct
|
|
import sys
|
|
import time
|
|
import signal
|
|
import threading
|
|
|
|
RE_LEFTRIGHT = re.compile(r"^(?P<left>\S+)\s+(?P<right>\S+)$")
|
|
RE_MUNIN_NODE_NAME = re.compile(r"^# munin node at\s+(?P<nodename>\S+)$")
|
|
|
|
threads = []
|
|
shutdown = False
|
|
|
|
|
|
class Munin():
|
|
"""Munin host object with querying getter functions."""
|
|
|
|
def __init__(self, hostname, thread, port=4949, args=None):
|
|
|
|
self.hostname = None
|
|
self.remotenode = None
|
|
self._sock = None
|
|
self._conn = None
|
|
self._carbon_sock = None
|
|
self.hello_string = None
|
|
self.reload_plugins = True
|
|
self.plugins = {}
|
|
self.plugins_config = {}
|
|
|
|
if ':' in hostname:
|
|
self.hostname, self.remotenode = hostname.split(":", 1)
|
|
else:
|
|
self.hostname = hostname
|
|
self.port = port
|
|
self.args = args
|
|
|
|
if self.args.displayname:
|
|
self.displayname = self.args.displayname.split(".")[0]
|
|
else:
|
|
self.displayname = self.hostname.split(".")[0]
|
|
self.thread = thread
|
|
|
|
def go(self):
|
|
"""Bootstrap method to start processing hosts's Munin stats."""
|
|
global shutdown
|
|
self.connect()
|
|
self.update_hostname()
|
|
processing_time = self.process_host_stats()
|
|
interval = int(self.args.interval)
|
|
|
|
while True and interval != 0 and not shutdown:
|
|
sleep_time = max(interval - processing_time, 0)
|
|
time.sleep(sleep_time)
|
|
self.connect()
|
|
processing_time = self.process_host_stats()
|
|
|
|
def update_hostname(self):
|
|
"""Updating hostname from connection hello string."""
|
|
if self.args.displayname:
|
|
return
|
|
try:
|
|
node_name = RE_MUNIN_NODE_NAME.search(self.hello_string).group(1)
|
|
self.displayname = node_name.split(".")[0]
|
|
except AttributeError:
|
|
logger.info("Thread %s: Unable to obtain munin node name from: %s",
|
|
self.thread.name, self.hello_string)
|
|
return
|
|
|
|
def connect(self):
|
|
"""Initial connection to Munin host."""
|
|
try:
|
|
self._sock = socket.create_connection((self.hostname, self.port), 10)
|
|
self._sock.settimeout(50)
|
|
except socket.error:
|
|
logger.info("Thread %s: Unable to connect to Munin host %s, port: %s",
|
|
self.thread.name, self.hostname, self.port)
|
|
return False
|
|
logger.info("Thread %s: connected to munin", self.thread.name)
|
|
|
|
try:
|
|
self._conn = self._sock.makefile()
|
|
self.hello_string = self._readline()
|
|
except socket.error:
|
|
logger.info("Thread %s: Unable to communicate to Munin host %s, port: %s",
|
|
self.thread.name, self.hostname, self.port)
|
|
self.close_connection()
|
|
return False
|
|
|
|
if self.args.carbon:
|
|
self.connect_carbon()
|
|
|
|
return True
|
|
|
|
def connect_carbon(self):
|
|
carbon_host, carbon_port = self.args.carbon.split(":")
|
|
try:
|
|
self._carbon_sock = socket.create_connection((carbon_host, carbon_port), 10)
|
|
except:
|
|
logger.exception("Thread %s: Unable to connect to Carbon on host %s, port: %s",
|
|
self.thread.name, carbon_host, carbon_port)
|
|
sys.exit(1)
|
|
|
|
def close_connection(self):
|
|
"""Close connection to Munin host."""
|
|
self._sock.close()
|
|
|
|
def close_carbon_connection(self):
|
|
"""Close connection to Carbon host."""
|
|
if self._carbon_sock:
|
|
self._carbon_sock.close()
|
|
|
|
def _readline(self):
|
|
"""Read one line from Munin output, stripping leading/trailing chars."""
|
|
return self._conn.readline().strip()
|
|
|
|
def _iterline(self):
|
|
"""Iterator over Munin output."""
|
|
while True:
|
|
current_line = self._readline()
|
|
logger.debug("Thread %s: Iterating over line: %s", self.thread.name, current_line)
|
|
if not current_line:
|
|
break
|
|
if current_line.startswith("#"):
|
|
continue
|
|
if current_line.rstrip().endswith("value"):
|
|
continue
|
|
if current_line == ".":
|
|
break
|
|
yield current_line
|
|
|
|
def fetch(self, plugin):
|
|
"""Fetch plugin's data fields from Munin."""
|
|
self._sock.sendall("fetch %s\n" % plugin)
|
|
response = {None: {}}
|
|
multigraph = None
|
|
multigraph_prefix = ""
|
|
for current_line in self._iterline():
|
|
if current_line.startswith("multigraph "):
|
|
multigraph = current_line[11:]
|
|
multigraph_prefix = multigraph.rstrip(".") + "."
|
|
response[multigraph] = {}
|
|
continue
|
|
# Some munin plugins have more than one space between key and value.
|
|
try:
|
|
full_key_name, key_value = RE_LEFTRIGHT.search(current_line).group(1, 2)
|
|
key_name = multigraph_prefix + full_key_name.split(".")[0]
|
|
response[multigraph][key_name] = key_value
|
|
except (KeyError, AttributeError):
|
|
logger.info("Thread %s: Plugin %s returned invalid data [%s] for host"
|
|
" %s\n", self.thread.name, plugin, current_line, self.hostname)
|
|
|
|
return response
|
|
|
|
def list_plugins(self):
|
|
"""Return a list of Munin plugins configured on a node. """
|
|
self._sock.sendall("cap multigraph\n")
|
|
self._readline() # ignore response
|
|
|
|
if self.remotenode:
|
|
logger.info("Thread %s: Asking for plugin list for remote node %s", self.thread.name, self.remotenode)
|
|
self._sock.sendall("list %s\n" % self.remotenode)
|
|
else:
|
|
logger.info("Thread %s: Asking for plugin list for local node %s", self.thread.name, self.hostname)
|
|
self._sock.sendall("list\n")
|
|
|
|
plugin_list = self._readline().split(" ")
|
|
if self.args.filter:
|
|
try:
|
|
filteredlist = [plugin for plugin in plugin_list if re.search(self.args.filter, plugin, re.IGNORECASE)]
|
|
plugin_list = filteredlist
|
|
except re.error:
|
|
logger.info("Thread %s: Filter regexp for plugin list is not valid: %s" % self.args.filter)
|
|
# if there is no filter or we have got an re.error, simply return full list
|
|
result_list = []
|
|
for plugin in plugin_list:
|
|
if len(plugin.strip()) > 0:
|
|
result_list.append(plugin)
|
|
return result_list
|
|
|
|
def get_config(self, plugin):
|
|
"""Get config values for Munin plugin."""
|
|
self._sock.sendall("config %s\n" % plugin)
|
|
response = {None: {}}
|
|
multigraph = None
|
|
|
|
for current_line in self._iterline():
|
|
if current_line.startswith("multigraph "):
|
|
multigraph = current_line[11:]
|
|
response[multigraph] = {}
|
|
continue
|
|
|
|
try:
|
|
key_name, key_value = current_line.split(" ", 1)
|
|
except ValueError:
|
|
# ignore broken plugins that don't return a value at all
|
|
continue
|
|
|
|
if "." in key_name:
|
|
# Some keys have periods in them.
|
|
# If so, make their own nested dictionary.
|
|
key_root, key_leaf = key_name.split(".", 1)
|
|
if key_root not in response:
|
|
response[multigraph][key_root] = {}
|
|
response[multigraph][key_root][key_leaf] = key_value
|
|
else:
|
|
response[multigraph][key_name] = key_value
|
|
|
|
return response
|
|
|
|
def process_host_stats(self):
|
|
"""Process Munin node data, potentially sending to Carbon."""
|
|
start_timestamp = time.time()
|
|
logger.info("Thread %s: Querying host %s", self.thread.name, self.hostname)
|
|
# to be more efficient, load list of plugins just in case we do not have any
|
|
if self.reload_plugins:
|
|
self.plugins_config = {}
|
|
self.plugins = self.list_plugins()
|
|
self.reload_plugins = False
|
|
logger.debug("Thread %s: Plugin List: %s", self.thread.name, self.plugins)
|
|
epoch_timestamp = int(start_timestamp)
|
|
|
|
for current_plugin in self.plugins:
|
|
logger.info("Thread %s: Fetching plugin: %s (Host: %s)",
|
|
self.thread.name, current_plugin, self.hostname)
|
|
|
|
# after (re)load of list of plugins we have to load their configurations too
|
|
try:
|
|
self.plugins_config[current_plugin]
|
|
except KeyError:
|
|
try:
|
|
self.plugins_config[current_plugin] = self.get_config(current_plugin)
|
|
logger.debug("Thread %s: Plugin Config: %s", self.thread.name, self.plugins_config[current_plugin])
|
|
except:
|
|
return time.time()
|
|
|
|
try:
|
|
plugin_data = self.fetch(current_plugin)
|
|
except:
|
|
logger.info("Thread %s: lost connection in process_host_stats() - sleeping 5 seconds", self.thread.name)
|
|
time.sleep(5)
|
|
return time.time()
|
|
logger.debug("Thread %s: Plugin Data: %s", self.thread.name, plugin_data)
|
|
if self.args.carbon:
|
|
for multigraph in self.plugins_config[current_plugin]:
|
|
try:
|
|
self.send_to_carbon(epoch_timestamp,
|
|
current_plugin,
|
|
self.plugins_config[current_plugin][multigraph],
|
|
plugin_data[multigraph])
|
|
except KeyError:
|
|
logger.info("Thread %s: Plugin returns invalid data:\n plugin_config: %r host %s.",
|
|
self.thread.name, self.plugins_config[current_plugin], self.hostname)
|
|
end_timestamp = time.time() - start_timestamp
|
|
self.close_connection()
|
|
self.close_carbon_connection()
|
|
logger.info("Thread %s: Finished querying host %s (Execution Time: %.2f sec).",
|
|
self.thread.name, self.hostname, end_timestamp)
|
|
return end_timestamp
|
|
|
|
def send_to_carbon(self, timestamp, plugin_name, plugin_config, plugin_data):
|
|
"""Send plugin data to Carbon over Pickle format."""
|
|
if self.args.noprefix:
|
|
prefix = ''
|
|
else:
|
|
prefix = "%s." % self.args.prefix
|
|
|
|
hostname = self.hostname
|
|
if self.remotenode:
|
|
hostname = self.remotenode
|
|
|
|
data_list = []
|
|
logger.info("Creating metric for plugin %s, timestamp: %d",
|
|
plugin_name, timestamp)
|
|
|
|
for data_key in plugin_data:
|
|
try:
|
|
plugin_category = plugin_config["graph_category"]
|
|
metric = "%s%s.%s.%s.%s" % (prefix, self.displayname, plugin_category, plugin_name, data_key)
|
|
value = plugin_data[data_key]
|
|
logger.debug("Creating metric %s, value: %s", metric, value)
|
|
data_list.append((metric, (timestamp, value)))
|
|
except KeyError:
|
|
logger.info("plugin returns invalid data:\n plugin_config: %r host %s.", plugin_config, self.hostname)
|
|
|
|
if self.args.noop:
|
|
logger.info("NOOP: Not sending data to Carbon")
|
|
return
|
|
|
|
logger.info("Sending plugin %s data to Carbon for host %s.",
|
|
plugin_name, hostname)
|
|
payload = pickle.dumps(data_list)
|
|
header = struct.pack("!L", len(payload))
|
|
message = header + payload
|
|
try:
|
|
self._carbon_sock.sendall(message)
|
|
logger.info("Finished sending plugin %s data to Carbon for host %s.",
|
|
plugin_name, self.hostname)
|
|
except socket.error:
|
|
logger.exception("Unable to send data to Carbon")
|
|
|
|
|
|
###
|
|
# Custom Threading class, one thread for each host in configuration
|
|
###
|
|
class MuninThread(threading.Thread):
|
|
def __init__(self, params, cmdlineargs):
|
|
threading.Thread.__init__(self)
|
|
self.name = params['host']
|
|
self.shutdown = False
|
|
# construct new namespace to pass it to the new Munin class instance
|
|
# for better manipulation, just prepare writable dcfg "link" to new namespace
|
|
cfg = argparse.Namespace()
|
|
dcfg = vars(cfg)
|
|
|
|
#construct final arguments Namespace
|
|
for v in vars(cmdlineargs):
|
|
try:
|
|
dcfg[v] = params[v]
|
|
except KeyError:
|
|
dcfg[v] = getattr(cmdlineargs, v, None)
|
|
|
|
self.munin = Munin(hostname=self.name, args=cfg, thread=self)
|
|
|
|
def run(self):
|
|
logger.info("Starting thread for %s." % self.name)
|
|
self.munin.go()
|
|
logger.info("Finishing thread for %s." % self.name)
|
|
|
|
def dostop(self):
|
|
global shutdown
|
|
logger.info("Thread %s: Got signal to stop." % self.name)
|
|
shutdown = True
|
|
|
|
def reload(self):
|
|
self.munin.reload_plugins = True
|
|
logger.info("Thread %s: Got signal to reload." % self.name)
|
|
|
|
|
|
###
|
|
# bellow are common function
|
|
###
|
|
def parse_args():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(description="Send Munin statistics to Graphite.")
|
|
parser.add_argument("--config", "-c",
|
|
action="store",
|
|
default=False,
|
|
help="Configuration file with list of hosts and their plugins to fetch.")
|
|
parser.add_argument("--host",
|
|
action="store",
|
|
default="127.0.0.1",
|
|
help="Munin host to query for stats. You can specify indirect node after ':', "
|
|
"i.e. --host localhost:remotenode. Default: %(default)s")
|
|
parser.add_argument("--displayname",
|
|
default=False,
|
|
help="If defined, use this as the name to store metrics in Graphite instead of the Munin"
|
|
" hostname.")
|
|
parser.add_argument("--carbon",
|
|
action="store",
|
|
help="Carbon host and Pickle port (ex: localhost:2004).")
|
|
parser.add_argument("--filter",
|
|
action="store",
|
|
default='.*',
|
|
help="Regular expression for selecting only defined subset of received plugins.")
|
|
parser.add_argument("--interval",
|
|
type=int,
|
|
default=60,
|
|
help="Interval (seconds) between polling Munin host for statistics. If set to 0, exit after "
|
|
"polling once. Default: %(default)s")
|
|
parser.add_argument("--noop",
|
|
action="store_true",
|
|
help="Don't actually send Munin data to Carbon. Default: %(default)s")
|
|
parser.add_argument("--noprefix",
|
|
action="store_true",
|
|
default=False,
|
|
help="Do not use a prefix on graphite target's name. Default: %(default)s")
|
|
parser.add_argument("--prefix",
|
|
action="store",
|
|
default="servers",
|
|
help="Prefix used on graphite target's name. Default: %(default)s")
|
|
parser.add_argument("--logtosyslog",
|
|
action="store_true",
|
|
help="Log to syslog. No output on the command line.")
|
|
parser.add_argument("--verbose", "-v",
|
|
choices=[1, 2, 3],
|
|
default=2,
|
|
type=int,
|
|
help="Verbosity level. 1:ERROR, 2:INFO, 3:DEBUG. Default: %(default)d")
|
|
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
###
|
|
# stop all threads and exit
|
|
###
|
|
def handler_term(signum=signal.SIGTERM, frame=None):
|
|
global threads
|
|
|
|
for t in threads:
|
|
t.dostop()
|
|
|
|
|
|
###
|
|
# set all threads to reload information about all munin-node's plugins
|
|
###
|
|
def handler_hup(signum, frame=None):
|
|
global threads
|
|
|
|
for t in threads:
|
|
t.reload()
|
|
|
|
|
|
def read_configuration(configfile):
|
|
"""
|
|
Returns False if configuration file is not readable, list of dictionaries otherwise
|
|
|
|
Configuration options follow parameters described as command line options. All parameters are optional except host,
|
|
displayname parameter is built from section name, so it is always presented too.
|
|
|
|
Non-existent options are superseded by defaults
|
|
|
|
Example:
|
|
[servername]
|
|
host=fqdn[:remotenode]
|
|
port=4949
|
|
carbon=carbonhostfqdn:port
|
|
interval=60
|
|
prefix=prefix for Graphite's target
|
|
noprefix=True|False
|
|
filter=^cpu.*
|
|
|
|
@param configfile: full filepath to configuration file
|
|
@rtype : object
|
|
"""
|
|
|
|
cf = ConfigParser.ConfigParser()
|
|
hostscfg = []
|
|
try:
|
|
cf.read(configfile)
|
|
for section in cf.sections():
|
|
di = {}
|
|
for ki, vi in cf.items(section):
|
|
# construct dictionary item
|
|
di[ki] = vi
|
|
if "host" in di.keys():
|
|
di["displayname"] = section
|
|
hostscfg.append(di)
|
|
except ConfigParser.Error as e:
|
|
logger.critical("Failed to parse configuration or command line options. Exception was %s. Giving up." % e)
|
|
|
|
return hostscfg
|
|
|
|
|
|
def main():
|
|
global threads
|
|
global logger
|
|
|
|
args = parse_args()
|
|
if args.verbose == 1:
|
|
logging_level = logging.ERROR
|
|
elif args.verbose == 3:
|
|
logging_level = logging.DEBUG
|
|
else:
|
|
logging_level = logging.INFO
|
|
|
|
#logging.basicConfig(format=LOGGING_FORMAT, level=logging_level)
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging_level)
|
|
syslog = logging.handlers.SysLogHandler(address='/dev/log')
|
|
stdout = logging.StreamHandler(stream=sys.stdout)
|
|
formatter = logging.Formatter('MUNIN-GRAPHITE: %(levelname)s %(message)s')
|
|
syslog.setFormatter(formatter)
|
|
if args.logtosyslog:
|
|
logger.addHandler(syslog)
|
|
else:
|
|
logger.addHandler(stdout)
|
|
|
|
# block for setting handling of signals
|
|
signal.signal(signal.SIGHUP, handler_hup)
|
|
signal.signal(signal.SIGTERM, handler_term)
|
|
signal.signal(signal.SIGINT, handler_term)
|
|
|
|
hosts = list()
|
|
if args.config:
|
|
hosts = read_configuration(args.config)
|
|
if not hosts:
|
|
# no file configuration, trying to use commandline arguments only and construct one-item dictionary
|
|
hosts.append({'host': args.host})
|
|
# we have got some items in hosts's list
|
|
for host in hosts:
|
|
logging.info("Going to thread with config %s" % host)
|
|
threads.append(MuninThread(host, args))
|
|
|
|
for t in threads:
|
|
t.start()
|
|
|
|
while True:
|
|
try:
|
|
if not any([t.isAlive() for t in threads]):
|
|
logging.info("All threads finished, exiting.")
|
|
break
|
|
else:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
handler_term()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|