Python を使った Elasticsearch へのアクセス

BLOG

1. 前書き

こんにちは。
サイオステクノロジーの田川です。

これまで Elastic Cloud 上の GUI から Elasticsearch での検索などを行ってきましたが、実際の開発・運用時にはプログラム言語を使って Elasticsearch へアクセスすることも珍しくありません。

Elasticsearch へ、様々なプログラム言語からアクセスすることが可能ですが、今回はその中でも比較的メジャーな Python を取り上げます。

今回は、Python を使って Elasticsearch へ検索を要求し、受け取った結果を画面に表示できるようにしてみたいと思います。

(画面の制御には、 streamlit を使います。)

対象者

  • Elastic Cloud のアカウントを持っている人(トライアルライセンスを含む)
  • Elasticsearch の初心者~中級者

できるようになること

  • Python を使って Elastic Cloud 上の Elasticsearch にアクセスし、検索テンプレートを使った検索をできるようになる。
  • Streamlit を使って 検索用クエリを受け取り、検索結果を画面に表示できるようになる。

前提条件

  • Elastic Cloud 8.15.0
  • Docker 27.2.1
  • Python 3.12
  • pip 24.3.1
  • elasticsearch (Python用のClient, version: 8.15.0)
  • streamlit 1.39.0
  • 桃太郎の内容をテキスト、および、密ベクトルでインデックス済であること。
  • 検索テンプレートを定義済であること。

(2024年11月18日時点の情報を元に記載しています。)

2. 準備

2.1. ファイル構成

次のような構成で動かしました(あくまでも例です)。

ルートフォルダ/
  |- .env
  |- docker-compose.yml
  |- Dockerfile
  |- requirement.txt
  |- src/
  |    |- app.py
  |    |- elastic/
                |- es_const.py
                |- es_func.py

なお、このブログ内では、Docker を利用していますが、Docker の利用は必須ではありません。

(Dockerの設定については、ここでは割愛します。
また、streamlit を使って画面制御を行っていますが、詳細については割愛します。)
(*脚注1)1

2.2. .env (Elasticsearch への接続情報) の内容

cloud_id_encoded=''

read_api_key_encoded=''

(後述の処理で取得した値をここに転記します。)

2.3. docker-compose.yml の記載例

services:
  sios_blog_sample_1:
    build:
      context: ./
      dockerfile: Dockerfile
    container_name: 'sios_blog_sample_1'
    volumes:
      - ./:/app
    ports:
      - 8501:8501

※8501は、streamlit を動かす際のディフォルトのポート番号です。

2.4. Dockerfileの記載例

FROM python:3.12

COPY ./requirements.txt ./

RUN pip install --upgrade pip
RUN pip install -r ./requirements.txt

WORKDIR /app
COPY ./ /app

ENV PYTHONUNBUFFERED 1

ENTRYPOINT ["/bin/sh", "-c", "while :; do sleep 30; done"]

2.5. requirements.txt の例

elasticsearch==8.15.0
python-dotenv==1.0.1
streamlit==1.39.0

2.6. src/app.py (アプリ本体) の内容

from elastic.es_func import create_es_client, create_search_params, es_search_template
from elastic.es_consts import SEARCH_INDEX, SEARCH_TEMPLATE_ID
import streamlit as st
from dotenv import load_dotenv
import os


# 初期化を行う。
def initialize():
  cloud_id_encoded: str = ''
  read_api_key_encoded: str = ''

  if load_dotenv(verbose=True):
    print('load_dotenv success')

  if 'cloud_id_encoded' in os.environ:
    cloud_id_encoded = os.environ['cloud_id_encoded']

    if 'read_api_key_encoded' in os.environ:
      read_api_key_encoded = os.environ['read_api_key_encoded']

  if read_api_key_encoded == '':
    print('please set cloud_id_encoded and read_api_key_encoded in .env')
    return ''
  else:
    es_client = create_es_client(cloud_id_encoded, read_api_key_encoded)

    # debug
    print(f'{es_client.info()=}')

    return es_client


# Elasticsearchで検索を行う。
def search(es_client, query):
  # 検索用パラメタを生成する。
  search_params = create_search_params(query)

  # 検索テンプレートを使って検索する。
  search_result = es_search_template(es_client, SEARCH_INDEX, SEARCH_TEMPLATE_ID, search_params)

  return search_result  


# 検索ボタンが押されたときに呼ばれる処理。
def search_onclick():
  es_client = st.session_state['es_client']
  query = st.session_state['query']

  # 検索を行う。
  search_results = search(es_client, query)

  # 結果を表示する。
  st.write(f'クエリ: {query}')
  st.write('検索結果:')
  for result in search_results:
    st.write(result)


# 画面の作成
def createScreen():
  st.text_input(label= 'クエリ', key= 'query')
  st.button(label= '検索', on_click= search_onclick)


# ----- main -----
if 'es_client' not in st.session_state:
  es_client = initialize()
  st.session_state['es_client'] = es_client

createScreen()

2.7. src/elastic/es_consts.py (定数定義部分) の内容

# 検索対象のインデックス
SEARCH_INDEX = 'momotaro_v3'

# 検索テンプレートのId
SEARCH_TEMPLATE_ID = 'rrf_search_template'

2.8. src/elastic/es_func.py (Elasticsearch関連の関数部分) の内容

from elasticsearch import Elasticsearch


# Elasticsearch へアクセスするための client を生成する。
def create_es_client(cloud_id_encoded, api_key_encoded):
  es_client: Elasticsearch = None
  if cloud_id_encoded != '' and api_key_encoded != '':
    es_client = Elasticsearch(cloud_id=cloud_id_encoded, api_key=api_key_encoded)
  return es_client


# 検索テンプレートに埋め込むパラメタを生成する。
def create_search_params(query):
  search_params = {
    'query_string': query,
    'query_for_vector': query
  }

  return search_params


# 検索テンプレートを使って、検索を行う。  
def es_search_template(es_client, search_index, search_template_id, search_params, field_name = 'content', max_count = 5):
  results = []
  search_results = es_client.search_template(index=search_index, id=search_template_id, params=search_params)

  for doc in search_results['hits']['hits'][:max_count]:
    # 配列になって返却されるので、0番目の要素を取り出す。
    results.append(doc['fields'][field_name][0])

  return results
  • create_es_client 関数

create_es_client 関数で、Elasticsearch へアクセスするための client を生成しています。
client を生成する方法はいくつかありますが、ここでは、cloud_id と api_key を渡す方法を採用しています。
詳細は、下記を参照してください。
https://elasticsearch-py.readthedocs.io/en/v8.15.1/quickstart.html#connecting
https://elasticsearch-py.readthedocs.io/en/v8.15.1/api/elasticsearch.html#elasticsearch.client.Elasticsearch

  • es_search_template 関数

es_search_template 関数から search_template API を使って Elasticsearch へ検索を行っていますが、
渡している情報は、

  • インデックス名
  • 検索テンプレートID
  • 検索パラメタ

のみです。検索クエリの詳細な内容は、Python側には、一切、書いていません。
(Elasticsearchの検索テンプレート内に記載しています)

search_tempate API の詳細は、下記を参照してください。

https://elasticsearch-py.readthedocs.io/en/v8.15.1/api/elasticsearch.html#elasticsearch.client.Elasticsearch.search_template


(*脚注2)2

3. 接続用情報の取得

3.1. Elasticsearch に接続するために必要な情報

Python から Elasticsearch に接続するために必要な情報は、下記の2つです。

  • Cloud ID
  • API Key

(*脚注3)3

3.2. Cloud ID の取得

Elastic Cloud にログインし、Home 画面を表示します。

elastic home

Home 画面の デプロイメント(ここでは “SIOS-BLOG-SAMPLE-1”)の Manage をクリックします。

cloud id

選択されたデプロイメントの画面が表示されますが、この右側に Cloud ID が表示されています。

これをコピーして、ソースの .env ファイルの cloud_id_encoded=’…’ に貼り付けてください。

.env

cloud_id_encoded='SIOS-BLOG-SAMPLE-1:.............='

3.3. API Key の取得

Python のプログラムから Elastic Cloud に接続する方法は、大きく分けて2つあります。

  • Username/Password を利用する方法
  • Access Key を利用する方法

ここでは、後者の Access Key を利用します。

Access Key を使うメリットとしては、

  • 権限の設定を行うことが可能。
  • 失効の操作が簡単。

などが挙げられます。

Elastic Cloud の Console から Access Key を作成します。

左側のリクエスト欄に下記を入力し、 ▷ をクリックします。

(今回は読み取り専用で十分なので、privileges: [“read”] としておきます。)

POST /_security/api_key
{
   "name": "momotaro_read_api_key",
   "role_descriptors": {
     "momotaro_api_key": {
       "cluster": ["all"],
       "indices": [
         {
           "names": ["momotaro*"],
           "privileges": ["read"]
         }
       ]
     }
   }
}

API Key の作成に成功すると、右側のレスポンス欄に、生成された API Key が表示されます。

{
  "id": "***********************",
  "name": "momotaro_read_api_key",
  "api_key": "***********************",
  "encoded": "***********************************************=="
}

ここで重要な値は、encoded に記載されている値です。

この値をコピーして、.env ファイルの read_api_key_encoded=’…’ に貼り付けます。

.env

read_api_key_encoded='.............=='

※注 API Key の api_key および encoded の値は、API Key を生成したときしか表示されません。

もし忘れてしまってわからなくなった場合は、再作成してください。

※API Key についての詳細は、下記を参照してください。

https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html

4. Docker 上での streamlit の実行

docker-compose.yml があるホスト上のディレクトリで、

docker compose build

を行います。

さらに

docker compose up -d

を行います。

これでコンテナが起動するはずなので、コンテナが起動したら、

docker exec -it sios_blog_sample_1 /bin/bash

とします。 (sios_blog_sample_1 は、コンテナ名)

bash を経由せずに、いきなり streamlit を動かしてもいいのですが、
エラーが起きた場合にハンドリングしにくいので、
いったん、bash を経由して streamlit を動かすようにしています。

接続先の bash の入力プロンプトから

streamlit run src/app.py

を実行します。

Streamlit が開始されるので、Web ブラウザから http://localhost:8501 へアクセスします。

次のような画面が表示されます。

streamlit-1

5. 検索の実行

それでは、検索してみましょう。

5.1. 検索1

クエリの入力欄に「おじいさんが山へ行ったのはなぜ」と入力し、[検索]ボタンを押します。

すると、次のような画面になります。

streamlit-2

検索結果の上位5件が表示されます。

5.2. 検索2

次に、クエリの入力欄に「桃太郎の家来になった動物」と入力し、[検索]ボタンを押します。

すると、次のような画面になります。

streamlit-3

6. まとめ

Python と streamlit を使って、Elasticsearch に検索を行い、結果を画面に表示することができました。

検索テンプレートを使って Python から検索を行うことで、役割の分担ができます。

Python 側の実装者は、Elasticsearch でのDSLの文法や、細かい検索の仕様について熟知する必要はありません。

逆に、Elasticsearch側で検索テンプレートを組み立てている担当者は、その検索テンプレートが
どのようなコードから実行されているか? など、細かい実装処理について熟知する必要はありません。

検索ロジックを変更したい場合も、Elasticsearch 側で検索テンプレートを変更し、
Python 側からは検索テンプレートのidとパラメタを渡すことで変更されたロジックで検索できるようになります。

次回は、生成AI と組み合わせた RAG を動かしてみたいと思います。


  1. あくまでもサンプルなので、エラー処理やサニタイジング処理は省略しています。
    ↩︎
  2. なお、このサンプルプログラムでは、データの投入は行っていません。前回までのブログでデータは投入済であることが前提です。
    Python から bulk API を呼び出してデータを投入することも可能です。
    詳細は、下記を参照してください。

    https://elasticsearch-py.readthedocs.io/en/v8.15.1/api/elasticsearch.html#elasticsearch.client.Elasticsearch.bulk


    その他の参考URL
    https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html
    https://elasticsearch-py.readthedocs.io/en/v8.15.1
    ↩︎
  3. Elastic Cloud 上で、(AWS/Azure/GCP用に) Private Link を設定した場合、Cloud IDでは接続できなくなります。
    その場合は、Cluster ID と ホスト名を指定して接続するようにしてください。
    (詳細は、公式ドキュメントを参照してください。)
    ↩︎