English

お問い合わせ

BLOG

Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その10 - プリプロッセシングアイテム

こんにちは、MIRACLE ZBXサポートを担当している花島タケシです。 3.4.xにおいて、監視後の処理(というより、名称からして"何らか"の前の処理(Preprocessing)) が機能拡張されています。 そこで、今回はプリプロッセシングアイテムについての解説を行います。

プリプロセッシングアイテムと新規プロセスの導入、ヒストリキャッシュへの処理の変更

今回はプリプロッセシングアイテムについての解説を行います。

プリプロッセシングアイテム

旧来は取得したデータに対して、差分や差分/時間を求めてそれをヒストリ情報とするといった処理を行うことができました。

3.4.xにおいて、監視後の処理(というより、名称からして"何らか"の前の処理(Preprocessing)が機能拡張されています。
そこで、今回はプリプロッセシングアイテムについての解説を行います。

WebフロントエンドでのPreprocessingの翻訳は、"保存前処理"とされています。

アイテムの作成

3.4.xのアイテム作成画面が下図のように改変されました。

アイテム1(zbx-tl-022用)

アイテム2(zbx-tl-022用)

このアイテムタイプを選択すると下図のような画面に遷移します。

アイテム3(zbx-tl-022用)

旧来のバージョンにはなかった「マスターアイテム」という項目が出てきました。

プリプロセッシングとマスターアイテムの関係

XMLやJSONデータ(それに限りませんが)には、複数の項目を含むことができます。

ある監視対象が然るべき手続きを行った後に複数の項目を含んだデータを返す場合、プリプロセッシングによりそこから切り出した"1つの"監視結果を取得することができます。

同様に、その同じデータを用いる別のアイテムが設定されている場合、そのアイテムも監視を行うには監視対象と通信を行わないといけないのでしょうか?

この場合、明らかに無駄なことが行われますし、同じ時刻からの監視データを取得することはできません。
監視対象からデータを取得することは1回にし、個々の監視アイテムに対して監視結果を切り出せば良いわけです。

3.4.xにおいて、「依存アイテム」を導入することにより、このことが可能となりました。つまり、「マスターアイテム」で元となるデータを取得し、依存アイテムで切り出し処理を行うわけです。

プリプロセッサーマネージャーとプリプロセッサーワーカーの導入

上述の処理を行うために、プロプロセッサーマネージャーとプリプロセッサーワーカー(以下マネージャーとワーカー)というプロセスが導入されました。
これらは、3.4.xで導入されたUNIXソケット通信を用いています。

マネージャーは1つしか起動しません。ワーカーは複数起動することができます。

zabbix_server.confでの、StartPreprocessorsパラメータで変更可能です。

Zabbixサーバーが起動すると、マネージャーといくつかのワーカーがfork()されて起動します。
ワーカーは起動直後にマネージャーへメッセージを送信し、それを受信したマネージャーは登録を行い、その存在を認知します。

この辺りはAlerterの複数化で説明をしたので割愛します。

Pollerプロセスからマネージャーへの処理依頼

監視で取得したデータを加工するためには、DBへ書き出す前に行わなくてはなりません。

これを可能とするプロセスは、キャッシュへ格納する前のPoller(やTrapper等)プロセスか、DBへ書き出す前のDB Syncerプロセスとなります。
本機能は主にキャッシュへ格納する前のPollerプロセスから処理を分岐することとなります。

Pollerプロセスが、get_values()にて監視データを取得した後に、zbx_preprocess_item_value()をコールします。
この関数では取得したデータを含む関連情報をパッケージ化し送信バッファへ付与します。
その後、zbx_preprocessor_flush()をコールして送信バッファの内容をプリプロセッサマネージャーへを送信します。

以前のバージョンでは、Pollerプロセスがdc_add_history()をコールし、監視データをヒストリキャッシュに格納していましたが、3.4.xからはこれがなくなり、データフローが変更されています。

src/zabbix_server/preprocessor/preprocessing.c :

686 void zbx_preprocessor_flush(void)
687 {
688 if (0 < cached_message.size)
689 {
690 preprocessor_send(ZBX_IPC_PREPROCESSOR_REQUEST, cached_message.data, cached_message.size, NULL);
691
692 zbx_ipc_message_clean(&cached_message);
693 zbx_ipc_message_init(&cached_message);
694 cached_values = 0;
695 }
696 }

zbx_preprocessor_flush()は大した処理をしていません。
ZBX_IPC_PREPROCESSOR_REQUESTタグを付与して、マネージャーへ送信しているだけです。

なお、コールしているpreprocessor_send()の最後の引数をNULLにしているため、送りっぱなしで返信を期待していない処理となります。

マネージャーからワーカーへのジョブの割り振り

プリプロッセッサーマネージャーは、メインループで他のプロセスからのメッセージを待ち、1) 受取後に暇なワーカーへ割り振り、2)ワーカーから受け取った結果の保存を行うプロセスです。

src/zabbix_server/preprocessor/preproc_manager.c

998 ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
999 {
...
1042 for (;;)
1043 {
...
1074 if (NULL != message)
1075 {
1076 switch (message->code)
1077 {
1078 case ZBX_IPC_PREPROCESSOR_WORKER:
1079 preprocessor_register_worker(&manager, client, message);
1080 break;
1081
1082 case ZBX_IPC_PREPROCESSOR_REQUEST:
1083 preprocessor_add_request(&manager, message);
1084 break;
1085
1086 case ZBX_IPC_PREPROCESSOR_RESULT:
1087 preprocessor_add_result(&manager, client, message);
1088 break;
1089
1090 case ZBX_IPC_PREPROCESSOR_QUEUE:
1091 zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
1092 sizeof(zbx_uint64_t));
1093 break;
1094 }
1095
1096 zbx_ipc_message_free(message);
1097 }

上記ソースコードのように、4つのタグ別に処理を行います。

ZBX_IPC_PREPROCESSOR_WORKERは、ワーカープロセスが起動直後にマネージャーへ登録してもらうために送信してくるタグです。
ZBX_IPC_PREPROCESSOR_QUEUEは、内部監視アイテムに対応するタグです。

Pollerプロセスからは、ZBX_IPC_PREPROCESSOR_REQUESTタグが付与されたメッセージを受け取るので、preprocessor_add_request()を見ていきます。

697 static void preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
698 {
...
705 preprocessor_sync_configuration(manager);
706
707 while (offset < message->size)
708 {
709 offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
710 preprocessor_enqueue(manager, &value, NULL);
711 }
712
713 preprocessor_assign_tasks(manager);
714 preprocessing_flush_queue(manager);
...
717 }

preprocessor_add_request()では、最初にpreprocessor_sync_configuration()をコールして、設定の更新を行います。

その後、ll.707-711のwhile()ブロックにて、受け取ったメッセージのアンパックを行い、キューへの挿入を行います。処理対象がなくなるまでキューへの挿入を繰り返します。

l.713のpreprocessor_assign_tasks()で、ワーカープロセスへZBX_IPC_PREPROCESSOR_REQUESTタグを付与したメッセージ送信をし、処理の依頼を行います。
ここでは処理可能な(フリー状態な)ワーカー全てに依頼を行います。

最後に、l.714のpreprocessing_flush_queue()にて、既に処理が終了しているタスクをキューから削除します。

依存アイテムのキューへの追加

上記までの説明だと、依存アイテムに対する処理がないように見えますが、問題ありません。

l.710のpreprocessor_enqueue()から、preprocessor_enqueue_dependent()がコールされます。

656 static void preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
657 zbx_preproc_item_value_t *source_value, zbx_list_item_t *master)
658 {
...
666 if (NULL != source_value->result && ISSET_VALUE(source_value->result))
667 {
668 item_local.itemid = source_value->itemid;
669 if (NULL != (item = zbx_hashset_search(&manager->item_config, &item_local)) &&
670 0 != item->dep_itemids_num)
671 {
672 for (i = item->dep_itemids_num - 1; i >= 0; i--)
673 {
674 preprocessor_copy_value(&value, source_value);
675 value.itemid = item->dep_itemids[i];
676 preprocessor_enqueue(manager, &value, master);
677 }
678
679 preprocessor_assign_tasks(manager);
680 preprocessing_flush_queue(manager);
681 }
682 }
...
685 }

preprocessor_enqueue_dependent()は、preprocessor_add_request()と似た作りとなっています。

ll.669-670にて、元のアイテムの情報を取得し、依存されているアイテムがあるかの判定をしています。
その後、ll.672-677のfor()ブロックにて、(結果的に再帰的になる)preprocessor_enqueu()をコールして、(監視を行った)元のアイテムに依存しているアイテム全てに関する処理をキューに入れています。

ワーカーでの処理と返信

ワーカーは、マネージャーから処理依頼を、worker_preprocess_value()にて処理します。
ソースコードの解説を行うほどではなく、処理を行った後にZBX_IPC_PRERPOCESSOR_RESULTタグを付与してマネージャーへ返信するぐらいです。

なお、この処理は単にfor()ループで設定されている順にフィルター処理を行っているだけですので、「保存前処理」に設定する順序は重要となります。(Webフロントエンドでは上下を入れ替えられるようになっています。)

ワーカーからの返信受け取りとヒストリキャッシュへの保存

マネージャーは、ZBX_IPC_PREPROCESSOR_RESULTタグが付与されたメッセージを受け取ると、preprocessor_add_result()にてメッセージに対する処理を行います。

834 static void preprocessor_add_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
835 zbx_ipc_message_t *message)
836 {
...
847 worker = preprocessor_get_worker_by_client(manager, client);
848 request = (zbx_preprocessing_request_t *)worker->queue_item->data;
849
850 zbx_preprocessor_unpack_result(&value, &history_value, &error, message->data);
851
852 if (NULL != history_value)
853 {
854 history_value->itemid = request->value.itemid;
855 history_value->value_type = request->value_type;
856
857 if (NULL != (cached_value = zbx_hashset_search(&manager->history_cache, history_value)))
858 {
859 if (0 < zbx_timespec_compare(&history_value->timestamp, &cached_value->timestamp))
860 {
861 /* history_value can only be numeric so it can be copied without extra memory */
862 /* allocation */
863 cached_value->timestamp = history_value->timestamp;
864 cached_value->value = history_value->value;
865 }
866 }
867 else
868 zbx_hashset_insert(&manager->history_cache, history_value, sizeof(zbx_item_history_value_t));
869 }
870
871 request->state = REQUEST_STATE_DONE;
...
884 if (FAIL != preprocessor_set_variant_result(request, &value, error))
885 preprocessor_enqueue_dependent(manager, &request->value, worker->queue_item);
...
893 preprocessor_assign_tasks(manager);
894 preprocessing_flush_queue(manager);
895
896 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
897 }

ll.847-871の前半部分では、受信したメッセージから処理を行ったワーカープロセスを特定し、さらに対象となったアイテム情報を取得します。そこからヒストリキャッシュへ挿入するデータを生成した後にキャッシュへ挿入します。

l.884にて要求されたタイプの値へ変換し、l.885にてさらに依存しているアイテムをキューに挿入します。

最後に少なくとも一つのワーカーが空いたはずなので、再度ジョブを割り振ります。

なお、参考に記したページにはキャッシュからDBへ書き出すときに特別な処理をするように見えますが、特に重要なことはありません。

改修された関係図を起こしました。

関係図(zbx-tl-022用)

懸念点としては、ヒストリキャッシュへの書き込みを、マネージャープロセスだけが受け持つことになったことです。

高負荷テストを行ってはいないのですが、これがボトルネックになる可能性はあります。

ただし、従来でもキャッシュへの書き込みにはロックが必要であったため杞憂であるかもしれませんが。

参考

https://www.zabbix.com/documentation/3.4/manual/appendix/items/preprocessing

関連記事

Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その1
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その2
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その3
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その4
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その5
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その6
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その7
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その8
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その9
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その10
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その11
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その12
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その13
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その14
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その15
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その16
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その17
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その18
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その19
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その20
Zabbix 3.2以降の新機能解説(Zabbix 4.0を見据えて) その21

注意事項

  • 本ドキュメントの内容は、予告なしに変更される場合があります。
  • 本ドキュメントは、限られた評価環境における検証結果をもとに作成しており、全ての環境での動作を保証するものではありません。
  • 本ドキュメントの内容に基づき、導入、設定、運用を行なったことにより損害が生じた場合でも、当社はその損害についての責任を負いません。あくまでお客さまのご判断にてご使用ください。
サイバートラストのテレワークソリューション
採用情報ページ リニューアル
組込み Linux にプラスして 長期間の製品ライフサイクルをサポート EM+PLS