Elasticsearchを使ったRAGアプリケーションの作成

BLOG

1. 前書き

こんにちは。サイオステクノロジーの田川です。

前回は、Python と Streamlit を使って Elasticsearch へアクセスし、ハイブリッド検索(RRF)を行ってみました。

今回は、昨今、流行りの RAG アプリケーションを作成して、 質問した内容を Elasticsearch で検索し、 検索結果から LLM に回答を作ってもらうようにしてみます。検索する内容は前回までに登録した「桃太郎」を使い、LLM には今回は Cohere の Command R を使います。

(*脚注1)1

対象者

  • Elastic Cloud のアカウントを持っている人(トライアルライセンスを含む)
  • Elasticsearch の初心者~中級者

できるようになること

  • Elasticsearch, Python, Streamlit, Cohere を使って、簡単な RAG のアプリケーションを動かせるようになる。

前提条件

  • Elastic Cloud 8.15.0
  • Docker 27.2.1
  • Python 3.12
  • pip 24.3.1
  • elasticsearch (Python用のClient, version: 8.15.0)
  • streamlit 1.39.0
  • Cohere Command R
  • 桃太郎の内容をテキスト型、および、密ベクトル型でインデックス済であること。
  • 検索テンプレートを定義済であること。

(2024年12月2日時点の情報を元に記載しています。)

今回のブログでは、回答作成用の LLM として Cohere の Command R を使用するため、Cohere API の Key(Trial Key 可) が必要です。

(*脚注2)2

2. RAG とは ?

いろんなWebサイトで RAG の解説記事が掲載されているので、あらためて、ここに書くまでもないかと 思いますが、超簡単に書くと以下の図のような構造になっています。
rag simple figure
LLM が知らない情報を事前に検索し、それを LLM に与えることで、LLMが知らない情報(企業内の独自の情報など)に関連する質問に答えることができるようになります。

今回は、

  • ユーザーとの対話部分に Streamlit を
  • 検索部分に Elasticsearch を
  • 回答作成用の LLM に Cohere Command R を

それぞれ使います。 検索方法は、RRFによるハイブリッド検索です。

前回のRRFによるハイブリッド検索プログラムを元に、Cohere Command R に回答を依頼する処理を追加し、RAGのプログラムを作成します。

※あくまでもサンプルですので、エラーチェックや、サニタイジング処理などは、ほとんど実装していません。また、LLM との対話履歴のクリアも必要な場合がありますが、実装していません。

3. 準備

3.1. ファイル構成

今回作成する RAG アプリケーションのファイル構成です(あくまでも例です)。

ルートフォルダ/
  |- .env
  |- docker-compose.yml
  |- Dockerfile
  |- requirement.txt
  |- src/
       |- app_rag.py
       |- elastic/
       |        |- es_consts.py
       |        |- es_func.py
       |-llm/
           |- cohere_func.py
           |- llm_consts.py

前回のファイル構成と比較して、LLM関連のフォルダ、ファイルを追加しています。

なお、このブログ内では、Docker を利用していますが、Docker の利用は必須ではありません。

(Dockerの設定については、ここでは割愛します。 また、Streamlit を使って画面制御を行っていますが、詳細については割愛します。)

3.2. .env

cloud_id_encoded='........=='

read_api_key_encoded='.......=='

# cohere api key
llm_api_key='..........'

前回登録した cloud_id_encoded, read_api_key_encoded に加えて、Cohere API の trial key を設定します。

3.3. docker-compose.yml

services:
  sios_rag_sample_1:
    build:
      context: ./
      dockerfile: Dockerfile
    container_name: 'sios_rag_sample_1'
    volumes:
      - ./:/app
    ports:
      - 8501:8501

コンテナ名を sios_rag_sample_1 としています。

3.4. Dockerfile

FROM python:3.12

COPY ./requirements.txt ./

RUN pip install --upgrade pip
RUN pip install -r ./requirements.txt

WORKDIR /app
COPY ./ /app

ENV PYTHONUNBUFFERED 1

ENTRYPOINT ["/bin/sh", "-c", "while :; do sleep 30; done"]

前回と同じです。

3.5. requirements.txt

elasticsearch==8.15.0
python-dotenv==1.0.1
streamlit==1.39.0
langchain-cohere==0.3.1

※前回の requirements.txt に、langchain-cohere を追加します。

3.6. src/llm/llm_consts.py

# 問い合わせ先のモデル
LLM_MODEL_ID = 'command-r'

LLM 用の固定文字列を定義します。

今回は、Cohere の Command R を使用するので、それを表す’command-r’を定義します。

3.7. src/llm/cohere_func.py

from langchain_cohere import ChatCohere, CohereRagRetriever
from langchain_core.documents import Document


# Define the Cohere LLM
def create_llm(cohere_api_key, model_id):
  return ChatCohere(cohere_api_key=cohere_api_key, model=model_id)


# LLM に問い合わせる。
def request_to_llm(llm, question, search_results):
  # 検索結果を Cohere に渡すための詰め替え。
  documents = []
  for search_result in search_results:
    documents.append(Document(page_content=search_result))

  rag = CohereRagRetriever(llm=llm, connectors=[])
  response = rag.invoke(input=question, documents=documents)

  return response


# Cohere の場合、response の最後の要素の page_content に回答が格納されている。
def retrieve_answer(response):
  return response[-1].page_content

Cohere 用の関数を定義します。

  • 初期化
  • LLM への問い合わせ
  • レスポンスからの回答部分の抽出

を定義しています。

CohereRagRetriever を利用すると、

response = rag.invoke(input=question, documents=documents)

という1文で、与えられた情報から回答を求める、といった動作をできるようになります。非常に簡単です。 (多少の準備作業は必要ですが、それほど大変ではありません。)
一般的に「プロンプトエンジニアリング」と呼ばれるようなテクニックも特に必要ありません。

3.8. src/elastic/es_consts.py

# 検索対象のインデックス
SEARCH_INDEX = 'momotaro_v3'

# 検索テンプレートのId
SEARCH_TEMPLATE_ID = 'rrf_search_template'

 前回と同じです。

3.9. src/elastic/es_func.py

from elasticsearch import Elasticsearch


# Elasticsearch へアクセスするための client を生成する。
def create_es_client(cloud_id_encoded, api_key_encoded):
  es_client: Elasticsearch = None
  if cloud_id_encoded != '' and api_key_encoded != '':
    es_client = Elasticsearch(cloud_id=cloud_id_encoded, api_key=api_key_encoded)
  return es_client


# 検索テンプレートに埋め込むパラメタを生成する。
def create_search_params(query):
  search_params = {
      'query_string': query,
      'query_for_vector': query
  }

  return search_params


# 検索テンプレートを使って、検索を行う。
def es_search_template(es_client, search_index, search_template_id, search_params,
                       field_name='content', max_count=5):
  results = []
  search_results = es_client.search_template(index=search_index, id=search_template_id,
                                             params=search_params)

  for doc in search_results['hits']['hits'][:max_count]:
    # 配列になって返却されるので、0番目の要素を取り出す。
    results.append(doc['fields'][field_name][0])

  return results

Elasticsearch への問い合わせ処理を実装しています。前回と同じです。
検索テンプレートを使って RRF ハイブリッド検索を行っていますが、Python側の実装は、非常に簡単になります。
(検索ロジックそのものは、Elasticsearch に登録した検索テンプレート(rrf_search_template)内に実装します。)
検索テンプレート(rrf_search_template)は、ハイブリッド検索の回で登録しています。

3.10. src/app_rag.py

import os

import streamlit as st
from dotenv import load_dotenv
from elastic.es_consts import SEARCH_INDEX, SEARCH_TEMPLATE_ID
from elastic.es_func import create_es_client, create_search_params, es_search_template
from llm.cohere_func import create_llm, request_to_llm, retrieve_answer
from llm.llm_consts import LLM_MODEL_ID

# 参考
# https://github.com/streamlit/example-app-langchain-rag/blob/main/streamlit_app.py


# Elasticsearch用の client の初期化を行う。
def initialize_es():
  cloud_id_encoded: str = ''
  read_api_key_encoded: str = ''

  if load_dotenv(verbose=True):
    print('load_dotenv success')

  if 'cloud_id_encoded' in os.environ:
    cloud_id_encoded = os.environ['cloud_id_encoded']

    if 'read_api_key_encoded' in os.environ:
      read_api_key_encoded = os.environ['read_api_key_encoded']

  if read_api_key_encoded == '':
    print('please set cloud_id_encoded and read_api_key_encoded in .env')
    return ''
  else:
    es_client = create_es_client(cloud_id_encoded, read_api_key_encoded)

    # debug
    print(f'{es_client.info()=}')

    return es_client


# LLM の初期化を行う。
def initialize_llm():
  llm_api_key: str = ''

  if load_dotenv(verbose=True):
    print('load_dotenv success')

  if 'llm_api_key' in os.environ:
    llm_api_key = os.environ['llm_api_key']

  llm = create_llm(llm_api_key, LLM_MODEL_ID)

  return llm


# 質問が入力された後に呼ばれる検索処理。
# (Elasticsearch へ検索を依頼する)
def search(query):
  es_client = st.session_state['es_client']

  # 検索用パラメタを生成する。
  search_params = create_search_params(query)

  # 検索テンプレートを使って検索する。
  search_results = es_search_template(es_client, SEARCH_INDEX, SEARCH_TEMPLATE_ID, search_params)

  # debug print (検索結果)
  print('----- search_results -----')
  i: int = 0
  for result in search_results:
    print(f'{i}, {result}')
    i += 1

  return search_results


# 検索結果を情報源としてLLMへ問い合わせる。
def question_to_llm(query, search_results):
  # LLM に問い合わせる。
  llm = st.session_state['llm']
  response = request_to_llm(llm, query, search_results)

  # for debug (LLMの結果)
  print('----- llm response -----')
  print(response)

  answer = retrieve_answer(response)

  return answer


# 画面の表示
def show_ui(prompt_to_user='質問を入力してください。'):
  if 'messages' not in st.session_state.keys():
    st.session_state.messages = [{'role': 'assistant', 'content': prompt_to_user}]

  # Display chat messages
  for message in st.session_state.messages:
    with st.chat_message(message['role']):
      st.write(message['content'])

  # 1. ユーザーからの質問を受け付ける。
  if query := st.chat_input():
    with st.chat_message('user'):
      st.write(query)
    st.session_state.messages.append({'role': 'user', 'content': query})

    # Generate a new response if last message is not from assistant
    if st.session_state.messages[-1]['role'] != 'assistant':
      with st.chat_message('assistant'):
        with st.spinner('Thinking...'):
          # 2. Elasticsearch へ問い合わせ / 3.検索結果の受け取り
          search_results = search(query)

          # 4. LLM へ問い合わせ / 5. 回答結果の受け取り
          answer = question_to_llm(query, search_results)

          # 6. 回答結果の表示
          st.markdown(answer)
          message = {'role': 'assistant', 'content': answer}
          st.session_state.messages.append(message)


# ----- main -----
if 'es_client' not in st.session_state:
  es_client = initialize_es()
  st.session_state['es_client'] = es_client

if 'llm' not in st.session_state:
  llm = initialize_llm()
  st.session_state['llm'] = llm

st.set_page_config(page_title='桃太郎 RAG')
st.title('桃太郎 RAG')

show_ui()

RAG の本体部分の処理です。前述の概略図の「アプリケーション」の部分に該当します。

Streamlit 上で、質問の受付 ~ 回答の表示 までを行っています。

  1. 質問の受け付け
  2. Elasticsearch への検索依頼
  3. 検索結果の受け取り
  4. LLM への問い合わせ
  5. 回答結果の受け取り
  6. 回答結果の表示

4. Docker 上での Streamlit の実行

ビルド~アプリケーションの実行は、前回とほぼ同じです。

(コンテナ名 と 最後に動作させるファイル名 が異なっています。)

docker-compose.yml があるホスト上のディレクトリで、

docker compose build

を行います。

さらに

docker compose up -d

を行います。

これでコンテナが起動するはずなので、コンテナが起動したら、

docker exec -it sios_rag_sample_1 /bin/bash

とします。 (sios_rag_sample_1 は、コンテナ名)

bash を経由せずに、いきなり Streamlit を動かしてもいいのですが、 エラーが起きた場合にハンドリングしにくいので、 いったん、bash を経由して Streamlit を動かすようにしています。

bash の入力プロンプトから

streamlit run src/app_rag.py

を実行します。Streamlit の HTTP サーバーが開始されます。

Web ブラウザから http://localhost:8501 へアクセスします。

次のような画面が表示されます。
rag ui 1

5. RAGの実行

それでは、実際に質問してみましょう。

5.1. 質問1

「おじいさんが山へ行った理由は?」という質問を入力してみます。

rag ui 2

「しば」が「柴」に勝手に変換されていますが、これは、Cohere 側の問題(仕様)だと思われます(親切で変換してくれているのでしょう)。意味は合ってるので良しとしましょう。

5.2. その他の質問

他にもいくつか質問してみます。

  • おばあさんが川へ行った理由は?
  • 船のへさきに立って見張りをしたのは誰?
  • 桃太郎が鬼が島へ行くために乗った乗り物は?
  • 桃太郎が鬼が島へ行った目的は?

それぞれ、次のような回答が返ってきます。

  • おばあさんは川へ洗濯に行きました。
  • 船のへさきに立って見張りをしたのはきじです。
  • 桃太郎が鬼が島へ乗った乗り物は船です。
  • 桃太郎が鬼が島へ行った目的は鬼せいばつをする事です。

6. まとめ

今回のプログラムを使って、簡単な RAG アプリケーションを動かすことができました。

RAG の基本的な動作自体は、あまり難しくないことがわかってもらえるとうれしく思います。

ただ、本格的な RAG システムを稼働させようとすると、様々なチューニングやセキュリティ対策を行う必要が出てきます。

これらについては、今後、紹介していければ、と思います。

(*脚注3)3 (*脚注4)4

最後に:
これまで断片的に内容を記載していたため、ややわかりにくい点もあったかと思いますので、今後、インデックスの作成~RAGの動作 の内容を1つの PDF にまとめて掲載する予定です。


  1. ※桃太郎の話は、LLMが既に学習済の可能性があります。 LLMが学習済であれば、わざわざ RAG を作る必要はありませんが、 あくまでも RAG の演習用として、桃太郎を RAG の題材にしています。
    RAG でないと答えられないように手を加えるのであれば、わざと、
     桃太郎 -> 柿之助
     桃 -> 柿
     猿 -> ゴリラ
     犬 -> 猫
     きじ -> 鷹
     きびだんご -> おむすび
    などのように変換してからテキスト内容を登録する対策も考えられますが、今回はあくまでも RAG の演習用なので、そこまではやっていません。
    ↩︎
  2. Cohere API の Trial Key を発行するには、
    https://cohere.com にアクセスして、右上の TRY NOW から Cohere のアカウントを作成後、 API Keys の Trial Key を発行してください。
    発行した Trial key は、.env ファイルに記載します。

    ※今回、Command R を利用するのは、
    – 無料での試用が可能なこと
    – 日本語に対応していること
    – RAGとしての利用が簡易であること
    を踏まえてのものです。
    日本語での RAG の動作を理解するためのお試しとしては、Command R は使いやすいと思います。ただし、本番運用での Command R の利用を推奨しているわけではありません。
    また、Command R よりも Command R+ の方が優秀と思われますが、今回はあくまでも RAG の学習用なので、Command R を利用しています。
    ↩︎
  3. ※あくまでも RAG の基礎を理解するために構築したアプリケーションですので、答えられない質問もあります。
    業務用の RAG システムであれば、
    – 適切なチャンキングを行う。
    – 適切なユーザー辞書登録を行う。
    – 適切な同義語登録を行う。
    などのチューニングを行う必要があります。
    その他、
    – 質問の内容にあわせて、検索結果のリランクを行う。
    といった改善策もあります。
    ↩︎
  4. CohereRagRetrieverの場合、どの情報源を参照して回答を作成したのか?も返してくれます。
    「桃太郎の家来が桃太郎からもらったものは?」の例だと、
    question_to_llm 関数内の response で次のような内容が返却されます。

    ‘citations’: [ChatCitation(start=15, end=20, text=’きびだんご’, document_ids=[‘doc-0’, ‘doc-2’]),
    ChatCitation(start=24, end=26, text=’犬も’, document_ids=[‘doc-0’]),
    ChatCitation(start=26, end=29, text=’きじも’, document_ids=[‘doc-2’])],

    page_content=’桃太郎の 家来がもらったものは、きびだんごでした。犬もきじも、きびだんごを一つもらって桃太郎についていきました。’

    これは、
    回答の根拠になった情報が
    – きびだんご (doc-0, doc-2 より)
    – 犬も (doc-0 より)
    – きじも (doc-2 より)
    であり、最終的な回答が
    「桃太郎の 家来がもらったものは、きびだんごでした。犬もきじも、きびだんごを一つもらって桃太郎についていきました。」
    であることを示しています。
    doc-0, doc-2 は、Elasticsearch で RRF によるハイブリッド検索を行った際の 0番目(先頭)と2番目の検索結果を指します。(search関数内での search_results の 0番目 と 2番目)
    (なお、猿の情報は、登録した内容のチャンクサイズが適切でないために、検索で見つかっていません。)
    ↩︎