Bulk API (Pythonからのドキュメントの一括登録)

BLOG

1. 前書き

前回に引き続き、ホワイトペーパー「Elasticsearchを使った簡易RAGアプリケーションの作成」に記載した技術的要素を紹介いたします。

今回は、Bulk API を使って Python からドキュメントを一括登録する方法について紹介します。

なお、このブログ内に記載しているソースコードおよび Elasticsearch 用のリクエストは、github のリポジトリでも公開しています。(*脚注11)

1.1. 対象者

  • Elastic Cloud のアカウントを持っている人(トライアルライセンスを含む)
  • Elasticsearch の初心者~中級者

1.2. できるようになること

  • Python から Elasticsearch endpoint を利用して接続できるようになる。
  • Python から Elasticsearch の Bulk API を呼び出し、ドキュメントを一括登録できるようになる。

1.3. 前提条件

  • Elastic Cloud (version: 8.17.3)
  • Python 3.13
  • Elastic Cloud 上で、日本語検索をするための準備が完了していること。

※日本語検索するための準備(形態素解析の準備、ベクトル生成の準備)については、ベクトル検索の準備 を参照してください。

(2025年03月07日時点の情報を元に記載しています。)

2. Bulk API とは?

Bulk API は、Elasticsearch へドキュメントを登録する方法の一つです。

Elasticsearch では、データを登録する方法がいくつかあります。

  • ドキュメントを1つずつ、POST で登録する。
  • Bulk API を使って複数ドキュメントをまとめて登録する。

など。

今回は、後者の Bulk API を使って複数ドキュメントをまとめて登録する方法について紹介します。

Bulk API は、Elastic Cloud の Console から実行することも可能です。

Elasticsearch へのドキュメントの登録 の回では、Elastic Cloud の Console から Bulk API のリクエストを発行する方法について紹介しました。

今回は、Python から呼び出す方法について紹介します。

3. 同義語セットの作成

ドキュメントの登録先となるインデックスを作成する前に、同義語セット(の枠)を作成しておきます。

Synonym API (*脚注22) を使って、synonyms set を登録します。

下記のリクエストを Elastic Cloud の Console から発行します。

(Elastic Cloud の Console の表示方法は、Elasticsearch へのインデックスの作成 の Dev Tools に関する説明を参照してください。)

PUT _synonyms/kakinosuke_synonyms_set
{
  "synonyms_set": []
}

(今回は実際の同義語は登録しません。後日登録する予定です。)

4. 書き込み先のインデックスの作成

今回は、書き込み対象のインデックス名を kakinosuke_202503 とします。

(「桃太郎」を改変した「柿之助」のお話を登録します。)

ベクトル検索の準備 の回を参考にして、書き込み先となるインデックスを作成します。

下記のリクエストを Elastic Cloud の Console から実行します。

PUT /kakinosuke_202503
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "refresh_interval": "3600s"
    },
    "analysis": {
      "char_filter": {
        "ja_normalizer": {
          "type": "icu_normalizer",
          "name": "nfkc_cf",
          "mode": "compose"
        }
      },
      "tokenizer": {
        "ja_kuromoji_tokenizer": {
          "mode": "search",
          "type": "kuromoji_tokenizer",
          "discard_compound_token": true,
          "user_dictionary_rules": [
          ]
        }
      },
      "filter": {
        "ja_search_synonym": {
          "type": "synonym_graph",
          "synonyms_set": "kakinosuke_synonyms_set",
          "updateable": true
        }
      },
      "analyzer": {
        "ja_kuromoji_index_analyzer": {
          "type": "custom",
          "char_filter": [
            "ja_normalizer",
            "kuromoji_iteration_mark"
          ],
          "tokenizer": "ja_kuromoji_tokenizer",
          "filter": [
            "kuromoji_baseform",
            "kuromoji_part_of_speech",
            "cjk_width",
            "ja_stop",
            "kuromoji_number",
            "kuromoji_stemmer"
          ]
        },
        "ja_kuromoji_search_analyzer": {
          "type": "custom",
          "char_filter": [
            "ja_normalizer",
            "kuromoji_iteration_mark"
          ],
          "tokenizer": "ja_kuromoji_tokenizer",
          "filter": [
            "kuromoji_baseform",
            "kuromoji_part_of_speech",
            "cjk_width",
            "ja_stop",
            "kuromoji_number",
            "kuromoji_stemmer",
            "ja_search_synonym"
          ]
        }
      }
    }
  }
}

リフレッシュは初回しか行わないので、”refresh_interval” : “3600s” としておきます。

“-1″ (リフレッシュしない)としてもいいのですが、リフレッシュをし忘れた場合の保険のために、”3600s” としています。

ホワイトペーパー内では “柿之助” という単語をユーザー辞書登録していましたが、まずは登録しないでおきます。(後日、登録します。)

検索用の同義語に関しては、先ほど作成した synonyms set を参照しておきます。(この時点では同義語の中身は空ですが、こちらも後日、登録します。)

5. 書き込み先のインデックスのフィールドの作成

続けて、フィールド作成用のリクエストも実行します。

PUT /kakinosuke_202503/_mappings
{
  "dynamic": false,
  "_source": {
    "excludes": [
      "text_embedding.*"
    ]
  },
  "properties": {
    "chunk_no": {
      "type": "integer"
    },
    "content": {
      "type": "text",
      "analyzer": "ja_kuromoji_index_analyzer",
      "search_analyzer": "ja_kuromoji_search_analyzer"
    },
    "text_embedding": {
      "properties": {
        "model_id": {
          "type": "keyword",
          "ignore_above": 256
        },
        "predicted_value": {
          "type": "dense_vector",
          "dims": 384
        }
      }
    }
  }
}

text_embedding.* には密ベクトルの値が格納されますが、密ベクトル値は検索さえできればよいので、_source の対象外としています。

6. データ取り込み用のパイプラインの作成

ベクトル検索の準備 を参考に、パイプラインを作成します。

(すでに作成済の場合は、この処理は飛ばしてください。)

PUT /_ingest/pipeline/japanese-text-embeddings
{
  "description" : "Text embedding pipeline",
  "processors" : [
    {
      "inference": {
        "model_id": ".multilingual-e5-small_linux-x86_64",
        "target_field": "text_embedding",
        "field_map": {
          "content": "text_field"
        }
      }
    }
  ]
}

Bulk API 実行時にこのパイプラインを通すことで、

 登録する文字列 → ベクトル生成

の処理が自動で行われるようになります(ベクトル化の処理の実装は必要ありません)。

7. エイリアスの作成

登録先のインデックス名の実体は、kakinosuke_202503 ですが、Python からは kakinosuke という名前でアクセスできるよう、エイリアスを作成しておきます。

すでに同名のエイリアス「kakinosuke」が存在する場合は削除するなり、今回作成するエイリアスを他の名前にするなどしてください。

同名のエイリアス「kakinosuke」が存在しない場合

POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "kakinosuke_202503",
        "alias": "kakinosuke"
      }
    }
  ]
}

同名のエイリアス「kakinosuke」が既に存在する場合

POST _aliases
{
  "actions": [
    {
      "remove": {
        "index": "変更前のインデックス名",
        "alias": "kakinosuke"
      }
    },
    {
      "add": {
        "index": "kakinosuke_202503",
        "alias": "kakinosuke"
      }
    }
  ]
}

※既に kakinosuke エイリアスが既に存在しているかどうかは、下記で調べることが可能です。

GET _alias/kakinosuke

8. Elasticsearch endpoint

8.1. Elasticsearch への接続方法

Python から Bulk API を呼び出す前に、Python から Elastic Cloud へ接続する必要があります。

Python から Elastic Cloud へ接続する方法は、大きく分けて2種類あります。

  • Elasticsearch endpoint を使って接続する方法
  • Cloud ID を使って接続する方法

以前、Python を使った Elasticsearch へのアクセス では、Cloud ID を使って接続する方法について説明しましたが、Elasticsearch v8.17.0 では、Elasticsearch endpoint を利用する方法が推奨されています。

取得した Elasticsearch endpoint を使って、Elasticsearch のクライアントを生成します。

8.2. Elasticsearch endpoint の取得方法

8.2.1. Elastic Cloud へログイン

Elastic Cloud のログイン画面からユーザー名、パスワードを入力してデプロイメント一覧画面へ進みます。

8.2.2. Deployment 一覧画面

接続したい Deployment 名をクリックします。

クリックすると、Home 画面へ進みます。

8.2.3. Home 画面

Home 画面内の Elasticsearch をクリックします。

8.2.4. Elasticsearch 画面

Elasticsearch 画面内の中央上部に Elasticsearch endpoint の URL が表示されています。

コピーアイコンを押すと、クリップボードにコピーされます。

取得した Elasticsearch endpoint の URL を、後述の .env ファイルに転記します。

9. Access Key の作成

Python から Elastic Cloud にアクセスするには、Elasticsearch endpoint の他に、Access Key も必要です。

以前 Python を使った Elasticsearch へのアクセス では読み取り用の Access Key を作成しましたが、今回は Bulk API でデータを書き込む必要があるので、書き込み用の Access Key が必要となります。

書き込み用の Access Key を作成するには、下記のリクエストを Elastic Cloud の Console から実行します。

書き込み用 Access Key の作成リクエスト

POST /_security/api_key
{
   "name": "kakinosuke_write_api_key",
   "role_descriptors": {
     "kakinosuke_write_role": {
       "cluster": ["all"],
       "indices": [
         {
           "names": ["kakinosuke*"],
           "privileges": ["all"]
         }
       ]
     }
   }
}

このリクエストを実行すると、次のようなレスポンスが返ってきます。

{
  "id": "......................",
  "name": "kakinosuke_write_api_key",
  "api_key": "......................",
  "encoded": "***********************************************=="
}

ここで必要なのは encoded の値です。この値を後述の .env ファイルに転記します。

10. ソースの説明

これらを踏まえて、Bulk API を使ってドキュメントをインデックスへ登録します。

ソースの全体を見たい方は、下記の github リポジトリを参照してください。

blogs/2025-03-bulk at main · sios-elastic-tech/blogs
A repository for blog sources. Contribute to sios-elastic-tech/blogs development by creating an account on GitHub.

各ソースの抜粋を以下に記載します。

10.1. .env

elasticsearch_endpoint=''
write_api_key_encoded=''

接続に必要な endpoint と api_key を記載します。

  • elasticsearch_endpoint は、[8.2.4. Elasticsearch 画面] で取得した URL。
  • write_api_key_encoded は、[9. Access Key の作成] で取得した encoded の値。

10.2. docker-compose.yml

services:
  bulk_sample_202503:
    build:
      context: ./
      dockerfile: Dockerfile
    container_name: 'bulk_sample_202503'
    volumes:
      - ./:/app

Docker 用の compose ファイルです。

10.3. Dockerfile

FROM python:3.13

...以下略

いわゆる Dockerfile です。

10.4. requirements.txt

elasticsearch==8.17.1
python-dotenv==1.0.1

今回のプログラムで必要となるライブラリです。

10.5. data/kakinosuke.txt_chunked.txt

むかし、むかし、あるところに、おじいさんとおばあさんがありました。まいにち、おじいさんは山へしば刈りに、おばあさんは川へ洗濯に行きました。ある日、おばあさんが、川のそばで、せっせと洗濯をし
...
ゴリラはキャッ、キャッと笑いながら、白い歯をむき出しました。鷹はケン、ケンと鳴きながら、くるくると宙返りをしました。空は青々と晴れ上がって、お庭には桜の花が咲き乱れていました。

「桃太郎」を改変した「柿之助」のお話です。

ファイルの内容をすべて知りたい場合は、github 内のリポジトリを参照してください。

事前に LangChain を使ってチャンキング処理を行っています。

(チャンキング処理の詳細はここでは割愛します。詳しく知りたい方は、ホワイトペーパーをご参照ください。)

10.6. src/bulk_from_txt.py

"""
...
Usage: python bulk_from_txt.py chunked_textfilepath
...
"""
...
def bulk_from_file(es_client: Elasticsearch, chunked_textfilepath: str, index_name: str = SEARCH_INDEX, refresh: bool = False):
...
if __name__ == "__main__":
    ...
bulk_from_file(es_client=es_client, chunked_textfilepath=chunked_textfilepath, index_name=SEARCH_INDEX, refresh=True)

Bulk API を呼び出す本体です。

ファイルから1行ずつ読み取って、Bulk API にドキュメントを1つ渡し、最後に refresh を呼び出しています。

ソース全体は、github 内のソースを参照してください。

10.7. src/common/setup_logger.py

...
def setup_logger(logger_name: str, log_level: str = DEFAULT_LOG_LEVEL) -> logging.Logger:
...

ロガー用の共通関数です。

10.8. src/elastic/es_consts.py

# 検索対象のインデックス(のエイリアス)
SEARCH_INDEX = 'kakinosuke'
# ドキュメント登録時のパイプライン名
INGEST_PIPELINE = 'japanese-text-embeddings'

Elasticsearch関連の定数を定義しています。

10.9. src/elastic/es_func.py

def create_es_client(elasticsearch_endpoint: str, api_key_encoded: str) -> Elasticsearch:
...
def streaming_bulk_wrapper(es_client: Elasticsearch, actions: Iterable) -> int:
...
def refresh_index(es_client: Elasticsearch, index_name: str = SEARCH_INDEX):
...

Elasticsearch 関連の関数を集めたファイルです。

create_es_client()関数から Python 用の Elasticsearch のクライアントを生成しています。(*脚注33)

streaming_bulk_wrapper()関数から Python 用の Bulk API を呼び出しています。(*脚注44)

最後に refresh() を呼ぶことで、ドキュメント登録を確定させています。

11. 実行

Docker 上のコンテナにデプロイします。

(ここでは Docker を利用していますが、Docker は必須ではありません。)

11.1. ビルド

docker compose build

docker-compose.yml ファイルがあるディレクトリで、上記を実行します。

11.2. コンテナの起動

docker compose up -d

エラーがなければ、コンテナ(コンテナ名 = “bulk_sample_202503”)が起動します。

11.3. コンテナ内で bash を実行

docker exec -it bulk_sample_202503 /bin/bash

“bulk_sample_202503” はコンテナ名です。

直接、src/bulk_from_txt.py を実行してもよいのですが、エラーが起きた場合などのハンドリングが小難しくなってしまうので、いったん、bash を経由してから実行します。

11.4. Python スクリプトの実行

python src/bulk_from_txt.py data/kakinosuke.txt_chunked.txt

コンテナ内部で実行中の bash から、上記のコマンドを実行します。

(data/kakinosuke.txt_chunked.txt が登録したいデータを1行ずつ格納しているテキストファイルの相対パスです。)

テキストファイルから1行ずつ文字列を読み取り、インデックスへ登録します。

正常に終了したら、ロガーを通じて標準出力に次のようなログが出力されます。

[07:42:59] [INFO] main:88 bulk_from_file success_count=38

12. 確認

Elastic Cloud にログインし、Console から下記のリクエストを実行してみます。

GET /kakinosuke/_search
{
  "query": {
    "match_all": {}
  }
}

レスポンス

{
...
  "hits": {
    "total": {
      "value": 38,
      "relation": "eq"
    },
...
}

のように 38 個のドキュメントが見つかります。

13. まとめ

Bulk API を利用することにより、Python からドキュメントをインデックスに一括登録できることを紹介しました。

次回も、ホワイトペーパー内に記載した技術的要素を取り上げて、紹介したいと思います。


  1. (*脚注1)
    このブログ内に記載しているソースコード、および Elasticsearch 用のリクエストは、下記にて公開しています。
    https://github.com/sios-elastic-tech/blogs/blob/main/2025-03-bulk
    ↩︎
  2. (*脚注2)
    Synonyms API:
    https://www.elastic.co/guide/en/elasticsearch/reference/current/put-synonyms-set.html
    Elasticsearch 8.10.0 より使用できるようになった、同義語関連の API です。
    ↩︎
  3. (*脚注3)
    Elasticsearch のクライアント生成 API:
    https://elasticsearch-py.readthedocs.io/en/v8.17.1/api/elasticsearch.html#elasticsearch.client.Elasticsearch
    ↩︎
  4. (*脚注4)
    Python から呼び出す Elasticsearch の bulk API:
    https://elasticsearch-py.readthedocs.io/en/v8.17.1/helpers.html#elasticsearch.helpers.streaming_bulk
    今回は、helpers.streaming_bulk()を利用します。
    ↩︎