From 88949b8e994719902915dd4126f2701b7fc627c8 Mon Sep 17 00:00:00 2001 From: "hugo.varenne" <hugo.varenne@edu.hefr.ch> Date: Sat, 28 Sep 2024 11:06:17 +0200 Subject: [PATCH] feat: Add part of Google --- .gitignore | 0 Part1 - Google/create-GCP-and-put-docs.py | 39 +++++ Part1 - Google/create-vector-db.py | 161 ++++++++++++++++++++ Part1 - Google/delete-s3.py | 27 ++++ Part1 - Google/requirements.txt | 7 + Part1 - Google/vectorise-store.py | 170 ++++++++++++++++++++++ 6 files changed, 404 insertions(+) create mode 100644 .gitignore create mode 100644 Part1 - Google/create-GCP-and-put-docs.py create mode 100644 Part1 - Google/create-vector-db.py create mode 100644 Part1 - Google/delete-s3.py create mode 100644 Part1 - Google/requirements.txt create mode 100644 Part1 - Google/vectorise-store.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/Part1 - Google/create-GCP-and-put-docs.py b/Part1 - Google/create-GCP-and-put-docs.py new file mode 100644 index 0000000..467e8d7 --- /dev/null +++ b/Part1 - Google/create-GCP-and-put-docs.py @@ -0,0 +1,39 @@ +# Creator: Abir Chebbi (abir.chebbi@hesge.ch) + +import os +import argparse + + +from google.cloud import storage + + +def create_bucket(gcp_client, bucket_name): + """Creates a new bucket.""" + + bucket = gcp_client.create_bucket(bucket_name) + + +# Function to write files to S3 +def write_files(gcp_client, directory, bucket): + bucket = gcp_client.bucket(bucket) + for filename in os.listdir(directory): + if filename.endswith(".pdf"): # Check if the file is a PDF + file_path = os.path.join(directory, filename) + with open(file_path, 'rb') as file: + print(f"Uploading {filename} to bucket {bucket}...") + blob = bucket.blob(filename) + blob.upload_from_filename(filename) + print(f"{filename} uploaded successfully.") + +def main(bucket_name, local_dir): + gcp_client = storage.Client() + create_bucket(gcp_client, bucket_name) + write_files(gcp_client, local_dir, bucket_name) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Upload PDF files to an GCP bucket") + parser.add_argument("--bucket_name", help="The name of the GCP bucket to which the files will be uploaded") + parser.add_argument("--local_path", help="The name of the folder to put the pdf files") + args = parser.parse_args() + main(args.bucket_name, args.local_path) + diff --git a/Part1 - Google/create-vector-db.py b/Part1 - Google/create-vector-db.py new file mode 100644 index 0000000..91a2c8b --- /dev/null +++ b/Part1 - Google/create-vector-db.py @@ -0,0 +1,161 @@ +# Creator: Abir Chebbi (abir.chebbi@hesge.ch) +## Source: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-sdk.html + + +import boto3 +import botocore +import time +import argparse + + +client = boto3.client('opensearchserverless') +#service = 'aoss' + +def createEncryptionPolicy(client,policy_name, collection_name): + """Creates an encryption policy for the specified collection.""" + try: + response = client.create_security_policy( + description=f'Encryption policy for {collection_name}', + name=policy_name, + policy=f""" + {{ + \"Rules\": [ + {{ + \"ResourceType\": \"collection\", + \"Resource\": [ + \"collection/{collection_name}\" + ] + }} + ], + \"AWSOwnedKey\": true + }} + """, + type='encryption' + ) + print('\nEncryption policy created:') + print(response) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'ConflictException': + print( + '[ConflictException] The policy name or rules conflict with an existing policy.') + else: + raise error + + +def createNetworkPolicy(client,policy_name,collection_name): + """Creates a network policy for the specified collection.""" + try: + response = client.create_security_policy( + description=f'Network policy for {collection_name}', + name=policy_name, + policy=f""" + [{{ + \"Description\": \"Public access for {collection_name}\", + \"Rules\": [ + {{ + \"ResourceType\": \"dashboard\", + \"Resource\": [\"collection/{collection_name}\"] + }}, + {{ + \"ResourceType\": \"collection\", + \"Resource\": [\"collection/{collection_name}\"] + }} + ], + \"AllowFromPublic\": true + }}] + """, + type='network' + ) + print('\nNetwork policy created:') + print(response) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'ConflictException': + print( + '[ConflictException] A network policy with this name already exists.') + else: + raise error + + +def createAccessPolicy(client, policy_name, collection_name, IAM_USER): + """Creates a data access policy for the specified collection.""" + try: + policy_content = f""" + [ + {{ + "Rules": [ + {{ + "Resource": ["collection/{collection_name}"], + "Permission": [ + "aoss:CreateCollectionItems", + "aoss:DeleteCollectionItems", + "aoss:UpdateCollectionItems", + "aoss:DescribeCollectionItems" + ], + "ResourceType": "collection" + }}, + {{ + "Resource": ["index/{collection_name}/*"], + "Permission": [ + "aoss:CreateIndex", + "aoss:DeleteIndex", + "aoss:UpdateIndex", + "aoss:DescribeIndex", + "aoss:ReadDocument", + "aoss:WriteDocument" + ], + "ResourceType": "index" + }} + ], + "Principal": ["arn:aws:iam::352909266144:user/{IAM_USER}"] + }} + ] + """ + response = client.create_access_policy( + description=f'Data access policy for {collection_name}', + name=policy_name, + policy=policy_content, + type='data' + ) + print('\nAccess policy created:') + print(response) + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'ConflictException': + print('[ConflictException] An access policy with this name already exists.') + else: + raise error + + + + +def waitForCollectionCreation(client,collection_name): + """Waits for the collection to become active""" + time.sleep(30) + response = client.batch_get_collection( + names=[collection_name]) + print('\nCollection successfully created:') + print(response["collectionDetails"]) + # Extract the collection endpoint from the response + host = (response['collectionDetails'][0]['collectionEndpoint']) + final_host = host.replace("https://", "") + return final_host + + +def main(collection_name,IAM_USER): + encryption_policy_name = f'{collection_name}-encryption-policy' + network_policy_name = f'{collection_name}-network-policy' + access_policy_name = f'{collection_name}-access-policy' + createEncryptionPolicy(client, encryption_policy_name, collection_name) + createNetworkPolicy(client, network_policy_name, collection_name) + createAccessPolicy(client, access_policy_name, collection_name,IAM_USER) + collection = client.create_collection(name=collection_name,type='VECTORSEARCH') + ENDPOINT= waitForCollectionCreation(client,collection_name) + + print("Collection created successfully:", collection) + print("Collection ENDPOINT:", ENDPOINT) + +if __name__== "__main__": + parser = argparse.ArgumentParser(description="Create collection") + parser.add_argument("--collection_name", help="The name of the collection") + parser.add_argument("--iam_user", help="The iam user") + args = parser.parse_args() + main(args.collection_name,args.iam_user) diff --git a/Part1 - Google/delete-s3.py b/Part1 - Google/delete-s3.py new file mode 100644 index 0000000..efe9c49 --- /dev/null +++ b/Part1 - Google/delete-s3.py @@ -0,0 +1,27 @@ +# Creator: Abir Chebbi (abir.chebbi@hesge.ch) + + +import boto3 + +BUCKET_NAME = '' + +S3_CLIENT = boto3.client('s3') +S3_RESOURCE = boto3.resource('s3') + +# # # Delete Bucket + +# First, delete all objects in the Bucket +bucket = S3_RESOURCE.Bucket(BUCKET_NAME) + +print("Deleting all objects in Bucket\n") +bucket.objects.all().delete() + + +print("Deleting Bucket") +# Bucket Deletion +response = S3_CLIENT.delete_bucket( + Bucket=BUCKET_NAME + +) + +print(response) diff --git a/Part1 - Google/requirements.txt b/Part1 - Google/requirements.txt new file mode 100644 index 0000000..02624df --- /dev/null +++ b/Part1 - Google/requirements.txt @@ -0,0 +1,7 @@ +langchain-community +pypdf +opensearch-py +boto3 +google-cloud +google-cloud-storage +google \ No newline at end of file diff --git a/Part1 - Google/vectorise-store.py b/Part1 - Google/vectorise-store.py new file mode 100644 index 0000000..3e5eda4 --- /dev/null +++ b/Part1 - Google/vectorise-store.py @@ -0,0 +1,170 @@ +# Creator: Abir Chebbi (abir.chebbi@hesge.ch) + +import boto3 +import os +from langchain_community.document_loaders import PyPDFDirectoryLoader +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain_community.embeddings import BedrockEmbeddings +from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth +from langchain_community.vectorstores import OpenSearchVectorSearch +import argparse + + + + +## S3_client +s3_client = boto3.client('s3') + +## Bedrock client +bedrock_client = boto3.client(service_name="bedrock-runtime") + + +## Configuration for AWS authentication and OpenSearch client +credentials = boto3.Session().get_credentials() +awsauth = AWSV4SignerAuth(credentials, 'us-east-1', 'aoss') + + + + +## Create Index in Opensearch +def create_index(client,index_name): + indexBody = { + "settings": { + "index.knn": True + }, + "mappings": { + "properties": { + "vector_field": { + "type": "knn_vector", + "dimension": 1536, + "method": { + "engine": "faiss", + "name": "hnsw" + } + } + } + } + } + + try: + create_response = client.indices.create(index_name, body=indexBody) + print('\nCreating index:') + print(create_response) + except Exception as e: + print(e) + print("(Index likely already exists?)") + + + +## Load docs from S3 +def download_documents(bucket_name,local_dir): + response = s3_client.list_objects_v2(Bucket=bucket_name) + for item in response['Contents']: + key = item['Key'] + if key.endswith('.pdf'): + local_filename = os.path.join(local_dir, key) + s3_client.download_file(Bucket=bucket_name, Key=key, Filename=local_filename) + + + + + + +## Split pages/text into chunks +def split_text(docs, chunk_size, chunk_overlap): + text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + chunks = text_splitter.split_documents(docs) + + return chunks + +## Generate embeddings +def generate_embeddings(bedrock_client, chunks): + embeddings_model = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_client) + chunks_list=[chunk.page_content for chunk in chunks] + embeddings = embeddings_model.embed_documents(chunks_list) + return embeddings + +# Store generated embeddings into an OpenSearch index. +def store_embeddings(embeddings, texts, meta_data, host, awsauth, index_name): + + docsearch = OpenSearchVectorSearch.from_embeddings( + embeddings, + texts, + meta_data, + opensearch_url=f'https://{host}:443', + http_auth=awsauth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + index_name=index_name, + bulk_size=1000 +) + + return docsearch + + +# Func to do both generating and storing embeddings +def generate_store_embeddings(bedrock_client, chunks,awsauth,index_name): + embeddings_model = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_client) + docsearch = OpenSearchVectorSearch.from_documents( + chunks, + embeddings_model, + opensearch_url=f'https://{host}:443', + http_auth=awsauth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + index_name=index_name, + bulk_size=1000 +) + + return docsearch + + + +## main +def main(bucket_name, endpoint,index_name, local_path): + + ## Opensearch Client + OpenSearch_client = OpenSearch( + hosts=[{'host': endpoint, 'port': 443}], + http_auth=awsauth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + + ) + + download_documents(bucket_name,local_path) + loader= PyPDFDirectoryLoader(local_path) + docs = loader.load() + print('Start chunking') + chunks = split_text(docs, 1000, 100) + print(chunks[1]) + create_index(OpenSearch_client,index_name) + print('Start vectorising') + embeddings= generate_embeddings(bedrock_client, chunks) + print(embeddings[1]) + texts = [chunk.page_content for chunk in chunks] + # Prepare metadata for each chunk + meta_data = [{'source': chunk.metadata['source'], 'page': chunk.metadata['page'] + 1} for chunk in chunks] + print('Start storing') + store_embeddings(embeddings, texts, meta_data ,endpoint, awsauth,index_name) + print('End storing') + + + + + + + + + +if __name__== "__main__": + parser = argparse.ArgumentParser(description="Process PDF documents and store their embeddings.") + parser.add_argument("--bucket_name", help="The S3 bucket name where documents are stored") + parser.add_argument("--endpoint", help="The OpenSearch service endpoint") + parser.add_argument("--index_name", help="The name of the OpenSearch index") + parser.add_argument("--local_path", help="local path") + args = parser.parse_args() + main(args.bucket_name, args.endpoint, args.index_name, args.local_path) -- GitLab