この記事では 「ゼロから動かせる構成」 を目標に、設定ファイルとWorkflow YAMLを全文掲載しています。
なぜ作ったか
毎日 Elastic の Announcements ページを開いて、英語を読んで、Slack にまとめて投稿する――これを手作業でやっていたのをやめました。
協力会社や社内メンバーへの共有は「読む → 整理する → 伝える」の3ステップが毎回発生し、どうしても属人化しやすくなります。しかも英語のままでは、社内共有のハードルがどうしても上がります。
・未処理記事だけを抽出
・AIで日本語要約 / カテゴリ分類 / 対応項目生成
・保存→Slack通知
Elastic Workflows とは
Elastic Workflows は、Elastic 上のデータや外部サービスをつなぎ、手作業で繰り返している処理を自動化するための仕組みです。
たとえば、定期的にデータを検索する、条件に応じて分岐する、AI で要約する、結果を保存する、Slack に通知する、といった処理を 1 本の YAML で組み立てられます。
Workflows の基本要素は、次の 4 つです。
- Triggers: いつ Workflow を動かすか
- Steps: Workflow が何をするか
- Connectors: 外部サービスとどう接続するか
- Data flow: 前の Step の出力を次の Step にどう渡すか

このエディタ画面が開きます。YAMLを直接貼り付けて
「Save」するだけで Workflow が登録できます。
上部の「Enabled」トグルで有効/無効を切り替えられます。
画面下部に「19 warnings」のような警告が表示されることがありますが、
黄色の warning は動作に影響しません。
赤いエラーが出ていなければ、そのまま保存して問題ありません。
この記事で学べること
Elastic Workflows の基本パターンをそのまま学べる実例になっています。
Trigger → Search → Foreach → If → AI → Update → Notify
この流れを一通り含んでいるので、RSSに限らず他の自動化にも応用できます。
なぜこの構成にしたのか
最初は Workflows だけで RSS 取得から通知まで全部やる構成も考えました。ただ、「安定して取り込める raw 層」を先に作る方が実用的と判断しました。
責務を分けると、こうなります。
- Logstash:RSS を安定して取得して raw 保存する
- Workflows:取得済みデータを判定・要約・保存・通知する
この分割のよいところは、収集の問題 と AI 要約や通知の問題 を切り分けやすいことです。まず「重要なお知らせを取り逃さず貯める」ことができれば、要約の質・カテゴリ設計・通知形式は後から改善できます。
前提条件
- Mac(Homebrew でインストールした Logstash)
- Elastic Cloud または Elastic Serverless の保存先
- Elastic Workflows を有効化済みの Kibana 環境
- Slack connector を作成済みの環境
- AI connector を利用できる環境
画面が見えないときは、まず Kibana の Advanced Settings で workflows:ui:enabled を確認してください。
Part 1. Logstash で raw インデックスを作る
1-1. RSS を収集する rss.confを作成する
まず、Logstash のパイプライン設定を作成します。
今回使った rss.conf の役割は次の通りです。
- Elastic 公式 Announcements RSS を読む
- 3時間ごとにポーリングする
- HTML タグを除去する
- 重要そうなタイトルだけ残す
-
linkをキーにして重複を抑える -
elastic-announcements-rawに保存する
input {
rss {
url => "https://discuss.elastic.co/c/announcements/5.rss"
interval => 10800
}
}
filter {
mutate {
add_field => {
"source_type" => "elastic_rss"
}
}
mutate {
gsub => [
"message", "<[^>]*>", ""
]
}
if [title] !~ /(Security Update|ESA-|CVE|What'?s new in Elastic|Elastic Stack|release|announcement|Announcing)/ {
drop { }
}
}
output {
elasticsearch {
hosts => ["https://YOUR-ENDPOINT"]
api_key => "YOUR_API_KEY" #Elastic Cloud URL
index => "elastic-announcements-raw" #APIキー
document_id => "%{link}" # ← 記事URLをIDにして重複を防ぐ
manage_template => false
}
stdout { codec => rubydebug }
}ポイント解説
interval => 10800:10,800 秒、つまり 3 時間ごとの取得 です。定期収集にしておくと、人が手動チェックしなくて済みます。
gsubによる HTML 除去:RSS の本文は HTML を含むため、そのまま AI に渡すとノイズになります。ここで最低限の整形をしておくと、後段の AI 要約が安定しやすくなります。
document_id => "%{link}":同じ記事を定期取得しても、記事 URL を ID として使うことで重複を抑えられます。今回の Workflow では、この ID をそのまま processed 側の重複判定にも使っています。
manage_template => false:Logstash のデフォルトテンプレート自動適用でハマるケースを避けるために、今回は明示的に無効化しています。今回のような小さな raw 保存用パイプラインでは、この方がシンプルでした。
1-2. pipelines.ymlに登録する
Homebrew の Logstash サービスは pipelines.yml で指定されたパイプラインを読みます。rss.conf を作っただけでは動きません。
- pipeline.id: main
path.config: "/opt/homebrew/etc/logstash/conf.d/rss.conf"確認コマンド
cat /opt/homebrew/etc/logstash/pipelines.ymlここはかなりハマりやすいポイントです。「設定ファイルはあるのに実行されていない」 ときは、まず pipelines.yml の参照先を確認した方が早いです。
1-3. Logstash を Homebrew サービスとして動かす
今回の構成では、Logstash を手動起動ではなく Homebrew サービスとして動かしています。
これにより、毎回ターミナルで起動し直さなくて済みます。
# 起動 / 再起動
brew services restart logstash
# 状態確認
brew services list
# 動作ログ確認
tail -n 50 /opt/homebrew/var/log/logstash.logこの状態が作れていれば、放置してもデータ収集が継続する ので、raw 層としてかなり扱いやすくなります。
1-4. Logstash の動作ログを確認する
stdout { codec => rubydebug } を入れているため、Logstash は処理したイベントをログにも出します。
これで、RSS を読めているか、どんなフィールドができているかを確認できます。
tail -n 50 /opt/homebrew/var/log/logstash.logログを見ると、title、published、link、source_type、message などが出力されます。これは RSS 取得とフィールド整形が動いている証拠です。
1-5. raw インデックスに保存されたことを確認する
ログが出ているだけでは不十分です。実際に Elasticsearch に保存されているかも確認します。
// 件数確認
GET elastic-announcements-raw/_count
// 最新1件確認
GET elastic-announcements-raw/_search
{
"size": 1,
"sort": [{ "published": { "order": "desc" } }]
}返ってきたドキュメントに title / message / link / published / source_type / event.original があればOKです。
ここまでできれば、Workflow 側は raw インデックスからタイトルと本文を読める状態になっています。
Part 2. Workflow で日本語化・分類・保存・Slack 通知する
ここからが、ビジネス価値を作る部分です。
raw データは英語のままで保存されていますが、実際にやりたいのは 「関係者が読みやすく、行動しやすい形にすること」 です。
2-1. Trigger の種類を先に整理する
Elastic Workflows には、主に次の Trigger があります。
| Trigger | 用途 |
| manual: | 手動実行(検証に便利) |
| scheduled: | 定期実行(今回のメイン) |
| alert: | アラート起点の実行 |
今回の記事で使うのは scheduled trigger です。毎日 9:00 JST に動かし、その日の業務開始時に新着アナウンスを確認しやすくする構成にしています。
検証中は manual に切り替えてすぐ実行、確認が取れたら scheduled に戻す流れが安全です。
triggers:
- type: manual2-2. Workflows のテンプレート記法を先に押さえる
この記事では、次のテンプレート記法を使います。
| 記法 | 意味 |
| {{…}} | 文字列として値を埋め込む |
| ${{…}} | 配列・オブジェクト・真偽値の型を保ったまま渡す |
| steps.<name>.output | 前のStepの出力を参照する |
| foreach.item | foreachで今処理中の1件を参照する |
2-3. Workflow 全体 YAML
ここから、今回の完成版 Workflow を載せます。
使う前に変更が必要な箇所:
- AI connector の設定(Default AI Connector または
connectorId) dtstart(自分のタイムゾーン・実行時刻)connector-id(自分の Slack コネクタ名)
# ═══════════════════════════════════════════════════════════════
# METADATA - Identifies and describes the workflow
# この Workflow は Elastic 公式アナウンスを処理し、
# 日本語要約・分類・保存・Slack 通知まで自動化します
# ═══════════════════════════════════════════════════════════════
name: elastic_announcements_ai_production_V5_slack
description: Process only new Elastic announcements, analyze once with AI, generate actions, store, and notify to Slack
enabled: true
# ═══════════════════════════════════════════════════════════════
# TRIGGER - Starts the workflow on a fixed schedule
# 毎日 9:00 JST に Workflow を起動します
# 朝の業務開始時に新着アナウンスを確認しやすくする設定です
# ═══════════════════════════════════════════════════════════════
triggers:
- type: scheduled
with:
rrule:
freq: DAILY
interval: 1
tzid: Asia/Tokyo
dtstart: 2026-03-16T09:00:00+09:00
byhour: [9]
byminute: [0]
# ═══════════════════════════════════════════════════════════════
# STEPS - Main workflow logic
# 検索 → 繰り返し処理 → 未処理判定 → AI要約 → 保存 → 通知
# という流れで処理します
# ═══════════════════════════════════════════════════════════════
steps:
# ────────────────────────────────────────────────────────────
# STEP 1 - Get raw announcements from Elasticsearch
# raw インデックスから過去24時間分の記事を取得します
# Output は hits.hits 配列で、次の foreach で1件ずつ処理します
# ────────────────────────────────────────────────────────────
- name: get_raw
type: elasticsearch.search
with:
index: elastic-announcements-raw
size: 10
sort: "published"
query:
range:
published:
gte: "now-24h"
lte: "now"
on-failure:
retry:
max-attempts: 2
delay: "5s"
# --------------------------------------------------------
# STEP 2.1 - Check whether this article was already processed
# processed インデックスに同じ ID の記事があるか確認します
# raw と processed で同じ ID を使い、重複処理を防ぎます
# --------------------------------------------------------
- name: process_items
type: foreach
foreach: "{{ steps.get_raw.output.hits.hits }}"
steps:
- name: lookup_processed
type: elasticsearch.search
with:
index: elastic-announcements
size: 1
query:
ids:
values:
- "{{ foreach.item._id }}"
on-failure:
retry:
max-attempts: 2
delay: "3s"
# --------------------------------------------------------
# STEP 2.2 - Continue only if the article is new
# lookup_processed の結果が 0 件なら未処理とみなし、
# AI 要約や保存処理へ進みます
# --------------------------------------------------------
- name: if_new_item
type: if
condition: "${{ steps.lookup_processed.output.hits.total.value == 0 }}"
steps:
# ----------------------------------------------------
# STEP 2.2.1 - Generate Japanese summary and category
# 記事タイトルと本文を使って、日本語要約とカテゴリ分類を行います
# summary_ja と category を schema で固定し、
# 後続ステップで扱いやすい構造化 output を返します
# ----------------------------------------------------
- name: analyze_article
type: ai.prompt
with:
prompt: |
あなたはElastic公式アナウンスの整理担当です。
以下の記事を分析し、2つの項目を出力してください。
1. 日本語サマリー(2〜3段落)
- 元の内容に忠実に
- 余計な推測はしない
- タイトルは含めない
- 元記事にない内容は追加しない
2. category(1つ選択)
- security_update
- release
- product_update
- maintenance
- other
判定ルール:
- CVE / 脆弱性 / ESA / セキュリティ修正 → security_update
- バージョン公開 / 一般的なリリース案内 → release
- 機能追加 / 製品更新 / 改善案内 → product_update
- 運用 / 停止 / 保守作業 → maintenance
- どれにも明確に当てはまらない → other
タイトル:
{{ foreach.item._source.title }}
本文:
{{ foreach.item._source.message }}
schema:
type: object
properties:
summary_ja:
type: string
category:
type: string
enum:
- security_update
- release
- product_update
- maintenance
- other
required:
- summary_ja
- category
temperature: 0.2
on-failure:
retry:
max-attempts: 2
delay: "8s"
# ----------------------------------------------------
# STEP 2.2.2 - Generate short action items
# 記事の内容をもとに、最初に確認すべき対応事項だけを
# 短く箇条書きで生成します
# 要約だけでなく、次の行動につながる情報を作るステップです
# ----------------------------------------------------
- name: generate_actions
type: ai.prompt
with:
prompt: |
次のElastic公式アナウンス記事をもとに、ユーザーが最初に確認すべき対応事項だけを書いてください。
出力ルール:
- 1〜3項目
- 各項目は箇条書きで短く書く
- 元記事にない内容は追加しない
- 冗長な説明は書かない
- 見出しやタグは書かない
例:
- 影響を受けるバージョンを確認する
- 必要ならアップデートを検討する
タイトル:
{{ foreach.item._source.title }}
本文:
{{ foreach.item._source.message }}
on-failure:
retry:
max-attempts: 2
delay: "8s"
# ----------------------------------------------------
# STEP 2.2.3 - Save processed result to Elasticsearch
# AI が作成した要約・カテゴリ・対応項目を
# processed インデックスに保存します
# doc_as_upsert: true により、同じ ID があれば更新、
# なければ新規作成になります
# ----------------------------------------------------
- name: upsert_processed
type: elasticsearch.update
with:
index: elastic-announcements
id: "{{ foreach.item._id }}"
doc:
source_link: "{{ foreach.item._source.link }}"
title: "{{ foreach.item._source.title }}"
published: "{{ foreach.item._source.published }}"
summary_ja: "{{ steps.analyze_article.output.content.summary_ja }}"
actions_ja: "{{ steps.generate_actions.output.content }}"
category: "{{ steps.analyze_article.output.content.category }}"
processed_at: "{{ execution.startedAt }}"
doc_as_upsert: true
on-failure:
retry:
max-attempts: 2
delay: "5s"
# ----------------------------------------------------
# STEP 2.2.4 - Send notification to Slack
# 整理した記事情報を Slack に通知します
# 保存後に関係者へ共有するための最終ステップです
# continue: true により、Slack 送信だけ失敗しても
# 保存処理全体は止めません
# ----------------------------------------------------
- name: send_slack
type: slack
connector-id: e3****01-****-****-****-a****c****ae
with:
message: |
🚨 *【お知らせ】{{ foreach.item._source.title }}*
*Published:* {{ foreach.item._source.published }}
*Category:* {{ steps.analyze_article.output.content.category }}
*Link:* {{ foreach.item._source.link }}
*Summary*
{{ steps.analyze_article.output.content.summary_ja }}
*Actions*
{{ steps.generate_actions.output.content }}
*※この通知はAIにより自動生成されています。必ず原文をご確認ください。*
on-failure:
retry:
max-attempts: 1
delay: "10s"
continue: true注意)上の YAML をそのまま使う場合は、少なくとも次の3つを自分の環境に合わせて変更してください。
- dtstart
- connector-id
- AI connector の設定(Default AI Connector または
connectorId)
2-4. この Workflow の見どころ
get_raw: まず raw から必要な分だけ取る
ここでの実務上の意図は、毎回全件を再処理しないこと です。
AI を使う構成では、処理件数がそのままコストと実行時間に効いてきます。最初に対象期間を絞っておくのは、かなり重要です。
foreach: 検索結果を 1 件ずつ処理できる形にする
この Step の役割は、検索結果の配列を 1 件ずつ処理できる形に変えること です。
この構造にすることで、記事ごとに AI 要約・保存・Slack 通知を個別に行えます。Workflow のデータの流れは、ここから見やすくなります。
lookup_processed+ if_new_item: 重複処理を防ぐ中心
condition: "${{ steps.lookup_processed.output.hits.total.value == 0 }}"ここが重複処理を防ぐ中心です。
raw で使っている _id は link ベースなので、processed 側でも同じ ID を使えば、「すでに処理済みか」 をシンプルに判定できます。
この設計のよいところは、Logstash 側の重複抑制と Workflow 側の未処理判定がつながっていることです。別々のキーを使うより、ずっと分かりやすくなります。
analyze_article: 要約と分類を 1 回の AI 呼び出しでまとめる
この Step では、要約とカテゴリ分類を 1 回の AI 呼び出しで同時にやっています。
これは、コストと一貫性の両面でよい設計です。特に `schema` を付けて `summary_ja` と `category` を固定している点が大事です。AI の自由回答にせず、後続 Step で扱いやすい形にしています。
generate_actions: 読者の次の行動につながる情報を作る
ここでは要約とは別に、「読者が最初に何を確認すべきか」だけを抜き出しています。
要約だけだと、「結局どう動けばいいのか」が分かりにくいことがあります。一方で、対応項目を 1〜3 個に制限しているので、Slack 上でも読みやすくなります。
この Step を入れることで、単なる翻訳ではなく、実務で動きやすい情報 に変わります。
upsert_processed: 人が再利用しやすい形で保存する
この Step で、英語 raw とは別に、日本語要約済みの processed データを保存します。
doc_as_upsert: true を使っているので、同じ ID があれば更新、なければ作成です。これにより、再実行しても扱いやすい保存先になります。
send_slack: 最後に人へ届ける
Workflow は「処理して終わり」ではなく、必要な相手に届けるところまで自動化して初めて価値が出ます。
Slack 送信が失敗しても、その前の upsert_processed(保存)はすでに完了しています。continue: true により、通知失敗で保存結果が失われることを防ぎます。
このworkflowで保存先を 2 層に分けています。
-
elastic-announcements-raw: 原文に近い保存先 -
elastic-announcements: 日本語要約・カテゴリ付きの加工済み保存先
この二層構造にしておくと、あとで要約ルールやカテゴリ設計を変えたくなったときも、raw から再処理できます。ここは再利用性の面でかなり大事です。

タイトル・公開日・カテゴリ・リンクに加え、
AI が生成した日本語サマリーと具体的な対応事項(Actions)が
1通にまとまって届いています。
英語の原文を読まなくても「何が起きたか」「何をすべきか」が
そのまま伝わる形になっているのが確認できます。
2-5. Workflow 実行画面では何を見るか
Workflow 実行後は、Executions 画面で次を確認できます。
- どの Step が成功したか
- どこで失敗したか
- 各 Step にどんな Input / Output が入ったか
- 各 Step の実行時間はどのくらいか
特に今回のような構成では、次を順に見ると原因を切り分けやすいです。
get_rawで記事が取れているかlookup_processedが想定通り 0 件または 1 件を返しているかanalyze_articleのoutput.contentがschema通りかupsert_processedがupdate成功しているかsend_slackが正常に送信できているか

「Executions」タブを開くと、この画面で実行結果を確認できます。
中央のツリーで各Stepの成否と実行時間が一覧でき、
Stepをクリックすると右ペインにInput/Outputの詳細が表示されます。
画像では analyze_article が 6s、upsert_processed が 669ms など、
各Stepの処理時間も確認できます。
2-6. ハマりやすかったポイント
1. Workflows が見えない
まず疑うべきは、workflows:ui:enabled です。
また、権限不足でも表示されないことがあります。画面が見えないときは、最初にこの 2 点を確認した方が早いです。
2. {{ }}と ${{ }}の使い分け
- 文字列埋め込みなら
{{ }} - 型を保ちたいなら
${{ }}
if の条件や配列・オブジェクトには ${{ }} が必要です。ここを間違えると条件分岐が常に false になります。
3. Slack connector の違い
Slack connector には、Incoming Webhook 型と Web API 型があります。
Webhook 型はシンプルですが、メッセージ表現に制約があります。チャネル選択や表現の柔軟性を重視するなら、Web API 型の方が扱いやすい場面があります。
4. テスト時は manual trigger が便利
毎回 scheduled trigger を待つより、検証中は manual trigger にしてその場で実行した方が早いです。
また、Workflow エディタでは Workflow 全体だけでなく、個別 Step の実行もできる ので、AI prompt や検索 Step を切り分けて試すのにも向いています。
5. connector-idの出し方
コネクタIDの特定がこのステップで最も難航した点でした。最終的に、Macで作業している場合、connector-idの入力欄でCMD + iを同時に押すことで、オートコンプリート機能によりIDが自動的に補完されることが判明しました。

まとめ
今回の構成で自動化できたこと:
- 毎日の手動チェック → ゼロ
- 英語読み込み → 日本語要約に置き換え
- 「で、何すればいい?」への回答 → 対応項目を自動生成
- 社内共有 → Slack に自動配信
Elastic Workflows をこれから触る人にとって、この構成は入門題材としてかなりちょうどよいと思います。Trigger → Search → Foreach → If → AI → Update → Notify という基本パターンが全部入っており、保存や通知までつながっているからです。
今後の発展方向
ai.promptをai.agentに置き換えて、より高度な判断をさせる- 通知先を Slack だけでなくメールやチケットシステムに広げる
- RSS 以外のデータソース(GitHub Releases、Elastic Blog など)に展開する
- セキュリティアラートや運用アラートの triage に応用する

