Temporal Blog 09月30日
Temporal工作流中的事件驱动流程
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Temporal工作流允许通过信号和等待条件创建事件驱动的流程。工作流执行可以作为持久、可靠和可扩展的功能执行单元,通过SDK API等待各种可等待项,如握手、确认或信号。活动执行需要start_to_close_timeout参数来检测工作线程崩溃。通过设置信号处理程序和等待条件,可以在工作流中创建异步活动,实现活动间的依赖和事件触发。Temporal的回放功能使得工作流可以从任何客户端恢复,只需发送包含活动编号的信号。

📌 Temporal工作流通过信号和等待条件实现事件驱动流程,允许活动执行根据外部事件进行等待和触发。

🕒 活动执行需要start_to_close_timeout参数来检测工作线程崩溃,确保流程的可靠性和稳定性。

🔗 通过设置信号处理程序和等待条件,可以在工作流中创建异步活动,实现活动间的依赖和事件触发,形成灵活的事件驱动逻辑。

🔄 Temporal的回放功能使得工作流可以从任何客户端恢复,只需发送包含活动编号的信号,增强了流程的容错性和可恢复性。

🚀 事件驱动流程是Temporal工作流的核心特性,通过SDK API等待各种可等待项,如握手、确认或信号,构建复杂的业务逻辑。

In Community Threads, we dig into some of the most frequent and insightful questions that appear in our Temporal Community.

The question#

In today’s Thread, I will look at a question from a Temporal user who wants to know:

Is it possible to create event-based Workflows; and, if so, how do I make Activities within a Workflow wait for these events?

The context#

The full anatomy of a Workflow is out of the scope of this article, but as a durable, reliable, and scalable function execution, a Workflow Execution is the main unit of a Temporal application. Workflow Executions are instances of Workflow Definitions; think of a Workflow Definition as a series of expected steps or processes, and a Workflow Execution as a single “run” of that series of steps from start to finish.

Workflows use Signals to communicate between Workflow Execution instances, and Activities to communicate with the external environment.

The following diagrams shows the two ways Workflow Executions interact with the Temporal Platform: they can issue Commands, such as ScheduleActivityTask or StartTimer; and they can wait on Awaitables.

A Workflow Execution may only ever block progress on an Awaitable that is provided through a Temporal SDK API. Awaitables are provided when using APIs for the following:

    Awaiting: Progress can block using explicit "Await" APIs. Requesting cancellation of another Workflow Execution: Progress can block on confirmation that the other Workflow Execution is canceled. Sending a Signal: Progress can block on confirmation that the Signal sent. Spawning a Child Workflow Execution: Progress can block on confirmation that the Child Workflow Execution started, and on the result of the Child Workflow Execution. Spawning an Activity Execution: Progress can block on the result of the Activity Execution. Starting a Timer: Progress can block until the Timer fires.

In the context of this user’s question, each of their Activities require waiting on either a handshake, an acknowledgment, or some other type of Awaitable before proceeding to the next Activity. As shown above, this event-based flow is in the DNA of Temporal Workflows.

The Answer#

The question came from the Python SDK forum, so the example code for the response will most closely resemble a Python implementation.

The short answer is: yes! Because of the magic of Temporal and the Temporal-backed asyncio event loop in Python, you can combine variables and wait conditions to create async Activities within your Workflow Definitions. Executing Activities requires a start_to_close_timeout. The main purpose of this param is to detect when a Worker crashes after it has started executing an Activity Task.

You can then set up a signal handler within your Workflow, using set() for each event to send a signal to the Workflow to trigger the subsequent Activity.

Here is a sketch implementation:

@workflow.defnclass MyWorkflow:    def __init__(self):        self.stage = 0    @workflow.run    async def run(self) -> None:        workflow.logger.info("Executing activity 1")        await workflow.execute_activity(my_activity_1, start_to_close_timeout=timedelta(seconds=45))        workflow.logger.info("Finished activity 1")        await workflow.wait_condition(lambda: self.stage > 0)        workflow.logger.info("Executing activity 2")        await workflow.execute_activity(my_activity_2, start_to_close_timeout=timedelta(seconds=30))        workflow.logger.info("Finished activity 2")        await workflow.wait_condition(lambda: self.stage > 1)        workflow.logger.info("Executing activity 3")        await workflow.execute_activity(my_activity_3, start_to_close_timeout=timedelta(minutes=15))        workflow.logger.info("Finished activity 3")        await workflow.wait_condition(lambda: self.stage > 2)        workflow.logger.info("Executing activity 4")        await workflow.execute_activity(my_activity_4, start_to_close_timeout=timedelta(hours=2))        workflow.logger.info("Finished activity 4")    @workflow.signal    def advance_stage(self, stage: int) –> None:        self.stage = stage

This is a very normal Workflow and will work as expected even if it has to wait for weeks between events, the Worker crashes, etc. Thanks to Temporal’s replay functionality, to recover a Workflow you simply need to send a signal with the Activity number from a client anywhere, in any language as needed.

If you found this helpful, or if you have your own questions about Temporal that you’d like to see answered, you can join us in Slack to connect with other Temporal users and team members.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Temporal 事件驱动 工作流 信号 可等待项
相关文章