kotaemon/knowledgehub/pipelines/retrieving.py
2023-11-14 11:51:10 +07:00

77 lines
2.4 KiB
Python

from __future__ import annotations
from pathlib import Path
from theflow import Node, Param
from ..base import BaseComponent
from ..base.schema import Document, RetrievedDocument
from ..docstores import BaseDocumentStore
from ..embeddings import BaseEmbeddings
from ..vectorstores import BaseVectorStore
VECTOR_STORE_FNAME = "vectorstore"
DOC_STORE_FNAME = "docstore"
class RetrieveDocumentFromVectorStorePipeline(BaseComponent):
"""Retrieve list of documents from vector store"""
vector_store: Param[BaseVectorStore] = Param()
doc_store: Param[BaseDocumentStore] = Param()
embedding: Node[BaseEmbeddings] = Node()
# TODO: refer to llama_index's storage as well
def run(self, text: str | Document, top_k: int = 1) -> list[RetrievedDocument]:
"""Retrieve a list of documents from vector store
Args:
text: the text to retrieve similar documents
Returns:
list[RetrievedDocument]: list of retrieved documents
"""
if self.doc_store is None:
raise ValueError(
"doc_store is not provided. Please provide a doc_store to "
"retrieve the documents"
)
emb: list[float] = self.embedding(text)[0]
_, scores, ids = self.vector_store.query(embedding=emb, top_k=top_k)
docs = self.doc_store.get(ids)
result = [
RetrievedDocument(**doc.to_dict(), score=score)
for doc, score in zip(docs, scores)
]
return result
def save(
self,
path: str | Path,
vectorstore_fname: str = VECTOR_STORE_FNAME,
docstore_fname: str = DOC_STORE_FNAME,
):
"""Save the whole state of the indexing pipeline vector store and all
necessary information to disk
Args:
path (str): path to save the state
"""
if isinstance(path, str):
path = Path(path)
self.vector_store.save(path / vectorstore_fname)
self.doc_store.save(path / docstore_fname)
def load(
self,
path: str | Path,
vectorstore_fname: str = VECTOR_STORE_FNAME,
docstore_fname: str = DOC_STORE_FNAME,
):
"""Load all information from disk to an object"""
if isinstance(path, str):
path = Path(path)
self.vector_store.load(path / vectorstore_fname)
self.doc_store.load(path / docstore_fname)