Skip to content

Commit

Permalink
Partial task support docs (#1008)
Browse files Browse the repository at this point in the history
partial task support

Signed-off-by: Samhita Alla <[email protected]>
  • Loading branch information
samhita-alla authored Jun 29, 2023
1 parent 7f73fef commit c8864e3
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 182 deletions.
20 changes: 0 additions & 20 deletions cookbook/core/control_flow/dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,3 @@ def wf(s1: str, s2: str) -> int:
# ^^^^^^^^^^^^^^^^^^^^^^^^
#
# Dynamic tasks have overhead for large fan-out tasks because they store metadata for the entire workflow. In contrast, map tasks are efficient for these large fan-out tasks since they don’t store the metadata, as a consequence of which overhead is less apparent.
#

# %%
# .. panels::
# :header: text-center
# :column: col-lg-12 p-2
#
# .. link-button:: https://blog.flyte.org/dynamic-workflows-in-flyte
# :type: url
# :text: Blog Post
# :classes: btn-block stretched-link
# ^^^^^^^^^^^^
# An article on how to use Dynamic Workflows in Flyte.
#
# .. toctree::
# :maxdepth: -1
# :caption: Contents
# :hidden:
#
# Blog Post <https://blog.flyte.org/dynamic-workflows-in-flyte>
125 changes: 53 additions & 72 deletions cookbook/core/control_flow/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@
.. tags:: Intermediate
A map task lets you run a pod task or a regular task over a list of inputs within a single workflow node.
This means you can run thousands of instances of the task without creating a node for every instance, providing valuable performance gains!
A map task allows you to execute a pod task or a regular task on a series of inputs within a single workflow node.
This enables you to execute numerous instances of the task without having to create a node for each instance, resulting in significant performance improvements.
Some use cases of map tasks include:
Map tasks find application in various scenarios, including:
* Several inputs must run through the same code logic
* Multiple data batches need to be processed in parallel
* Hyperparameter optimization
- When multiple inputs require running through the same code logic.
- Processing multiple data batches concurrently.
- Conducting hyperparameter optimization.
Let's look at an example now!
Now, let's delve into an example!
"""

# %%
# First, import the libraries.
# First, import the necessary libraries.
from typing import List

from flytekit import Resources, map_task, task, workflow


# %%
# Next, define a task to use in the map task.
# Define a task to be used in the map task.
#
# .. note::
# A map task can only accept one input and produce one output.
Expand All @@ -44,11 +44,12 @@ def coalesce(b: List[str]) -> str:


# %%
# We send ``a_mappable_task`` to be repeated across a collection of inputs to the :py:func:`~flytekit:flytekit.map_task` function.
# In the example, ``a`` of type ``List[int]`` is the input.
# The task ``a_mappable_task`` is run for each element in the list.
# To repeat the execution of the ``a_mappable_task`` across a collection of inputs, use the :py:func:`~flytekit:flytekit.map_task` function from flytekit.
# In this example, the input ``a`` is of type ``List[int]``.
# The ``a_mappable_task`` is executed for each element in the list.
#
# ``with_overrides`` is useful to set resources for individual map task.
# You can utilize the ``with_overrides`` function to set resources specifically for individual map tasks.
# This allows you to customize resource allocations such as memory usage.
@workflow
def my_map_workflow(a: List[int]) -> str:
mapped_out = map_task(a_mappable_task)(a=a).with_overrides(
Expand All @@ -61,11 +62,12 @@ def my_map_workflow(a: List[int]) -> str:


# %%
# Lastly, we can run the workflow locally!
# Finally, you can run the workflow locally.
if __name__ == "__main__":
result = my_map_workflow(a=[1, 2, 3, 4, 5])
print(f"{result}")


# %%
# When defining a map task, avoid calling other tasks in it. Flyte
# can't accurately register tasks that call other tasks. While Flyte
Expand All @@ -79,6 +81,7 @@ def my_map_workflow(a: List[int]) -> str:
def upperhalf(a: int) -> int:
return a / 2 + 1


@task
def suboptimal_mappable_task(a: int) -> str:
inc = upperhalf(a=a)
Expand All @@ -87,82 +90,60 @@ def suboptimal_mappable_task(a: int) -> str:


# %%
#
# By default, the map task uses the K8s Array plugin. Map tasks can
# also run on alternate execution backends, such as
# `AWS Batch <https://docs.flyte.org/en/latest/deployment/plugin_setup/aws/batch.html#deployment-plugin-setup-aws-array>`__,
# a provisioned service that can scale to great sizes.
# By default, the map task utilizes the Kubernetes Array plugin for execution.
# However, map tasks can also be run on alternate execution backends.
# For example, you can configure the map task to run on `AWS Batch <https://docs.flyte.org/en/latest/deployment/plugin_setup/aws/batch.html#deployment-plugin-setup-aws-array>`__, a provisioned service that offers scalability for handling large-scale tasks.


# %%
# Map a Task with Multiple Inputs
# Map a task with multiple inputs
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
# You might need to map a task with multiple inputs.
#
# For example, we have a task that takes 3 inputs.
# For instance, consider a task that requires three inputs.
@task
def multi_input_task(quantity: int, price: float, shipping: float) -> float:
return quantity * price * shipping


# %%
# But we only want to map this task with the ``quantity`` input
# while the other inputs stay the same. Since a map task accepts only
# one input, we can do this by creating a new task that prepares the
# map task's inputs.
#
# We start by putting the inputs in a Dataclass and
# ``dataclass_json``. We also define our helper task to prepare the map
# task's inputs.
from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class MapInput:
quantity: float
price: float
shipping: float
# In some cases, you may want to map this task with only the ``quantity`` input, while keeping the other inputs unchanged.
# Since a map task accepts only one input, you can achieve this by partially binding values to the map task.
# This can be done using the :py:func:`functools.partial` function.
import functools


@workflow
def multiple_workflow(
list_q: List[int] = [1, 2, 3, 4, 5], p: float = 6.0, s: float = 7.0
) -> List[float]:
partial_task = functools.partial(multi_input_task, price=p, shipping=s)
return map_task(partial_task)(quantity=list_q)

@task
def prepare_map_inputs(list_q: List[float], p: float, s: float) -> List[MapInput]:
return [MapInput(q, p, s) for q in list_q]

# %%
# Then we refactor ``multi_input_task``. Instead of 3 inputs, ``mappable_task``
# has a single input.
# Another possibility is to bind the outputs of a task to partials.
@task
def mappable_task(input: MapInput) -> float:
return input.quantity * input.price * input.shipping
def get_price() -> float:
return 7.0


# %%
# Our workflow prepares a new list of inputs for the map task.
@workflow
def multiple_workflow(list_q: List[float], p: float, s: float) -> List[float]:
prepared = prepare_map_inputs(list_q=list_q, p=p, s=s)
return map_task(mappable_task)(input=prepared)
def multiple_workflow_with_task_output(
list_q: List[int] = [1, 2, 3, 4, 5], s: float = 6.0
) -> List[float]:
p = get_price()
partial_task = functools.partial(multi_input_task, price=p, shipping=s)
return map_task(partial_task)(quantity=list_q)


# %%
# We can run our multi-input map task locally.
if __name__ == "__main__":
result = multiple_workflow(list_q=[1.0, 2.0, 3.0, 4.0, 5.0], p=6.0, s=7.0)
print(f"{result}")
# You can also provide multiple lists as input to a ``map_task``.
@workflow
def multiple_workflow_with_lists(list_q: List[int] = [1, 2, 3, 4, 5], list_p: List[float] = [6.0, 9.0, 8.7, 6.5, 1.2], s: float = 6.0) -> List[float]:
partial_task = functools.partial(multi_input_task, shipping=s)
return map_task(partial_task)(quantity=list_q, price=list_p)

# %%
# .. panels::
# :header: text-center
# :column: col-lg-12 p-2
#
# .. link-button:: https://blog.flyte.org/map-tasks-in-flyte
# :type: url
# :text: Blog Post
# :classes: btn-block stretched-link
# ^^^^^^^^^^^^
# An article on how to use Map Taks in Flyte.
#
# .. toctree::
# :maxdepth: -1
# :caption: Contents
# :hidden:
#
# Blog Post <https://blog.flyte.org/map-tasks-in-flyte>
# .. note::
# It is important to note that you cannot provide a list as an input to a partial task.
Loading

0 comments on commit c8864e3

Please sign in to comment.