Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/source/tasks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ In this section
sql
python
http
sub_workflow

switch
condition
Expand All @@ -40,8 +41,6 @@ In this section
kubernetes

datax
sub_workflow

sagemaker
mlflow
openmldb
Expand Down
38 changes: 38 additions & 0 deletions docs/source/tasks/sub_workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,44 @@
Sub Workflow
============

Task trigger exists workflow run, should make sure workflow exists in current project when you create
sub workflow task.

Example
-------

we have a simple example about how to use sub workflow task, when we want to create a sub workflow task,
we should makeh sure in already exists in current project. So the first thing we do is to create a workflow
will be used as sub workflow task.

.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
:start-after: [start sub_workflow_declare]
:end-before: [end sub_workflow_declare]

workflow with name ``sub_workflow_upstream`` would be create after we exists ``submit`` method.

Then we create a main workflow, and the sub workflow task will connect to workflow we created before.

.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
:start-after: [start sub_workflow_task_declare]
:end-before: [end sub_workflow_task_declare]

Finish we can submit or run sub workflow task by ``submit`` or ``run`` method. And you can also use workflow
already exists in current project instead of create a new one.

.. note::

We could only run the workflow contains sub workflow task, and the sub workflow task will trigger the
sub workflow run.


.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sub_workflow_example.py
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]

Dive Into
---------

.. automodule:: pydolphinscheduler.tasks.sub_workflow


Expand Down
4 changes: 2 additions & 2 deletions src/pydolphinscheduler/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from py4j.protocol import Py4JJavaError

from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import gateway

Expand All @@ -34,7 +34,7 @@ class ProgramType(str):
PYTHON = "PYTHON"


class Engine(Task):
class Engine(BatchTask):
"""Task engine object, declare behavior for engine task to dolphinscheduler.

This is the parent class of spark, flink and mr tasks,
Expand Down
178 changes: 178 additions & 0 deletions src/pydolphinscheduler/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,181 @@ def add_out(

"""
self._output_params[name] = value


class BatchTask(Task):
"""Task object, parent class for all exactly task type.

:param name: The name of the task. Node names within the same workflow must be unique.
:param task_type:
:param description: default None
:param flag: default TaskFlag.YES,
:param task_priority: default TaskPriority.MEDIUM
:param worker_group: default configuration.WORKFLOW_WORKER_GROUP
:param environment_name: default None
:param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0.
:param task_group_priority: Priority for same task group to, the higher the value, the higher the
priority, default 0.
:param delay_time: deault 0
:param fail_retry_times: default 0
:param fail_retry_interval: default 1
:param timeout_notify_strategy: default, None
:param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the
running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will
be converted to int(in minutes). default ``None``
:param resource_list: default None
:param wait_start_timeout: default None
:param condition_result: default None,
:param resource_plugin: default None
:param is_cache: default False
:param input_params: default None, input parameters, {param_name: param_value}
:param output_params: default None, input parameters, {param_name: param_value}
"""

_DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"}

def __init__(
self,
name: str,
task_type: str,
description: str | None = None,
flag: str | None = TaskFlag.YES,
task_priority: str | None = TaskPriority.MEDIUM,
worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
environment_name: str | None = None,
task_group_id: int | None = 0,
task_group_priority: int | None = 0,
delay_time: int | None = 0,
fail_retry_times: int | None = 0,
fail_retry_interval: int | None = 1,
timeout_notify_strategy: str | None = None,
timeout: timedelta | int | None = None,
workflow: Workflow | None = None,
resource_list: list | None = None,
dependence: dict | None = None,
wait_start_timeout: dict | None = None,
condition_result: dict | None = None,
resource_plugin: ResourcePlugin | None = None,
is_cache: bool | None = False,
input_params: dict | None = None,
output_params: dict | None = None,
*args,
**kwargs,
):
super().__init__(
name,
task_type,
description,
flag,
task_priority,
worker_group,
environment_name,
task_group_id,
task_group_priority,
delay_time,
fail_retry_times,
fail_retry_interval,
timeout_notify_strategy,
timeout,
workflow,
resource_list,
dependence,
wait_start_timeout,
condition_result,
resource_plugin,
is_cache,
input_params,
output_params,
*args,
**kwargs,
)
self.task_execute_type = "BATCH"


class StreamTask(Task):
"""Task object, parent class for all exactly task type.

:param name: The name of the task. Node names within the same workflow must be unique.
:param task_type:
:param description: default None
:param flag: default TaskFlag.YES,
:param task_priority: default TaskPriority.MEDIUM
:param worker_group: default configuration.WORKFLOW_WORKER_GROUP
:param environment_name: default None
:param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0.
:param task_group_priority: Priority for same task group to, the higher the value, the higher the
priority, default 0.
:param delay_time: deault 0
:param fail_retry_times: default 0
:param fail_retry_interval: default 1
:param timeout_notify_strategy: default, None
:param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the
running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will
be converted to int(in minutes). default ``None``
:param resource_list: default None
:param wait_start_timeout: default None
:param condition_result: default None,
:param resource_plugin: default None
:param is_cache: default False
:param input_params: default None, input parameters, {param_name: param_value}
:param output_params: default None, input parameters, {param_name: param_value}
"""

_DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"}

def __init__(
self,
name: str,
task_type: str,
description: str | None = None,
flag: str | None = TaskFlag.YES,
task_priority: str | None = TaskPriority.MEDIUM,
worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP,
environment_name: str | None = None,
task_group_id: int | None = 0,
task_group_priority: int | None = 0,
delay_time: int | None = 0,
fail_retry_times: int | None = 0,
fail_retry_interval: int | None = 1,
timeout_notify_strategy: str | None = None,
timeout: timedelta | int | None = None,
workflow: Workflow | None = None,
resource_list: list | None = None,
dependence: dict | None = None,
wait_start_timeout: dict | None = None,
condition_result: dict | None = None,
resource_plugin: ResourcePlugin | None = None,
is_cache: bool | None = False,
input_params: dict | None = None,
output_params: dict | None = None,
*args,
**kwargs,
):
super().__init__(
name,
task_type,
description,
flag,
task_priority,
worker_group,
environment_name,
task_group_id,
task_group_priority,
delay_time,
fail_retry_times,
fail_retry_interval,
timeout_notify_strategy,
timeout,
workflow,
resource_list,
dependence,
wait_start_timeout,
condition_result,
resource_plugin,
is_cache,
input_params,
output_params,
*args,
**kwargs,
)
self.task_execute_type = "STREAM"
58 changes: 58 additions & 0 deletions src/pydolphinscheduler/examples/task_sub_workflow_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""A example workflow for task sub workflow."""

# [start tutorial]
# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.sub_workflow import SubWorkflow
from pydolphinscheduler.tasks.shell import Shell


# [start workflow_declare]
# [start sub_workflow_declare]
with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
name="task_sub_workflow_example"
) as wf_upstream:
sub_workflow_ds_task = Shell(
name="task_sub_workflow",
command="echo 'call sub workflow success!'",
workflow=wf_downstream,
)
wf_downstream.submit()
# [end sub_workflow_declare]

sub_workflow_pre = Shell(
name="pre-task",
command="echo 'prefix task for sub workflow'",
workflow=wf_upstream,
)
# [start sub_workflow_task_declare]
sw_task = SubWorkflow(
name="sub_workflow",
workflow_name=wf_downstream.name,
workflow=wf_upstream,
)
# [end sub_workflow_task_declare]
sub_workflow_pre >> sw_task
# Please make sure workflow with name `wf_downstream.name` exists when we submit or run sub workflow task
wf_upstream.run()
# [end workflow_declare]
4 changes: 2 additions & 2 deletions src/pydolphinscheduler/tasks/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from __future__ import annotations

from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.task import BatchTask, Task
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.models.base import Base

Expand Down Expand Up @@ -154,7 +154,7 @@ def __init__(self, *args):
super().__init__(*args)


class Condition(Task):
class Condition(BatchTask):
"""Task condition object, declare behavior for condition task to dolphinscheduler."""

def __init__(
Expand Down
6 changes: 3 additions & 3 deletions src/pydolphinscheduler/tasks/datax.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.mixin import WorkerResourceMixin
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.models.datasource import Datasource


class CustomDataX(WorkerResourceMixin, Task):
class CustomDataX(WorkerResourceMixin, BatchTask):
"""Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.

You provider json template for DataX, it can synchronize data according to the template you provided.
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
self.add_attr(**kwargs)


class DataX(WorkerResourceMixin, Task):
class DataX(WorkerResourceMixin, BatchTask):
"""Task DataX object, declare behavior for DataX task to dolphinscheduler.

It should run database datax job in multiply sql link engine, such as:
Expand Down
4 changes: 2 additions & 2 deletions src/pydolphinscheduler/tasks/dependent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import warnings

from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models.base import Base
Expand Down Expand Up @@ -272,7 +272,7 @@ def __init__(self, *args):
super().__init__(*args)


class Dependent(Task):
class Dependent(BatchTask):
"""Task dependent object, declare behavior for dependent task to dolphinscheduler."""

def __init__(self, name: str, dependence: DependentOperator, *args, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions src/pydolphinscheduler/tasks/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from copy import deepcopy

from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.task import BatchTask


class DvcTaskType(str):
Expand All @@ -32,7 +32,7 @@ class DvcTaskType(str):
UPLOAD = "Upload"


class BaseDVC(Task):
class BaseDVC(BatchTask):
"""Base class for dvc task."""

dvc_task_type = None
Expand Down
Loading