This guide will walk you through creating a Retrieval-Augmented Generation (RAG) application using Pulsejet for vector storage and retrieval, and Ollama for embeddings and text generation.

Prerequisites

Ensure you have the following Python packages installed:

  • Pulsejet
  • Ollama
  • NLTK

You also need to install Ollama on your computer and pull ‘llama3.1’ and ‘nomic-embed-text’ models.

You’ll also need some text files to index. You can use Art Deco building files from this GitHub repository if you are interested in asking RAG questions about Art Deco buildings in United States.

Setting Up the RAG System

First, let’s import the necessary libraries and set up our Pulsejet client:

import os
import pulsejet_client as pj
import ollama
from typing import List, Dict
import nltk
from nltk.tokenize import sent_tokenize
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Download necessary NLTK data
nltk.download('punkt', quiet=True)

# Initialize Pulsejet client
client = pj.PulsejetClient(location="local")

# Define the embedding model and LLM
EMBEDDING_MODEL = 'nomic-embed-text'
LLM_MODEL = 'llama3.1'

# Create or get the collection
collection_name = "documents"

def get_embedding(text: str) -> List[float]:
    """Get embedding for a given text using Ollama."""
    return ollama.embeddings(model=EMBEDDING_MODEL, prompt=text)['embedding']

def get_vector_size() -> int:
    """Determine the vector size from the embedding model."""
    sample_embedding = get_embedding("Sample text")
    return len(sample_embedding)

# Define vector parameters dynamically
vector_size = get_vector_size()
vector_params = pj.VectorParams(size=vector_size, index_type=pj.IndexType.HNSW)

def ensure_collection_exists():
    """Ensure the collection exists, creating it if necessary."""
    try:
        client.create_collection(collection_name, vector_params)
        logging.info(f"Created new collection: {collection_name}")
    except Exception as e:
        logging.info(f"Collection '{collection_name}' already exists or error occurred: {str(e)}")

Indexing Documents

Now, let’s create functions to chunk text and index documents:

def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
    """Split text into chunks of approximately chunk_size characters."""
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_length = 0

    for sentence in sentences:
        sentence_length = len(sentence)
        if current_length + sentence_length > chunk_size and current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append(chunk_text)
            # Keep the last sentence for overlap
            current_chunk = current_chunk[-1:] if overlap > 0 else []
            current_length = len(current_chunk[0]) if current_chunk else 0

        current_chunk.append(sentence)
        current_length += sentence_length

    # Add the last chunk
    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks

def index_documents(folder_path: str):
    """Index documents from a folder into Pulsejet."""
    ensure_collection_exists()
    total_chunks = 0
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(folder_path, filename)
            logging.info(f"Indexing file: {filename}")
            with open(file_path, 'r') as file:
                content = file.read()
                chunks = chunk_text(content)
                logging.info(f"  - Created {len(chunks)} chunks")
                for i, chunk in enumerate(chunks):
                    embedding = get_embedding(chunk)
                    if len(embedding) != vector_size:
                        logging.error(f"    - Error: Embedding dimension mismatch. Expected {vector_size}, got {len(embedding)}")
                        continue
                    meta = {
                        "filename": filename,
                        "chunk_id": str(i),
                        "content": chunk
                    }
                    try:
                        client.insert_single(collection_name, embedding, meta)
                        logging.info(f"    - Inserted chunk {i+1}/{len(chunks)}")
                    except Exception as e:
                        logging.error(f"    - Error inserting chunk {i+1}/{len(chunks)}: {str(e)}")
                total_chunks += len(chunks)
    logging.info(f"Indexing complete. Total chunks indexed: {total_chunks}")

Searching and Generating Answers

Now, let’s create functions to search for similar documents and generate answers:

def search_similar_docs(query: str, limit: int = 3) -> List[Dict]:
    """Search for similar documents based on the query."""
    query_embedding = get_embedding(query)
    logging.info(f"Query embedding size: {len(query_embedding)}")
    logging.info(f"Query embedding (first 5 elements): {query_embedding[:5]}")
    if len(query_embedding) != vector_size:
        logging.error(f"Query embedding dimension mismatch. Expected {vector_size}, got {len(query_embedding)}")
        return []
    try:
        results = client.search_single(collection_name, query_embedding, limit=limit, filter=None)
        logging.info(f"Search results: {results}")

        if not results.status.element:
            logging.info("No results found in the search.")
            return []

        return [{'filename': result.meta.get('filename', ''),
                 'chunk_id': result.meta.get('chunk_id', ''),
                 'content': result.meta.get('content', '')}
                for result in results.status.element]
    except Exception as e:
        logging.error(f"Error during search: {str(e)}")
        return []

def generate_answer(query: str, context: str) -> str:
    """Generate an answer using Ollama."""
    prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
    response = ollama.generate(model=LLM_MODEL, prompt=prompt)
    return response['response']

def rag_query(query: str) -> str:
    """Perform RAG query."""
    similar_docs = search_similar_docs(query)
    if not similar_docs:
        return "No similar documents found. The index might be empty or there was an error during the search."

    logging.info("\nSimilar documents found:")
    for i, doc in enumerate(similar_docs, 1):
        logging.info(f"\nDocument {i}:")
        logging.info(f"  Filename: {doc['filename']}")
        logging.info(f"  Chunk ID: {doc['chunk_id']}")
        logging.info(f"  Content snippet: {doc['content'][:200]}...")

    context = "\n".join([doc['content'] for doc in similar_docs])
    answer = generate_answer(query, context)

    if "unfortunately" in answer.lower() or "no information" in answer.lower():
        logging.info("No specific information found in the context. Generating a general answer.")
        answer += "\n\nHowever, based on general knowledge: " + generate_answer(query, "")

    return answer

Running the RAG Application

Finally, let’s put it all together:

if __name__ == "__main__":
    try:
        # Check if the index is empty
        if not check_index_status():
            logging.info("Index is empty. Indexing documents...")
            index_documents("files/")

        # Check index status again and print sample documents
        check_index_status()

        # Perform a RAG query
        query = "Tell me about Art Deco buildings in New York City"
        logging.info(f"\nQuestion: {query}")
        answer = rag_query(query)
        logging.info(f"\nAnswer: {answer}")
    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
    finally:
        try:
            client.close()
        except Exception as e:
            logging.error(f"Error closing client: {str(e)}")

This RAG application demonstrates the key operations using Pulsejet and Ollama:

  1. Document Indexing with Pulsejet:

    • We use client.insert_single(collection_name, embedding, meta) to insert each document chunk’s embedding and metadata into Pulsejet.
    • The insert_single method efficiently stores the vector (embedding) along with its associated metadata.
  2. Vector Search with Pulsejet:

    • We use client.search_single(collection_name, query_embedding, limit=limit, filter=None) to find similar documents.
    • This method performs a fast similarity search in the vector space, returning the most relevant documents.
  3. Embedding Generation with Ollama:

    • We use ollama.embeddings(model=EMBEDDING_MODEL, prompt=text) to generate embeddings for both documents and queries.
    • The vector size is determined automatically based on the embedding model output.
  4. Text Generation with Ollama:

    • We use ollama.generate(model=LLM_MODEL, prompt=prompt) to generate answers based on the retrieved context.
    • This leverages the power of the LLaMA 3.1 model to produce human-like responses.
  5. Collection Management:

    • The ensure_collection_exists() function checks if the collection already exists before attempting to create it, avoiding unnecessary operations.

By using Pulsejet for vector operations and Ollama for embeddings and text generation, we create a powerful and efficient RAG system. Pulsejet handles the storage and retrieval of vector data, while Ollama provides the necessary language understanding and generation capabilities.

Remember to replace "files/" with the path to your document folder. If you want to use the Art Deco building files, download them from the rag_art_deco GitHub repository and place them in your files2/ folder.