From 75f98f5fd15bd1d96830715a73d31765b69d1eb3 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Fri, 15 Sep 2023 18:24:05 +0200 Subject: [PATCH 1/2] feat: remove JAC Signed-off-by: Joan Fontanals Martinez --- .github/workflows/ci.yml | 52 --- docarray/array/doc_list/pushpull.py | 28 +- docarray/store/__init__.py | 6 +- docarray/store/abstract_doc_store.py | 10 +- docarray/store/file.py | 13 +- docarray/store/jac.py | 370 ------------------ docarray/store/s3.py | 13 +- docarray/utils/_internal/misc.py | 1 - .../user_guide/storing/doc_store/store_jac.md | 58 --- docs/user_guide/storing/first_step.md | 5 +- tests/integrations/store/test_jac.py | 245 ------------ tests/integrations/store/test_s3.py | 2 +- 12 files changed, 13 insertions(+), 790 deletions(-) delete mode 100644 docarray/store/jac.py delete mode 100644 docs/user_guide/storing/doc_store/store_jac.md delete mode 100644 tests/integrations/store/test_jac.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ed23060455..6233b57af5a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -138,58 +138,6 @@ jobs: flags: ${{ steps.test.outputs.codecov_flag }} fail_ci_if_error: false - - - docarray-test-jac: - needs: [import-test] - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: [3.8] - pydantic-version: ["pydantic-v2", "pydantic-v1"] - steps: - - uses: actions/checkout@v2.5.0 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - name: Prepare environment - run: | - python -m pip install --upgrade pip - python -m pip install poetry - poetry install --all-extras - ./scripts/install_pydantic_v2.sh ${{ matrix.pydantic-version }} - poetry run pip install elasticsearch==8.6.2 - poetry run pip uninstall -y torch - poetry run pip install torch - sudo apt-get update - sudo apt-get install --no-install-recommends ffmpeg - - - name: Test - id: test - run: | - poetry run pytest -m "not (tensorflow or benchmark or index or jax)" --cov=docarray --cov-report=xml tests/integrations/store/test_jac.py - echo "flag it as docarray for codeoverage" - echo "codecov_flag=docarray" >> $GITHUB_OUTPUT - timeout-minutes: 30 - env: - JINA_AUTH_TOKEN: "${{ secrets.JINA_AUTH_TOKEN }}" - - name: Check codecov file - id: check_files - uses: andstor/file-existence-action@v1 - with: - files: "coverage.xml" - - name: Upload coverage from test to Codecov - uses: codecov/codecov-action@v3.1.1 - if: steps.check_files.outputs.files_exists == 'true' && ${{ matrix.python-version }} == '3.8' - with: - file: coverage.xml - name: benchmark-test-codecov - flags: ${{ steps.test.outputs.codecov_flag }} - fail_ci_if_error: false - - docarray-test-proto3: needs: [import-test] runs-on: ubuntu-latest diff --git a/docarray/array/doc_list/pushpull.py b/docarray/array/doc_list/pushpull.py index 2bfe6764061..5784610633b 100644 --- a/docarray/array/doc_list/pushpull.py +++ b/docarray/array/doc_list/pushpull.py @@ -5,7 +5,6 @@ Dict, Iterable, Iterator, - Optional, Tuple, Type, TypeVar, @@ -15,7 +14,7 @@ from typing_extensions import Literal from typing_inspect import get_args -PUSH_PULL_PROTOCOL = Literal['jac', 's3', 'file'] +PUSH_PULL_PROTOCOL = Literal['s3', 'file'] SUPPORTED_PUSH_PULL_PROTOCOLS = get_args(PUSH_PULL_PROTOCOL) if TYPE_CHECKING: # pragma: no cover @@ -55,18 +54,13 @@ def get_pushpull_backend( """ Get the backend for the given protocol. - :param protocol: the protocol to use, e.g. 'jac', 'file', 's3' + :param protocol: the protocol to use, e.g. 'file', 's3' :return: the backend class """ if protocol in cls.__backends__: return cls.__backends__[protocol] - if protocol == 'jac': - from docarray.store.jac import JACDocStore - - cls.__backends__[protocol] = JACDocStore - logging.debug('Loaded Jina AI Cloud backend') - elif protocol == 'file': + if protocol == 'file': from docarray.store.file import FileDocStore cls.__backends__[protocol] = FileDocStore @@ -84,22 +78,18 @@ def get_pushpull_backend( def push( self, url: str, - public: bool = True, show_progress: bool = False, - branding: Optional[Dict] = None, + **kwargs, ) -> Dict: """Push this `DocList` object to the specified url. :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name`` - :param public: Only used by ``jac`` protocol. If true, anyone can pull a `DocList` if they know its name. - Setting this to false will restrict access to only the creator. :param show_progress: If true, a progress bar will be displayed. - :param branding: Only used by ``jac`` protocol. A dictionary of branding information to be sent to Jina AI Cloud. {"icon": "emoji", "background": "#fff"} """ logging.info(f'Pushing {len(self)} docs to {url}') protocol, name = self.__class__.resolve_url(url) return self.__class__.get_pushpull_backend(protocol).push( - self, name, public, show_progress, branding # type: ignore + self, name, show_progress # type: ignore ) @classmethod @@ -107,23 +97,17 @@ def push_stream( cls: Type[SelfPushPullMixin], docs: Iterator['BaseDoc'], url: str, - public: bool = True, show_progress: bool = False, - branding: Optional[Dict] = None, ) -> Dict: """Push a stream of documents to the specified url. :param docs: a stream of documents :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name`` - :param public: Only used by ``jac`` protocol. If true, anyone can pull a `DocList` if they know its name. :param show_progress: If true, a progress bar will be displayed. - :param branding: Only used by ``jac`` protocol. A dictionary of branding information to be sent to Jina AI Cloud. {"icon": "emoji", "background": "#fff"} """ logging.info(f'Pushing stream to {url}') protocol, name = cls.resolve_url(url) - return cls.get_pushpull_backend(protocol).push_stream( - docs, name, public, show_progress, branding - ) + return cls.get_pushpull_backend(protocol).push_stream(docs, name, show_progress) @classmethod def pull( diff --git a/docarray/store/__init__.py b/docarray/store/__init__.py index 9547db27c3e..42e7025ce85 100644 --- a/docarray/store/__init__.py +++ b/docarray/store/__init__.py @@ -8,7 +8,6 @@ ) if TYPE_CHECKING: - from docarray.store.jac import JACDocStore # noqa: F401 from docarray.store.s3 import S3DocStore # noqa: F401 __all__ = ['FileDocStore'] @@ -16,10 +15,7 @@ def __getattr__(name: str): lib: types.ModuleType - if name == 'JACDocStore': - import_library('hubble', raise_error=True) - import docarray.store.jac as lib - elif name == 'S3DocStore': + if name == 'S3DocStore': import_library('smart_open', raise_error=True) import_library('botocore', raise_error=True) import_library('boto3', raise_error=True) diff --git a/docarray/store/abstract_doc_store.py b/docarray/store/abstract_doc_store.py index df7788f584a..76610aa2ce4 100644 --- a/docarray/store/abstract_doc_store.py +++ b/docarray/store/abstract_doc_store.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict, Iterator, List, Optional, Type +from typing import Dict, Iterator, List, Type from typing_extensions import TYPE_CHECKING @@ -35,17 +35,13 @@ def delete(name: str, missing_ok: bool) -> bool: def push( docs: 'DocList', name: str, - public: bool, show_progress: bool, - branding: Optional[Dict], ) -> Dict: """Push this DocList to the specified name. :param docs: The DocList to push :param name: The name to push to - :param public: Whether the DocList should be publicly accessible :param show_progress: If true, a progress bar will be displayed. - :param branding: Branding information to be stored with the DocList """ ... @@ -54,17 +50,13 @@ def push( def push_stream( docs: Iterator['BaseDoc'], url: str, - public: bool = True, show_progress: bool = False, - branding: Optional[Dict] = None, ) -> Dict: """Push a stream of documents to the specified name. :param docs: a stream of documents :param url: The name to push to - :param public: Whether the DocList should be publicly accessible :param show_progress: If true, a progress bar will be displayed. - :param branding: Branding information to be stored with the DocList """ ... diff --git a/docarray/store/file.py b/docarray/store/file.py index 6c46c3ab615..91d4e3737a6 100644 --- a/docarray/store/file.py +++ b/docarray/store/file.py @@ -98,40 +98,29 @@ def push( cls: Type[SelfFileDocStore], docs: 'DocList', name: str, - public: bool, show_progress: bool, - branding: Optional[Dict], ) -> Dict: """Push this [`DocList`][docarray.DocList] object to the specified file path. :param docs: The `DocList` to push. :param name: The file path to push to. - :param public: Not used by the ``file`` protocol. :param show_progress: If true, a progress bar will be displayed. - :param branding: Not used by the ``file`` protocol. """ - return cls.push_stream(iter(docs), name, public, show_progress, branding) + return cls.push_stream(iter(docs), name, show_progress) @classmethod def push_stream( cls: Type[SelfFileDocStore], docs: Iterator['BaseDoc'], name: str, - public: bool = True, show_progress: bool = False, - branding: Optional[Dict] = None, ) -> Dict: """Push a stream of documents to the specified file path. :param docs: a stream of documents :param name: The file path to push to. - :param public: Not used by the ``file`` protocol. :param show_progress: If true, a progress bar will be displayed. - :param branding: Not used by the ``file`` protocol. """ - if branding is not None: - logging.warning('branding is not supported for "file" protocol') - source = _to_binary_stream( docs, protocol='protobuf', compress='gzip', show_progress=show_progress ) diff --git a/docarray/store/jac.py b/docarray/store/jac.py deleted file mode 100644 index 9fea6614c6d..00000000000 --- a/docarray/store/jac.py +++ /dev/null @@ -1,370 +0,0 @@ -import json -import logging -import os -from pathlib import Path -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Iterator, - List, - Optional, - Type, - TypeVar, - Union, -) - -from docarray.store.abstract_doc_store import AbstractDocStore -from docarray.store.helpers import ( - _BufferedCachingRequestReader, - get_version_info, - raise_req_error, -) -from docarray.utils._internal.cache import _get_cache_path -from docarray.utils._internal.misc import import_library - -if TYPE_CHECKING: # pragma: no cover - import io - - from docarray import BaseDoc, DocList - -if TYPE_CHECKING: - import hubble - from hubble import Client as HubbleClient - from hubble.client.endpoints import EndpointsV2 -else: - hubble = import_library('hubble', raise_error=True) - HubbleClient = hubble.Client - EndpointsV2 = hubble.client.endpoints.EndpointsV2 - - -def _get_length_from_summary(summary: List[Dict]) -> Optional[int]: - """Get the length from summary.""" - for item in summary: - if 'Length' == item['name']: - return item['value'] - raise ValueError('Length not found in summary') - - -def _get_raw_summary(self: 'DocList') -> List[Dict[str, Any]]: - items: List[Dict[str, Any]] = [ - dict( - name='Type', - value=self.__class__.__name__, - description='The type of the DocList', - ), - dict( - name='Length', - value=len(self), - description='The length of the DocList', - ), - dict( - name='Homogenous Documents', - value=True, - description='Whether all documents are of the same structure, attributes', - ), - dict( - name='Fields', - value=tuple(self[0].__class__._docarray_fields().keys()), - description='The fields of the Document', - ), - dict( - name='Multimodal dataclass', - value=True, - description='Whether all documents are multimodal', - ), - ] - - return items - - -SelfJACDocStore = TypeVar('SelfJACDocStore', bound='JACDocStore') - - -class JACDocStore(AbstractDocStore): - """Class to push and pull [`DocList`][docarray.DocList] to and from Jina AI Cloud.""" - - @staticmethod - @hubble.login_required - def list(namespace: str = '', show_table: bool = False) -> List[str]: - """List all available arrays in the cloud. - - :param namespace: Not supported for Jina AI Cloud. - :param show_table: if true, show the table of the arrays. - :returns: List of available DocList's names. - """ - if len(namespace) > 0: - logging.warning('Namespace is not supported for Jina AI Cloud.') - from rich import print - - result = [] - from rich import box - from rich.table import Table - - resp = HubbleClient(jsonify=True).list_artifacts( - filter={'type': 'documentArray'}, - sort={'createdAt': 1}, - pageSize=10000, - ) - - table = Table( - title=f'You have {resp["meta"]["total"]} DocList on the cloud', - box=box.SIMPLE, - highlight=True, - ) - table.add_column('Name') - table.add_column('Length') - table.add_column('Access') - table.add_column('Created at', justify='center') - table.add_column('Updated at', justify='center') - - for docs in resp['data']: - result.append(docs['name']) - - table.add_row( - docs['name'], - str(_get_length_from_summary(docs['metaData'].get('summary', []))), - docs['visibility'], - docs['createdAt'], - docs['updatedAt'], - ) - - if show_table: - print(table) - return result - - @staticmethod - @hubble.login_required - def delete(name: str, missing_ok: bool = True) -> bool: - """ - Delete a [`DocList`][docarray.DocList] from the cloud. - :param name: the name of the DocList to delete. - :param missing_ok: if true, do not raise an error if the DocList does not exist. - :return: True if the DocList was deleted, False if it did not exist. - """ - try: - HubbleClient(jsonify=True).delete_artifact(name=name) - except hubble.excepts.RequestedEntityNotFoundError: - if missing_ok: - return False - else: - raise - return True - - @staticmethod - @hubble.login_required - def push( - docs: 'DocList', - name: str, - public: bool = True, - show_progress: bool = False, - branding: Optional[Dict] = None, - ) -> Dict: - """Push this [`DocList`][docarray.DocList] object to Jina AI Cloud - - !!! note - - Push with the same ``name`` will override the existing content. - - Kinda like a public clipboard where everyone can override anyone's content. - So to make your content survive longer, you may want to use longer & more complicated name. - - The lifetime of the content is not promised atm, could be a day, could be a week. Do not use it for - persistence. Only use this full temporary transmission/storage/clipboard. - - :param docs: The `DocList` to push. - :param name: A name that can later be used to retrieve this `DocList`. - :param public: By default, anyone can pull a `DocList` if they know its name. - Setting this to false will restrict access to only the creator. - :param show_progress: If true, a progress bar will be displayed. - :param branding: A dictionary of branding information to be sent to Jina Cloud. e.g. {"icon": "emoji", "background": "#fff"} - """ - import requests - import urllib3 - - delimiter = os.urandom(32) - - data, ctype = urllib3.filepost.encode_multipart_formdata( - { - 'file': ( - 'DocumentArray', - delimiter, - ), - 'name': name, - 'type': 'documentArray', - 'public': public, - 'metaData': json.dumps( - { - 'summary': _get_raw_summary(docs), - 'branding': branding, - 'version': get_version_info(), - }, - sort_keys=True, - ), - } - ) - - headers = { - 'Content-Type': ctype, - } - - auth_token = hubble.get_token() - if auth_token: - headers['Authorization'] = f'token {auth_token}' - - _head, _tail = data.split(delimiter) - - def gen(): - yield _head - binary_stream = docs._to_binary_stream( - protocol='protobuf', compress='gzip', show_progress=show_progress - ) - while True: - try: - yield next(binary_stream) - except StopIteration: - break - yield _tail - - response = requests.post( - HubbleClient()._base_url + EndpointsV2.upload_artifact, - data=gen(), - headers=headers, - ) - - if response.ok: - return response.json()['data'] - else: - if response.status_code >= 400 and 'readableMessage' in response.json(): - response.reason = response.json()['readableMessage'] - raise_req_error(response) - - @classmethod - @hubble.login_required - def push_stream( - cls: Type[SelfJACDocStore], - docs: Iterator['BaseDoc'], - name: str, - public: bool = True, - show_progress: bool = False, - branding: Optional[Dict] = None, - ) -> Dict: - """Push a stream of documents to Jina AI Cloud - - !!! note - - Push with the same ``name`` will override the existing content. - - Kinda like a public clipboard where everyone can override anyone's content. - So to make your content survive longer, you may want to use longer & more complicated name. - - The lifetime of the content is not promised atm, could be a day, could be a week. Do not use it for - persistence. Only use this full temporary transmission/storage/clipboard. - - :param docs: a stream of documents - :param name: A name that can later be used to retrieve this `DocList`. - :param public: By default, anyone can pull a `DocList` if they know its name. - Setting this to false will restrict access to only the creator. - :param show_progress: If true, a progress bar will be displayed. - :param branding: A dictionary of branding information to be sent to Jina Cloud. e.g. {"icon": "emoji", "background": "#fff"} - """ - from docarray import DocList - - # This is a temporary solution to push a stream of documents - # The memory footprint is not ideal - # But it must be done this way for now because Hubble expects to know the length of the DocList - # before it starts receiving the documents - first_doc = next(docs) - _docs = DocList[first_doc.__class__]([first_doc]) # type: ignore - for doc in docs: - _docs.append(doc) - return cls.push(_docs, name, public, show_progress, branding) - - @staticmethod - @hubble.login_required - def pull( - cls: Type['DocList'], - name: str, - show_progress: bool = False, - local_cache: bool = True, - ) -> 'DocList': - """Pull a [`DocList`][docarray.DocList] from Jina AI Cloud to local. - - :param name: the upload name set during `.push` - :param show_progress: if true, display a progress bar. - :param local_cache: store the downloaded DocList to local folder - :return: a [`DocList`][docarray.DocList] object - """ - from docarray import DocList - - return DocList[cls.doc_type]( # type: ignore - JACDocStore.pull_stream(cls, name, show_progress, local_cache) - ) - - @staticmethod - @hubble.login_required - def pull_stream( - cls: Type['DocList'], - name: str, - show_progress: bool = False, - local_cache: bool = False, - ) -> Iterator['BaseDoc']: - """Pull a [`DocList`][docarray.DocList] from Jina AI Cloud to local. - - :param name: the upload name set during `.push` - :param show_progress: if true, display a progress bar. - :param local_cache: store the downloaded DocList to local folder - :return: An iterator of Documents - """ - import requests - - headers = {} - - auth_token = hubble.get_token() - - if auth_token: - headers['Authorization'] = f'token {auth_token}' - - url = HubbleClient()._base_url + EndpointsV2.download_artifact + f'?name={name}' - response = requests.get(url, headers=headers) - - if response.ok: - url = response.json()['data']['download'] - else: - response.raise_for_status() - - with requests.get( - url, - stream=True, - ) as r: - from contextlib import nullcontext - - r.raise_for_status() - save_name = name.replace('/', '_') - - tmp_cache_file = Path(f'/tmp/{save_name}.docs') - _source: Union[ - _BufferedCachingRequestReader, io.BufferedReader - ] = _BufferedCachingRequestReader(r, tmp_cache_file) - - cache_file = _get_cache_path() / f'{save_name}.docs' - if local_cache and cache_file.exists(): - _cache_len = cache_file.stat().st_size - if _cache_len == int(r.headers['Content-length']): - if show_progress: - print(f'Loading from local cache {cache_file}') - _source = open(cache_file, 'rb') - r.close() - - docs = cls._load_binary_stream( - nullcontext(_source), # type: ignore - protocol='protobuf', - compress='gzip', - show_progress=show_progress, - ) - try: - while True: - yield next(docs) - except StopIteration: - pass - - if local_cache: - if isinstance(_source, _BufferedCachingRequestReader): - Path(_get_cache_path()).mkdir(parents=True, exist_ok=True) - tmp_cache_file.rename(cache_file) - else: - _source.close() diff --git a/docarray/store/s3.py b/docarray/store/s3.py index 2ebb864fc8d..5b2e4ae6f4b 100644 --- a/docarray/store/s3.py +++ b/docarray/store/s3.py @@ -121,39 +121,28 @@ def push( cls: Type[SelfS3DocStore], docs: 'DocList', name: str, - public: bool = False, show_progress: bool = False, - branding: Optional[Dict] = None, ) -> Dict: """Push this [`DocList`][docarray.DocList] object to the specified bucket and key. :param docs: The `DocList` to push. :param name: The bucket and key to push to. e.g. my_bucket/my_key - :param public: Not used by the ``s3`` protocol. :param show_progress: If true, a progress bar will be displayed. - :param branding: Not used by the ``s3`` protocol. """ - return cls.push_stream(iter(docs), name, public, show_progress, branding) + return cls.push_stream(iter(docs), name, show_progress) @staticmethod def push_stream( docs: Iterator['BaseDoc'], name: str, - public: bool = True, show_progress: bool = False, - branding: Optional[Dict] = None, ) -> Dict: """Push a stream of documents to the specified bucket and key. :param docs: a stream of documents :param name: The bucket and key to push to. e.g. my_bucket/my_key - :param public: Not used by the ``s3`` protocol. :param show_progress: If true, a progress bar will be displayed. - :param branding: Not used by the ``s3`` protocol. """ - if branding is not None: - logging.warning("Branding is not supported for S3 push") - bucket, name = name.split('/', 1) binary_stream = _to_binary_stream( docs, protocol='pickle', compress=None, show_progress=show_progress diff --git a/docarray/utils/_internal/misc.py b/docarray/utils/_internal/misc.py index c753ce303ea..5665f922fe0 100644 --- a/docarray/utils/_internal/misc.py +++ b/docarray/utils/_internal/misc.py @@ -45,7 +45,6 @@ 'fastapi': '"docarray[web]"', 'torch': '"docarray[torch]"', 'tensorflow': 'protobuf==3.19.0 tensorflow', - 'hubble': '"docarray[jac]"', 'smart_open': '"docarray[aws]"', 'boto3': '"docarray[aws]"', 'botocore': '"docarray[aws]"', diff --git a/docs/user_guide/storing/doc_store/store_jac.md b/docs/user_guide/storing/doc_store/store_jac.md deleted file mode 100644 index 202d89ba4b2..00000000000 --- a/docs/user_guide/storing/doc_store/store_jac.md +++ /dev/null @@ -1,58 +0,0 @@ -# Store on Jina AI Cloud - -When you want to use your [`DocList`][docarray.DocList] in another place, you can use: -- the [`.push()`][docarray.array.doc_list.pushpull.PushPullMixin.push] method to push the `DocList` to Jina AI Cloud . -- the [`.pull()`][docarray.array.doc_list.pushpull.PushPullMixin.pull] function to pull its content back. - -!!! note - To store documents on Jina AI Cloud, you need to install the extra dependency with the following line: - - ```cmd - pip install "docarray[jac]" - ``` - -## Push and pull - -To use the store [`DocList`][docarray.DocList] on Jina AI Cloud, you need to pass a Jina AI Cloud path to the function starting with `'jac://'`. - -Before getting started, create an account at [Jina AI Cloud](http://cloud.jina.ai/) and a [Personal Access Token (PAT)](https://cloud.jina.ai/settings/tokens). - -```python -from docarray import BaseDoc, DocList -import os - - -class SimpleDoc(BaseDoc): - text: str - - -os.environ['JINA_AUTH_TOKEN'] = 'YOUR_PAT' -DL_NAME = 'simple-dl' -dl = DocList[SimpleDoc]([SimpleDoc(text=f'doc {i}') for i in range(8)]) - -# push to Jina AI Cloud -dl.push(f'jac://{DL_NAME}') - -# pull from Jina AI Cloud -dl_pull = DocList[SimpleDoc].pull(f'jac://{DL_NAME}') -``` - -## Push and pull with streaming - -When you have a large amount of documents to push and pull, you can use the streaming function. -[`.push_stream()`][docarray.array.doc_list.pushpull.PushPullMixin.push_stream] and -[`.pull_stream()`][docarray.array.doc_list.pushpull.PushPullMixin.pull_stream] stream the -[`DocList`][docarray.DocList] to save memory usage. -You can set multiple `DocList` to pull from the same source as well. -The usage is the same as streaming with local files. -Please refer to [push and pull with streaming with local files](store_file.md#push-and-pull-with-streaming). - -## Delete - -To delete the store, you need to use the static method [`.delete()`][docarray.store.jac.JACDocStore.delete] of [`JACDocStore`][docarray.store.jac.JACDocStore] class: - -```python -from docarray.store import JACDocStore - -JACDocStore.delete(f'jac://{DL_NAME}') -``` diff --git a/docs/user_guide/storing/first_step.md b/docs/user_guide/storing/first_step.md index 836f12646d1..9cd9b7e3e4d 100644 --- a/docs/user_guide/storing/first_step.md +++ b/docs/user_guide/storing/first_step.md @@ -14,13 +14,12 @@ DocArray offers two ways of storing your data, each of which have their own docu [`.push()`][docarray.array.doc_list.pushpull.PushPullMixin.push] and [`.pull()`][docarray.array.doc_list.pushpull.PushPullMixin.pull] methods. Under the hood, [DocStore][docarray.store.abstract_doc_store.AbstractDocStore] is used to persist a `DocList`. -You can either store your documents on-disk or upload them to [AWS S3](https://aws.amazon.com/s3/), -[minio](https://min.io) or [Jina AI Cloud](https://cloud.jina.ai/user/storage). +You can either store your documents on-disk or upload them to [AWS S3](https://aws.amazon.com/s3/) or +[minio](https://min.io). This section covers the following three topics: - [Storing](doc_store/store_file.md) [`BaseDoc`][docarray.base_doc.doc.BaseDoc], [`DocList`][docarray.array.doc_list.doc_list.DocList] and [`DocVec`][docarray.array.doc_vec.doc_vec.DocVec] on-disk - - [Storing on Jina AI Cloud](doc_store/store_jac.md) - [Storing on S3](doc_store/store_s3.md) ## Document Index diff --git a/tests/integrations/store/test_jac.py b/tests/integrations/store/test_jac.py deleted file mode 100644 index 228ee6d29bc..00000000000 --- a/tests/integrations/store/test_jac.py +++ /dev/null @@ -1,245 +0,0 @@ -import multiprocessing as mp -import uuid - -import hubble -import pytest - -from docarray import DocList -from docarray.documents import TextDoc -from docarray.store import JACDocStore -from docarray.utils._internal.pydantic import is_pydantic_v2 -from tests.integrations.store import gen_text_docs, get_test_da, profile_memory - -DA_LEN: int = 2**10 -TOLERANCE_RATIO = 0.5 # Percentage of difference allowed in stream vs non-stream test -RANDOM: str = uuid.uuid4().hex[:8] - - -@pytest.fixture(scope='session', autouse=True) -def testing_namespace_cleanup(): - da_names = list( - filter( - lambda x: x.startswith('test'), - JACDocStore.list('jac://', show_table=False), - ) - ) - for da_name in da_names: - JACDocStore.delete(f'jac://{da_name}') - yield - da_names = list( - filter( - lambda x: x.startswith(f'test{RANDOM}'), - JACDocStore.list('jac://', show_table=False), - ) - ) - for da_name in da_names: - JACDocStore.delete(f'{da_name}') - - -@pytest.mark.slow -@pytest.mark.internet -def test_pushpull_correct(capsys): - DA_NAME: str = f'test{RANDOM}-pushpull-correct' - da1 = get_test_da(DA_LEN) - - # Verbose - da1.push(f'jac://{DA_NAME}', show_progress=True) - da2 = DocList[TextDoc].pull(f'jac://{DA_NAME}', show_progress=True) - assert len(da1) == len(da2) - assert all(d1.id == d2.id for d1, d2 in zip(da1, da2)) - assert all(d1.text == d2.text for d1, d2 in zip(da1, da2)) - - captured = capsys.readouterr() - assert len(captured.out) > 0 - assert len(captured.err) == 0 - - # Quiet - da2.push(f'jac://{DA_NAME}') - da1 = DocList[TextDoc].pull(f'jac://{DA_NAME}') - assert len(da1) == len(da2) - assert all(d1.id == d2.id for d1, d2 in zip(da1, da2)) - assert all(d1.text == d2.text for d1, d2 in zip(da1, da2)) - - captured = capsys.readouterr() - assert ( - len(captured.out) == 0 - ), 'No output should be printed when show_progress=False' - assert len(captured.err) == 0, 'No error should be printed when show_progress=False' - - -@pytest.mark.slow -@pytest.mark.internet -def test_pushpull_stream_correct(capsys): - DA_NAME_1: str = f'test{RANDOM}-pushpull-stream-correct-da1' - DA_NAME_2: str = f'test{RANDOM}-pushpull-stream-correct-da2' - - da1 = get_test_da(DA_LEN) - - # Verbosity and correctness - DocList[TextDoc].push_stream(iter(da1), f'jac://{DA_NAME_1}', show_progress=True) - doc_stream2 = DocList[TextDoc].pull_stream(f'jac://{DA_NAME_1}', show_progress=True) - - assert all(d1.id == d2.id for d1, d2 in zip(da1, doc_stream2)) - with pytest.raises(StopIteration): - next(doc_stream2) - - captured = capsys.readouterr() - assert len(captured.out) > 0 - assert len(captured.err) == 0 - - # Quiet and chained - doc_stream = DocList[TextDoc].pull_stream(f'jac://{DA_NAME_1}', show_progress=False) - DocList[TextDoc].push_stream(doc_stream, f'jac://{DA_NAME_2}', show_progress=False) - - captured = capsys.readouterr() - assert ( - len(captured.out) == 0 - ), 'No output should be printed when show_progress=False' - assert len(captured.err) == 0, 'No error should be printed when show_progress=False' - - -# for some reason this test is failing with pydantic v2 -@pytest.mark.skipif(is_pydantic_v2, reason="Not working with pydantic v2 for now") -@pytest.mark.slow -@pytest.mark.internet -def test_pull_stream_vs_pull_full(): - import docarray.store.helpers - - docarray.store.helpers.CACHING_REQUEST_READER_CHUNK_SIZE = 2**10 - DA_NAME_SHORT: str = f'test{RANDOM}-pull-stream-vs-pull-full-short' - DA_NAME_LONG: str = f'test{RANDOM}-pull-stream-vs-pull-full-long' - - DocList[TextDoc].push_stream( - gen_text_docs(DA_LEN * 1), - f'jac://{DA_NAME_SHORT}', - show_progress=False, - ) - DocList[TextDoc].push_stream( - gen_text_docs(DA_LEN * 4), - f'jac://{DA_NAME_LONG}', - show_progress=False, - ) - - @profile_memory - def get_total_stream(url: str): - return sum( - len(d.text) for d in DocList[TextDoc].pull_stream(url, show_progress=False) - ) - - @profile_memory - def get_total_full(url: str): - return sum(len(d.text) for d in DocList[TextDoc].pull(url, show_progress=False)) - - # A warmup is needed to get accurate memory usage comparison - _ = get_total_stream(f'jac://{DA_NAME_SHORT}') - short_total_stream, (_, short_stream_peak) = get_total_stream( - f'jac://{DA_NAME_SHORT}' - ) - long_total_stream, (_, long_stream_peak) = get_total_stream(f'jac://{DA_NAME_LONG}') - - _ = get_total_full(f'jac://{DA_NAME_SHORT}') - short_total_full, (_, short_full_peak) = get_total_full(f'jac://{DA_NAME_SHORT}') - long_total_full, (_, long_full_peak) = get_total_full(f'jac://{DA_NAME_LONG}') - - assert ( - short_total_stream == short_total_full - ), 'Streamed and non-streamed pull should have similar statistics' - assert ( - long_total_stream == long_total_full - ), 'Streamed and non-streamed pull should have similar statistics' - - assert ( - abs(long_stream_peak - short_stream_peak) / short_stream_peak < TOLERANCE_RATIO - ), 'Streamed memory usage should not be dependent on the size of the data' - assert ( - abs(long_full_peak - short_full_peak) / short_full_peak > TOLERANCE_RATIO - ), 'Full pull memory usage should be dependent on the size of the data' - - -@pytest.mark.slow -@pytest.mark.internet -def test_list_and_delete(): - DA_NAME_0 = f'test{RANDOM}-list-and-delete-da0' - DA_NAME_1 = f'test{RANDOM}-list-and-delete-da1' - - da_names = list( - filter( - lambda x: x.startswith(f'test{RANDOM}-list-and-delete'), - JACDocStore.list(show_table=False), - ) - ) - assert len(da_names) == 0 - - DocList[TextDoc].push( - get_test_da(DA_LEN), f'jac://{DA_NAME_0}', show_progress=False - ) - da_names = list( - filter( - lambda x: x.startswith(f'test{RANDOM}-list-and-delete'), - JACDocStore.list(show_table=False), - ) - ) - assert set(da_names) == {DA_NAME_0} - DocList[TextDoc].push( - get_test_da(DA_LEN), f'jac://{DA_NAME_1}', show_progress=False - ) - da_names = list( - filter( - lambda x: x.startswith(f'test{RANDOM}-list-and-delete'), - JACDocStore.list(show_table=False), - ) - ) - assert set(da_names) == {DA_NAME_0, DA_NAME_1} - - assert JACDocStore.delete( - f'{DA_NAME_0}' - ), 'Deleting an existing DA should return True' - da_names = list( - filter( - lambda x: x.startswith(f'test{RANDOM}-list-and-delete'), - JACDocStore.list(show_table=False), - ) - ) - assert set(da_names) == {DA_NAME_1} - - with pytest.raises( - hubble.excepts.RequestedEntityNotFoundError - ): # Deleting a non-existent DA without safety should raise an error - JACDocStore.delete(f'{DA_NAME_0}', missing_ok=False) - - assert not JACDocStore.delete( - f'{DA_NAME_0}', missing_ok=True - ), 'Deleting a non-existent DA should return False' - - -@pytest.mark.slow -@pytest.mark.internet -def test_concurrent_push_pull(): - # Push to DA that is being pulled should not mess up the pull - DA_NAME_0 = f'test{RANDOM}-concurrent-push-pull-da0' - - DocList[TextDoc].push_stream( - gen_text_docs(DA_LEN), - f'jac://{DA_NAME_0}', - show_progress=False, - ) - - global _task - - def _task(choice: str): - if choice == 'push': - DocList[TextDoc].push_stream( - gen_text_docs(DA_LEN), - f'jac://{DA_NAME_0}', - show_progress=False, - ) - elif choice == 'pull': - pull_len = sum( - 1 for _ in DocList[TextDoc].pull_stream(f'jac://{DA_NAME_0}') - ) - assert pull_len == DA_LEN - else: - raise ValueError(f'Unknown choice {choice}') - - with mp.get_context('fork').Pool(3) as p: - p.map(_task, ['pull', 'push', 'pull']) diff --git a/tests/integrations/store/test_s3.py b/tests/integrations/store/test_s3.py index 37acf787c8a..eb36ddd907a 100644 --- a/tests/integrations/store/test_s3.py +++ b/tests/integrations/store/test_s3.py @@ -16,7 +16,7 @@ BUCKET: str = 'da-pushpull' RANDOM: str = uuid.uuid4().hex[:8] -pytestmark = [pytest.mark.jac] +pytestmark = [pytest.mark.s3] @pytest.fixture(scope="session") From 99f423414958d0981b8fb4db261d432806bbc41a Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Fri, 15 Sep 2023 18:56:10 +0200 Subject: [PATCH 2/2] fix: fix ruff Signed-off-by: Joan Fontanals Martinez --- docarray/store/file.py | 2 +- docs/API_reference/doc_store/jac_doc_store.md | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) delete mode 100644 docs/API_reference/doc_store/jac_doc_store.md diff --git a/docarray/store/file.py b/docarray/store/file.py index 91d4e3737a6..9b37c15dfc8 100644 --- a/docarray/store/file.py +++ b/docarray/store/file.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Dict, Iterator, List, Optional, Type, TypeVar +from typing import Dict, Iterator, List, Type, TypeVar from typing_extensions import TYPE_CHECKING diff --git a/docs/API_reference/doc_store/jac_doc_store.md b/docs/API_reference/doc_store/jac_doc_store.md deleted file mode 100644 index 1d4c0a28303..00000000000 --- a/docs/API_reference/doc_store/jac_doc_store.md +++ /dev/null @@ -1,3 +0,0 @@ -# JACDocStore - -::: docarray.store.jac.JACDocStore