Настоящий LLD описывает низкоуровневую реализацию системы интеллектуальной обработки и маршрутизации событий мониторинга Zabbix с использованием локальной LLM Qwen3.5:4B.
Документ предназначен для перехода от HLD/C4-архитектуры к реализации на Python и содержит:
LLD подготовлен на основании:
| ID | Решение | Обоснование |
|---|---|---|
| LLD-ADR-001 | Использовать отдельные сервисы alert-receiver, alert-processor-ingest, alert-processor-worker, audit-api, redis |
Соответствует разделению быстрого приема и длительной обработки |
| LLD-ADR-002 | Использовать Redis Streams для основной очереди событий | Нужны consumer group, pending messages, ack/retry и возможность deadletter |
| LLD-ADR-003 | Использовать Redis как кратковременное состояние, audit-хранилище MVP и outbox уведомлений | Соответствует HLD, упрощает MVP и Docker-развертывание |
| LLD-ADR-004 | Включить Redis AOF persistence | Иначе Redis как in-memory-хранилище не обеспечивает требование «принятое событие не должно теряться» при перезапуске контейнера |
| LLD-ADR-005 | Для High и Disaster применять детерминированную доставку, LLM использовать только как вспомогательный источник текста/диагностики |
LLM не должна отменять или подавлять критичные события |
| LLD-ADR-006 | Для Average сделать режим конфигурируемым: mandatory или intelligent |
Это прямо требуется ТЗ |
| LLD-ADR-007 | Для LLM-вызовов использовать строгий JSON-контракт ответа и дополнительную валидацию Pydantic | Нужна предсказуемость triage/remediation/correlation |
| LLD-ADR-008 | Для remediation применять post-filter guardrails по запрещенным командам и опасным действиям | Нельзя генерировать инструкции, меняющие состояние продуктивной среды |
| LLD-ADR-009 | Audit log писать поэтапно при каждом значимом действии обработки | Необходимо восстановление полного жизненного цикла события |
| LLD-ADR-010 | Доставку уведомлений реализовать через outbox/retry-механику в Redis | Сбой Matrix/SMTP не должен приводить к потере факта необходимости доставки |
Входит в реализацию:
Не входит в реализацию:
alert-receiverНазначение: внешний входной шлюз для webhook из Zabbix.
Основные функции:
POST /api/v1/webhook/zabbix;correlation_id;fingerprint;alert-processor-ingest;202 Accepted после подтверждения постановки в очередь.Не выполняет:
alert-processor-ingestНазначение: внутренняя точка постановки событий в очередь.
Основные функции:
alert-receiver;event_id + event_phase;stream:alerts:incoming;received_by_ingest и queued;202 Accepted приемному сервису.alert-processor-workerНазначение: основной контур обработки событий.
Основные функции:
audit-apiНазначение: отдельный API-контур для просмотра обработки событий и состояния worker.
Основные функции:
correlation_id;event_id;redisНазначение:
Обязательная настройка:
appendonly yes
appendfsync everysec
save 60 1000
Для MVP этого достаточно. Для промышленного варианта audit log желательно вынести в PostgreSQL или ClickHouse, но в рамках текущего ТЗ и HLD Redis допустим как единая память краткоживущего состояния.
| Задача | Библиотека |
|---|---|
| HTTP API | FastAPI |
| ASGI server | Uvicorn |
| Валидация моделей | Pydantic v2 |
| HTTP-клиенты | httpx |
| Redis async client | redis-py asyncio |
| YAML-правила | PyYAML или ruamel.yaml |
| Retry policies | tenacity |
| Логи | structlog + standard logging |
| Тесты | pytest, pytest-asyncio, httpx AsyncClient |
| Типизация | mypy, pyright опционально |
| Линтинг/форматирование | ruff |
| Конфигурация | pydantic-settings |
zabbix-llm-alerts/
├── docker-compose.yml
├── Dockerfile
├── pyproject.toml
├── README.md
├── .env.example
├── config/
│ ├── app.yaml
│ ├── routing.yaml
│ ├── suppress.yaml
│ ├── flap.yaml
│ ├── correlation.yaml
│ └── remediation_guardrails.yaml
├── src/
│ ├── common/
│ │ ├── __init__.py
│ │ ├── audit.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── errors.py
│ │ ├── logging.py
│ │ ├── models.py
│ │ ├── redis.py
│ │ ├── security.py
│ │ └── time.py
│ ├── alert_receiver/
│ │ ├── __init__.py
│ │ ├── api.py
│ │ ├── normalizer.py
│ │ └── forwarder.py
│ ├── alert_ingest/
│ │ ├── __init__.py
│ │ ├── api.py
│ │ └── queue_repository.py
│ ├── alert_worker/
│ │ ├── __init__.py
│ │ ├── runtime.py
│ │ ├── processor.py
│ │ ├── state_repository.py
│ │ ├── policy.py
│ │ ├── zabbix_enricher.py
│ │ ├── llm_client.py
│ │ ├── triage.py
│ │ ├── remediation.py
│ │ ├── guardrails.py
│ │ ├── correlation.py
│ │ ├── notification_dispatcher.py
│ │ └── notifiers/
│ │ ├── __init__.py
│ │ ├── matrix.py
│ │ └── email.py
│ └── audit_api/
│ ├── __init__.py
│ └── api.py
└── tests/
├── test_receiver_validation.py
├── test_ingest_queue.py
├── test_worker_policy.py
├── test_suppress_flap_recovery.py
├── test_correlation.py
├── test_guardrails.py
├── test_audit_api.py
└── fixtures/
├── zabbix_problem_high.json
├── zabbix_problem_warning.json
└── zabbix_recovery.json
class Severity(str, Enum):
NOT_CLASSIFIED = "Not classified"
INFORMATION = "Information"
WARNING = "Warning"
AVERAGE = "Average"
HIGH = "High"
DISASTER = "Disaster"
Внутренний numeric rank:
| Severity | Rank |
|---|---|
| Not classified | 0 |
| Information | 1 |
| Warning | 2 |
| Average | 3 |
| High | 4 |
| Disaster | 5 |
class EventPhase(str, Enum):
PROBLEM = "problem"
RECOVERY = "recovery"
UPDATE = "update"
UNKNOWN = "unknown"
Для Zabbix обычно используется логика:
event_value = 1 или status = PROBLEM → problem;event_value = 0 или status = OK/RESOLVED → recovery.class NormalizedAlertEvent(BaseModel):
schema_version: Literal["1.0"] = "1.0"
source: Literal["zabbix"] = "zabbix"
correlation_id: UUID
event_id: str
event_phase: EventPhase
event_clock: datetime
received_at: datetime
severity: Severity
severity_rank: int
host_id: str | None = None
host_name: str
trigger_id: str | None = None
trigger_name: str
trigger_description: str | None = None
item_id: str | None = None
item_name: str | None = None
operational_data: str | None = None
tags: dict[str, str] = Field(default_factory=dict)
zabbix_event_url: AnyUrl | None = None
zabbix_trigger_url: AnyUrl | None = None
zabbix_graph_url: AnyUrl | None = None
fingerprint: str
dedup_key: str
raw_payload: dict[str, Any]
fingerprint используется для suppress, flap, recovery и correlation. Он должен быть стабильным для повторяющихся событий одного типа.
Рекомендуемый алгоритм:
fingerprint_source = lower(
host_id or host_name
+ "|"
+ trigger_id or trigger_name
+ "|"
+ tags.get("service", "")
+ "|"
+ tags.get("component", "")
)
fingerprint = sha256(fingerprint_source).hexdigest()
event_id не включается в fingerprint, потому что он уникален для конкретного события и не подходит для поиска повторов.
dedup_key = f"{event_id}:{event_phase}"
Для recovery это важно: problem и recovery одного Zabbix event могут иметь один event_id, но это разные фазы жизненного цикла.
POST /api/v1/webhook/zabbixНазначение: принять webhook от Zabbix.
Headers:
Authorization: Bearer <ZABBIX_WEBHOOK_TOKEN>
Content-Type: application/json
Также допускается режим совместимости:
X-Webhook-Token: <ZABBIX_WEBHOOK_TOKEN>
Request body: JSON payload от Zabbix.
Минимальные обязательные поля после нормализации:
| Поле | Обязательно | Описание |
|---|---|---|
event_id |
Да | ID события Zabbix |
event_phase |
Да | problem/recovery/update |
severity |
Да | Критичность |
host_name |
Да | Имя хоста |
trigger_name |
Да | Название trigger/problem |
event_clock |
Да | Время события |
raw_payload |
Да | Исходный payload |
Response 202 Accepted:
{
"status": "accepted",
"correlation_id": "b451e9b4-c9a8-4cc2-9b2a-8ef1a43d1f67",
"event_id": "123456",
"queued": true
}
Response 401 Unauthorized:
{
"detail": "Invalid webhook token"
}
Response 422 Unprocessable Entity:
{
"detail": "Payload validation failed",
"errors": [
{
"field": "event_id",
"message": "field required"
}
]
}
Response 503 Service Unavailable:
{
"detail": "Ingest service unavailable"
}
Важно: если alert-processor-ingest недоступен и событие не поставлено в очередь, alert-receiver не должен возвращать 202.
GET /health/live
GET /health/ready
live проверяет только работоспособность процесса.
ready проверяет доступность alert-processor-ingest.
POST /api/v1/eventsНазначение: принять нормализованное событие и поставить в Redis Stream.
Headers:
Authorization: Bearer <INTERNAL_API_TOKEN>
Content-Type: application/json
Request body: NormalizedAlertEvent.
Response 202 Accepted:
{
"status": "queued",
"correlation_id": "b451e9b4-c9a8-4cc2-9b2a-8ef1a43d1f67",
"event_id": "123456",
"stream": "stream:alerts:incoming",
"stream_id": "1710000000000-0"
}
Response для дубликата:
{
"status": "duplicate",
"correlation_id": "b451e9b4-c9a8-4cc2-9b2a-8ef1a43d1f67",
"event_id": "123456",
"queued": false
}
Дубликат фиксируется в audit log, но повторно в основную очередь не ставится.
GET /health/live
GET /health/ready
ready проверяет Redis PING и возможность записи тестового volatile-ключа.
GET /api/v1/audit/eventsНазначение: получить последние обработанные события.
Query params:
| Параметр | По умолчанию | Описание |
|---|---|---|
limit |
50 | Количество событий |
offset |
0 | Смещение |
severity |
null | Фильтр по критичности |
status |
null | Фильтр по итоговому статусу |
Response:
{
"items": [
{
"correlation_id": "b451e9b4-c9a8-4cc2-9b2a-8ef1a43d1f67",
"event_id": "123456",
"host_name": "db-01",
"trigger_name": "PostgreSQL is down",
"severity": "High",
"event_phase": "problem",
"final_decision": "notify",
"channels": ["matrix", "email"],
"created_at": "2026-05-04T10:00:00+03:00",
"updated_at": "2026-05-04T10:00:03+03:00"
}
],
"limit": 50,
"offset": 0
}
GET /api/v1/audit/events/{correlation_id}Назначение: получить полную карточку обработки события.
Response:
{
"correlation_id": "b451e9b4-c9a8-4cc2-9b2a-8ef1a43d1f67",
"event_id": "123456",
"event": {},
"state": {},
"policy_decision": {},
"enrichment": {},
"triage": {},
"correlation": {},
"remediation": {},
"notification": {},
"timeline": []
}
GET /api/v1/audit/by-event/{event_id}Назначение: найти карточку обработки по Zabbix event_id.
Если найдено несколько фаз, возвращаются все связанные correlation_id.
GET /api/v1/workersНазначение: получить состояние worker.
Response:
{
"workers": [
{
"worker_id": "worker-01",
"status": "alive",
"last_heartbeat": "2026-05-04T10:01:00+03:00",
"current_event": null,
"processed_total": 125,
"failed_total": 2
}
]
}
| Назначение | Ключ |
|---|---|
| Основная очередь событий | stream:alerts:incoming |
| Deadletter событий | stream:alerts:deadletter |
| Audit timeline, опционально общий поток | stream:audit:events |
| Notification outbox | stream:notifications:outbox |
| Notification deadletter | stream:notifications:deadletter |
Consumer group основной очереди:
group: alert-workers
consumer: <worker_id>
Создание группы при старте:
XGROUP CREATE stream:alerts:incoming alert-workers $ MKSTREAM
| Назначение | Ключ | Тип |
|---|---|---|
| Карточка события | audit:event:<correlation_id> |
JSON string или HASH |
| Timeline события | audit:timeline:<correlation_id> |
Redis Stream |
| Индекс event_id → correlation_id | audit:index:event_id:<event_id> |
SET |
| Последние события | audit:index:latest |
ZSET |
| Индекс по severity | audit:index:severity:<severity> |
ZSET |
| Индекс по final decision | audit:index:decision:<decision> |
ZSET |
TTL:
audit:event:<correlation_id> AUDIT_RETENTION_DAYS
audit:timeline:<correlation_id> AUDIT_RETENTION_DAYS
audit:index:* AUDIT_RETENTION_DAYS или периодическая очистка
| Назначение | Ключ | Тип | TTL |
|---|---|---|---|
| Dedup принятого события | dedup:accepted:<event_id>:<phase> |
STRING | DEDUP_TTL_SEC |
| Активный problem по fingerprint | alert:active:<fingerprint> |
STRING | STATE_RETENTION_SEC |
| Последнее уведомление | alert:last_notify:<fingerprint> |
STRING/HASH | SUPPRESS_WINDOW_SEC |
| История фаз события | alert:history:<fingerprint> |
ZSET | STATE_RETENTION_SEC |
| Окно flap | alert:flap:<fingerprint> |
ZSET | FLAP_WINDOW_SEC |
| Окно correlation | alert:correlation:<scope> |
ZSET | CORRELATION_WINDOW_SEC |
| Triage cache | triage:cache:<triage_hash> |
JSON string | TRIAGE_CACHE_TTL_SEC |
| Worker heartbeat | worker:heartbeat:<worker_id> |
HASH | WORKER_HEARTBEAT_TTL_SEC |
| Attempts stream message | queue:attempts:<stream_id> |
INTEGER | QUEUE_ATTEMPT_TTL_SEC |
| Назначение | Ключ | Тип |
|---|---|---|
| Delivery job | delivery:job:<delivery_id> |
JSON string/HASH |
| Retry schedule | delivery:retry:schedule |
ZSET |
| Delivery attempts | delivery:attempts:<delivery_id> |
INTEGER |
delivery_id формируется как:
sha256(correlation_id + channel + message_kind)
Это позволяет сделать отправку идемпотентной.
Псевдокод:
async def worker_loop() -> None:
await ensure_consumer_group()
while True:
await publish_heartbeat()
await reclaim_stale_pending_messages()
await process_due_delivery_retries()
messages = await queue.read_group(
stream="stream:alerts:incoming",
group="alert-workers",
consumer=worker_id,
count=settings.queue.batch_size,
block_ms=settings.queue.block_ms,
)
for stream_id, payload in messages:
try:
await process_one(stream_id, payload)
await queue.ack(stream_id)
except RetryableProcessingError as exc:
await schedule_event_retry(stream_id, payload, exc)
except PermanentProcessingError as exc:
await move_to_deadletter(stream_id, payload, exc)
except Exception as exc:
await schedule_event_retry(stream_id, payload, exc)
При retryable error:
queue:attempts:<stream_id>.QUEUE_MAX_ATTEMPTS:
retry_scheduled;stream:alerts:incoming с тем же correlation_id, attempt=N;XACK.stream:alerts:deadletter;moved_to_deadletter;XACK.Если worker умер после чтения сообщения, но до XACK, сообщение остается в pending list consumer group.
При старте и периодически worker выполняет:
XAUTOCLAIM stream:alerts:incoming alert-workers <worker_id> <min-idle-ms> 0-0 COUNT 10
min-idle-ms = QUEUE_VISIBILITY_TIMEOUT_SEC * 1000.
async def process_event(event: NormalizedAlertEvent) -> ProcessingResult:
audit.processing_started(event)
state_snapshot = await state.load_state(event)
dedup_result = await state.check_processing_dedup(event)
if dedup_result.is_duplicate_retry_completed:
audit.duplicate_skipped(event)
return ProcessingResult(final_decision="duplicate_skipped")
recovery_result = await recovery_detector.detect(event, state_snapshot)
flap_result = await flap_detector.detect(event, state_snapshot)
suppress_result = await suppress_engine.evaluate(event, state_snapshot)
policy_decision = await policy_engine.decide(
event=event,
recovery=recovery_result,
flap=flap_result,
suppress=suppress_result,
)
enrichment = await zabbix_enricher.enrich_if_required(event, policy_decision)
correlation = await correlation_engine.correlate(event, enrichment)
if correlation.requires_llm_fallback:
correlation = await correlation_fallback.correlate(event, enrichment, correlation)
triage = None
if policy_decision.requires_llm_triage:
triage = await triage_service.triage(event, enrichment, correlation)
remediation = await remediation_service.build(event, enrichment, correlation)
remediation = guardrails.filter(remediation)
final_decision = decision_builder.build(
event=event,
policy=policy_decision,
triage=triage,
suppress=suppress_result,
flap=flap_result,
recovery=recovery_result,
correlation=correlation,
remediation=remediation,
)
delivery_result = await notification_dispatcher.dispatch(final_decision)
await state.update_after_processing(event, final_decision, delivery_result)
audit.completed(event, final_decision, delivery_result)
return ProcessingResult(...)
| Severity | Базовое поведение |
|---|---|
| Disaster | Обязательное уведомление, LLM не может подавить |
| High | Обязательное уведомление, LLM не может подавить |
| Average | Зависит от AVERAGE_POLICY |
| Warning | LLM triage + deterministic rules |
| Information | LLM triage + deterministic rules |
| Not classified | LLM triage + deterministic rules |
AVERAGE_POLICYAVERAGE_POLICY=mandatory
или
AVERAGE_POLICY=intelligent
mandatory:
Average всегда доставляется;intelligent:
Average проходит suppress/triage/correlation аналогично Warning;class PolicyDecision(BaseModel):
track: Literal["mandatory", "intelligent", "recovery", "suppress_candidate"]
must_notify: bool
may_suppress: bool
requires_enrichment: bool
requires_llm_triage: bool
requires_remediation: bool
allowed_channels: list[Literal["matrix", "email"]]
reason: str
Для High и Disaster:
must_notify = True
may_suppress = False
requires_llm_triage = False
requires_remediation = True
Допускается маркировка как duplicate/repeat/flap, но это не отменяет доставку первого критичного события. Повторные критичные события можно группировать в текст уведомления, но не полностью скрывать, если политика эксплуатации явно этого не разрешила.
Suppress нужен для снижения шума по повторяющимся низкоприоритетным событиям.
Suppress запрещен для:
High;Disaster;Average, если AVERAGE_POLICY=mandatory.config/suppress.yamlsuppress:
enabled: true
default_window_sec: 1800
max_repeats_before_notify: 1
apply_to_severities:
- Not classified
- Information
- Warning
rules:
- name: repeated_low_disk_warning
match:
trigger_name_regex: "(?i).*disk.*space.*"
severity: Warning
window_sec: 3600
max_repeats_before_notify: 1
action: suppress_repeat
- name: interface_flap_noise
match:
trigger_name_regex: "(?i).*interface.*(down|up).*"
severity: Warning
window_sec: 900
max_repeats_before_notify: 2
action: suppress_repeat
alert:last_notify:<fingerprint>.suppress_candidate;suppressed.alert:last_notify:<fingerprint>.Flap — частое переключение состояния problem/recovery в коротком окне.
config/flap.yamlflap:
enabled: true
window_sec: 900
min_transitions: 4
apply_to_severities:
- Not classified
- Information
- Warning
- Average
notify_on_flap_detected: true
suppress_individual_flap_events: true
ZADD alert:flap:<fingerprint> <timestamp> "<timestamp>|<phase>|<correlation_id>|<event_id>"
ZREMRANGEBYSCORE alert:flap:<fingerprint> -inf <now-window>
Добавить текущую фазу в alert:flap:<fingerprint>.
Удалить записи старше FLAP_WINDOW_SEC.
Получить последние фазы.
Посчитать количество переходов problem -> recovery и recovery -> problem.
Если переходов >= min_transitions:
установить flap_detected=True;
записать audit;
для низких severity можно отправить одно агрегированное уведомление;
отдельные повторные события внутри flap-окна можно подавлять.
Recovery нужен для фиксации восстановления ранее открытого события.
SET alert:active:<fingerprint> <correlation_id> EX <STATE_RETENTION_SEC>
Для problem:
alert:active:<fingerprint>.Для recovery:
найти alert:active:<fingerprint>;
если найдено — связать recovery с исходным problem;
удалить или пометить active как recovered;
записать audit recovery_detected.
Рекомендуемое поведение:
| Исходное событие | Recovery уведомлять? |
|---|---|
| Disaster | Да |
| High | Да |
| Average mandatory | Да |
| Average intelligent | Конфигурируемо |
| Warning и ниже | Да, если исходный problem был доставлен; иначе только audit |
Correlation Engine определяет роль события:
root — вероятная первопричина;
child — следствие другого события;
standalone — самостоятельное событие.
| Источник | Приоритет |
|---|---|
| Deterministic YAML rules | 1 |
| Topology/service tags | 2 |
| LLM fallback | 3 |
| No correlation | 4 |
config/correlation.yamlcorrelation:
enabled: true
window_sec: 600
deterministic_confidence_threshold: 0.75
llm_fallback_enabled: true
llm_fallback_min_confidence: 0.7
event_types:
- name: network_unavailable
match:
trigger_name_regex: "(?i).*(unavailable|icmp ping|host is down).*"
tags:
component: network
role_hint: root
- name: service_down
match:
trigger_name_regex: "(?i).*(service.*down|process.*not running).*"
role_hint: child
- name: database_unavailable
match:
trigger_name_regex: "(?i).*(postgres|mysql|database).*(down|unavailable).*"
role_hint: root
dependencies:
- parent_type: network_unavailable
child_type: service_down
same_host: false
same_service_tag: true
confidence: 0.85
- parent_type: database_unavailable
child_type: application_error
same_service_tag: true
confidence: 0.8
class CorrelationResult(BaseModel):
role: Literal["root", "child", "standalone", "unknown"]
source: Literal["deterministic", "llm", "none"]
confidence: float
parent_correlation_id: UUID | None = None
parent_event_id: str | None = None
correlation_type: str | None = None
reason: str
Классифицировать текущее событие по event_types.
Сохранить событие в correlation window:
ZADD alert:correlation:<scope> <timestamp> <correlation_id>
Получить события из окна CORRELATION_WINDOW_SEC.
Для каждого кандидата проверить dependency rules.
Если найден parent с confidence >= threshold:
вернуть role=child;
указать parent;
записать audit.
Если событие само выглядит как parent/root:
role=root.Если уверенности нет:
standalone или вызвать LLM fallback.LLM fallback может только предложить связь и роль события. Для High/Disaster результат LLM correlation не может:
отменить уведомление;
suppress событие;
понизить критичность;
изменить факт доставки.
Рекомендуется использовать HTTP API внешнего LLM-сервиса.
Поддерживаемые варианты:
Ollama-compatible API;
OpenAI-compatible local gateway;
собственный FastAPI-wrapper над моделью.
Базовые параметры:
llm:
provider: openai_compatible
base_url: "http://llm:11434/v1"
model: "qwen3.5:4b"
timeout_sec: 30
temperature: 0.1
max_tokens: 1200
retries: 1
circuit_breaker:
enabled: true
failure_threshold: 5
reset_timeout_sec: 120
Всегда передавать только нужный контекст, без секретов.
Всегда требовать JSON-only output.
Всегда валидировать ответ Pydantic-моделью.
При невалидном JSON выполнить одну повторную попытку с repair prompt.
При повторном сбое использовать безопасную деградацию.
Все prompts и ответы фиксировать в audit в redacted-виде.
class LlmTriageResult(BaseModel):
verdict: Literal["notify", "suppress_noise", "ignore_low_value"]
confidence: float = Field(ge=0, le=1)
reason: str
operator_summary: str
cacheable: bool = True
cache_ttl_sec: int | None = None
System prompt:
Ты локальный помощник дежурного инженера мониторинга.
Твоя задача — оценить низкоприоритетное событие Zabbix.
Ты не принимаешь решений по критичным событиям High/Disaster.
Верни только JSON по заданной схеме.
Не предлагай действий, изменяющих состояние систем.
User prompt:
Оцени событие Zabbix для triage.
Контекст события:
<event_json>
Обогащение:
<enrichment_json>
Корреляция:
<correlation_json>
Вердикты:
- notify: уведомить оператора
- suppress_noise: подавить как шум/повтор
- ignore_low_value: оставить без уведомления как несущественное или недостаточно информативное
Верни JSON:
{
"verdict": "notify|suppress_noise|ignore_low_value",
"confidence": 0.0,
"reason": "...",
"operator_summary": "...",
"cacheable": true,
"cache_ttl_sec": 600
}
| Ошибка LLM | Поведение |
|---|---|
| Timeout | Для Warning/Average intelligent — notify, если нет deterministic suppress |
| HTTP 5xx | Notify или audit-only по конфигурации, но не терять событие |
| Invalid JSON | Одна repair-попытка, затем fallback |
| Low confidence | Notify |
| Circuit breaker open | Не вызывать LLM, применять deterministic fallback |
Для Average intelligent безопаснее использовать fallback notify, а не suppress.
class DiagnosticStep(BaseModel):
title: str
description: str
command: str | None = None
expected_result: str | None = None
safety_class: Literal["read_only"]
class LlmRemediationResult(BaseModel):
summary: str
possible_causes: list[str]
diagnostic_steps: list[DiagnosticStep]
escalation_hint: str | None = None
limitations: str | None = None
Разрешены только read-only действия:
просмотр статуса;
чтение логов;
просмотр метрик;
просмотр процессов;
просмотр сокетов;
просмотр файловой системы;
диагностические запросы без изменения состояния.
Запрещены:
systemctl restart|stop|start|reload;
service restart|stop|start|reload;
docker restart|stop|rm;
kubectl delete|apply|rollout restart|scale;
rm, mv, chmod, chown;
iptables, nft, firewall-cmd modifying actions;
SQL UPDATE, DELETE, INSERT, DROP, ALTER, TRUNCATE;
любые команды автоисправления.
config/remediation_guardrails.yaml:
guardrails:
deny_command_regex:
- "(?i)\\brm\\b"
- "(?i)\\bsystemctl\\s+(restart|stop|start|reload)\\b"
- "(?i)\\bservice\\s+\\S+\\s+(restart|stop|start|reload)\\b"
- "(?i)\\bdocker\\s+(restart|stop|rm|compose\\s+down)\\b"
- "(?i)\\bkubectl\\s+(delete|apply|scale|rollout\\s+restart)\\b"
- "(?i)\\b(update|delete|insert|drop|alter|truncate)\\b.*;"
- "(?i)\\biptables\\b.*\\s(-A|-D|-I|-F)\\b"
- "(?i)\\bnft\\b.*\\b(add|delete|flush)\\b"
allow_command_regex:
- "(?i)^systemctl\\s+status\\s+"
- "(?i)^journalctl\\s+"
- "(?i)^tail\\s+"
- "(?i)^grep\\s+"
- "(?i)^ss\\s+"
- "(?i)^ps\\s+"
- "(?i)^df\\s+"
- "(?i)^free\\s+"
- "(?i)^top\\b"
- "(?i)^docker\\s+(ps|logs|inspect)\\b"
- "(?i)^curl\\s+(-I|--head)"
Если remediation содержит опасные команды:
команда удаляется;
в audit пишется remediation_guardrail_removed_step;
оператору показывается безопасный текст без опасного действия.
Zabbix Enricher получает дополнительный контекст:
trigger description;
host inventory/tags;
item metadata;
последние значения метрик;
ссылки на event/trigger/graph;
related events, если доступно.
zabbix:
url: "https://zabbix.example.local"
api_url: "https://zabbix.example.local/api_jsonrpc.php"
timeout_sec: 10
verify_tls: true
enrich:
trigger: true
host: true
item: true
recent_events: true
graph_url: true
Секреты:
ZABBIX_API_TOKEN=...
| Ошибка | Поведение |
|---|---|
| Timeout | Записать audit, продолжить без enrichment |
| 401/403 | Записать audit security error, продолжить без enrichment, поднять health warning |
| 5xx | Retry 1-2 раза, затем продолжить без enrichment |
| Нет item/trigger | Продолжить с частичным контекстом |
Сбой enrichment не должен приводить к потере события.
class FinalDecision(BaseModel):
correlation_id: UUID
event_id: str
final_action: Literal["notify", "suppress", "audit_only", "duplicate_skipped"]
reason: str
channels: list[Literal["matrix", "email"]]
message: NotificationMessage | None
retry_policy: RetryPolicy
config/routing.yamlrouting:
default_channels:
- matrix
severity_routes:
Disaster:
channels: [matrix, email]
mention: true
High:
channels: [matrix, email]
mention: true
Average:
channels: [matrix]
Warning:
channels: [matrix]
Information:
channels: []
Not classified:
channels: []
recovery:
notify_if_original_was_notified: true
channels: [matrix]
matrix:
room_by_severity:
Disaster: "!critical:example.local"
High: "!critical:example.local"
Average: "!alerts:example.local"
Warning: "!alerts:example.local"
email:
recipients_by_severity:
Disaster: ["noc@example.local"]
High: ["noc@example.local"]
Структура уведомления:
[High] db-01 — PostgreSQL is down
Статус: PROBLEM
Время Zabbix: 2026-05-04 10:00:00
Event ID: 123456
Correlation ID: b451e9b4-c9a8-4cc2-9b2a-8ef1a43d1f67
Краткое описание:
...
Контекст Zabbix:
- host: db-01
- trigger: PostgreSQL is down
- tags: service=database, component=postgresql
- event URL: ...
Корреляция:
role=root, source=deterministic, confidence=0.85
Рекомендации по диагностике:
1. Проверить статус сервиса: systemctl status postgresql
2. Посмотреть последние логи: journalctl -u postgresql --since "30 minutes ago"
3. Проверить порт: ss -lntp | grep 5432
Audit: https://audit.local/api/v1/audit/events/<correlation_id>
Для временной ошибки Matrix/SMTP:
Создать delivery:job:<delivery_id>.
Добавить в delivery:retry:schedule со score retry_at.
Worker периодически выбирает due jobs:
ZRANGEBYSCORE delivery:retry:schedule -inf <now>
Если успешно — удалить job и записать audit.
Если попытки исчерпаны — перенести в stream:notifications:deadletter.
Параметры:
notifications:
retry:
max_attempts: 5
backoff_sec: [30, 120, 300, 900, 1800]
Минимальный набор этапов:
| Stage | Где фиксируется |
|---|---|
received_by_receiver |
alert-receiver |
validation_failed |
alert-receiver |
normalized |
alert-receiver |
forwarded_to_ingest |
alert-receiver |
received_by_ingest |
ingest |
duplicate_detected |
ingest/worker |
queued |
ingest |
processing_started |
worker |
state_loaded |
worker |
recovery_detected |
worker |
flap_detected |
worker |
suppress_evaluated |
worker |
policy_decided |
worker |
enrichment_started |
worker |
enrichment_completed |
worker |
enrichment_failed |
worker |
correlation_completed |
worker |
llm_triage_started |
worker |
llm_triage_completed |
worker |
llm_triage_failed |
worker |
remediation_completed |
worker |
guardrails_applied |
worker |
notification_selected |
worker |
notification_sent |
worker |
notification_retry_scheduled |
worker |
notification_failed |
worker |
processing_completed |
worker |
processing_failed |
worker |
moved_to_deadletter |
worker |
class AuditRecord(BaseModel):
correlation_id: UUID
event_id: str | None
stage: str
status: Literal["ok", "warning", "error"]
timestamp: datetime
component: str
message: str
data: dict[str, Any] = Field(default_factory=dict)
Перед записью audit необходимо маскировать:
Authorization;
X-Webhook-Token;
ZABBIX_API_TOKEN;
Matrix access token;
SMTP password;
любые поля password, secret, token, apikey, api_key.
Пример:
{
"token": "***redacted***"
}
.env.exampleAPP_ENV=dev
LOG_LEVEL=INFO
TZ=Europe/Moscow
REDIS_URL=redis://redis:6379/0
RECEIVER_HOST=0.0.0.0
RECEIVER_PORT=8080
INGEST_URL=http://alert-processor-ingest:8081
INGEST_HOST=0.0.0.0
INGEST_PORT=8081
AUDIT_API_HOST=0.0.0.0
AUDIT_API_PORT=8082
ZABBIX_WEBHOOK_TOKEN=change-me
INTERNAL_API_TOKEN=change-me-internal
AUDIT_API_TOKEN=change-me-audit
ZABBIX_URL=https://zabbix.example.local
ZABBIX_API_URL=https://zabbix.example.local/api_jsonrpc.php
ZABBIX_API_TOKEN=change-me
ZABBIX_VERIFY_TLS=true
LLM_BASE_URL=http://llm:11434/v1
LLM_MODEL=qwen3.5:4b
LLM_TIMEOUT_SEC=30
LLM_TEMPERATURE=0.1
AVERAGE_POLICY=mandatory
DEDUP_TTL_SEC=86400
STATE_RETENTION_SEC=604800
AUDIT_RETENTION_DAYS=14
TRIAGE_CACHE_TTL_SEC=3600
QUEUE_MAX_ATTEMPTS=5
QUEUE_BATCH_SIZE=10
QUEUE_BLOCK_MS=5000
QUEUE_VISIBILITY_TIMEOUT_SEC=120
MATRIX_ENABLED=true
MATRIX_HOMESERVER_URL=https://matrix.example.local
MATRIX_ACCESS_TOKEN=change-me
MATRIX_DEFAULT_ROOM_ID=!alerts:example.local
SMTP_ENABLED=true
SMTP_HOST=mail.example.local
SMTP_PORT=587
SMTP_USERNAME=alerts@example.local
SMTP_PASSWORD=change-me
SMTP_FROM=alerts@example.local
SMTP_USE_TLS=true
config/app.yamlprocessing:
average_policy: "${AVERAGE_POLICY}"
low_severity_triage:
enabled: true
min_confidence_notify_fallback: 0.65
critical:
mandatory_notify_severities:
- High
- Disaster
queue:
stream: stream:alerts:incoming
deadletter_stream: stream:alerts:deadletter
consumer_group: alert-workers
batch_size: 10
block_ms: 5000
max_attempts: 5
visibility_timeout_sec: 120
state:
dedup_ttl_sec: 86400
retention_sec: 604800
triage_cache_ttl_sec: 3600
worker:
heartbeat_interval_sec: 10
heartbeat_ttl_sec: 30
security:
redact_keys:
- authorization
- token
- password
- secret
- api_key
- apikey
services:
redis:
image: redis:7.4-alpine
container_name: zbxai-redis
command:
- redis-server
- --appendonly
- "yes"
- --appendfsync
- everysec
volumes:
- redis-data:/data
networks:
- zbxai-internal
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
alert-receiver:
build: .
container_name: zbxai-alert-receiver
command: uvicorn src.alert_receiver.api:app --host 0.0.0.0 --port 8080
env_file: .env
depends_on:
redis:
condition: service_healthy
alert-processor-ingest:
condition: service_started
ports:
- "8080:8080"
networks:
- zbxai-public
- zbxai-internal
alert-processor-ingest:
build: .
container_name: zbxai-alert-ingest
command: uvicorn src.alert_ingest.api:app --host 0.0.0.0 --port 8081
env_file: .env
depends_on:
redis:
condition: service_healthy
networks:
- zbxai-internal
alert-processor-worker:
build: .
container_name: zbxai-alert-worker
command: python -m src.alert_worker.runtime
env_file: .env
depends_on:
redis:
condition: service_healthy
networks:
- zbxai-internal
audit-api:
build: .
container_name: zbxai-audit-api
command: uvicorn src.audit_api.api:app --host 0.0.0.0 --port 8082
env_file: .env
depends_on:
redis:
condition: service_healthy
ports:
- "8082:8082"
networks:
- zbxai-public
- zbxai-internal
volumes:
redis-data:
networks:
zbxai-public:
zbxai-internal:
internal: true
Примечание: если Zabbix находится вне Docker host, наружу публикуется только alert-receiver. ingest, worker и Redis остаются во внутренней сети.
| Сервис | Endpoint | Проверки |
|---|---|---|
| alert-receiver | /health/live |
Процесс жив |
| alert-receiver | /health/ready |
Доступен ingest |
| alert-processor-ingest | /health/live |
Процесс жив |
| alert-processor-ingest | /health/ready |
Redis доступен |
| audit-api | /health/live |
Процесс жив |
| audit-api | /health/ready |
Redis доступен |
| worker | heartbeat в Redis | Worker жив и обновляет статус |
Все сервисы пишут JSON logs:
{
"timestamp": "2026-05-04T10:00:00+03:00",
"level": "INFO",
"component": "alert-worker",
"correlation_id": "...",
"event_id": "123456",
"stage": "policy_decided",
"message": "Critical event requires mandatory notification"
}
На первом этапе достаточно health и JSON logs. Для следующей итерации рекомендуется добавить /metrics Prometheus:
zbxai_events_received_total;
zbxai_events_queued_total;
zbxai_events_processed_total;
zbxai_events_failed_total;
zbxai_llm_requests_total;
zbxai_llm_failures_total;
zbxai_notifications_sent_total;
zbxai_notifications_failed_total;
zbxai_queue_pending_total.
alert-receiver доступен Zabbix по HTTP/HTTPS.
Webhook обязательно защищается bearer-токеном.
Желательно публиковать сервис за reverse proxy с TLS.
Желательно ограничить source IP до адресов Zabbix/proxy.
alert-receiver обращается к alert-processor-ingest только по внутренней Docker-сети.
Внутренний API защищен INTERNAL_API_TOKEN.
Redis не публикуется наружу.
В LLM не передаются секреты.
Raw payload перед передачей в LLM проходит redaction.
LLM не используется как источник истины для критичных решений.
LLM output всегда валидируется.
Audit API защищается отдельным токеном.
Audit хранит полезную техническую информацию, но секреты маскируются.
Для публичной ссылки из уведомления лучше использовать reverse proxy с авторизацией или не включать внешнюю ссылку до появления защищенного доступа.
| ID | Сценарий | Ожидаемый результат |
|---|---|---|
| AT-001 | Валидный webhook от Zabbix | 202 Accepted, событие в stream:alerts:incoming, audit queued |
| AT-002 | Неверный токен webhook | 401, событие не попало в очередь |
| AT-003 | Некорректный payload | 422, событие не попало в очередь |
| AT-004 | Ingest недоступен | Receiver возвращает 503, ложный 202 не возвращается |
| AT-005 | High event при недоступной LLM | Событие доставлено, audit содержит LLM failure/degraded |
| AT-006 | Disaster event | Обязательная доставка в Matrix и email |
| AT-007 | Average при AVERAGE_POLICY=mandatory |
Доставка без LLM triage |
| AT-008 | Average при AVERAGE_POLICY=intelligent |
Применяется LLM triage, результат фиксируется в audit |
| AT-009 | Warning event | Проходит triage, verdict фиксируется в audit |
| AT-010 | Повтор Warning в suppress window | Повтор подавляется, audit содержит suppress decision |
| AT-011 | Частые problem/recovery | Обнаруживается flap |
| AT-012 | Recovery после problem | Recovery связывается с исходным problem |
| AT-013 | Детерминированная корреляция | Child/root определяются по YAML-правилам |
| AT-014 | Нет deterministic correlation | Вызывается LLM fallback при включенной настройке |
| AT-015 | Matrix timeout | Delivery job попадает в retry schedule |
| AT-016 | SMTP permanent failure | После попыток delivery попадает в notification deadletter |
| AT-017 | Worker падает до XACK | Сообщение остается pending и переобрабатывается через XAUTOCLAIM |
| AT-018 | Remediation содержит опасную команду | Команда удаляется guardrails, audit фиксирует удаление |
| AT-019 | Audit by event_id | Возвращается карточка события |
| AT-020 | Перенос на другой Docker host | После изменения .env сервис запускается без правки кода |
NormalizedAlertEvent.alert-receiver.alert-processor-ingest..env.example и инструкции.| Риск | Комментарий | Мера |
|---|---|---|
| Redis без persistence потеряет события при перезапуске | Требование запрещает потерю принятых событий | Включить AOF и volume |
| LLM вернет невалидный JSON | Типовой риск локальных моделей | JSON schema, repair retry, fallback |
| LLM предложит опасную команду | Недопустимо по ТЗ | Guardrails deny/allow filters |
| Matrix token истечет | Уже известная сложность Matrix/MAS/OIDC | Для MVP использовать long-lived token или отдельного service user; ротацию вынести отдельным этапом |
| Zabbix API недоступен | Enrichment не должен ломать обработку | Partial enrichment + audit warning |
| Слишком много событий | Redis Stream растет | Retention policy, мониторинг pending, в будущем отдельный broker/storage |
| Audit в Redis ограничен TTL | Подходит для кратковременного audit MVP | Для долгосрочного аудита добавить PostgreSQL/ClickHouse |