[ENH] V1 -> V2 Migration - Flows (module)#1609
[ENH] V1 -> V2 Migration - Flows (module)#1609Omswastik-11 wants to merge 142 commits intoopenml:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1609 +/- ##
==========================================
+ Coverage 53.09% 54.04% +0.94%
==========================================
Files 37 65 +28
Lines 4362 5120 +758
==========================================
+ Hits 2316 2767 +451
- Misses 2046 2353 +307 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Omswastik-11 <omswastikpanda11@gmail.com>
614411f to
36184e5
Compare
for more information, see https://pre-commit.ci
|
Hi @geetu040 !! Can you review the pre-commit failure It was due to merge conflicts more specifically for tasks . Should I change it on my branch ? |
can you try again, sync your branch with mine? It should be fixed now. |
|
I think that due to conflicts it did not synced properly . I have to revert it manually |
geetu040
left a comment
There was a problem hiding this comment.
Nice overall, few changes needed. I'll look at the tests later when the implementation is final.
Previously, multiple tests were publishing the same task concurrently, which increased the likelihood of race conditions and flaky failures. This update replaces real HTTP post calls with mocks, making the tests deterministic and isolated from the server.
There was a problem hiding this comment.
Pull request overview
This PR advances the V1 → V2 migration work for the flows module by introducing a new internal API backend layer (HTTP client + resource APIs + optional fallback), switching flow operations to use that backend, and adding tests for the new API abstractions.
Changes:
- Added a new
_apisubsystem (config/backend builder, HTTP client + cache, versioned resource APIs, fallback proxy). - Migrated flow operations (
get,exists,list,delete, andpublishpath) to the new backend. - Updated/added tests to patch lower-level HTTP calls and validate API version/fallback contracts.
Reviewed changes
Copilot reviewed 41 out of 42 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_flows/test_flow_functions.py | Updates flow-delete tests to mock Session.request (new HTTP client path). |
| tests/test_flows/test_flow.py | Switches raw flow XML retrieval and updates publish-error mocking strategy. |
| tests/test_datasets/test_dataset_functions.py | Syncs new backend config after direct legacy-config mutations in tests. |
| tests/test_api/test_versions.py | Adds contract tests for supported/unsupported operations across API versions and fallback. |
| tests/test_api/test_http.py | Adds unit/integration tests for new HTTPClient + disk cache behavior. |
| tests/test_api/test_flow.py | Adds V1/V2 flow API tests and combined-output comparisons. |
| tests/conftest.py | Ensures legacy config changes propagate to new backend via _sync_api_config(). |
| openml/testing.py | Introduces TestAPIBase with preconfigured V1/V2 HTTPClients and resource factory. |
| openml/flows/functions.py | Routes flow functions through openml._backend.flow resource API methods. |
| openml/flows/flow.py | Routes publish/tag/untag through backend flow resource API. |
| openml/exceptions.py | Adds OpenMLNotSupportedError for version/resource unsupported operations. |
| openml/enums.py | Adds enums for APIVersion, ResourceType, and RetryPolicy. |
| openml/config.py | Adds _sync_api_config() and calls it from config mutation entry points. |
| openml/_api/setup/config.py | Adds new structured config dataclasses for API versions and connection settings. |
| openml/_api/setup/builder.py | Adds backend builder to wire HTTP/minio clients + resource API instances and fallback. |
| openml/_api/setup/backend.py | Adds singleton APIBackend with get/set config helpers and resource accessors. |
| openml/_api/setup/_utils.py | Adds cache-dir resolution helper for new config. |
| openml/_api/setup/_instance.py | Instantiates a module-level _backend singleton. |
| openml/_api/setup/init.py | Exposes backend/config builder API and _backend. |
| openml/_api/resources/dataset.py | Adds dataset resource API stubs for V1/V2. |
| openml/_api/resources/task.py | Adds task resource API stubs for V1/V2. |
| openml/_api/resources/flow.py | Implements Flow V1/V2 resource logic (XML vs JSON conversion). |
| openml/_api/resources/run.py | Adds run resource API stubs for V1/V2. |
| openml/_api/resources/setup.py | Adds setup resource API stubs for V1/V2. |
| openml/_api/resources/study.py | Adds study resource API stubs for V1/V2. |
| openml/_api/resources/evaluation.py | Adds evaluation resource API stubs for V1/V2. |
| openml/_api/resources/evaluation_measure.py | Adds evaluation-measure resource API stubs for V1/V2. |
| openml/_api/resources/estimation_procedure.py | Adds estimation-procedure resource API stubs for V1/V2. |
| openml/_api/resources/_registry.py | Adds API registry mapping (version × resource → implementation). |
| openml/_api/resources/init.py | Re-exports resource APIs/registry types. |
| openml/_api/resources/base/base.py | Adds ResourceAPI base and standardized “not supported” error helper. |
| openml/_api/resources/base/resources.py | Adds abstract resource interfaces (FlowAPI includes get/list/exists). |
| openml/_api/resources/base/versions.py | Adds common V1 (XML) publish/delete/tag/untag implementations and V2 unsupported stubs. |
| openml/_api/resources/base/fallback.py | Adds FallbackProxy to transparently retry operations on fallback API versions. |
| openml/_api/resources/base/init.py | Re-exports base resource API interfaces and helpers. |
| openml/_api/clients/http.py | Adds HTTPClient with retries, error parsing, and disk caching (HTTPCache). |
| openml/_api/clients/minio.py | Adds lightweight MinIOClient configuration wrapper. |
| openml/_api/clients/utils.py | Adds retry-delay helpers (human_delay, robot_delay). |
| openml/_api/clients/init.py | Re-exports HTTP/cache/minio clients. |
| openml/_api/init.py | Re-exports _api public surface (clients/resources/setup + _backend). |
| openml/init.py | Exposes _backend at package root for internal/new API usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @pytest.mark.uses_test_server() | ||
| class TestFlowAPIBase(TestAPIBase): | ||
| resource: FlowV1API | FlowV2API | ||
|
|
||
| def _assert_flow_shape(self, flow: OpenMLFlow) -> None: | ||
| self.assertIsInstance(flow, OpenMLFlow) | ||
| self.assertEqual(flow.flow_id, 1) | ||
| self.assertIsInstance(flow.name, str) | ||
| self.assertGreater(len(flow.name), 0) | ||
|
|
||
| def _get(self) -> OpenMLFlow: | ||
| flow = self.resource.get(flow_id=1) | ||
| self._assert_flow_shape(flow) |
There was a problem hiding this comment.
PR description says tests were added “so that we can test without local V2 server”, but these tests exercise FlowV2API.get/exists against the default V2 endpoint (http://localhost:8002/) without mocking. If the intention is offline testing, patch requests.Session.request (as done in other new API tests) or mark/skip these tests when the V2 server isn’t available.
| 9, | ||
| ]: | ||
| flow_xml = _perform_api_call("flow/%d" % flow_id, request_method="get") | ||
| flow_xml = openml.config.get_backend().http_client.get(f"flow/{flow_id}").text |
There was a problem hiding this comment.
openml.config.get_backend() does not exist (no such function in openml/config.py), so this test will error at runtime. Consider using the public API (openml._backend / openml._api_calls._perform_api_call) or whichever backend accessor is intended, but it needs to be a real callable in the codebase.
| response = requests.Response() | ||
| response.status_code = 200 | ||
| response._content = ( | ||
| "<oml:upload_flow>\n" " <oml:id>1</oml:id>\n" "</oml:upload_flow>" | ||
| ) | ||
| flow_exists_mock.return_value = False | ||
| ).encode() |
There was a problem hiding this comment.
The mocked upload response XML uses the oml: prefix without declaring the namespace (xmlns:oml=...), which makes the XML invalid and will cause xmltodict.parse(...) (via ResourceV1API.publish) to raise an “unbound prefix” parse error. Use a valid upload payload (as in tests/test_api/test_versions.py) or reuse create_request_response to build a proper response.
| mock_request.return_value = response | ||
| flow_exists_mock.return_value = None # Flow doesn't exist yet, so try to publish | ||
| get_flow_mock.return_value = flow |
There was a problem hiding this comment.
flow_exists_mock.return_value is set to None, but flow_exists is documented/typed to return int | bool and the production code treats non-existence as False. Returning None can mask type/contract issues and makes the test less representative; set it to False for the “doesn’t exist” case.
|
|
||
| import xmltodict | ||
|
|
||
| import openml |
There was a problem hiding this comment.
Importing the top-level openml package from within openml.flows.flow increases the risk of circular-import issues (the package openml/__init__.py imports openml.flows, which imports this module). Since the new code only needs the backend, consider importing _backend (or APIBackend) directly from openml._api lazily inside methods, rather than import openml at module import time.
| __all__ = [ | ||
| "API_REGISTRY", | ||
| "APIBackend", | ||
| "APIBackendBuilder", | ||
| "APIConfig", | ||
| "Config", | ||
| "ConnectionConfig", | ||
| "DatasetAPI", | ||
| "DatasetV1API", | ||
| "DatasetV2API", | ||
| "EstimationProcedureAPI", | ||
| "EstimationProcedureV1API", | ||
| "EstimationProcedureV2API", | ||
| "EvaluationAPI", | ||
| "EvaluationMeasureAPI", | ||
| "EvaluationMeasureV1API", | ||
| "EvaluationMeasureV2API", | ||
| "EvaluationV1API", | ||
| "EvaluationV2API", | ||
| "FallbackProxy", | ||
| "FallbackProxy", | ||
| "FlowAPI", | ||
| "FlowV1API", | ||
| "FlowV2API", | ||
| "HTTPCache", | ||
| "HTTPClient", | ||
| "MinIOClient", | ||
| "ResourceAPI", | ||
| "ResourceAPI", | ||
| "ResourceV1API", | ||
| "ResourceV2API", | ||
| "RunAPI", | ||
| "RunV1API", | ||
| "RunV2API", | ||
| "SetupAPI", | ||
| "SetupV1API", | ||
| "SetupV2API", | ||
| "StudyAPI", | ||
| "StudyV1API", | ||
| "StudyV2API", | ||
| "TaskAPI", | ||
| "TaskV1API", | ||
| "TaskV2API", | ||
| "_backend", | ||
| ] |
There was a problem hiding this comment.
__all__ contains duplicate entries (e.g., FallbackProxy and ResourceAPI are listed twice). This is harmless at runtime but makes exports harder to reason about; please deduplicate the list.
| __all__ = [ | |
| "API_REGISTRY", | |
| "APIBackend", | |
| "APIBackendBuilder", | |
| "APIConfig", | |
| "Config", | |
| "ConnectionConfig", | |
| "DatasetAPI", | |
| "DatasetV1API", | |
| "DatasetV2API", | |
| "EstimationProcedureAPI", | |
| "EstimationProcedureV1API", | |
| "EstimationProcedureV2API", | |
| "EvaluationAPI", | |
| "EvaluationMeasureAPI", | |
| "EvaluationMeasureV1API", | |
| "EvaluationMeasureV2API", | |
| "EvaluationV1API", | |
| "EvaluationV2API", | |
| "FallbackProxy", | |
| "FallbackProxy", | |
| "FlowAPI", | |
| "FlowV1API", | |
| "FlowV2API", | |
| "HTTPCache", | |
| "HTTPClient", | |
| "MinIOClient", | |
| "ResourceAPI", | |
| "ResourceAPI", | |
| "ResourceV1API", | |
| "ResourceV2API", | |
| "RunAPI", | |
| "RunV1API", | |
| "RunV2API", | |
| "SetupAPI", | |
| "SetupV1API", | |
| "SetupV2API", | |
| "StudyAPI", | |
| "StudyV1API", | |
| "StudyV2API", | |
| "TaskAPI", | |
| "TaskV1API", | |
| "TaskV2API", | |
| "_backend", | |
| ] | |
| __all__ = [ | |
| "API_REGISTRY", | |
| "APIBackend", | |
| "APIBackendBuilder", | |
| "APIConfig", | |
| "Config", | |
| "ConnectionConfig", | |
| "DatasetAPI", | |
| "DatasetV1API", | |
| "DatasetV2API", | |
| "EstimationProcedureAPI", | |
| "EstimationProcedureV1API", | |
| "EstimationProcedureV2API", | |
| "EvaluationAPI", | |
| "EvaluationMeasureAPI", | |
| "EvaluationMeasureV1API", | |
| "EvaluationMeasureV2API", | |
| "EvaluationV1API", | |
| "EvaluationV2API", | |
| "FallbackProxy", | |
| "FlowAPI", | |
| "FlowV1API", | |
| "FlowV2API", | |
| "HTTPCache", | |
| "HTTPClient", | |
| "MinIOClient", | |
| "ResourceAPI", | |
| "ResourceV1API", | |
| "ResourceV2API", | |
| "RunAPI", | |
| "RunV1API", | |
| "RunV2API", | |
| "SetupAPI", | |
| "SetupV1API", | |
| "SetupV2API", | |
| "StudyAPI", | |
| "StudyV1API", | |
| "StudyV2API", | |
| "TaskAPI", | |
| "TaskV1API", | |
| "TaskV2API", | |
| "_backend", | |
| ] |
| def _robot_delay(self, n: int) -> float: | ||
| """ | ||
| Compute delay for automated retry policy. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| n : int | ||
| Current retry attempt number (1-based). | ||
|
|
||
| Returns | ||
| ------- | ||
| float | ||
| Number of seconds to wait before the next retry. | ||
|
|
||
| Notes | ||
| ----- | ||
| Uses a sigmoid-based growth curve with Gaussian noise to gradually | ||
| increase waiting time. | ||
| """ | ||
| wait = (1 / (1 + math.exp(-(n * 0.5 - 4)))) * 60 | ||
| variation = random.gauss(0, wait / 10) | ||
| return max(1.0, wait + variation) | ||
|
|
||
| def _human_delay(self, n: int) -> float: | ||
| """ | ||
| Compute delay for human-like retry policy. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| n : int | ||
| Current retry attempt number (1-based). | ||
|
|
||
| Returns | ||
| ------- | ||
| float | ||
| Number of seconds to wait before the next retry. | ||
| """ | ||
| return max(1.0, n) | ||
|
|
There was a problem hiding this comment.
HTTPClient.__init__ selects self.retry_func from human_delay / robot_delay, but the class also defines _robot_delay and _human_delay methods that are never used. Keeping two implementations is confusing and risks divergence; either remove the unused methods or switch to using them consistently.
| def _robot_delay(self, n: int) -> float: | |
| """ | |
| Compute delay for automated retry policy. | |
| Parameters | |
| ---------- | |
| n : int | |
| Current retry attempt number (1-based). | |
| Returns | |
| ------- | |
| float | |
| Number of seconds to wait before the next retry. | |
| Notes | |
| ----- | |
| Uses a sigmoid-based growth curve with Gaussian noise to gradually | |
| increase waiting time. | |
| """ | |
| wait = (1 / (1 + math.exp(-(n * 0.5 - 4)))) * 60 | |
| variation = random.gauss(0, wait / 10) | |
| return max(1.0, wait + variation) | |
| def _human_delay(self, n: int) -> float: | |
| """ | |
| Compute delay for human-like retry policy. | |
| Parameters | |
| ---------- | |
| n : int | |
| Current retry attempt number (1-based). | |
| Returns | |
| ------- | |
| float | |
| Number of seconds to wait before the next retry. | |
| """ | |
| return max(1.0, n) |
| class HTTPCache: | ||
| """ | ||
| Filesystem-based cache for HTTP responses. | ||
|
|
||
| This class stores HTTP responses on disk using a structured directory layout | ||
| derived from the request URL and parameters. Each cached response consists of | ||
| three files: metadata (``meta.json``), headers (``headers.json``), and the raw | ||
| body (``body.bin``). Entries are considered valid until their time-to-live | ||
| (TTL) expires. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| path : pathlib.Path | ||
| Base directory where cache entries are stored. | ||
|
|
||
| Notes | ||
| ----- | ||
| The cache key is derived from the URL (domain and path components) and query | ||
| parameters, excluding the ``api_key`` parameter. | ||
| """ | ||
|
|
||
| def __init__(self, *, path: Path) -> None: | ||
| self.path = path | ||
|
|
||
| def get_key(self, url: str, params: dict[str, Any]) -> str: | ||
| """ | ||
| Generate a filesystem-safe cache key for a request. | ||
|
|
||
| The key is constructed from the reversed domain components, URL path | ||
| segments, and URL-encoded query parameters (excluding ``api_key``). | ||
|
|
||
| Parameters | ||
| ---------- | ||
| url : str | ||
| The full request URL. | ||
| params : dict of str to Any | ||
| Query parameters associated with the request. | ||
|
|
||
| Returns | ||
| ------- | ||
| str | ||
| A relative path string representing the cache key. | ||
| """ | ||
| parsed_url = urlparse(url) | ||
| netloc = parsed_url.netloc.replace(":", "_") | ||
| netloc_parts = netloc.split(".")[::-1] | ||
| path_parts = parsed_url.path.strip("/").split("/") | ||
|
|
||
| filtered_params = {k: v for k, v in params.items() if k != "api_key"} | ||
| params_part = [urlencode(filtered_params)] if filtered_params else [] | ||
|
|
||
| return str(Path(*netloc_parts, *path_parts, *params_part)) | ||
|
|
||
| def _key_to_path(self, key: str) -> Path: | ||
| """ | ||
| Convert a cache key into an absolute filesystem path. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| key : str | ||
| Cache key as returned by :meth:`get_key`. | ||
|
|
||
| Returns | ||
| ------- | ||
| pathlib.Path | ||
| Absolute path corresponding to the cache entry. | ||
| """ | ||
| return self.path.joinpath(key) | ||
|
|
||
| def load(self, key: str) -> Response: | ||
| """ | ||
| Load a cached HTTP response from disk. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| key : str | ||
| Cache key identifying the stored response. | ||
|
|
||
| Returns | ||
| ------- | ||
| requests.Response | ||
| Reconstructed response object with status code, headers, body, and metadata. | ||
|
|
||
| Raises | ||
| ------ | ||
| FileNotFoundError | ||
| If the cache entry or required files are missing. | ||
| TimeoutError | ||
| If the cached entry has expired based on the configured TTL. | ||
| ValueError | ||
| If required metadata is missing or malformed. | ||
| """ |
There was a problem hiding this comment.
The HTTPCache docstring and load() docstring describe TTL/expiry behavior and a TimeoutError, but load() never checks age/TTL and will never raise TimeoutError. Either implement expiry (and raise TimeoutError accordingly) or update the documentation and the HTTPClient.request() cache-miss handling to match the actual behavior.
Fixes #1601
added a
Createmethod inFlowAPIfor publishing flow but not refactored with oldpublish. (Needs discussion on this)Added tests using
fake_methodsso that we can test without localV2server . I have tested theFlowsV2methods (getandexists) anddeleteandlistwere not implemented inV2server so I skipped them .