RAGはRetrieval-Augmented Generationの略で外部の知識ベース(ドキュメント等)を検索してそのドキュメントを元にLLMで回答を生成するというものである。
今回はOpenSearchの構築、Amazon Bedrockの知識ベースの構築、文書をOpenSearchに取り込む方法を紹介する。 ドキュメントをOpensearchやS3などの知識ベースに取り込むデータパイプラインを構築し、質問を受けたときに検索できるようにする。
構築方法
- S3のデータにアクセスし、OpenSearchに書き込むことができるポリシーを持つbedrockの実行ロールを作成する
- OpenSearchを作成する
- ドキュメントをダウンロードする
- Bedrockの知識ベースを作成する
- 知識ベースの中にS3と接続するデータソースを作成する
- KB APIを使用して取り込みジョブを開始し、S3からデータを読み込む
- 読み込んだデータをAmazon Titan Embeddingsモデルを使用してチャンクに分割し、これをOpenSearchに保存する。
事前準備
- S3バケットを作成する。
import warnings
warnings.filterwarnings('ignore')
import json
import os
import boto3
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
import random
from retrying import retry
suffix = random.randrange(200, 900)
sts_client = boto3.client('sts')
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
s3_client = boto3.client('s3')
account_id = sts_client.get_caller_identity()["Account"]
# s3_suffix = f"{region_name}-{account_id}"
s3_suffix = "hayashi"
bucket_name = f'bedrock-kb-{s3_suffix}' # replace it with your bucket name.
pp = pprint.PrettyPrinter(indent=2)
# Create S3 bucket for knowledge base data source
s3bucket = s3_client.create_bucket(
    Bucket=bucket_name
)
- OpenSearchとOpensearchに付与するポリシーを作成する
import boto3
import time
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)
# wait for collection creation
response = aoss_client.batch_get_collection(names=[vector_store_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    time.sleep(30)
    response = aoss_client.batch_get_collection(names=[vector_store_name])
print('\nCollection successfully created:')
print(response["collectionDetails"])
# create oss policy and attach it to Bedrock execution role
create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                bedrock_kb_execution_role=bedrock_kb_execution_role)
- vector indexを作成する
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)
index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "nmslib",
                 "space_type": "cosinesimil",
                 "parameters": {
                     "ef_construction": 512,
                     "m": 16
                 },
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)
# Create index
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
print('\nCreating index:')
print(response)
time.sleep(60) # index creation can take up to a minute
- データをダウンロードしてS3にアップロードする
from urllib.request import urlretrieve
urls = [
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]
filenames = [
    'AMZN-2022-Shareholder-Letter.pdf',
    'AMZN-2021-Shareholder-Letter.pdf',
    'AMZN-2020-Shareholder-Letter.pdf',
    'AMZN-2019-Shareholder-Letter.pdf'
]
data_root = "./data/"
for idx, url in enumerate(urls):
    file_path = data_root + filenames[idx]
    urlretrieve(url, file_path)
# Upload data to s3
s3_client = boto3.client("s3")
def uploadDirectory(path,bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                s3_client.upload_file(os.path.join(root,file),bucket_name,file)
uploadDirectory(data_root, bucket_name)
知識ベースを作成する
手順は以下となる。
- OpenSearchのインデックス名、ベクターフィールド、テキストフィールド、メタデータ等の 各フィールドの初期設定を行う
- KBに基づいたチャンキング戦略に従ってchunkingStrategyConfigurationで指定したチャンクサイズに従ってドキュメントをチャンクに分割する。
- データソースオブジェクトを作成するために使用するs3の設定を初期化する
- TianのARNを初期化する。各テキストチャンクの埋め込みを作成するために使用する
ソースコード
# Provide the configurations as input to the create_knowledge_base method, which will create the Knowledge base.
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes.
}
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"
name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Amazon shareholder letter knowledge base."
roleArn = bedrock_kb_execution_role_arn
# Create a KnowledgeBase
from retrying import retry
@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")
pp.pprint(kb)
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])
データソースを作成する
次にデータソースを作成した知識ベースと関連付ける。
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])
ドキュメントをデータソースに取り込む
知識ベースとデータソースの作成が完了したらドキュメントを取り込むジョブを実行することができる。 ジョブ実行中知識ベースはデータソースの中のドキュメントを取り込む。前処理としてデータソースからテキストを抽出し、指定したチャンクサイズに基づいてチャンクに分割する。 各チャンクを埋め込み、ベクターデータベースに保存する
# Start an ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])
job = start_job_response["ingestionJob"]
pp.pprint(job)
# Get job 
while(job['status']!='COMPLETE' ):
  get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
  job = get_job_response["ingestionJob"]
pp.pprint(job)
time.sleep(40)
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)
知識ベースを試す
RetrieveAndGenerate APIはクエリを埋め込みに変換し、知識ベースを検索する。そして、検索結果をコンテキスト情報としてモデルに追加し、質問に対する応答をモデルが返す。
# try out KB using RetrieveAndGenerate API
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
model_id = "anthropic.claude-instant-v1" # try with both claude instant as well as claude-v2. for claude v2 - "anthropic.claude-v2"
model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id}'
time.sleep(5)
query = "What is Amazon's doing in the field of generative AI?"
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        'text': query
    },
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': kb_id,
            'modelArn': model_arn
        }
    },
)
generated_text = response['output']['text']
pp.pprint(generated_text)
## print out the source attribution/citations from the original documents to see if the response generated belongs to the context.
citations = response["citations"]
contexts = []
for citation in citations:
    retrievedReferences = citation["retrievedReferences"]
    for reference in retrievedReferences:
        contexts.append(reference["content"]["text"])
pp.pprint(contexts)
Retrieve API
Retrieve APIはクエリを埋め込みに変換し、知識ベースを検索して関連する結果を返すため、セマンティック検索結果に基づいてカスタムワークフローを構築するためのより詳細な制御が可能になる。 Retrieve API の出力には取得されたテキスト、チャンク、ソースデータの場所の種類とURI、および取得の関連性スコアが含まれる。
# retreive api for fetching only the relevant context.
relevant_documents = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)
pp.pprint(relevant_documents["retrievalResults"])
出力は以下のようになる。
[ { 'content': { 'text': 'This shift was driven by several factors, including '
                         'access to higher volumes of compute capacity at '
                         'lower prices than was ever available. Amazon has '
                         'been using machine learning extensively for 25 '
                         'years, employing it in everything from personalized '
                         'ecommerce recommendations, to fulfillment center '
                         'pick paths, to drones for Prime Air, to Alexa, to '
                         'the many machine learning services AWS offers (where '
                         'AWS has the broadest machine learning functionality '
                         'and customer base of any cloud provider). More '
                         'recently, a newer form of machine learning, called '
                         'Generative AI, has burst onto the scene and promises '
                         'to significantly accelerate machine learning '
                         'adoption. Generative AI is based on very Large '
                         'Language Models (trained on up to hundreds of '
                         'billions of parameters, and growing), across '
                         'expansive datasets, and has radically general and '
                         'broad recall and learning capabilities. We have been '
                         'working on our own LLMs for a while now, believe it '
                         'will transform and improve virtually every customer '
                         'experience, and will continue to invest '
                         'substantially in these models across all of our '
                         'consumer, seller, brand, and creator experiences. '
                         'Additionally, as we’ve done for years in AWS, we’re '
                         'democratizing this technology so companies of all '
                         'sizes can leverage Generative AI. AWS is offering '
                         'the most price-performant machine learning chips in '
                         'Trainium and Inferentia so small and large companies '
                         'can afford to train and run their LLMs in '
                         'production. We enable companies to choose from '
                         'various LLMs and build applications with all of the '
                         'AWS security, privacy and other features that '
                         'customers are accustomed to using. And, we’re '
                         'delivering applications like AWS’s CodeWhisperer, '
                         'which revolutionizes        developer productivity '
                         'by generating code suggestions in real time. I could '
                         'write an entire letter on LLMs and Generative AI as '
                         'I think they will be that transformative, but I’ll '
                         'leave that for a future letter. Let’s just say that '
                         'LLMs and Generative AI are going to be a big deal '
                         'for customers, our shareholders, and Amazon.   So, '
                         'in closing, I’m optimistic that we’ll emerge from '
                         'this challenging macroeconomic time in a stronger '
                         'position than when we entered it. There are several '
                         'reasons for it and I’ve mentioned many of them '
                         'above. But, there are two relatively simple '
                         'statistics that underline our immense future '
                         'opportunity. While we have a consumer business '
                         'that’s $434B in 2022, the vast majority of total '
                         'market segment share in global retail still resides '
                         'in physical stores (roughly 80%).'},
    'location': { 's3Location': { 'uri': 's3://bedrock-kb-hayashi/AMZN-2022-Shareholder-Letter.pdf'},
                  'type': 'S3'},
    'score': 0.7635814},
まとめ
今回はBedrockでRAGを試してみた。専門用語も多く、私も理解していないところがあるため、RAGの理解を深めていきたいと思いました。
- 閲覧数 539
 
    
コメントを追加