Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ This allows:
* the reviewer to be very confident that the feature does what it is supposed to do before merging it into the code base.
* the contributors to be sure that they don't break already-merged features when refactoring or modifying the code base.

<a name="-enable-logging"></a>
## Enable logging
If you need to monitor and debug your code, you can enable docarray logging:
```python
import logging
logging.getLogger('docarray').setLevel(logging.DEBUG)
```

<a name="-compiling-protobuf"></a>
## Compiling protobuf

Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,38 @@ match = store.find(
)
```

## Enable logging
You can see more logs by setting the log level to `DEBUG` or `INFO`.

Example:
```python
from pydantic import Field
from docarray import BaseDocument
from docarray.doc_index.backends.hnswlib_doc_index import HnswDocumentIndex
from docarray.typing import NdArray
# import logging and set the level to DEBUG
import logging
logging.getLogger('docarray').setLevel(logging.DEBUG)


# define a simple document and create a document index
class SimpleDoc(BaseDocument):
vector: NdArray = Field(dim=10)

doc_store = HnswDocumentIndex[SimpleDoc](work_dir='temp_path/')
```

```console
INFO - docarray - DB config created
INFO - docarray - Runtime config created
DEBUG - docarray - Working directory set to temp_path/
WARNING - docarray - No index was created for `id` as it does not have a config
INFO - docarray - Created a new index for column `vector`
DEBUG - docarray - DB path set to temp_path/docs_sqlite.db
INFO - docarray - Connection to DB has been established
INFO - docarray - HnswDocumentIndex[SimpleDoc] has been initialized
```

## Install the alpha

to try out the alpha you can install it via git:
Expand Down
8 changes: 8 additions & 0 deletions docarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,13 @@

from docarray.array import DocumentArray, DocumentArrayStacked
from docarray.base_document.document import BaseDocument
import logging

__all__ = ['BaseDocument', 'DocumentArray', 'DocumentArrayStacked']

logger = logging.getLogger('docarray')

handler = logging.StreamHandler()
formatter = logging.Formatter("%(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
26 changes: 23 additions & 3 deletions docarray/doc_index/abstract_doc_index.py
Comment thread
jupyterjazz marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from docarray.typing import AnyTensor
from docarray.utils.find import FindResult, _FindResult
from docarray.utils.misc import torch_imported
import logging

if TYPE_CHECKING:
from pydantic.fields import ModelField
Expand Down Expand Up @@ -87,10 +88,13 @@ def __init__(self, db_config=None, **kwargs):
'A DocumentIndex must be typed with a Document type.'
'To do so, use the syntax: DocumentIndex[DocumentType]'
)
self._logger = logging.getLogger('docarray')
Comment thread
samsja marked this conversation as resolved.
self._db_config = db_config or self.DBConfig(**kwargs)
if not isinstance(self._db_config, self.DBConfig):
raise ValueError(f'db_config must be of type {self.DBConfig}')
self._logger.info('DB config created')
Comment thread
samsja marked this conversation as resolved.
self._runtime_config = self.RuntimeConfig()
self._logger.info('Runtime config created')
self._column_infos: Dict[str, _ColumnInfo] = self._create_column_infos(
self._schema
)
Expand Down Expand Up @@ -346,6 +350,7 @@ def __delitem__(self, key: Union[str, Sequence[str]]):

:param key: id or ids to delete from the Document index
"""
self._logger.info(f'Deleting documents with id(s) {key} from the index')
if isinstance(key, str):
key = [key]
self._del_items(key)
Expand All @@ -371,10 +376,15 @@ def configure(self, runtime_config=None, **kwargs):
def index(self, docs: Union[BaseDocument, Sequence[BaseDocument]], **kwargs):
"""index Documents into the index.

:param docs: Documents to index. NOTE: passing a Sequence of Documents that is
not a DocumentArray comes at a performance penalty, since compatibility
with the Index's schema need to be checked for every Document individually.
:param docs: Documents to index.
"""
if not isinstance(docs, (BaseDocument, DocumentArray)):
self._logger.warning(
'Passing a sequence of Documents that is not a DocumentArray comes at '
'a performance penalty, since compatibility with the schema of Index '
'needs to be checked for every Document individually.'
)
self._logger.debug(f'Indexing {len(docs)} documents')
docs_validated = self._validate_docs(docs)
data_by_columns = self._get_col_value_dict(docs_validated)
self._index(data_by_columns, **kwargs)
Expand All @@ -397,6 +407,7 @@ def find(
:param limit: maximum number of documents to return
:return: a named tuple containing `documents` and `scores`
"""
self._logger.debug(f'Executing `find` for search field {search_field}')
if isinstance(query, BaseDocument):
query_vec = self._get_values_by_column([query], search_field)[0]
else:
Expand Down Expand Up @@ -430,6 +441,7 @@ def find_batched(
:param limit: maximum number of documents to return per query
:return: a named tuple containing `documents` and `scores`
"""
self._logger.debug(f'Executing `find_batched` for search field {search_field}')
if isinstance(queries, Sequence):
query_vec_list = self._get_values_by_column(queries, search_field)
query_vec_np = np.stack(
Expand Down Expand Up @@ -459,6 +471,7 @@ def filter(
:param limit: maximum number of documents to return
:return: a DocumentArray containing the documents that match the filter query
"""
self._logger.debug(f'Executing `filter` for the query {filter_query}')
docs = self._filter(filter_query, limit=limit, **kwargs)

if isinstance(docs, List):
Expand All @@ -478,6 +491,9 @@ def filter_batched(
:param limit: maximum number of documents to return
:return: a DocumentArray containing the documents that match the filter query
"""
self._logger.debug(
f'Executing `filter_batched` for the queries {filter_queries}'
)
da_list = self._filter_batched(filter_queries, limit=limit, **kwargs)

if len(da_list) > 0 and isinstance(da_list[0], List):
Expand All @@ -499,6 +515,7 @@ def text_search(
:param limit: maximum number of documents to return
:return: a named tuple containing `documents` and `scores`
"""
self._logger.debug(f'Executing `text_search` for search field {search_field}')
if isinstance(query, BaseDocument):
query_text = self._get_values_by_column([query], search_field)[0]
else:
Expand Down Expand Up @@ -526,6 +543,9 @@ def text_search_batched(
:param limit: maximum number of documents to return
:return: a named tuple containing `documents` and `scores`
"""
self._logger.debug(
f'Executing `text_search_batched` for search field {search_field}'
)
if isinstance(queries[0], BaseDocument):
query_docs: Sequence[BaseDocument] = cast(Sequence[BaseDocument], queries)
query_texts: Sequence[str] = self._get_values_by_column(
Expand Down
17 changes: 14 additions & 3 deletions docarray/doc_index/backends/hnswlib_doc_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, db_config=None, **kwargs):
super().__init__(db_config=db_config, **kwargs)
self._db_config = cast(HnswDocumentIndex.DBConfig, self._db_config)
self._work_dir = self._db_config.work_dir
self._logger.debug(f'Working directory set to {self._work_dir}')
load_existing = os.path.exists(self._work_dir) and os.listdir(self._work_dir)
Path(self._work_dir).mkdir(parents=True, exist_ok=True)

Expand All @@ -83,18 +84,26 @@ def __init__(self, db_config=None, **kwargs):
self._hnsw_indices = {}
for col_name, col in self._column_infos.items():
if not col.config:
continue # do not create column index if no config is given
self._logger.warning(
f'No index was created for `{col_name}` as it does not have a config'
)
continue
if load_existing:
self._hnsw_indices[col_name] = self._load_index(col_name, col)
self._logger.info(f'Loading an existing index for column `{col_name}`')
else:
self._hnsw_indices[col_name] = self._create_index(col)
self._logger.info(f'Created a new index for column `{col_name}`')

# SQLite setup
self._sqlite_db_path = os.path.join(self._work_dir, 'docs_sqlite.db')
self._logger.debug(f'DB path set to {self._sqlite_db_path}')
self._sqlite_conn = sqlite3.connect(self._sqlite_db_path)
self._logger.info('Connection to DB has been established')
self._sqlite_cursor = self._sqlite_conn.cursor()
self._create_docs_table()
self._sqlite_conn.commit()
self._logger.info(f'{self.__class__.__name__} has been initialized')

###############################################
# Inner classes for query builder and configs #
Expand Down Expand Up @@ -161,10 +170,11 @@ def index(self, docs: Union[BaseDocument, Sequence[BaseDocument]], **kwargs):
"""index a document into the store"""
if kwargs:
raise ValueError(f'{list(kwargs.keys())} are not valid keyword arguments')

self._logger.debug(f'Indexing {len(docs)} documents')
docs_validated = self._validate_docs(docs)
data_by_columns = self._get_col_value_dict(docs_validated)
hashed_ids = tuple(self._to_hashed_id(doc.id) for doc in docs_validated)

# indexing into HNSWLib and SQLite sequentially
# could be improved by processing in parallel
for col_name, index in self._hnsw_indices.items():
Expand Down Expand Up @@ -196,13 +206,15 @@ def execute_query(self, query: List[Tuple[str, Dict]], *args, **kwargs) -> Any:
elif op == 'filter':
filter_conditions.append(op_kwargs['filter_query'])

self._logger.debug(f'Executing query {query}')
docs_filtered = ann_docs
for cond in filter_conditions:
da_cls = DocumentArray.__class_getitem__(
cast(Type[BaseDocument], self._schema)
)
docs_filtered = da_cls(filter_docs(docs_filtered, cond))

self._logger.debug(f'{len(docs_filtered)} results found')
docs_and_scores = zip(
docs_filtered, (doc_to_score[doc.id] for doc in docs_filtered)
)
Expand Down Expand Up @@ -240,7 +252,6 @@ def _filter(
filter_query: Any,
limit: int,
) -> DocumentArray:

raise NotImplementedError(
f'{type(self)} does not support filter-only queries.'
f' To perform post-filtering on a query, use'
Expand Down
8 changes: 8 additions & 0 deletions tests/doc_index/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest
import logging


@pytest.fixture(autouse=True)
def set_logger_level():
logger = logging.getLogger('docarray')
logger.setLevel(logging.INFO)