Merge branch 'zdenekpizl-zpi-multithread'

This commit is contained in:
Jeffrey Forman 2014-05-21 20:57:39 -04:00
commit 888b5334bc
2 changed files with 284 additions and 59 deletions

View File

@ -12,13 +12,58 @@ Collect Munin statistics, and convert their naming structure to submit to Graphi
Clone the Git repository from [https://github.com/jforman/munin-graphite](https://github.com/jforman/munin-graphite)
## Example
## Program invocation
As of 20130209, the poller is best configured via a Cronjob that runs once a minute.
Optimal way is to run m2g-poller using **nohup** and define all hosts within an configuration file:
./m2g-poller.py --muninhost localhost --carbon carbonhost:2004
Example:
nohup ./m2g-poller.py --config /etc/munin-graphite/hosts.cfg &
Metrics paths are created using the hostname and various plugin data. The processes count plugin for Munin would produce metrics and values like the following:
## Configuration file looks like:
```
[node-001]
carbon=carbon01.company.com:2004
host=node-001.infra.company.com
interval=60
prefix=munin
[node-002]
carbon=carbon99.company.com:2004
host=node-002.infra.company.com
interval=120
prefix=munin
```
Runing with config above will start two threads, each thread for one node's section and these threads will in defined
intervals fetch data from munin-node on targets.
If you do not specify all parameters they will be set to default values as described in --help option.
You can run m2g-poller.py from commandline with parameters directly to override some or all of supported options,
in that case all specified option will be use instead of built-in defaults for nodes in configuration file.
Example:
./m2g-poller.py --host node-003.company.com --displayname node-003 --carbon carbon01.infra.company.com:2004 --interval 90 --prefix someprefix
## Signal handling
* you can send SIGTERM to m2g-poller.py program. This would terminate program's run after all threads will finish its
current cycle.
* pressing CTRL+C when running from command line will terminate the program as well.
* you can send SIGHUP to m2g-poller.py program. This would signalize the program it should reload list of plugins of all
nodes from target list.
## System log
Program logs information into syslog, using prefix MUNIN-GRAPHITE and identification of originating thread.
Example:
MUNIN-GRAPHITE: INFO Thread node-009.company.com: Finished querying host node-009.company.com (Execution Time: 5.12 sec).
## Metrics
Metrics paths are created using the hostname and various plugin data. The processes count plugin for Munin would produce
metrics and values like the following:
servers.localhost.processes.processes.uninterruptible, value: 0
servers.localhost.processes.processes.processes, value: 224
@ -31,6 +76,3 @@ Metrics paths are created using the hostname and various plugin data. The proces
These paths of data are then sent to Carbon via the pickle port.
## TODO
Logic will eventually be added to allow the m2g-poller to be run in the background with a list of hosts to query and not require a cronjob.

View File

@ -2,24 +2,29 @@
"""Gather Munin statistics and deliver to Carbon for Graphite display."""
import argparse
import ConfigParser
import logging
import logging.handlers
import pickle
import re
import socket
import struct
import sys
import time
import signal
import threading
LOGGING_FORMAT = "%(asctime)s:%(levelname)s:%(message)s"
RE_LEFTRIGHT = re.compile(r"^(?P<left>\S+)\s+(?P<right>\S+)$")
RE_MUNIN_NODE_NAME = re.compile(r"^# munin node at\s+(?P<nodename>\S+)$")
## TODO: Catch keyboard interrupt properly and die when requested
threads = []
shutdown = False
class Munin():
"""Munin host object with querying getter functions."""
def __init__(self, hostname, port=4949, args=None):
def __init__(self, hostname, thread, port=4949, args=None):
self.hostname = None
self.remotenode = None
@ -27,6 +32,9 @@ class Munin():
self._conn = None
self._carbon_sock = None
self.hello_string = None
self.reload_plugins = True
self.plugins = {}
self.plugins_config = {}
if ':' in hostname:
self.hostname, self.remotenode = hostname.split(":", 1)
@ -36,19 +44,20 @@ class Munin():
self.args = args
if self.args.displayname:
self.displayname = self.args.displayname
self.displayname = self.args.displayname.split(".")[0]
else:
self.displayname = self.hostname.split(".")[0]
self.thread = thread
def go(self):
"""Bootstrap method to start processing hosts's Munin stats."""
global shutdown
self.connect()
self.update_hostname()
processing_time = self.process_host_stats()
interval = self.args.interval
interval = int(self.args.interval)
while True and interval != 0:
while True and interval != 0 and not shutdown:
sleep_time = max(interval - processing_time, 0)
time.sleep(sleep_time)
self.connect()
@ -62,8 +71,8 @@ class Munin():
node_name = RE_MUNIN_NODE_NAME.search(self.hello_string).group(1)
self.displayname = node_name.split(".")[0]
except AttributeError:
logging.info("Unable to obtain munin node name from: %s",
self.hello_string)
logger.info("Thread %s: Unable to obtain munin node name from: %s",
self.thread.name, self.hello_string)
return
def connect(self):
@ -71,16 +80,16 @@ class Munin():
try:
self._sock = socket.create_connection((self.hostname, self.port), 10)
except socket.error:
logging.exception("Unable to connect to Munin host %s, port: %s",
self.hostname, self.port)
logger.exception("Thread %s: Unable to connect to Munin host %s, port: %s",
self.thread.name, self.hostname, self.port)
sys.exit(1)
try:
self._conn = self._sock.makefile()
self.hello_string = self._readline()
except socket.error:
logging.exception("Unable to communicate to Munin host %s, port: %s",
self.hostname, self.port)
logger.exception("Thread %s: Unable to communicate to Munin host %s, port: %s",
self.thread.name, self.hostname, self.port)
if self.args.carbon:
self.connect_carbon()
@ -90,8 +99,8 @@ class Munin():
try:
self._carbon_sock = socket.create_connection((carbon_host, carbon_port), 10)
except socket.error:
logging.exception("Unable to connect to Carbon on host %s, port: %s",
carbon_host, carbon_port)
logger.exception("Thread %s: Unable to connect to Carbon on host %s, port: %s",
self.thread.name, carbon_host, carbon_port)
sys.exit(1)
def close_connection(self):
@ -111,7 +120,7 @@ class Munin():
"""Iterator over Munin output."""
while True:
current_line = self._readline()
logging.debug("Iterating over line: %s", current_line)
logger.debug("Thread %s: Iterating over line: %s", self.thread.name, current_line)
if not current_line:
break
if current_line.startswith("#"):
@ -138,8 +147,8 @@ class Munin():
key_name = multigraph_prefix + full_key_name.split(".")[0]
response[multigraph][key_name] = key_value
except (KeyError, AttributeError):
logging.info("plugin %s returned invalid data [%s] for host"
" %s\n", plugin, current_line, self.hostname)
logger.info("Thread %s: Plugin %s returned invalid data [%s] for host"
" %s\n", self.thread.name, plugin, current_line, self.hostname)
return response
@ -149,12 +158,25 @@ class Munin():
self._readline() # ignore response
if self.remotenode:
logger.info("Thread %s: Asking for plugin list for remote node %s", self.thread.name, self.remotenode)
self._sock.sendall("list %s\n" % self.remotenode)
else:
logger.info("Thread %s: Asking for plugin list for local node %s", self.thread.name, self.hostname)
self._sock.sendall("list\n")
plugin_list = self._readline().split(" ")
return plugin_list
if self.args.filter:
try:
filteredlist = [plugin for plugin in plugin_list if re.search(self.args.filter, plugin, re.IGNORECASE)]
plugin_list = filteredlist
except re.error:
logger.info("Thread %s: Filter regexp for plugin list is not valid: %s" % self.args.filter)
# if there is no filter or we have got an re.error, simply return full list
result_list = []
for plugin in plugin_list:
if len(plugin.strip()) > 0:
result_list.append(plugin)
return result_list
def get_config(self, plugin):
"""Get config values for Munin plugin."""
@ -189,35 +211,43 @@ class Munin():
def process_host_stats(self):
"""Process Munin node data, potentially sending to Carbon."""
start_timestamp = time.time()
logging.info("Querying host %s", self.hostname)
plugins = self.list_plugins()
logging.debug("Plugin List: %s", plugins)
logger.info("Thread %s: Querying host %s", self.thread.name, self.hostname)
# to be more efficient, load list of plugins just in case we do not have any
if self.reload_plugins:
self.plugins_config = {}
self.plugins = self.list_plugins()
self.reload_plugins = False
logger.debug("Thread %s: Plugin List: %s", self.thread.name, self.plugins)
epoch_timestamp = int(start_timestamp)
for current_plugin in plugins:
logging.info("Fetching plugin: %s (Host: %s)",
current_plugin, self.hostname)
for current_plugin in self.plugins:
logger.info("Thread %s: Fetching plugin: %s (Host: %s)",
self.thread.name, current_plugin, self.hostname)
plugin_config = self.get_config(current_plugin)
logging.debug("Plugin Config: %s", plugin_config)
# after (re)load of list of plugins we have to load their configurations too
try:
self.plugins_config[current_plugin]
except KeyError:
self.plugins_config[current_plugin] = self.get_config(current_plugin)
logger.debug("Thread %s: Plugin Config: %s", self.thread.name, self.plugins_config[current_plugin])
plugin_data = self.fetch(current_plugin)
logging.debug("Plugin Data: %s", plugin_data)
logger.debug("Thread %s: Plugin Data: %s", self.thread.name, plugin_data)
if self.args.carbon:
for multigraph in plugin_config:
for multigraph in self.plugins_config[current_plugin]:
try:
self.send_to_carbon(epoch_timestamp,
current_plugin,
plugin_config[multigraph],
self.plugins_config[current_plugin][multigraph],
plugin_data[multigraph])
except KeyError:
logging.info("plugin returns invalid data:\n plugin_config: %r host %s.",
plugin_config, self.hostname)
logger.info("Thread %s: Plugin returns invalid data:\n plugin_config: %r host %s.",
self.thread.name, self.plugins_config[current_plugin], self.hostname)
end_timestamp = time.time() - start_timestamp
self.close_connection()
self.close_carbon_connection()
logging.info("Finished querying host %s (Execution Time: %.2f sec).",
self.hostname, end_timestamp)
logger.info("Thread %s: Finished querying host %s (Execution Time: %.2f sec).",
self.thread.name, self.hostname, end_timestamp)
return end_timestamp
def send_to_carbon(self, timestamp, plugin_name, plugin_config, plugin_data):
@ -232,7 +262,7 @@ class Munin():
hostname = self.remotenode
data_list = []
logging.info("Creating metric for plugin %s, timestamp: %d",
logger.info("Creating metric for plugin %s, timestamp: %d",
plugin_name, timestamp)
for data_key in plugin_data:
@ -240,34 +270,75 @@ class Munin():
plugin_category = plugin_config["graph_category"]
metric = "%s%s.%s.%s.%s" % (prefix, self.displayname, plugin_category, plugin_name, data_key)
value = plugin_data[data_key]
logging.debug("Creating metric %s, value: %s", metric, value)
logger.debug("Creating metric %s, value: %s", metric, value)
data_list.append((metric, (timestamp, value)))
except KeyError:
logging.info("plugin returns invalid data:\n plugin_config: %r host %s.", plugin_config, self.hostname)
logger.info("plugin returns invalid data:\n plugin_config: %r host %s.", plugin_config, self.hostname)
if self.args.noop:
logging.info("NOOP: Not sending data to Carbon")
logger.info("NOOP: Not sending data to Carbon")
return
logging.info("Sending plugin %s data to Carbon for host %s.",
logger.info("Sending plugin %s data to Carbon for host %s.",
plugin_name, hostname)
payload = pickle.dumps(data_list)
header = struct.pack("!L", len(payload))
message = header + payload
try:
self._carbon_sock.sendall(message)
logging.info("Finished sending plugin %s data to Carbon for host %s.",
logger.info("Finished sending plugin %s data to Carbon for host %s.",
plugin_name, self.hostname)
except socket.error:
logging.exception("Unable to send data to Carbon")
logger.exception("Unable to send data to Carbon")
###
# Custom Threading class, one thread for each host in configuration
###
class MuninThread(threading.Thread):
def __init__(self, params, cmdlineargs):
threading.Thread.__init__(self)
self.name = params['host']
self.shutdown = False
# construct new namespace to pass it to the new Munin class instance
# for better manipulation, just prepare writable dcfg "link" to new namespace
cfg = argparse.Namespace()
dcfg = vars(cfg)
#construct final arguments Namespace
for v in vars(cmdlineargs):
try:
dcfg[v] = params[v]
except KeyError:
dcfg[v] = getattr(cmdlineargs, v, None)
self.munin = Munin(hostname=self.name, args=cfg, thread=self)
def run(self):
logger.info("Starting thread for %s." % self.name)
self.munin.go()
logger.info("Finishing thread for %s." % self.name)
def dostop(self):
global shutdown
logger.info("Thread %s: Got signal to stop." % self.name)
shutdown = True
def reload(self):
self.munin.reload_plugins = True
logger.info("Thread %s: Got signal to reload." % self.name)
###
# bellow are common function
###
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Send Munin statistics to Graphite.")
parser.add_argument("--carbon",
parser.add_argument("--config", "-c",
action="store",
help="Carbon host and Pickle port (ex: localhost:2004).")
default=False,
help="Configuration file with list of hosts and their plugins to fetch.")
parser.add_argument("--host",
action="store",
default="localhost",
@ -277,6 +348,13 @@ def parse_args():
default=False,
help="If defined, use this as the name to store metrics in Graphite instead of the Munin"
" hostname.")
parser.add_argument("--carbon",
action="store",
help="Carbon host and Pickle port (ex: localhost:2004).")
parser.add_argument("--filter",
action="store",
default='.*',
help="Regular expression for selecting only defined subset of received plugins.")
parser.add_argument("--interval",
type=int,
default=60,
@ -293,6 +371,9 @@ def parse_args():
action="store",
default="servers",
help="Prefix used on graphite target's name. Default: %(default)s")
parser.add_argument("--logtosyslog",
action="store_true",
help="Log to syslog. No output on the command line.")
parser.add_argument("--verbose", "-v",
choices=[1, 2, 3],
default=2,
@ -303,7 +384,71 @@ def parse_args():
return args
###
# stop all threads and exit
###
def handler_term(signum=signal.SIGTERM, frame=None):
global threads
for t in threads:
t.dostop()
###
# set all threads to reload information about all munin-node's plugins
###
def handler_hup(signum, frame=None):
global threads
for t in threads:
t.reload()
def read_configuration(configfile):
"""
Returns False if configuration file is not readable, list of dictionaries otherwise
Configuration options follow parameters described as command line options. All parameters are optional except host,
displayname parameter is built from section name, so it is always presented too.
Non-existent options are superseded by defaults
Example:
[servername]
host=fqdn[:remotenode]
port=4949
carbon=carbonhostfqdn:port
interval=60
prefix=prefix for Graphite's target
noprefix=True|False
filter=^cpu.*
@param configfile: full filepath to configuration file
@rtype : object
"""
cf = ConfigParser.ConfigParser()
hostscfg = []
try:
cf.read(configfile)
for section in cf.sections():
di = {}
for ki, vi in cf.items(section):
# construct dictionary item
di[ki] = vi
if "host" in di.keys():
di["displayname"] = section
hostscfg.append(di)
except ConfigParser.Error as e:
logger.critical("Failed to parse configuration or command line options. Exception was %s. Giving up." % e)
return hostscfg
def main():
global threads
global logger
args = parse_args()
if args.verbose == 1:
logging_level = logging.ERROR
@ -312,9 +457,47 @@ def main():
else:
logging_level = logging.INFO
logging.basicConfig(format=LOGGING_FORMAT, level=logging_level)
munin = Munin(hostname=args.host, args=args)
munin.go()
#logging.basicConfig(format=LOGGING_FORMAT, level=logging_level)
logger = logging.getLogger()
logger.setLevel(logging_level)
syslog = logging.handlers.SysLogHandler(address='/dev/log')
stdout = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('MUNIN-GRAPHITE: %(levelname)s %(message)s')
syslog.setFormatter(formatter)
if args.logtosyslog:
logger.addHandler(syslog)
else:
logger.addHandler(stdout)
# block for setting handling of signals
signal.signal(signal.SIGHUP, handler_hup)
signal.signal(signal.SIGTERM, handler_term)
signal.signal(signal.SIGINT, handler_term)
hosts = list()
if args.config:
hosts = read_configuration(args.config)
if not hosts:
# no file configuration, trying to use commandline arguments only and construct one-item dictionary
hosts.append({'host': args.host})
# we have got some items in hosts's list
for host in hosts:
logging.info("Going to thread with config %s" % host)
threads.append(MuninThread(host, args))
for t in threads:
t.start()
while True:
try:
if not any([t.isAlive() for t in threads]):
logging.info("All threads finished, exiting.")
break
else:
time.sleep(1)
except KeyboardInterrupt:
handler_term()
if __name__ == '__main__':
main()