diff --git a/.dlc.json b/.dlc.json index bd24e4c0..2e3e67c6 100644 --- a/.dlc.json +++ b/.dlc.json @@ -20,6 +20,7 @@ "retryCount": 10, "fallbackRetryDelay": "1000s", "aliveStatusCodes": [ - 200 + 200, + 0 ] } diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e76d26f9..958f8eb1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -154,7 +154,7 @@ jobs: if: ${{ github.event_name == 'schedule' || contains(toJSON(github.event.commits.*.message), '[run-it]') }} timeout-minutes: 30 steps: - - name: Checkout Dolphinscheduler SDK Python + - name: Checkout Dolphinscheduler Python SDK uses: actions/checkout@v3 with: path: src diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..bd3be26a --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +include LICENSE +include NOTICE +include README.md +include CONTRIBUTING.md +include RELEASE.md +include UPDATING.md diff --git a/README.md b/README.md index 3bbcef80..45fd5e9c 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ under the License. [![PyPi Python Versions](https://img.shields.io/pypi/pyversions/apache-dolphinscheduler.svg?style=flat-square&logo=python)](https://pypi.org/project/apache-dolphinscheduler/) [![PyPi License](https://img.shields.io/:license-Apache%202-blue.svg?style=flat-square)](https://raw.githubusercontent.com/apache/dolphinscheduler-sdk-python/main/LICENSE) [![PyPi Status](https://img.shields.io/pypi/status/apache-dolphinscheduler.svg?style=flat-square)](https://pypi.org/project/apache-dolphinscheduler/) -[![Downloads](https://pepy.tech/badge/apache-dolphinscheduler/month)](https://pepy.tech/project/apache-dolphinscheduler) +[![Downloads](https://static.pepy.tech/badge/apache-dolphinscheduler/month)](https://pepy.tech/project/apache-dolphinscheduler) ![Coverage Status](https://img.shields.io/codecov/c/github/apache/dolphinscheduler-sdk-python/main.svg?style=flat-square) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg?style=flat-square)](https://github.com/psf/black) [![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat-square&labelColor=ef8336)](https://pycqa.github.io/isort) @@ -87,7 +87,7 @@ python ./tutorial.py ``` > NOTICE: Since Apache DolphinScheduler's tenant is requests while running command, you have to change -> tenant value in tutorial.py`. The default value is `tenant_exists`, change it to username exists your host. +> tenant value in file tutorial.py. The default value is `tenant_exists`, change it to username exists your host. After that, a new workflow will be created by PyDolphinScheduler, and you can see it in DolphinScheduler web UI's Project Management page. It will trigger the workflow automatically, so you can see the workflow running diff --git a/RELEASE.md b/RELEASE.md index 1c241ba5..a1695d51 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -24,13 +24,15 @@ but we also have a [PyPi](#release-to-pypi) repository for Python package distri ## Prepare -* Remove `sphinx-multiversion` dependency in `setup.py`, we still can not fix this issue - [Distribute tarball and wheel error with direct dependency](https://github.com/apache/dolphinscheduler/issues/12238) -* Change `version_ext` about the dolphinscheduler version current support, the syntax is respect [pep-044](https://peps.python.org/pep-0440/#version-specifiers) -* Run all test locally, `tox -e local-ci && tox -e local-integrate-test`, after you start dolphinscheduler to - pass `local-integrate-test` +* Change `version_ext` about the dolphinscheduler version current support, the syntax is respect [pep-044](https://peps.python.org/pep-0440/#version-specifiers), and update support version in `docs/source/index.rst` file. +* Cherry-pick some backward compatibility commit in [4.0.x](https://github.com/apache/dolphinscheduler-sdk-python/tree/4.0.x), the first + commit is 3a8a4e31 * Revert commit Apache dolphinscheduler not release yet, see [this link](https://github.com/apache/dolphinscheduler-sdk-python/pulls?q=is%3Apr+is%3Aclosed+label%3Adep-main-repo) add check whether apache/dolphinscheduler commit is released, for some commit not release should revert the change +* Run all test locally, `tox -e local-ci && tox -e local-integrate-test`, after you start dolphinscheduler to + pass `local-integrate-test` +* Remove `sphinx-multiversion` dependency in `setup.cfg`, we still can not fix this issue + [Distribute tarball and wheel error with direct dependency](https://github.com/apache/dolphinscheduler/issues/12238) ## Build and Sign Package @@ -82,13 +84,13 @@ cd release/dolphinscheduler && svn add python/"${VERSION}" && svn commit python Create a new vote in dev@dolphinscheduler.apache.org ```text -TITLE: [VOTE] Release Apache DolphinScheduler SDK Python +TITLE: [VOTE] Release Apache DolphinScheduler Python SDK BODY: Hello DolphinScheduler Community, -This is a call for the vote to release Apache DolphinScheduler SDK Python version +This is a call for the vote to release Apache DolphinScheduler Python SDK version Release notes: https://github.com/apache/dolphinscheduler-sdk-python/releases/tag/ @@ -146,7 +148,7 @@ Vote result should follow these: - **Send the closing vote mail to announce the result**. When count the binding and no binding votes, please list the names of voters. An example like this: ```text - [RESULT][VOTE] Release Apache SkyWalking Python version $VERSION + [RESULT][VOTE] Release Apache DolphinScheduler Python version $VERSION 72+ hours passed, we’ve got ($NUMBER) +1 bindings (and ... +1 non-bindings): @@ -177,21 +179,19 @@ Vote result should follow these: - Send ANNOUNCE email to `dev@dolphinscheduler.apache.org` and `announce@apache.org`. ```text - Subject: [ANNOUNCE] Apache DolphinScheduler SDK Python $VERSION Released + Subject: [ANNOUNCE] Apache DolphinScheduler Python SDK $VERSION Released Content: Hi Community, - We are glad to announce the release of Apache DolphinScheduler SDK Python $VERSION. + We are glad to announce the release of Apache DolphinScheduler Python SDK $VERSION. - Apache DolphinScheduler SDK Python is an API for Apache DolphinScheduler which allow you definition your workflow by Python code, aka workflow-as-codes. + Apache DolphinScheduler Python SDK is an API for Apache DolphinScheduler which allows you to define your workflow by Python code, aka workflow-as-codes. DolphinScheduler is a distributed and easy-to-extend visual workflow scheduler system, dedicated to solving the complex task dependencies in data processing, making the scheduler system out of the box for data processing. - Download Links: https://dolphinscheduler.apache.org/#/en-us/download - Release Notes: https://github.com/apache/dolphinscheduler-sdk-python/releases/tag/$VERSION DolphinScheduler Resources: diff --git a/docs/source/concept.rst b/docs/source/concept.rst index 28a797a1..ab8df5eb 100644 --- a/docs/source/concept.rst +++ b/docs/source/concept.rst @@ -63,7 +63,7 @@ asterisks expression, and each of the meaning of position as below ┬ ┬ ┬ ┬ ┬ ┬ ┬ │ │ │ │ │ │ │ │ │ │ │ │ │ └─── year - │ │ │ │ │ └───── day of week (0 - 7) (0 to 6 are Sunday to Saturday, or use names; 7 is Sunday, the same as 0) + │ │ │ │ │ └───── day of week (1 - 7) (1 to 7 are Sunday to Saturday, or use names; 7 is for Sunday, or use `SUN`) │ │ │ │ └─────── month (1 - 12) │ │ │ └───────── day of month (1 - 31) │ │ └─────────── hour (0 - 23) @@ -123,6 +123,16 @@ Parameter ``execution type`` can be set in * Via environment variables, configurations file setting, for more detail about those way setting, you can see you can read :doc:`config` section. +Alert +~~~~~ + +Alert is the way to notify user when workflow instance is success or failed. We can set alert with parameter +``warning_type`` and ``warning_group_id`` in workflow definition. + +* ``warning_type`` represent the type of alert, when workflow instance in those status, it will trigger alert. + The value of ``warning_type`` could be one of ``failure``, ``success``, ``all``, ``none``. +* ``warning_group_id`` represent the group of alert, you can get the group id from DolphinScheduler web UI. + Tasks ----- @@ -183,6 +193,41 @@ decide workflow of task. You could set `workflow` in both normal assign or in co With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks. +Task Group +~~~~~~~~~~ + +A task group can manage and control the maximum number of concurrently running tasks. This is particularly +useful when you want to limit the simultaneous execution of various task types. For instance, in an ETL +(Extract, Transform, Load) job where data is extracted from a source database, it's crucial to control the +parallelism of extract tasks to prevent an excessive number of connections to the source database. This is +where a task group comes into play. There are two key parameters, ``task_group_id`` and ``task_group_priority`` +that determine the behavior of the task group. + +Task group can control the maximum number of tasks running at the same time. It is useful when you don't want +to run too many type of tasks at the same time. For example when you extract data from source database in ELT +job, you want to control the parallelism of extract task to avoid too many connections to source database. +Then task group can help you. There are two major parameters ``task_group_id`` and ``task_group_priority`` +to control the behavior of task group. + +* ``task_group_id``: is an integer used to identify the task group. You can set a ``task_group_id`` to + restrict the parallelism of tasks. The ``task_group_id`` can be find in the DolphinScheduler web UI. The + default value is ``0``, which means there are no restrictions for this task group. +* ``task_group_priority``: is an integer used to define the priority of the task group. When different tasks + share the same ``task_group_id``, the task group's priority comes into play, controlling the order in which + they run. Higher values indicate higher priority. The default value is ``0``, which means there's no + specific priority for this task group, and tasks will run in the order they were created. + +Here's an example in Python: + +.. code-block:: python + + extract = Shell( + name="extract", + command="echo 'Some extract command here'", + task_group_id=1, + task_group_priority=123 + ) + Resource Files -------------- diff --git a/docs/source/conf.py b/docs/source/conf.py index e8349715..03f675d0 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -26,7 +26,6 @@ # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -import base64 import os import sys from pathlib import Path @@ -112,11 +111,6 @@ autosectionlabel_prefix_document = True -# extensions for sphinx_github_changelog, token from Jay Chung with None permission scope. have to encode it -# due to github will delete token string if it finds in any commit -token_encode = b"Z2hwXzlhczh1ZG1zYTcxbFpPODZZelQzRTVJZHpLYjNDRzBvZzNEUQ==" -sphinx_github_changelog_token = base64.b64decode(token_encode).decode() - # -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for diff --git a/docs/source/index.rst b/docs/source/index.rst index 89611e3e..330b4e66 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -84,7 +84,9 @@ use PyDolphinScheduler above version 4.0.0. +-----------------------------+----------------------------+---------------------+ | above 3.1.0 and prior 3.1.2 | 3.1.0 | before we seperated | +-----------------------------+----------------------------+---------------------+ -| above 3.1.2 and prior 3.1.4 | 4.0.0 | | +| above 3.1.2 and prior 3.1.4 | >=4.0.0, <4.0.3 | | ++-----------------------------+----------------------------+---------------------+ +| 3.1.5 | 4.0.3 | | +-----------------------------+----------------------------+---------------------+ diff --git a/examples/yaml_define/MoreConfiguration.yaml b/examples/yaml_define/MoreConfiguration.yaml index 849ee4b8..0cda5ede 100644 --- a/examples/yaml_define/MoreConfiguration.yaml +++ b/examples/yaml_define/MoreConfiguration.yaml @@ -36,7 +36,6 @@ tasks: fail_retry_times: 30 fail_retry_interval: 5 timeout: 60 - is_cache: true input_params: value_VARCHAR: "abc" value_INTEGER: 123 diff --git a/setup.cfg b/setup.cfg index ad67231a..9ee93d7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -117,7 +117,6 @@ doc = # Unreleased package have a feature we want(use correct version package for API ref), so we install from # GitHub directly, see also: # https://github.com/Holzhaus/sphinx-multiversion/issues/42#issuecomment-1210539786 - sphinx-multiversion @ git+https://github.com/Holzhaus/sphinx-multiversion#egg=sphinx-multiversion sphinx-github-changelog dev = # build diff --git a/src/pydolphinscheduler/__init__.py b/src/pydolphinscheduler/__init__.py index cf0fe9fd..08dd60f7 100644 --- a/src/pydolphinscheduler/__init__.py +++ b/src/pydolphinscheduler/__init__.py @@ -17,4 +17,4 @@ """Init root of pydolphinscheduler.""" -__version__ = "4.1.0-dev" +__version__ = "4.0.4" diff --git a/src/pydolphinscheduler/configuration.py b/src/pydolphinscheduler/configuration.py index 9d12afe3..91ecba9f 100644 --- a/src/pydolphinscheduler/configuration.py +++ b/src/pydolphinscheduler/configuration.py @@ -184,7 +184,7 @@ def get_bool(val: Any) -> bool: "PYDS_USER_PASSWORD", configs.get("default.user.password") ) USER_EMAIL = os.environ.get("PYDS_USER_EMAIL", configs.get("default.user.email")) -USER_TENANT = os.environ.get("PYDS_USER_STATE", configs.get("default.user.tenant")) +USER_TENANT = os.environ.get("PYDS_USER_TENANT", configs.get("default.user.tenant")) USER_PHONE = str(os.environ.get("PYDS_USER_PHONE", configs.get("default.user.phone"))) USER_STATE = get_int( os.environ.get("PYDS_USER_STATE", configs.get("default.user.state")) diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py index b28d3b64..385e78f5 100644 --- a/src/pydolphinscheduler/constants.py +++ b/src/pydolphinscheduler/constants.py @@ -35,13 +35,6 @@ class TaskFlag(str): NO = "NO" -class IsCache(str): - """Constants for Cache.""" - - YES = "YES" - NO = "NO" - - class TaskTimeoutFlag(str): """Constants for task timeout flag.""" @@ -119,7 +112,7 @@ class Time(str): class ResourceKey(str): """Constants for key of resource.""" - NAME = "resourceName" + ID = "id" class Symbol(str): diff --git a/src/pydolphinscheduler/core/parameter.py b/src/pydolphinscheduler/core/parameter.py index 6bac3573..15ea723b 100644 --- a/src/pydolphinscheduler/core/parameter.py +++ b/src/pydolphinscheduler/core/parameter.py @@ -49,7 +49,7 @@ def _convert(self, value=None): def __eq__(self, data): return ( - type(self) == type(data) + type(self) is type(data) and self.data_type == data.data_type and self.value == data.value ) diff --git a/src/pydolphinscheduler/core/resource.py b/src/pydolphinscheduler/core/resource.py index 907114f4..b7717fce 100644 --- a/src/pydolphinscheduler/core/resource.py +++ b/src/pydolphinscheduler/core/resource.py @@ -55,9 +55,9 @@ def get_info_from_database(self): ) return gateway.query_resources_file_info(self.user_name, self.name) - def get_fullname_from_database(self): - """Get resource fullname from java gateway.""" - return self.get_info_from_database().getFullName() + def get_id_from_database(self): + """Get resource id from java gateway.""" + return self.get_info_from_database().getId() def create_or_update_resource(self): """Create or update resource via java gateway.""" @@ -65,8 +65,9 @@ def create_or_update_resource(self): raise PyDSParamException( "`user_name` and `content` are required when create or update resource from python gate." ) - return gateway.create_or_update_resource( + gateway.create_or_update_resource( self.user_name, self.name, + self.description, self.content, ) diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py index 3341040a..e8902c79 100644 --- a/src/pydolphinscheduler/core/task.py +++ b/src/pydolphinscheduler/core/task.py @@ -26,7 +26,6 @@ from pydolphinscheduler import configuration from pydolphinscheduler.constants import ( Delimiter, - IsCache, ResourceKey, Symbol, TaskFlag, @@ -92,6 +91,9 @@ class Task(Base): :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None + :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. + :param task_group_priority: Priority for same task group to, the higher the value, the higher the + priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 @@ -101,7 +103,6 @@ class Task(Base): :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None - :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ @@ -120,10 +121,11 @@ class Task(Base): "delay_time", "fail_retry_times", "fail_retry_interval", + "task_group_id", + "task_group_priority", "timeout_flag", "timeout_notify_strategy", "timeout", - "is_cache", } # task default attribute will into `task_params` property @@ -153,6 +155,8 @@ def __init__( task_priority: Optional[str] = TaskPriority.MEDIUM, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, environment_name: Optional[str] = None, + task_group_id: Optional[int] = 0, + task_group_priority: Optional[int] = 0, delay_time: Optional[int] = 0, fail_retry_times: Optional[int] = 0, fail_retry_interval: Optional[int] = 1, @@ -164,7 +168,6 @@ def __init__( wait_start_timeout: Optional[Dict] = None, condition_result: Optional[Dict] = None, resource_plugin: Optional[ResourcePlugin] = None, - is_cache: Optional[bool] = False, input_params: Optional[Dict] = None, output_params: Optional[Dict] = None, *args, @@ -173,10 +176,11 @@ def __init__( super().__init__(name, description) self.task_type = task_type self.flag = flag - self._is_cache = is_cache self.task_priority = task_priority self.worker_group = worker_group self._environment_name = environment_name + self.task_group_id = task_group_id + self.task_group_priority = task_group_priority self.fail_retry_times = fail_retry_times self.fail_retry_interval = fail_retry_interval self.delay_time = delay_time @@ -251,26 +255,16 @@ def timeout_flag(self) -> str: """Whether the timeout attribute is being set or not.""" return TaskTimeoutFlag.ON if self._timeout else TaskTimeoutFlag.OFF - @property - def is_cache(self) -> str: - """Whether the cache is being set or not.""" - if isinstance(self._is_cache, bool): - return IsCache.YES if self._is_cache else IsCache.NO - else: - raise PyDSParamException("is_cache must be a bool") - @property def resource_list(self) -> List[Dict[str, Resource]]: """Get task define attribute `resource_list`.""" resources = set() for res in self._resource_list: - if type(res) == str: + if isinstance(res, str): resources.add( - Resource( - name=res, user_name=self.user_name - ).get_fullname_from_database() + Resource(name=res, user_name=self.user_name).get_id_from_database() ) - elif type(res) == dict and ResourceKey.NAME in res: + elif isinstance(res, dict) and res.get(ResourceKey.ID) is not None: warnings.warn( """`resource_list` should be defined using List[str] with resource paths, the use of ids to define resources will be remove in version 3.2.0. @@ -278,8 +272,8 @@ def resource_list(self) -> List[Dict[str, Resource]]: DeprecationWarning, stacklevel=2, ) - resources.add(res.get(ResourceKey.NAME)) - return [{ResourceKey.NAME: r} for r in resources] + resources.add(res.get(ResourceKey.ID)) + return [{ResourceKey.ID: r} for r in resources] @property def user_name(self) -> Optional[str]: diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py index 91243f66..aeb6d044 100644 --- a/src/pydolphinscheduler/java_gateway.py +++ b/src/pydolphinscheduler/java_gateway.py @@ -124,9 +124,13 @@ def get_resources_file_info(self, program_type: str, main_package: str): """Get resources file info through java gateway.""" return self.gateway.entry_point.getResourcesFileInfo(program_type, main_package) - def create_or_update_resource(self, user_name: str, name: str, content: str): + def create_or_update_resource( + self, user_name: str, name: str, description: str, content: str + ): """Create or update resource through java gateway.""" - return self.gateway.entry_point.createOrUpdateResource(user_name, name, content) + return self.gateway.entry_point.createOrUpdateResource( + user_name, name, description, content + ) def query_resources_file_info(self, user_name: str, name: str): """Get resources file info through java gateway.""" diff --git a/src/pydolphinscheduler/models/base.py b/src/pydolphinscheduler/models/base.py index 2647714a..007edecc 100644 --- a/src/pydolphinscheduler/models/base.py +++ b/src/pydolphinscheduler/models/base.py @@ -43,7 +43,7 @@ def __repr__(self) -> str: return f'<{type(self).__name__}: name="{self.name}">' def __eq__(self, other): - return type(self) == type(other) and all( + return type(self) is type(other) and all( getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR ) diff --git a/src/pydolphinscheduler/tasks/sql.py b/src/pydolphinscheduler/tasks/sql.py index a320d437..343caa20 100644 --- a/src/pydolphinscheduler/tasks/sql.py +++ b/src/pydolphinscheduler/tasks/sql.py @@ -19,6 +19,7 @@ import logging import re +import warnings from typing import Dict, List, Optional, Sequence, Union from pydolphinscheduler.constants import TaskType @@ -58,6 +59,8 @@ class Sql(Task): detected according to sql statement using :func:`pydolphinscheduler.tasks.sql.Sql.sql_type`, and you can also set it manually. by ``SqlType.SELECT`` for query statement or ``SqlType.NOT_SELECT`` for not query statement. + :param sql_delimiter: SQL delimiter to split one sql statement into multiple statements, ONLY support in + ``sql_type=SqlType.NOT_SELECT``, default is None. :param pre_statements: SQL statements to be executed before the main SQL statement. :param post_statements: SQL statements to be executed after the main SQL statement. :param display_rows: The number of record rows number to be displayed in the SQL task log, default is 10. @@ -66,6 +69,7 @@ class Sql(Task): _task_custom_attr = { "sql", "sql_type", + "segment_separator", "pre_statements", "post_statements", "display_rows", @@ -81,6 +85,7 @@ def __init__( sql: str, datasource_type: Optional[str] = None, sql_type: Optional[str] = None, + sql_delimiter: Optional[str] = None, pre_statements: Union[str, Sequence[str], None] = None, post_statements: Union[str, Sequence[str], None] = None, display_rows: Optional[int] = 10, @@ -90,6 +95,14 @@ def __init__( self._sql = sql super().__init__(name, TaskType.SQL, *args, **kwargs) self.param_sql_type = sql_type + if sql_type == SqlType.SELECT and sql_delimiter: + warnings.warn( + "Parameter `sql_delimiter` is only supported in `sql_type=SqlType.NO_SELECT`, but current " + "sql_type is `sql_type=SqlType.SELECT`, so `sql_delimiter` will be ignored.", + UserWarning, + stacklevel=2, + ) + self.segment_separator = sql_delimiter or "" self.datasource_name = datasource_name self.datasource_type = datasource_type self.pre_statements = self.get_stm_list(pre_statements) diff --git a/src/pydolphinscheduler/version_ext b/src/pydolphinscheduler/version_ext index 1983d1ab..33453bd3 100644 --- a/src/pydolphinscheduler/version_ext +++ b/src/pydolphinscheduler/version_ext @@ -1 +1 @@ -dolphinscheduler>=3.2.0 \ No newline at end of file +dolphinscheduler>=3.1.5, <3.2.0 \ No newline at end of file diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py index 90a14d07..f7d97df6 100644 --- a/tests/core/test_engine.py +++ b/tests/core/test_engine.py @@ -108,6 +108,8 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect): "version": 1, "description": None, "delayTime": 0, + "taskGroupId": 0, + "taskGroupPriority": 0, "taskType": "test-engine", "taskParams": { "mainClass": "org.apache.examples.mock.Mock", @@ -122,7 +124,6 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect): "waitStartTimeout": {}, }, "flag": "YES", - "isCache": "NO", "taskPriority": "MEDIUM", "workerGroup": "default", "environmentCode": None, diff --git a/tests/core/test_task.py b/tests/core/test_task.py index d34b6d2e..8c4b0b62 100644 --- a/tests/core/test_task.py +++ b/tests/core/test_task.py @@ -147,7 +147,7 @@ def test_task_timeout(value: timedelta, expect: Tuple[int, str]): }, { "localParams": ["foo", "bar"], - "resourceList": [{"resourceName": 1}], + "resourceList": [{"id": 1}], "dependence": {"foo", "bar"}, "waitStartTimeout": {"foo", "bar"}, "conditionResult": {"foo": ["bar"]}, @@ -156,7 +156,7 @@ def test_task_timeout(value: timedelta, expect: Tuple[int, str]): ], ) @patch( - "pydolphinscheduler.core.resource.Resource.get_fullname_from_database", + "pydolphinscheduler.core.resource.Resource.get_id_from_database", return_value=1, ) @patch( @@ -243,6 +243,8 @@ def test_task_get_define(): "version": version, "description": None, "delayTime": 0, + "taskGroupId": 0, + "taskGroupPriority": 0, "taskType": task_type, "taskParams": { "resourceList": [], @@ -252,7 +254,6 @@ def test_task_get_define(): "waitStartTimeout": {}, }, "flag": "YES", - "isCache": "NO", "taskPriority": "MEDIUM", "workerGroup": "default", "environmentCode": None, @@ -480,11 +481,11 @@ def test_task_obtain_res_plugin_exception(m_get_content, m_code_version, attr): [ ( ["/dev/test.py"], - [{"resourceName": 1}], + [{"id": 1}], ), ( - ["/dev/test.py", {"resourceName": 2}], - [{"resourceName": 1}, {"resourceName": 2}], + ["/dev/test.py", {"id": 2}], + [{"id": 1}, {"id": 2}], ), ], ) @@ -493,7 +494,7 @@ def test_task_obtain_res_plugin_exception(m_get_content, m_code_version, attr): return_value=(123, 1), ) @patch( - "pydolphinscheduler.core.resource.Resource.get_fullname_from_database", + "pydolphinscheduler.core.resource.Resource.get_id_from_database", return_value=1, ) @patch( diff --git a/tests/integration/test_resources.py b/tests/integration/test_resources.py index a8fd8f7d..33a18804 100644 --- a/tests/integration/test_resources.py +++ b/tests/integration/test_resources.py @@ -45,20 +45,15 @@ def tmp_user(): user.delete() -@pytest.mark.skip( - "activate it when dolphinscheduler default resource center is local file" -) def test_create_or_update(tmp_user): """Test create or update resource to java gateway.""" resource = Resource(name=name, content=content, user_name=UNIT_TEST_USER_NAME) - result = resource.create_or_update_resource() - assert result is not None and isinstance(result, JavaObject) - assert result.getAlias() == name + try: + resource.create_or_update_resource() + except Exception: + pytest.fail("Create or update resource failed.") -@pytest.mark.skip( - "activate it when dolphinscheduler default resource center is local file" -) def test_get_resource_info(tmp_user): """Test get resource info from java gateway.""" resource = Resource(name=name, user_name=UNIT_TEST_USER_NAME) diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py index f150820c..06f8d2ca 100644 --- a/tests/tasks/test_sql.py +++ b/tests/tasks/test_sql.py @@ -131,6 +131,7 @@ def test_get_sql_type( "type": "MYSQL", "datasource": 1, "sqlType": "0", + "segmentSeparator": "", "preStatements": [], "postStatements": [], "displayRows": 10, @@ -177,6 +178,7 @@ def test_sql_get_define(mock_datasource): "preStatements": [], "postStatements": [], "localParams": [], + "segmentSeparator": "", "resourceList": [], "dependence": {}, "conditionResult": {"successNode": [""], "failedNode": [""]},