diff --git a/knowledgehub/agents/rewoo/agent.py b/knowledgehub/agents/rewoo/agent.py index bccf827..3831c29 100644 --- a/knowledgehub/agents/rewoo/agent.py +++ b/knowledgehub/agents/rewoo/agent.py @@ -6,8 +6,8 @@ from typing import Any from theflow import Param from kotaemon.base.schema import Document +from kotaemon.indices.qa import CitationPipeline from kotaemon.llms import LLM, ChatLLM, PromptTemplate -from kotaemon.pipelines.citation import CitationPipeline from ..base import AgentType, BaseAgent, BaseLLM, BaseTool from ..output.base import BaseScratchPad diff --git a/knowledgehub/indices/__init__.py b/knowledgehub/indices/__init__.py index e69de29..64da5f3 100644 --- a/knowledgehub/indices/__init__.py +++ b/knowledgehub/indices/__init__.py @@ -0,0 +1,3 @@ +from .vectorindex import VectorIndexing, VectorRetrieval + +__all__ = ["VectorIndexing", "VectorRetrieval"] diff --git a/knowledgehub/indices/base.py b/knowledgehub/indices/base.py index 02cfbc7..6fee77a 100644 --- a/knowledgehub/indices/base.py +++ b/knowledgehub/indices/base.py @@ -5,7 +5,7 @@ from typing import Any, Type from llama_index.node_parser.interface import NodeParser -from ..base import BaseComponent, Document +from kotaemon.base import BaseComponent, Document, RetrievedDocument class DocTransformer(BaseComponent): @@ -26,7 +26,7 @@ class DocTransformer(BaseComponent): ... -class LlamaIndexMixin: +class LlamaIndexDocTransformerMixin: """Allow automatically wrapping a Llama-index component into kotaemon component Example: @@ -70,3 +70,23 @@ class LlamaIndexMixin: """ docs = self._obj(documents, **kwargs) # type: ignore return [Document.from_dict(doc.to_dict()) for doc in docs] + + +class BaseIndexing(BaseComponent): + """Define the base interface for indexing pipeline""" + + def to_retrieval_pipeline(self, **kwargs): + """Convert the indexing pipeline to a retrieval pipeline""" + raise NotImplementedError + + def to_qa_pipeline(self, **kwargs): + """Convert the indexing pipeline to a QA pipeline""" + raise NotImplementedError + + +class BaseRetrieval(BaseComponent): + """Define the base interface for retrieval pipeline""" + + @abstractmethod + def run(self, *args, **kwargs) -> list[RetrievedDocument]: + ... diff --git a/knowledgehub/indices/extractors/doc_parsers.py b/knowledgehub/indices/extractors/doc_parsers.py index 867f0ac..ed12fdd 100644 --- a/knowledgehub/indices/extractors/doc_parsers.py +++ b/knowledgehub/indices/extractors/doc_parsers.py @@ -1,18 +1,18 @@ -from ..base import DocTransformer, LlamaIndexMixin +from ..base import DocTransformer, LlamaIndexDocTransformerMixin class BaseDocParser(DocTransformer): ... -class TitleExtractor(LlamaIndexMixin, BaseDocParser): +class TitleExtractor(LlamaIndexDocTransformerMixin, BaseDocParser): def _get_li_class(self): from llama_index.extractors import TitleExtractor return TitleExtractor -class SummaryExtractor(LlamaIndexMixin, BaseDocParser): +class SummaryExtractor(LlamaIndexDocTransformerMixin, BaseDocParser): def _get_li_class(self): from llama_index.extractors import SummaryExtractor diff --git a/knowledgehub/indices/ingests/__init__.py b/knowledgehub/indices/ingests/__init__.py new file mode 100644 index 0000000..064f206 --- /dev/null +++ b/knowledgehub/indices/ingests/__init__.py @@ -0,0 +1,3 @@ +from .files import DocumentIngestor + +__all__ = ["DocumentIngestor"] diff --git a/knowledgehub/indices/ingests/files.py b/knowledgehub/indices/ingests/files.py new file mode 100644 index 0000000..83ea8b9 --- /dev/null +++ b/knowledgehub/indices/ingests/files.py @@ -0,0 +1,75 @@ +from pathlib import Path + +from llama_index.readers.base import BaseReader +from theflow import Param + +from kotaemon.base import BaseComponent, Document +from kotaemon.indices.extractors import BaseDocParser +from kotaemon.indices.splitters import BaseSplitter, TokenSplitter +from kotaemon.loaders import ( + AutoReader, + DirectoryReader, + MathpixPDFReader, + OCRReader, + PandasExcelReader, +) + + +class DocumentIngestor(BaseComponent): + """Ingest common office document types into Document for indexing + + Document types: + - pdf + - xlsx + - docx + """ + + pdf_mode: str = "normal" # "normal", "mathpix", "ocr" + doc_parsers: list[BaseDocParser] = Param(default_callback=lambda _: []) + text_splitter: BaseSplitter = TokenSplitter.withx( + chunk_size=1024, + chunk_overlap=256, + ) + + def _get_reader(self, input_files: list[str | Path]): + """Get appropriate readers for the input files based on file extension""" + file_extractor: dict[str, AutoReader | BaseReader] = { + ".xlsx": PandasExcelReader(), + } + + if self.pdf_mode == "normal": + file_extractor[".pdf"] = AutoReader("UnstructuredReader") + elif self.pdf_mode == "ocr": + file_extractor[".pdf"] = OCRReader() + else: + file_extractor[".pdf"] = MathpixPDFReader() + + main_reader = DirectoryReader( + input_files=input_files, + file_extractor=file_extractor, + ) + + return main_reader + + def run(self, file_paths: list[str | Path] | str | Path) -> list[Document]: + """Ingest the file paths into Document + + Args: + file_paths: list of file paths or a single file path + + Returns: + list of parsed Documents + """ + if not isinstance(file_paths, list): + file_paths = [file_paths] + + documents = self._get_reader(input_files=file_paths)() + nodes = self.text_splitter(documents) + self.log_progress(".num_docs", num_docs=len(nodes)) + + # document parsers call + if self.doc_parsers: + for parser in self.doc_parsers: + nodes = parser(nodes) + + return nodes diff --git a/knowledgehub/indices/qa/__init__.py b/knowledgehub/indices/qa/__init__.py new file mode 100644 index 0000000..03a185f --- /dev/null +++ b/knowledgehub/indices/qa/__init__.py @@ -0,0 +1,7 @@ +from .citation import CitationPipeline +from .text_based import CitationQAPipeline + +__all__ = [ + "CitationPipeline", + "CitationQAPipeline", +] diff --git a/knowledgehub/pipelines/citation.py b/knowledgehub/indices/qa/citation.py similarity index 95% rename from knowledgehub/pipelines/citation.py rename to knowledgehub/indices/qa/citation.py index 3f4f295..374a5f3 100644 --- a/knowledgehub/pipelines/citation.py +++ b/knowledgehub/indices/qa/citation.py @@ -1,14 +1,10 @@ -from typing import Iterator, List, Union +from typing import Iterator, List from pydantic import BaseModel, Field from kotaemon.base import BaseComponent from kotaemon.base.schema import HumanMessage, SystemMessage - -from ..llms.chats.base import ChatLLM -from ..llms.completions.base import LLM - -BaseLLM = Union[ChatLLM, LLM] +from kotaemon.llms import BaseLLM class FactWithEvidence(BaseModel): diff --git a/knowledgehub/indices/qa/text_based.py b/knowledgehub/indices/qa/text_based.py new file mode 100644 index 0000000..9c46fc7 --- /dev/null +++ b/knowledgehub/indices/qa/text_based.py @@ -0,0 +1,62 @@ +import os + +from kotaemon.base import BaseComponent, Document, RetrievedDocument +from kotaemon.llms import AzureChatOpenAI, BaseLLM, PromptTemplate + +from .citation import CitationPipeline + + +class CitationQAPipeline(BaseComponent): + """Answering question from a text corpus with citation""" + + qa_prompt_template: PromptTemplate = PromptTemplate( + 'Answer the following question: "{question}". ' + "The context is: \n{context}\nAnswer: " + ) + llm: BaseLLM = AzureChatOpenAI.withx( + azure_endpoint="https://bleh-dummy.openai.azure.com/", + openai_api_key=os.environ.get("OPENAI_API_KEY", ""), + openai_api_version="2023-07-01-preview", + deployment_name="dummy-q2-16k", + temperature=0, + request_timeout=60, + ) + + def _format_doc_text(self, text: str) -> str: + """Format the text of each document""" + return text.replace("\n", " ") + + def _format_retrieved_context(self, documents: list[RetrievedDocument]) -> str: + """Format the texts between all documents""" + matched_texts: list[str] = [ + self._format_doc_text(doc.text) for doc in documents + ] + return "\n\n".join(matched_texts) + + def run( + self, + question: str, + documents: list[RetrievedDocument], + use_citation: bool = False, + **kwargs + ) -> Document: + # retrieve relevant documents as context + context = self._format_retrieved_context(documents) + self.log_progress(".context", context=context) + + # generate the answer + prompt = self.qa_prompt_template.populate( + context=context, + question=question, + ) + self.log_progress(".prompt", prompt=prompt) + answer_text = self.llm(prompt).text + if use_citation: + # run citation pipeline + citation_pipeline = CitationPipeline(llm=self.llm) + citation = citation_pipeline(context=context, question=question) + else: + citation = None + + answer = Document(text=answer_text, metadata={"citation": citation}) + return answer diff --git a/knowledgehub/indices/splitters/__init__.py b/knowledgehub/indices/splitters/__init__.py index 8c89324..ea5c928 100644 --- a/knowledgehub/indices/splitters/__init__.py +++ b/knowledgehub/indices/splitters/__init__.py @@ -1,4 +1,4 @@ -from ..base import DocTransformer, LlamaIndexMixin +from ..base import DocTransformer, LlamaIndexDocTransformerMixin class BaseSplitter(DocTransformer): @@ -7,14 +7,14 @@ class BaseSplitter(DocTransformer): ... -class TokenSplitter(LlamaIndexMixin, BaseSplitter): +class TokenSplitter(LlamaIndexDocTransformerMixin, BaseSplitter): def _get_li_class(self): from llama_index.text_splitter import TokenTextSplitter return TokenTextSplitter -class SentenceWindowSplitter(LlamaIndexMixin, BaseSplitter): +class SentenceWindowSplitter(LlamaIndexDocTransformerMixin, BaseSplitter): def _get_li_class(self): from llama_index.node_parser import SentenceWindowNodeParser diff --git a/knowledgehub/indices/vectorindex.py b/knowledgehub/indices/vectorindex.py new file mode 100644 index 0000000..0f686d2 --- /dev/null +++ b/knowledgehub/indices/vectorindex.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import uuid +from pathlib import Path +from typing import Optional, Sequence, cast + +from kotaemon.base import BaseComponent, Document, RetrievedDocument +from kotaemon.embeddings import BaseEmbeddings +from kotaemon.storages import BaseDocumentStore, BaseVectorStore + +from .base import BaseIndexing, BaseRetrieval +from .rankings import BaseReranking + +VECTOR_STORE_FNAME = "vectorstore" +DOC_STORE_FNAME = "docstore" + + +class VectorIndexing(BaseIndexing): + """Ingest the document, run through the embedding, and store the embedding in a + vector store. + + This pipeline supports the following set of inputs: + - List of documents + - List of texts + """ + + vector_store: BaseVectorStore + doc_store: Optional[BaseDocumentStore] = None + embedding: BaseEmbeddings + + def to_retrieval_pipeline(self, *args, **kwargs): + """Convert the indexing pipeline to a retrieval pipeline""" + return VectorRetrieval( + vector_store=self.vector_store, + doc_store=self.doc_store, + embedding=self.embedding, + **kwargs, + ) + + def to_qa_pipeline(self, *args, **kwargs): + from .qa import CitationQAPipeline + + return TextVectorQA( + retrieving_pipeline=self.to_retrieval_pipeline(**kwargs), + qa_pipeline=CitationQAPipeline(**kwargs), + ) + + def run(self, text: str | list[str] | Document | list[Document]) -> None: + input_: list[Document] = [] + if not isinstance(text, list): + text = [text] + + for item in cast(list, text): + if isinstance(item, str): + input_.append(Document(text=item, id_=str(uuid.uuid4()))) + elif isinstance(item, Document): + input_.append(item) + else: + raise ValueError( + f"Invalid input type {type(item)}, should be str or Document" + ) + + embeddings = self.embedding(input_) + self.vector_store.add( + embeddings=embeddings, + ids=[t.id_ for t in input_], + ) + if self.doc_store: + self.doc_store.add(input_) + + def save( + self, + path: str | Path, + vectorstore_fname: str = VECTOR_STORE_FNAME, + docstore_fname: str = DOC_STORE_FNAME, + ): + """Save the whole state of the indexing pipeline vector store and all + necessary information to disk + + Args: + path (str): path to save the state + """ + if isinstance(path, str): + path = Path(path) + self.vector_store.save(path / vectorstore_fname) + if self.doc_store: + self.doc_store.save(path / docstore_fname) + + def load( + self, + path: str | Path, + vectorstore_fname: str = VECTOR_STORE_FNAME, + docstore_fname: str = DOC_STORE_FNAME, + ): + """Load all information from disk to an object""" + if isinstance(path, str): + path = Path(path) + self.vector_store.load(path / vectorstore_fname) + if self.doc_store: + self.doc_store.load(path / docstore_fname) + + +class VectorRetrieval(BaseRetrieval): + """Retrieve list of documents from vector store""" + + vector_store: BaseVectorStore + doc_store: Optional[BaseDocumentStore] = None + embedding: BaseEmbeddings + rerankers: Sequence[BaseReranking] = [] + top_k: int = 1 + + def run( + self, text: str | Document, top_k: Optional[int] = None, **kwargs + ) -> list[RetrievedDocument]: + """Retrieve a list of documents from vector store + + Args: + text: the text to retrieve similar documents + top_k: number of top similar documents to return + + Returns: + list[RetrievedDocument]: list of retrieved documents + """ + if top_k is None: + top_k = self.top_k + + if self.doc_store is None: + raise ValueError( + "doc_store is not provided. Please provide a doc_store to " + "retrieve the documents" + ) + + emb: list[float] = self.embedding(text)[0].embedding + _, scores, ids = self.vector_store.query(embedding=emb, top_k=top_k) + docs = self.doc_store.get(ids) + result = [ + RetrievedDocument(**doc.to_dict(), score=score) + for doc, score in zip(docs, scores) + ] + # use additional reranker to re-order the document list + if self.rerankers: + for reranker in self.rerankers: + result = reranker(documents=result, query=text) + + return result + + def save( + self, + path: str | Path, + vectorstore_fname: str = VECTOR_STORE_FNAME, + docstore_fname: str = DOC_STORE_FNAME, + ): + """Save the whole state of the indexing pipeline vector store and all + necessary information to disk + + Args: + path (str): path to save the state + """ + if isinstance(path, str): + path = Path(path) + self.vector_store.save(path / vectorstore_fname) + if self.doc_store: + self.doc_store.save(path / docstore_fname) + + def load( + self, + path: str | Path, + vectorstore_fname: str = VECTOR_STORE_FNAME, + docstore_fname: str = DOC_STORE_FNAME, + ): + """Load all information from disk to an object""" + if isinstance(path, str): + path = Path(path) + self.vector_store.load(path / vectorstore_fname) + if self.doc_store: + self.doc_store.load(path / docstore_fname) + + +class TextVectorQA(BaseComponent): + retrieving_pipeline: BaseRetrieval + qa_pipeline: BaseComponent + + def run(self, question, **kwargs): + retrieved_documents = self.retrieving_pipeline(question, **kwargs) + return self.qa_pipeline(question, retrieved_documents, **kwargs) diff --git a/knowledgehub/llms/__init__.py b/knowledgehub/llms/__init__.py index 5bb7c4f..bdc61bc 100644 --- a/knowledgehub/llms/__init__.py +++ b/knowledgehub/llms/__init__.py @@ -1,3 +1,5 @@ +from typing import Union + from kotaemon.base.schema import AIMessage, BaseMessage, HumanMessage, SystemMessage from .branching import GatedBranchingPipeline, SimpleBranchingPipeline @@ -6,7 +8,11 @@ from .completions import LLM, AzureOpenAI, OpenAI from .linear import GatedLinearPipeline, SimpleLinearPipeline from .prompts import BasePromptComponent, PromptTemplate +BaseLLM = Union[ChatLLM, LLM] + + __all__ = [ + "BaseLLM", # chat-specific components "ChatLLM", "BaseMessage", diff --git a/knowledgehub/pipelines/cot.py b/knowledgehub/llms/cot.py similarity index 97% rename from knowledgehub/pipelines/cot.py rename to knowledgehub/llms/cot.py index 2a39679..29256b3 100644 --- a/knowledgehub/pipelines/cot.py +++ b/knowledgehub/llms/cot.py @@ -4,8 +4,10 @@ from typing import Callable, List from theflow import Function, Node, Param from kotaemon.base import BaseComponent, Document -from kotaemon.llms import LLM, BasePromptComponent -from kotaemon.llms.chats.openai import AzureChatOpenAI + +from .chats import AzureChatOpenAI +from .completions import LLM +from .prompts import BasePromptComponent class Thought(BaseComponent): @@ -146,7 +148,7 @@ class ManualSequentialChainOfThought(BaseComponent): thoughts: List[Thought] = Param( default_callback=lambda *_: [], help="List of Thought" ) - llm: LLM = Param(help="The LLM model to use (base of kotaemon.llms.LLM)") + llm: LLM = Param(help="The LLM model to use (base of kotaemon.llms.BaseLLM)") terminate: Callable = Param( default=lambda _: False, help="Callback on terminate condition. Default to always return False", diff --git a/knowledgehub/pipelines/__init__.py b/knowledgehub/pipelines/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/knowledgehub/pipelines/indexing.py b/knowledgehub/pipelines/indexing.py deleted file mode 100644 index 5c13a27..0000000 --- a/knowledgehub/pipelines/indexing.py +++ /dev/null @@ -1,81 +0,0 @@ -from __future__ import annotations - -import uuid -from pathlib import Path -from typing import Optional, cast - -from ..base import BaseComponent, Document -from ..embeddings import BaseEmbeddings -from ..storages import BaseDocumentStore, BaseVectorStore - -VECTOR_STORE_FNAME = "vectorstore" -DOC_STORE_FNAME = "docstore" - - -class IndexVectorStoreFromDocumentPipeline(BaseComponent): - """Ingest the document, run through the embedding, and store the embedding in a - vector store. - - This pipeline supports the following set of inputs: - - List of documents - - List of texts - """ - - vector_store: BaseVectorStore - doc_store: Optional[BaseDocumentStore] = None - embedding: BaseEmbeddings - # TODO: refer to llama_index's storage as well - - def run(self, text: str | list[str] | Document | list[Document]) -> None: - input_: list[Document] = [] - if not isinstance(text, list): - text = [text] - - for item in cast(list, text): - if isinstance(item, str): - input_.append(Document(text=item, id_=str(uuid.uuid4()))) - elif isinstance(item, Document): - input_.append(item) - else: - raise ValueError( - f"Invalid input type {type(item)}, should be str or Document" - ) - - embeddings = self.embedding(input_) - self.vector_store.add( - embeddings=embeddings, - ids=[t.id_ for t in input_], - ) - if self.doc_store: - self.doc_store.add(input_) - - def save( - self, - path: str | Path, - vectorstore_fname: str = VECTOR_STORE_FNAME, - docstore_fname: str = DOC_STORE_FNAME, - ): - """Save the whole state of the indexing pipeline vector store and all - necessary information to disk - - Args: - path (str): path to save the state - """ - if isinstance(path, str): - path = Path(path) - self.vector_store.save(path / vectorstore_fname) - if self.doc_store: - self.doc_store.save(path / docstore_fname) - - def load( - self, - path: str | Path, - vectorstore_fname: str = VECTOR_STORE_FNAME, - docstore_fname: str = DOC_STORE_FNAME, - ): - """Load all information from disk to an object""" - if isinstance(path, str): - path = Path(path) - self.vector_store.load(path / vectorstore_fname) - if self.doc_store: - self.doc_store.load(path / docstore_fname) diff --git a/knowledgehub/pipelines/ingest.py b/knowledgehub/pipelines/ingest.py deleted file mode 100644 index a753cb6..0000000 --- a/knowledgehub/pipelines/ingest.py +++ /dev/null @@ -1,159 +0,0 @@ -from __future__ import annotations - -import os -from pathlib import Path -from typing import Optional, Sequence - -from llama_index.readers.base import BaseReader -from theflow import Node -from theflow.utils.modules import ObjectInitDeclaration as _ - -from kotaemon.agents import BaseAgent -from kotaemon.base import BaseComponent -from kotaemon.embeddings import AzureOpenAIEmbeddings -from kotaemon.indices.extractors import BaseDocParser -from kotaemon.indices.rankings import BaseReranking -from kotaemon.indices.splitters import TokenSplitter -from kotaemon.loaders import ( - AutoReader, - DirectoryReader, - MathpixPDFReader, - OCRReader, - PandasExcelReader, -) -from kotaemon.pipelines.indexing import IndexVectorStoreFromDocumentPipeline -from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline -from kotaemon.storages import ( - BaseDocumentStore, - BaseVectorStore, - InMemoryDocumentStore, - InMemoryVectorStore, -) - -from .qa import AgentQAPipeline, QuestionAnsweringPipeline -from .utils import file_names_to_collection_name - - -class ReaderIndexingPipeline(BaseComponent): - """ - Indexing pipeline which takes input from list of files - and perform ingestion to vectorstore - """ - - # Expose variables for users to switch in prompt ui - storage_path: Path = Path("./storage") - reader_name: str = "normal" # "normal", "mathpix" or "ocr" - chunk_size: int = 1024 - chunk_overlap: int = 256 - vector_store: BaseVectorStore = _(InMemoryVectorStore) - doc_store: BaseDocumentStore = _(InMemoryDocumentStore) - doc_parsers: list[BaseDocParser] = [] - - embedding: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings.withx( - model="text-embedding-ada-002", - deployment="dummy-q2-text-embedding", - azure_endpoint="https://bleh-dummy.openai.azure.com/", - openai_api_key=os.environ.get("OPENAI_API_KEY", ""), - chunk_size=16, - ) - - def get_reader(self, input_files: list[str | Path]): - # document parsers - file_extractor: dict[str, BaseReader | AutoReader] = { - ".xlsx": PandasExcelReader(), - } - if self.reader_name == "normal": - file_extractor[".pdf"] = AutoReader("UnstructuredReader") - elif self.reader_name == "ocr": - file_extractor[".pdf"] = OCRReader() - else: - file_extractor[".pdf"] = MathpixPDFReader() - main_reader = DirectoryReader( - input_files=input_files, - file_extractor=file_extractor, - ) - return main_reader - - @Node.auto(depends_on=["doc_store", "vector_store", "embedding"]) - def indexing_vector_pipeline(self): - return IndexVectorStoreFromDocumentPipeline( - doc_store=self.doc_store, - vector_store=self.vector_store, - embedding=self.embedding, - ) - - @Node.auto(depends_on=["chunk_size", "chunk_overlap"]) - def text_splitter(self) -> TokenSplitter: - return TokenSplitter( - chunk_size=self.chunk_size, - chunk_overlap=self.chunk_overlap, - ) - - def run( - self, - file_path_list: list[str | Path] | str | Path, - force_reindex: Optional[bool] = False, - ): - self.storage_path.mkdir(exist_ok=True) - - if not isinstance(file_path_list, list): - file_path_list = [file_path_list] - - self.file_name_list = [Path(path).stem for path in file_path_list] - collection_name = file_names_to_collection_name(self.file_name_list) - - file_storage_path = self.storage_path / collection_name - - # skip indexing if storage path exist - if force_reindex or not file_storage_path.exists(): - file_storage_path.mkdir(exist_ok=True) - # reader call - documents = self.get_reader(input_files=file_path_list)() - nodes = self.text_splitter(documents) - self.log_progress(".num_docs", num_docs=len(nodes)) - - # document parsers call - if self.doc_parsers: - for parser in self.doc_parsers: - nodes = parser(nodes) - - self.indexing_vector_pipeline(nodes) - # persist right after indexing - self.indexing_vector_pipeline.save(file_storage_path) - else: - self.indexing_vector_pipeline.load(file_storage_path) - - def to_retrieving_pipeline(self, top_k=3, rerankers: Sequence[BaseReranking] = []): - retrieving_pipeline = RetrieveDocumentFromVectorStorePipeline( - vector_store=self.vector_store, - doc_store=self.doc_store, - embedding=self.embedding, - top_k=top_k, - rerankers=rerankers, - ) - return retrieving_pipeline - - def to_qa_pipeline(self, llm: BaseComponent, **kwargs): - qa_pipeline = QuestionAnsweringPipeline( - storage_path=self.storage_path, - file_name_list=self.file_name_list, - vector_store=self.vector_store, - doc_store=self.doc_store, - embedding=self.embedding, - llm=llm, - **kwargs, - ) - return qa_pipeline - - def to_agent_pipeline(self, agent: BaseAgent, **kwargs): - agent_pipeline = AgentQAPipeline( - storage_path=self.storage_path, - file_name_list=self.file_name_list, - vector_store=self.vector_store, - doc_store=self.doc_store, - embedding=self.embedding, - agent=agent, - **kwargs, - ) - agent_pipeline.add_search_tool() - return agent_pipeline diff --git a/knowledgehub/pipelines/qa.py b/knowledgehub/pipelines/qa.py deleted file mode 100644 index 8435eaa..0000000 --- a/knowledgehub/pipelines/qa.py +++ /dev/null @@ -1,145 +0,0 @@ -import os -from pathlib import Path -from typing import List, Sequence - -from theflow import Node -from theflow.utils.modules import ObjectInitDeclaration as _ - -from kotaemon.agents import BaseAgent -from kotaemon.agents.tools import ComponentTool -from kotaemon.base import BaseComponent -from kotaemon.base.schema import Document, RetrievedDocument -from kotaemon.embeddings import AzureOpenAIEmbeddings -from kotaemon.indices.rankings import BaseReranking -from kotaemon.llms import PromptTemplate -from kotaemon.llms.chats.openai import AzureChatOpenAI -from kotaemon.pipelines.citation import CitationPipeline -from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline -from kotaemon.storages import ( - BaseDocumentStore, - BaseVectorStore, - InMemoryDocumentStore, - InMemoryVectorStore, -) - -from .utils import file_names_to_collection_name - - -class QuestionAnsweringPipeline(BaseComponent): - """ - Question Answering pipeline ultilizing a child Retrieving pipeline - """ - - storage_path: Path = Path("./storage") - retrieval_top_k: int = 3 - file_name_list: List[str] - """List of filename, incombination with storage_path to - create persistent path of vectorstore""" - qa_prompt_template: PromptTemplate = PromptTemplate( - 'Answer the following question: "{question}". ' - "The context is: \n{context}\nAnswer: " - ) - - llm: AzureChatOpenAI = AzureChatOpenAI.withx( - azure_endpoint="https://bleh-dummy.openai.azure.com/", - openai_api_key=os.environ.get("OPENAI_API_KEY", ""), - openai_api_version="2023-07-01-preview", - deployment_name="dummy-q2-16k", - temperature=0, - request_timeout=60, - ) - - vector_store: BaseVectorStore = _(InMemoryVectorStore) - doc_store: BaseDocumentStore = _(InMemoryDocumentStore) - rerankers: Sequence[BaseReranking] = [] - - embedding: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings.withx( - model="text-embedding-ada-002", - deployment="dummy-q2-text-embedding", - azure_endpoint="https://bleh-dummy.openai.azure.com/", - openai_api_key=os.environ.get("OPENAI_API_KEY", ""), - ) - - @Node.auto( - depends_on=[ - "vector_store", - "doc_store", - "embedding", - "file_name_list", - "retrieval_top_k", - ] - ) - def retrieving_pipeline(self) -> RetrieveDocumentFromVectorStorePipeline: - retrieving_pipeline = RetrieveDocumentFromVectorStorePipeline( - vector_store=self.vector_store, - doc_store=self.doc_store, - embedding=self.embedding, - top_k=self.retrieval_top_k, - rerankers=self.rerankers, - ) - # load persistent from selected path - collection_name = file_names_to_collection_name(self.file_name_list) - retrieving_pipeline.load(self.storage_path / collection_name) - return retrieving_pipeline - - def _format_doc_text(self, text: str) -> str: - return text.replace("\n", " ") - - def _format_retrieved_context(self, documents: List[RetrievedDocument]) -> str: - matched_texts: List[str] = [ - self._format_doc_text(doc.text) for doc in documents - ] - return "\n\n".join(matched_texts) - - def run(self, question: str, use_citation: bool = False) -> Document: - # retrieve relevant documents as context - documents = self.retrieving_pipeline(question, top_k=int(self.retrieval_top_k)) - context = self._format_retrieved_context(documents) - self.log_progress(".context", context=context) - - # generate the answer - prompt = self.qa_prompt_template.populate( - context=context, - question=question, - ) - self.log_progress(".prompt", prompt=prompt) - answer_text = self.llm(prompt).text - if use_citation: - # run citation pipeline - citation_pipeline = CitationPipeline(llm=self.llm) - citation = citation_pipeline(context=context, question=question) - else: - citation = None - - answer = Document(text=answer_text, metadata={"citation": citation}) - return answer - - -class AgentQAPipeline(QuestionAnsweringPipeline): - """ - QA pipeline ultilizing a child Retrieving pipeline and a Agent pipeline - """ - - agent: BaseAgent - - def add_search_tool(self): - search_tool = ComponentTool( - name="search_doc", - description=( - "A vector store that searches for similar and " - "related content " - f"in a document: {' '.join(self.file_name_list)}. " - "The result is a huge chunk of text related " - "to your search but can also " - "contain irrelevant info." - ), - postprocessor=self._format_retrieved_context, - component=self.retrieving_pipeline, - ) - if search_tool not in self.agent.plugins: - self.agent.add_tools([search_tool]) - - def run(self, question: str, use_citation: bool = False) -> Document: - kwargs = {"use_citation": use_citation} if use_citation else {} - answer = self.agent(question, **kwargs) - return answer diff --git a/knowledgehub/pipelines/retrieving.py b/knowledgehub/pipelines/retrieving.py deleted file mode 100644 index 941ff01..0000000 --- a/knowledgehub/pipelines/retrieving.py +++ /dev/null @@ -1,87 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Optional, Sequence - -from kotaemon.base import BaseComponent, Document, RetrievedDocument -from kotaemon.embeddings import BaseEmbeddings -from kotaemon.indices.rankings import BaseReranking -from kotaemon.storages import BaseDocumentStore, BaseVectorStore - -VECTOR_STORE_FNAME = "vectorstore" -DOC_STORE_FNAME = "docstore" - - -class RetrieveDocumentFromVectorStorePipeline(BaseComponent): - """Retrieve list of documents from vector store""" - - vector_store: BaseVectorStore - doc_store: BaseDocumentStore - embedding: BaseEmbeddings - rerankers: Sequence[BaseReranking] = [] - top_k: int = 1 - # TODO: refer to llama_index's storage as well - - def run( - self, text: str | Document, top_k: Optional[int] = None - ) -> list[RetrievedDocument]: - """Retrieve a list of documents from vector store - - Args: - text: the text to retrieve similar documents - top_k: number of top similar documents to return - - Returns: - list[RetrievedDocument]: list of retrieved documents - """ - if top_k is None: - top_k = self.top_k - - if self.doc_store is None: - raise ValueError( - "doc_store is not provided. Please provide a doc_store to " - "retrieve the documents" - ) - - emb: list[float] = self.embedding(text)[0].embedding - _, scores, ids = self.vector_store.query(embedding=emb, top_k=top_k) - docs = self.doc_store.get(ids) - result = [ - RetrievedDocument(**doc.to_dict(), score=score) - for doc, score in zip(docs, scores) - ] - # use additional reranker to re-order the document list - if self.rerankers: - for reranker in self.rerankers: - result = reranker(documents=result, query=text) - - return result - - def save( - self, - path: str | Path, - vectorstore_fname: str = VECTOR_STORE_FNAME, - docstore_fname: str = DOC_STORE_FNAME, - ): - """Save the whole state of the indexing pipeline vector store and all - necessary information to disk - - Args: - path (str): path to save the state - """ - if isinstance(path, str): - path = Path(path) - self.vector_store.save(path / vectorstore_fname) - self.doc_store.save(path / docstore_fname) - - def load( - self, - path: str | Path, - vectorstore_fname: str = VECTOR_STORE_FNAME, - docstore_fname: str = DOC_STORE_FNAME, - ): - """Load all information from disk to an object""" - if isinstance(path, str): - path = Path(path) - self.vector_store.load(path / vectorstore_fname) - self.doc_store.load(path / docstore_fname) diff --git a/knowledgehub/pipelines/utils.py b/knowledgehub/pipelines/utils.py deleted file mode 100644 index a7554f4..0000000 --- a/knowledgehub/pipelines/utils.py +++ /dev/null @@ -1,17 +0,0 @@ -import hashlib -from typing import List - - -def filename_to_hash(filename: str) -> str: - """ - Convert filename to hash to be used as collection name for storage - """ - result = hashlib.md5(filename.encode()) - return result.hexdigest() - - -def file_names_to_collection_name(file_name_list: List[str]) -> str: - """ - Convert list of filenames to collection name - """ - return filename_to_hash(" ".join(file_name_list)) diff --git a/tests/simple_pipeline.py b/tests/simple_pipeline.py index 7a0ed20..1e79d0b 100644 --- a/tests/simple_pipeline.py +++ b/tests/simple_pipeline.py @@ -5,8 +5,8 @@ from theflow.utils.modules import ObjectInitDeclaration as _ from kotaemon.base import BaseComponent from kotaemon.embeddings import AzureOpenAIEmbeddings +from kotaemon.indices import VectorRetrieval from kotaemon.llms.completions.openai import AzureOpenAI -from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline from kotaemon.storages import ChromaVectorStore @@ -20,16 +20,14 @@ class Pipeline(BaseComponent): request_timeout=60, ) - retrieving_pipeline: RetrieveDocumentFromVectorStorePipeline = ( - RetrieveDocumentFromVectorStorePipeline.withx( - vector_store=_(ChromaVectorStore).withx(path=str(tempfile.mkdtemp())), - embedding=AzureOpenAIEmbeddings.withx( - model="text-embedding-ada-002", - deployment="embedding-deployment", - openai_api_base="https://test.openai.azure.com/", - openai_api_key="some-key", - ), - ) + retrieving_pipeline: VectorRetrieval = VectorRetrieval.withx( + vector_store=_(ChromaVectorStore).withx(path=str(tempfile.mkdtemp())), + embedding=AzureOpenAIEmbeddings.withx( + model="text-embedding-ada-002", + deployment="embedding-deployment", + openai_api_base="https://test.openai.azure.com/", + openai_api_key="some-key", + ), ) def run_raw(self, text: str) -> str: diff --git a/tests/test_citation.py b/tests/test_citation.py index 3fba999..69fc544 100644 --- a/tests/test_citation.py +++ b/tests/test_citation.py @@ -4,8 +4,8 @@ from unittest.mock import patch import pytest from openai.types.chat.chat_completion import ChatCompletion +from kotaemon.indices.qa import CitationPipeline from kotaemon.llms.chats.openai import AzureChatOpenAI -from kotaemon.pipelines.citation import CitationPipeline function_output = '{\n "question": "What is the provided _example_ benefits?",\n "answer": [\n {\n "fact": "特約死亡保険金: 被保険者がこの特約の保険期間中に死亡したときに支払います。",\n "substring_quote": ["特約死亡保険金"]\n },\n {\n "fact": "特約特定疾病保険金: 被保険者がこの特約の保険期間中に特定の疾病(悪性新生物(がん)、急性心筋梗塞または脳卒中)により所定の状態に該当したときに支払います。",\n "substring_quote": ["特約特定疾病保険金"]\n },\n {\n "fact": "特約障害保険金: 被保険者がこの特約の保険期間中に傷害もしくは疾病により所定の身体障害の状態に該当したとき、または不慮の事故により所定の身体障害の状態に該当したときに支払います。",\n "substring_quote": ["特約障害保険金"]\n },\n {\n "fact": "特約介護保険金: 被保険者がこの特約の保険期間中に傷害または疾病により所定の要介護状態に該当したときに支払います。",\n "substring_quote": ["特約介護保険金"]\n }\n ]\n}' diff --git a/tests/test_cot.py b/tests/test_cot.py index 304d481..0f4320f 100644 --- a/tests/test_cot.py +++ b/tests/test_cot.py @@ -2,8 +2,8 @@ from unittest.mock import patch from openai.types.chat.chat_completion import ChatCompletion -from kotaemon.llms.chats.openai import AzureChatOpenAI -from kotaemon.pipelines.cot import ManualSequentialChainOfThought, Thought +from kotaemon.llms import AzureChatOpenAI +from kotaemon.llms.cot import ManualSequentialChainOfThought, Thought _openai_chat_completion_response = [ ChatCompletion.parse_obj( diff --git a/tests/test_indexing_retrieval.py b/tests/test_indexing_retrieval.py index 59ed090..367495f 100644 --- a/tests/test_indexing_retrieval.py +++ b/tests/test_indexing_retrieval.py @@ -7,8 +7,7 @@ from openai.resources.embeddings import Embeddings from kotaemon.base import Document from kotaemon.embeddings.openai import AzureOpenAIEmbeddings -from kotaemon.pipelines.indexing import IndexVectorStoreFromDocumentPipeline -from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline +from kotaemon.indices import VectorIndexing, VectorRetrieval from kotaemon.storages import ChromaVectorStore, InMemoryDocumentStore with open(Path(__file__).parent / "resources" / "embedding_openai.json") as f: @@ -30,9 +29,7 @@ def test_indexing(mock_openai_embedding, tmp_path): openai_api_key="some-key", ) - pipeline = IndexVectorStoreFromDocumentPipeline( - vector_store=db, embedding=embedding, doc_store=doc_store - ) + pipeline = VectorIndexing(vector_store=db, embedding=embedding, doc_store=doc_store) pipeline.doc_store = cast(InMemoryDocumentStore, pipeline.doc_store) pipeline.vector_store = cast(ChromaVectorStore, pipeline.vector_store) assert pipeline.vector_store._collection.count() == 0, "Expected empty collection" @@ -52,10 +49,10 @@ def test_retrieving(mock_openai_embedding, tmp_path): openai_api_key="some-key", ) - index_pipeline = IndexVectorStoreFromDocumentPipeline( + index_pipeline = VectorIndexing( vector_store=db, embedding=embedding, doc_store=doc_store ) - retrieval_pipeline = RetrieveDocumentFromVectorStorePipeline( + retrieval_pipeline = VectorRetrieval( vector_store=db, doc_store=doc_store, embedding=embedding ) diff --git a/tests/test_qa.py b/tests/test_qa.py deleted file mode 100644 index b6ff788..0000000 --- a/tests/test_qa.py +++ /dev/null @@ -1,73 +0,0 @@ -import json -from pathlib import Path -from unittest.mock import patch - -import pytest -from openai.resources.embeddings import Embeddings -from openai.types.chat.chat_completion import ChatCompletion - -from kotaemon.llms.chats.openai import AzureChatOpenAI -from kotaemon.pipelines.ingest import ReaderIndexingPipeline - -with open(Path(__file__).parent / "resources" / "embedding_openai.json") as f: - openai_embedding = json.load(f) - - -_openai_chat_completion_response = ChatCompletion.parse_obj( - { - "id": "chatcmpl-7qyuw6Q1CFCpcKsMdFkmUPUa7JP2x", - "object": "chat.completion", - "created": 1692338378, - "model": "gpt-35-turbo", - "system_fingerprint": None, - "choices": [ - { - "index": 0, - "finish_reason": "stop", - "message": { - "role": "assistant", - "content": "Hello! How can I assist you today?", - "function_call": None, - "tool_calls": None, - }, - } - ], - "usage": {"completion_tokens": 9, "prompt_tokens": 10, "total_tokens": 19}, - } -) - - -@pytest.fixture(scope="function") -def mock_openai_embedding(monkeypatch): - monkeypatch.setattr(Embeddings, "create", lambda *args, **kwargs: openai_embedding) - - -@patch( - "openai.resources.chat.completions.Completions.create", - side_effect=lambda *args, **kwargs: _openai_chat_completion_response, -) -def test_ingest_pipeline(patch, mock_openai_embedding, tmp_path): - indexing_pipeline = ReaderIndexingPipeline( - storage_path=tmp_path, - ) - indexing_pipeline.embedding.openai_api_key = "some-key" - input_file_path = Path(__file__).parent / "resources/dummy.pdf" - - # call ingestion pipeline - indexing_pipeline(input_file_path, force_reindex=True) - retrieving_pipeline = indexing_pipeline.to_retrieving_pipeline() - - results = retrieving_pipeline("This is a query") - assert len(results) == 1 - - # create llm - llm = AzureChatOpenAI( - openai_api_base="https://test.openai.azure.com/", - openai_api_key="some-key", - openai_api_version="2023-03-15-preview", - deployment_name="gpt35turbo", - temperature=0, - ) - qa_pipeline = indexing_pipeline.to_qa_pipeline(llm=llm, openai_api_key="some-key") - response = qa_pipeline("Summarize this document.") - assert response diff --git a/tests/test_tools.py b/tests/test_tools.py index 0bb8b39..42a270d 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -7,8 +7,7 @@ from openai.resources.embeddings import Embeddings from kotaemon.agents.tools import ComponentTool, GoogleSearchTool, WikipediaTool from kotaemon.base import Document from kotaemon.embeddings.openai import AzureOpenAIEmbeddings -from kotaemon.pipelines.indexing import IndexVectorStoreFromDocumentPipeline -from kotaemon.pipelines.retrieving import RetrieveDocumentFromVectorStorePipeline +from kotaemon.indices.vectorindex import VectorIndexing, VectorRetrieval from kotaemon.storages import ChromaVectorStore, InMemoryDocumentStore with open(Path(__file__).parent / "resources" / "embedding_openai.json") as f: @@ -46,10 +45,10 @@ def test_pipeline_tool(mock_openai_embedding, tmp_path): openai_api_key="some-key", ) - index_pipeline = IndexVectorStoreFromDocumentPipeline( + index_pipeline = VectorIndexing( vector_store=db, embedding=embedding, doc_store=doc_store ) - retrieval_pipeline = RetrieveDocumentFromVectorStorePipeline( + retrieval_pipeline = VectorRetrieval( vector_store=db, doc_store=doc_store, embedding=embedding )