В этой статье мы рассмотрим прикладной пример использования zabbix_utils для решения одной из не очень тривиальных задач – получения списка получателей алертов для триггеров определённого узла сети. Вы узнаете, как легко автоматизировать процесс сбора этой информации, а также увидите примеры реального кода, которые можно будет адаптировать под ваши нужды.

Библиотека zabbix_utils за последний год стала одним из самых популярных инструментов для работы с Zabbix API. Это удобный инструмент, который значительно упрощает процесс взаимодействия с сервером, прокси и агентом Zabbix, особенно для тех, кто автоматизирует задачи мониторинга и управления объектами мониторинга.

Благодаря простоте использования и обширной функциональности, zabbix_utils нашла своих поклонников среди системных администраторов, инженеров мониторинга и DevOps. Согласно данным с PyPI, с момента релиза библиотека уже была скачана более 140000 раз, что только подтверждает её востребованность в сообществе.

Описание задачи

В системе мониторинга Zabbix администраторам часто требуется проверять, кто из пользователей получает алерты по определённым триггерам. Это может быть полезно для аудита, настройки новых уведомлений или просто для быстрой диагностики проблем. Задача становится особенно актуальной, когда на одном узле сети настроено множество триггеров, и ручная проверка получателей для каждого триггера через интерфейс Zabbix становится очень трудоёмкой. 

В таких случаях целесообразно использовать пользовательское решение на основе Zabbix API. С помощью API можно получить доступ ко всем нужным данным напрямую, чтобы затем с помощью дополнительной логики определить конечных получателей. Библиотека zabbix_utils делает работу с Zabbix API более удобной и позволяет автоматизировать этот процесс.

В данном проекте мы используем zabbix_utils для написания скрипта, который собирает список получателей алертов для триггеров выбранного узла сети. Это позволит вам получать необходимую информацию гораздо быстрее и с минимальными усилиями.

Установка и настройка окружения

Для начала работы с zabbix_utils необходимо установить библиотеку и настроить подключение к Zabbix API. Библиотека поддерживает несколько способов установки, описанных в официальном README, что делает её удобной для использования в разных окружениях.

1. Установка через pip

Самый простой и распространённый способ установки — с помощью пакетного менеджера pip. Для этого выполните команду:

~$ pip install zabbix_utils

Для установки всех необходимых зависимостей для асинхронной работы, можно воспользоваться командой:

~$ pip install zabbix_utils[async]

Этот способ подходит для большинства пользователей, поскольку pip автоматически устанавливает все необходимые зависимости.

2. Установка из репозитория Zabbix

С момента написания прошлых статей мы добавили еще один способ установки — из репозитория Zabbix. Прежде всего, вам необходимо добавить сам репозиторий в вашу систему, если он не установлен. Официальные пакеты Zabbix для Red Hat Enterprise Linux, а также для Debian их производных доступны на сайте Zabbix.

Red Hat Enterprise Linux и производные

Установка библиотеки zabbix_utils из репозитория Zabbix:

~# dnf install python3-zabbix-utils

Debian / Ubuntu and derivatives

Команда установки библиотеки из репозитория Zabbix:

~# apt install python3-zabbix-utils

3. Установка из исходного кода

Если вам требуется использовать последнюю версию библиотеки, которая ещё не опубликована в PyPI, или если вы хотите внести изменения в код, можно установить библиотеку напрямую из GitHub:

1. Клонируйте репозиторий с GitHub:

~$ git clone https://github.com/zabbix/python-zabbix-utils

2. Перейдите в папку с проектом:

~$ cd python-zabbix-utils/

3. Установите библиотеку, выполнив команду:

~$ python3 setup.py install

Этот метод удобен для разработчиков, которые хотят кастомизировать функционал библиотеки или тестировать её последние изменения.

4. Тестирование подключения к Zabbix API

После установки zabbix_utils будет не лишним проверить подключение к вашему серверу Zabbix через API. Для этого используйте стандартные параметры URL, логина и пароля вашего Zabbix сервера.

Пример кода для проверки подключения:

from zabbix_utils import ZabbixAPI ZABBIX_AUTH = {     "url": "адрес_zabbix_сервера",     "user": "имя_пользователя",     "password": "пароль" } api = ZabbixAPI(**ZABBIX_AUTH) hosts = api.host.get(     output=['hostid', 'name'] ) print(hosts) api.logout()

Этот код создаёт объект ZabbixAPI, который используется для дальнейшей работы с API Zabbix, упрощая выполнение запросов и обработку данных. Если код выводит список узлов сети, значит, настройка завершена успешно, и вы готовы к дальнейшей работе с Zabbix API.

Основные шаги решения задачи

Теперь, когда окружение настроено, давайте рассмотрим основные шаги по решению задачи — получению списка получателей алертов для триггеров, привязанных к конкретному узлу сети в Zabbix. 

В zabbix_utils встроена поддержка асинхронного взаимодействия с API через класс AsyncZabbixAPI. Это позволяет отправлять несколько запросов одновременно и обрабатывать их результаты по мере готовности, что значительно снижает задержки при множественных обращениях к API. Поэтому в этом проекте используем класс AsyncZabbixAPI и асинхронный подход.

Ниже будут приведены основные шаги для решения поставленной задачи и примеры кода для каждого шага. Имейте ввиду, что код в этом проекте носит ознакомительный характер, может быть не оптимальным или иметь ошибки. Используйте его как пример для своего проекта, но не как полноценный инструмент.

Шаг 1. Получение ID узла сети

Первый шаг — это идентификация узла сети (host), для которого мы будем получать информацию о триггерах и алертах. Для этого нужно найти ID узла, используя его имя (host / name). Zabbix API предоставляет метод для получения этой информации, а с помощью zabbix_utils этот процесс значительно упрощён.

Пример получения ID узла по его имени:

host = api.host.get(     output=["hostid"],     filter={"name": "имя_узла_сети"} )

Этот метод возвращает уникальный идентификатор узла, который можно использовать далее. Но для целей нашего тестового проекта мы будем использовать идентификатор узла заданный вручную.

Шаг 2. Получение триггеров узла

Имея ID узла сети, следующим шагом выполняем получение всех триггеров, связанных с этим узлом. Триггеры содержат в себе условия, по которым срабатывают алерты. Нам нужно собрать информацию о всех триггерах, чтобы затем использовать ее для выбора подходящих по всем условиям действий.

Пример получения триггеров узла:

triggers = api.trigger.get(     hostids=[hostid],     selectTags="extend",     selectHosts=["hostid"],     selectHostGroups=["groupid"],     selectDiscoveryRule=["templateid"],     output="extend", )

Этот запрос возвращает полную информацию о триггерах для узла. Мы получаем не только сами триггеры, но и их теги, связанные узлы и группы, а также информацию о правилах обнаружения. Вся эта информация будет необходима для проверки условий в действиях (actions).

Шаг 3. Инициализация метаданных триггеров

На этом этапе создаются объекты для каждого триггера, чтобы хранить их метаданные. Для этого используется класс Trigger. Он включает информацию о триггере, таких как его имя, ID, связанные хост-группы, узлы и теги, а также шаблоны и операции.

Вот код, который описывает класс Trigger:

class Trigger:     def __init__(self, trigger):         self.name = trigger["description"]         self.triggerid = trigger["triggerid"]         self.hostgroups = [g["groupid"] for g in trigger["hostgroups"]]         self.hosts = [h["hostid"] for h in trigger["hosts"]]         self.tags = {t["tag"]: t["value"] for t in trigger["tags"]}         self.tmpl_triggerid = self.triggerid         self.lld_rule = trigger["discoveryRule"] or {}         if trigger["templateid"] != "0":             self.tmpl_triggerid = trigger["templateid"]         self.templates = []         self.messages = []         self._conditions = {             "0": self.hostgroups,             "1": self.hosts,             "2": [self.triggerid],             "3": trigger["event_name"] or trigger["description"],             "4": trigger["priority"],             "13": self.templates,             "25": self.tags.keys(),             "26": self.tags,         }     def eval_condition(self, operator, value, trigger_data):         # equals or does not equal         if operator in ["0", "1"]:             equals = operator == "0"             if isinstance(value, dict) and isinstance(                 trigger_data, dict):                 if value["tag"] in trigger_data:                     if value["value"] == trigger_data[                         value["tag"]]:                         return equals             elif value in trigger_data and isinstance(                 trigger_data, list):                 return equals             elif value == trigger_data:                 return equals             return not equals         # contains or does not contain         if operator in ["2", "3"]:             contains = operator == "2"             if isinstance(value, dict) and isinstance(                 trigger_data, dict):                 if value["tag"] in trigger_data:                     if value["value"] in trigger_data[                         value["tag"]]:                         return contains             elif value in trigger_data:                 return contains             return not contains           # is greater/less than or equals         if operator in ["5", "6"]:             greater = operator != "5"             try:                 if int(value) < int(trigger_data):                     return not greater                 if int(value) == int(trigger_data):                     return True                 if int(value) > int(trigger_data):                     return greater             except:                 raise ValueError(                     "Values must be numbers to compare them"                 )       def select_templates(self, templates):         for template in templates:             if self.tmpl_triggerid in [                 t["triggerid"] for t in template["triggers"]]:                 self.templates.append(template["templateid"])             if self.lld_rule.get("templateid") in [                 d["itemid"] for d in template["discoveries"]             ]:                 self.templates.append(template["templateid"])     def select_actions(self, actions):         selected_actions = []         for action in actions:             conditions = []             if "filter" in action:                 conditions = action["filter"]["conditions"]                 eval_formula = action["filter"]["eval_formula"]             # Add actions without conditions directly             if not conditions:                 selected_actions.append(action)                 continue             condition_check = {}             for condition in conditions:                 if (                     condition["conditiontype"] != "6"                     and condition["conditiontype"] != "16"                 ):                     if (                         condition["conditiontype"] == "26"                         and isinstance(condition["value"], str)                     ):                         condition["value"] = {                             "tag": condition["value2"],                             "value": condition["value"],                         }                     if condition["conditiontype"] in self._conditions:                         condition_check[                             condition["formulaid"]                         ] = self.eval_condition(                             condition["operator"],                             condition["value"],                             self._conditions[                                 condition["conditiontype"]                             ],                         )                 else:                     condition_check[                         condition["formulaid"]                     ] = True             for formulaid, bool_result in condition_check.items():                 eval_formula = eval_formula.replace(                     formulaid, str(bool_result))
            # Evaluate the final condition formula             if eval(eval_formula):                 selected_actions.append(action)         return selected_actions       def select_operations(self, actions, mediatypes):         messages_metadata = []         for action in self.select_actions(actions):             messages_metadata += self.check_operations(                 "operations", action, mediatypes             )             messages_metadata += self.check_operations(                 "update_operations", action, mediatypes             )             messages_metadata += self.check_operations(                 "recovery_operations", action, mediatypes             )         return messages_metadata
    def check_operations(self, optype, action, mediatypes):         messages_metadata = []         optype_mapping = {             "operations": "0",  # Problem event             "recovery_operations": "1",  # Recovery event             "update_operations": "2",  # Update event         }         operations = copy.deepcopy(action[optype])         # Processing "notify all involved" scenarios         for idx, _ in enumerate(operations):             if operations[idx]["operationtype"] not in ["11", "12"]:                 continue             # Copy operation as a template for reuse             op_template = copy.deepcopy(operations[idx])             del operations[idx]             # Checking for message sending operations             for key in [                 k for k in ["operations", "update_operations"] if k != optype             ]:                 if not action[key]:                     continue                 # Checking for message sending type operations                 for op in [                     o for o in action[key] if o["operationtype"] == "0"                 ]:                     # Copy template for the current operation                     operation = copy.deepcopy(op_template)                     operation.update(                         {                             "operationtype": "0",                             "opmessage_usr": op["opmessage_usr"],                             "opmessage_grp": op["opmessage_grp"],                         }                     )                     operation["opmessage"]["mediatypeid"] = op[                         "opmessage"                     ]["mediatypeid"]                     operations.append(operation)         for operation in operations:             if operation["operationtype"] != "0":                 continue             # Processing "all mediatypes" scenario             if operation["opmessage"]["mediatypeid"] == "0":                 for mediatype in mediatypes:                     operation["opmessage"]["mediatypeid"] = mediatype[                         "mediatypeid"                     ]                     messages_metadata.append(                         self.create_messages(                             optype_mapping[optype], action, operation, [                                 mediatype                             ]                         )                     )             else:                 messages_metadata.append(                     self.create_messages(                         optype_mapping[optype],                         action,                         operation,                         mediatypes                     )                 )         return messages_metadata       def create_messages(self, optype, action, operation, mediatypes):         message = Message(optype, action, operation)         message.select_mediatypes(mediatypes)         self.messages.append(message)         return message

Код создания объектов класса Trigger, для каждого из полученных триггеров:

for trigger in triggers:     triggers_metadata[trigger["triggerid"]] = Trigger(trigger)

Этот цикл перебирает все триггеры и сохраняет их в словарь triggers_metadata, где ключом является triggerid, а значением — объект триггера.

Шаг 4. Получение информации о шаблонах

Следующий этап — получение данных о шаблонах, связанных со всеми триггерами:

templates = api.template.get(     triggerids=list(set([t.tmpl_triggerid for t in triggers_metadata.values()])),     selectTriggers=["triggerid"],     selectDiscoveries=["itemid"],     output=["templateid"], )

Этот запрос возвращает информацию о всех шаблонах, которые связанны к триггерами проверяемого узла сети. Выполнение одного запроса для всех триггеров является более оптимальным решением, нежели выполнение отдельных запросов для каждого триггера. Эта информация потребуется для проверки условия “Шаблон” в действиях (actions).

Шаг 5. Получение действий и медиа-типов

Затем получаем список действий (actions) и медиа-типов, настроенных в системе:

actions = api.action.get(     selectFilter="extend",     selectOperations="extend",     selectRecoveryOperations="extend",     selectUpdateOperations="extend",     filter={"eventsource": 0, "status": 0},     output=["actionid", "esc_period", "eval_formula", "name"], )
mediatypes = api.mediatype.get(     selectUsers="extend",     selectActions="extend",     selectMessageTemplates="extend",     filter={"status": 0},     output=["mediatypeid", "name"], )

Здесь мы получаем действия, которые определяют, как и кому отправляются алерты, а также медиа-типы, через которые пользователи могут получать уведомления (например, Email или SMS).

Шаг 6. Сопоставление триггеров с шаблонами и действиями

На этом этапе каждый триггер ассоциируется с соответствующими шаблонами и действиями:

for trigger in triggers_metadata.values():     trigger.select_templates(templates)     messages += trigger.select_operations(actions, mediatypes)

В этом цикле для каждого триггера обновляется информация о его шаблонах и настроенных действиях для отправки уведомлений. Список соответствующих действий определяется путем проверки заданных в них условий с накопленными данными по каждому триггеру.

Для каждой операции соответствующего триггеру действия создается объект класса Message:

class Message:     def __init__(self, optype, action, operation):         self.optype = optype         self.mediatypename = ""         self.actionid = action["actionid"]         self.actionname = action["name"]         self.operationid = operation["operationid"]         self.mediatypeid = operation["opmessage"]["mediatypeid"]         self.subject = operation["opmessage"]["subject"]         self.message = operation["opmessage"]["message"]         self.default_msg = operation["opmessage"]["default_msg"]         self.users = [u["userid"] for u in operation["opmessage_usr"]]         self.groups = [g["usrgrpid"] for g in operation["opmessage_grp"]]         self.recipients = []         # Escalation period set to action's period if not specified         self.esc_period = operation.get("esc_period", "0")         if self.esc_period == "0":             self.esc_period = action["esc_period"]         # Use action's escalation period if unset         self.esc_step_from = self.multiply_time(             self.esc_period, int(operation.get("esc_step_from", "1")) - 1         )         if operation.get("esc_step_to", "0") != "0":             self.repeat_count = str(                 int(operation["esc_step_to"]) - int(operation["esc_step_from"]) + 1             )         # If not a problem event, set repeat count to 1         elif self.optype != "0":             self.repeat_count = "1"         # Infinite repeat count if esc_step_to is 0         else:             self.repeat_count = “&infin;”       def multiply_time(self, time_str, multiplier):         # Multiply numbers within the time string         result = re.sub(             r"(\d+)",             lambda m: str(int(m.group(1)) * multiplier),             time_str         )         if result[0] == "0":             return "0"         return result       def select_mediatypes(self, mediatypes):         for mediatype in mediatypes:             if mediatype["mediatypeid"] == self.mediatypeid:                 self.mediatypename = mediatype["name"]                 # Select message templates related to operation type                 msg_template = [                     m                     for m in mediatype["message_templates"]                     if (                         m["recovery"] == self.optype                         and m["eventsource"] == "0"                     )                 ]                 # Use default message if applicable                 if msg_template and self.default_msg == "1":                     self.subject = msg_template[0]["subject"]                     self.message = msg_template[0]["message"]       def select_recipients(self, user_groups, recipients):         for groupid in self.groups:             if groupid in user_groups:                 self.users += user_groups[groupid]         for userid in self.users:             if userid in recipients:                 recipient = copy.deepcopy(recipients[userid])                 if self.mediatypeid in recipient.sendto:                     recipient.mediatype = True                 self.recipients.append(recipient)

Каждый такой объект представляет собой отдельное сообщение отправляемое пользователям (получателям) и будет содержать всю информацию о сообщении — его тему, текст и получателей, а также параметры эскалации.

Шаг 7. Сбор идентификаторов пользователей и групп

После сопоставления триггеров с действиями начинается процесс сбора уникальных идентификаторов пользователей и групп:

userids = set() groupids = set() for message in messages:     userids.update(message.users)     groupids.update(message.groups)

Этот фрагмент кода собирает ID всех пользователей и групп, которые участвуют в операциях для каждого триггера. Это необходимо чтобы выполнить лишь один запрос к API Zabbix для всех задействованных пользователей и их групп, а не выполнять отдельные запросы для каждого триггера.

Шаг 8. Получение информации о пользователях и их группах

Следующий шаг — получение детализированной информации о пользователях и группах пользователей:

usergroups = {     group["usrgrpid"]: group     for group in api.usergroup.get(         selectUsers=["userid"],         selectHostGroupRights="extend",         output=["usrgrpid", "role"],     ) }   users = {     user["userid"]: user     for user in api.user.get(         selectUsrgrps=["usrgrpid"],         selectMedias=["mediatypeid", "active", "sendto"],         selectRole=["roleid", "type"],         filter={"status": 0},         output=["userid", "username", "name", "surname"],     ) }

Здесь мы собираем данные о пользователях, включая их роль в системе и медиа, через которые они получают уведомления. А также данные о группах пользователей, включая права доступа к хост-группам и список пользователей в каждой группе. Вся эта информация будет необходима для проверки доступов к узлу, с триггерами которого мы работаем.

Шаг 9. Сопоставление пользователей и групп с триггерами

После получения информации о пользователях начинается процесс сопоставления пользователей и групп с их соответствующими правами на получение уведомлений. Также здесь мы связываем пользователей с группами, обновляя информацию о правах и группах для каждого пользователя.

for userid in userids:     if userid in users:         user = users[userid]         recipients[userid] = Recipient(user)         for group in user["usrgrps"]:             if group["usrgrpid"] in usergroups:                 recipients[userid].permissions.update([                     h["id"]                     for h in usergroups[group["usrgrpid"]]["hostgroup_rights"]                     if int(h["permission"]) > 1                 ])   for groupid in groupids:     if groupid in usergroups:         group = usergroups[groupid]         user_groups[group["usrgrpid"]] = []         for user in group["users"]:             user_groups[group["usrgrpid"]].append(user["userid"])             if user["userid"] in recipients:                 recipients[user["userid"]].groups.update(group["usrgrpid"])             elif user["userid"] in users:                 recipients[user["userid"]] = Recipient(users[user["userid"]])             recipients[user["userid"]].permissions.update([                 h["id"]                 for h in group["hostgroup_rights"]                 if int(h["permission"]) > 1             ])

Этот фрагмент кода связывает каждого пользователя с его группами и наоборот, создавая полноценный список пользователей с их правами доступа к узлу, а значит на получение уведомлений по событиям этого узла.

Для каждого получателя создается объект класса Recipient, содержащий данные о получателе, такие как адрес для получения уведомления, права доступа к узлам сети, настроенные медиа-типы и др.

Вот код класса Recipient:

class Recipient:     def __init__(self, user):         self.userid = user["userid"]         self.username = user["username"]         self.fullname = "{name} {surname}".format(**user).strip()         self.type = user["role"]["type"]         self.groups = set([g["usrgrpid"] for g in user["usrgrps"]])         self.has_right = False         self.permissions = set()         self.sendto = {             m["mediatypeid"]: m["sendto"] for m in user["medias"] if m["active"] == "0"         }         # Check if the user is a super admin (type 3)         if self.type == "3":             self.has_right = True

Шаг 10. Сопоставление сообщений с получателями.

Наконец, сопоставляются получатели с конкретными сообщениями из шага 6:

for message in messages:     message.select_recipients(user_groups, recipients)

Этот шаг завершает основной процесс — каждому сообщению назначаются соответствующие получатели.

Шаг 11. Проверка прав доступа получателей и вывод результата.

Перед непосредственным выводом результата со списком получателей мы можем произвести проверку прав получателей сообщений и отфильтровать только тех, у кого есть соответствующие права на получение уведомлений по событиям, связанным с триггером, или у кого указаны и активны все настроенные медиа-типы. После этих действий можно выводить информацию любым удобным для Вас способом — будь то экспорт в файл, либо вывод на экран:

for trigger in triggers_metadata.values():     for message in trigger.messages:         for recipient in message.recipients:             recipient.show = True             if not recipient.has_right:                 recipient.has_right = (len([gid                     for gid in trigger.hostgroups                     if gid in recipient.permissions                 ]) > 0)             if not recipient.has_right and not show_unavail:                 recipient.show = False

Пример реализации

Все примеры и фрагменты кода, описанные выше, были собраны вместе для создания решения, демонстрирующего работу алгоритма получения получателей оповещений для триггеров выбранного узла сети. Чтобы сделать процесс более удобным и наглядным, мы реализовали этот алгоритм в виде небольшого веб-интерфейса.

Этот интерфейс позволяет пользователям ввести ID узла сети, после чего система обработает данные и выдаст список получателей оповещений, связанных с триггерами на данном узле. Веб-интерфейс использует асинхронные запросы к Zabbix API и библиотеку zabbix_utils для обеспечения быстрой обработки данных и удобства работы с большим количеством триггеров и пользователей.

Таким образом, читатели могут не только ознакомиться с теорией и примером кода, но и попробовать данное решение в действии.

Обратите внимание, что код в этом проекте предназначен для демонстрационных целей, может быть неоптимальным или содержать ошибки. Используйте его как пример для вашего проекта, но не как полноценный инструмент.

Полный исходный код и инструкции по установке веб-интерфейса можно найти на GitHub.

Заключение

zabbix_utils для решения задачи получения получателей алертов для триггеров выбранного узла сети с использованием Zabbix API. Мы детально описали ключевые шаги, начиная от настройки окружения и инициализации метаданных триггеров, до работы с получателями уведомлений и оптимизации производительности с помощью асинхронных запросов.

Использование zabbix_utils позволило упростить и ускорить взаимодействие с Zabbix API, дополнив возможности веб-интерфейса и повысив эффективность работы с большими объемами данных. Благодаря поддержке асинхронности и выборочным запросам, можно значительно снизить нагрузку на сервер и повысить производительность системы при работе с Zabbix, что особенно важно в крупных инфраструктурах.

Мы надеемся, что данный пример поможет вам в реализации собственных решений на основе Zabbix API и zabbix_utils, а также продемонстрирует возможности оптимизации взаимодействия с системой мониторинга.

Подписаться
Уведомить о
0 Comments
Старые
Новые Популярные
Межтекстовые Отзывы
Посмотреть все комментарии
0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x