# -*- coding: utf-8 -*- ''' Saltgraph engine for catching returns of state runs, parsing them and passing them to flat database of latest Salt resource runs. ''' # Import python libs from __future__ import absolute_import import datetime import json import logging # Import salt libs import salt.utils.event # Import third party libs try: import psycopg2 import psycopg2.extras HAS_POSTGRES = True except ImportError: HAS_POSTGRES = False __virtualname__ = 'saltgraph' log = logging.getLogger(__name__) def __virtual__(): if not HAS_POSTGRES: return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.' return __virtualname__ def _get_conn(options={}): ''' Return a postgres connection. ''' host = options.get('host', '127.0.0.1') user = options.get('user', 'salt') passwd = options.get('passwd', 'salt') datab = options.get('db', 'salt') port = options.get('port', 5432) return psycopg2.connect( host=host, user=user, password=passwd, database=datab, port=port) def _close_conn(conn): ''' Close the Postgres connection ''' conn.commit() conn.close() def _get_lowstate_data(options={}): ''' TODO: document this method ''' conn = _get_conn(options) cur = conn.cursor() try: # you can only do this on Salt Masters minion lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True}) except: lowstate_req = {} for minion, lowstate_ret in lowstate_req.items(): if lowstate_ret.get('retcode') != 0: continue for resource in lowstate_ret.get('ret', []): low_sql = '''INSERT INTO salt_resources (id, resource_id, host, service, module, fun, status) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET resource_id = excluded.resource_id, host = excluded.host, service = excluded.service, module = excluded.module, fun = excluded.fun, alter_time = current_timestamp''' rid = "%s|%s" % (minion, resource.get('__id__')) cur.execute( low_sql, ( rid, resource.get('__id__'), minion, resource.get('__sls__'), resource.get('state') if 'state' in resource else resource.get('module'), resource.get('fun'), 'unknown' ) ) conn.commit() if lowstate_req: meta_sql = '''INSERT INTO salt_resources_meta (id, options) VALUES (%s, %s) ON CONFLICT (id) DO UPDATE SET options = excluded.options, alter_time = current_timestamp''' cur.execute( meta_sql, ( 'lowstate_data', '{}' ) ) _close_conn(conn) def _up_to_date(options={}): ''' TODO: document this method ''' conn = _get_conn(options) cur = conn.cursor() #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) ret = False # if lowstate data are older than 1 day, refresh them cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',)) alter_time = cur.fetchone() if alter_time: now = datetime.datetime.utcnow() day = datetime.timedelta(days=1) time_diff = now - alter_time[0].replace(tzinfo=None) if time_diff < day: ret = True else: skip = False _close_conn(conn) return ret def _update_resources(event, options): ''' TODO: document this method ''' conn = _get_conn(options) cur = conn.cursor() cur.execute('SELECT id FROM salt_resources') resources_db = [res[0] for res in cur.fetchall()] resources = event.get('return', {}).values() for resource in resources: rid = '%s|%s' % (event.get('id'), resource.get('__id__')) if rid in resources_db: status = 'unknown' if resource.get('result', None) is not None: status = 'success' if resource.get('result') else 'failed' resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp) WHERE id = %s''' cur.execute( resource_sql, ( status, repr(resource), rid ) ) conn.commit() _close_conn(conn) def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs): ''' Listen to events and parse Salt state returns ''' if __opts__['__role'] == 'master': event_bus = salt.utils.event.get_master_event( __opts__, __opts__['sock_dir'], listen=True) else: event_bus = salt.utils.event.get_event( 'minion', transport=__opts__['transport'], opts=__opts__, sock_dir=__opts__['sock_dir'], listen=True) log.debug('Saltgraph engine started') while True: event = event_bus.get_event() supported_funcs = ['state.sls', 'state.apply', 'state.highstate'] if event and event.get('fun', None) in supported_funcs: test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])] if not test: options = { 'host': host, 'user': user, 'passwd': password, 'db': database, 'port': port } is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')] if is_reclass or not _up_to_date(options): _get_lowstate_data(options) _update_resources(event, options)