Temporal Blog 09月30日
Temporal实现持久执行
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文探讨了Temporal如何提供持久执行功能。通过Client、Server和Worker三部分的协作,Temporal确保函数可靠执行,即使Worker进程失败也能从事件历史中恢复状态。文章详细解析了订单处理流程,包括函数调用、活动执行、信号传递和事件历史记录的机制,展示了Temporal如何透明处理失败并保证业务逻辑的完整性。

Temporal通过Client、Server和Worker三部分架构实现持久执行,其中Server维护数据库和状态,Client控制流程,Worker执行代码并轮询任务队列。

在订单处理流程中,Client发起订单函数调用,Server创建WorkflowTask并加入队列,Worker执行函数并通过事件历史记录跟踪执行状态,确保即使Worker重启也能恢复执行。

Temporal的机制包括活动调用与重试机制:当chargeCustomer活动因支付服务不可达失败时,Server会自动重新调度该活动任务,初始重试间隔为1秒,开发者无需额外配置。

信号传递机制允许外部事件实时更新工作流状态:例如司机在驾驶平台标记订单已送达时,通过Signal通知工作流状态变更,触发后续条件判断和流程继续执行。

事件历史记录是持久执行的核心,记录所有关键事件和状态变更,使得Worker重启后能通过查询历史记录恢复完整执行上下文,确保业务逻辑的原子性和一致性。

This is part 2 of the series, "Building Reliable Distributed Systems in Node.js." In part 1, we went over what durable execution is, its benefits, and what a durable function looks like. In this post, we'll look at how Temporal can provide durable execution.

A function that can't fail, can last forever, and doesn't need to store data in a database? Sounds like magic. There must be a gotcha—like only a small subset of the language can be used, or it only works on specialized hardware. But in fact, it's just JavaScript—you can use the whole language, and it runs on any server that can run Node.js.

So how does this all work? You can take a look at the How Temporal Works diagram, which explains the process with Go code. In this post, we'll go through the process with the TypeScript code from the previous post in the series.

Client ↔️ Server ↔️ Worker#

To start out, a Temporal application has three parts: the Client, the Server, and the Worker.

The Client and Worker both connect to the Server, which has a database and maintains state. The Client says things like "start the order() durable function," "send it a delivered Signal," and "terminate the function." The Worker is a long-running Node.js process that has our code and polls the Server for tasks. Tasks look like "run the order() durable function" or "run the normal sendPushNotification() function." After the Worker runs the code, it reports the result back to the Server.

In our delivery app, we create the Client in temporal-client.ts and use it in our Next.js serverless functions:

We create the Worker in apps/worker/src /worker.ts:

import { NativeConnection, Worker } from '@temporalio/worker'import * as activities from 'activities'import { taskQueue } from 'common'import { namespace, getConnectionOptions } from 'common/lib/temporal-connection'async function run() { const connection = await NativeConnection.connect(getConnectionOptions()) const worker = await Worker.create({   workflowsPath: require.resolve('../../../packages/workflows/'),   activities,   connection,   namespace,   taskQueue, }) await worker.run()}run().catch((err) => { console.error(err) process.exit(1)})

We pass the Worker our code:

And set up Render to automatically build and deploy on pushes to main:

Running each part#

In production, our web apps and their serverless functions are deployed to Vercel, our long-running Worker process is deployed to Render, and they both talk to a Server instance hosted by Temporal Cloud. The Server is an open source cluster of services which work with a database (SQL or Cassandra) and ElasticSearch. You can also host all that it yourself, or you can save a lot of time and get peace of mind with higher reliability and scale by paying the experts to host it 😄.

In development, we can run all three parts locally. First, we install the Temporal CLI, which has a development version of the Server:

    Homebrew: brew install temporal cURL: curl -sSf https://temporal.download/cli.sh | sh Manual: Download and extract the latest release and then add it to your PATH.

We start the Server with:

temporal server start-dev

In another terminal, with Node v16 or higher, run:

npx @temporalio/create@latest ./temporal-delivery --sample food-deliverycd ./temporal-deliverynpm run dev

The dev script runs the two Next.js web apps and the Worker. The menu is running at localhost:3000, where we can click "Order", and the driver portal is at localhost:3001, where we can mark the item we ordered as picked up and delivered. Once we've done that, we can see the "Delivered" status of the order in both of the web apps. We can also see the status of the corresponding Workflow Execution in the Server's Web UI at localhosthttps://temporal.io//images.ctfassets.net/0uuz8ydxyd9p/Sw3J2r6grbVAjYbnFmqvR/cedd05d1c4c7aa8a6724c1c3b9c7479d/workflow-list.pngaa8a6724c1c3b9c7479d/workflow-list.png" alt="workflow-list">

We can see its status is Completed, and when we click on it, we see the Event History—the list of events that took place during the execution ohttps://temporal.io//images.ctfassets.net/0uuz8ydxyd9p/2ajTFT9kt2L1mVWVA0tQAW/4f347016c009453e71e14088feb415db/event-history.pngA0tQAW/4f347016c009453e71e14088feb415db/event-history.png" alt="event-history">

The first event is always WorkflowExecutionStarted, which contains the type of Workflow being started—in this case, an order Workflow. We'll look more at events in the next section.

In the Queries tab, we can select the getStatus Query, which (assuming our Worker is still running) will send a Query to the order function, which responds that the order was deliverhttps://temporal.iohttps//images.ctfassets.net/0uuz8ydxyd9p/2FKGQju4hGXSWezu5X7gcU/c4591da83a62e0daa4797cee361fdf15/query.pnguz8ydxyd9p/2FKGQju4hGXSWezu5X7gcU/c4591da83a62e0daa4797cee361fdf15/query.png" alt="query">

Sequence of events#

Now let's look at what happened behind the scenes during our order.

Start order#

When we clicked the "Order" button, the API handler used a Temporal Client to send a start command to the Server:

The Server saves the WorkflowExecutionStarted event to the Event History and returns. It also creates a WorkflowTaskScheduled event, which results in a "Workflow Task" (an instruction to run the

The Worker receives the task, the Server adds the WorkflowTaskStarted Event, and the Worker performs the task—in this case, calling order(3). The order function runs until it hits this line:

const { chargeCustomer, refundOrder, sendPushNotification } = proxyActivities<typeof activities>({ ... })export async function order(productId: number): Promise<void> { ... await chargeCustomer(product)

Call Activity#

The Server adds the WorkflowTaskCompleted event (the Workflow Execution didn't complete—just the initial task of "run the ohttps://temporal.iohttps//images.ctfassets.net/0uuz8ydxyd9p/6M6GwWc9moyVD2PR5pLUcy/84d605c5ce4cefbb7d874c0e2a49b345/activity-task-scheduled.pngd arguments:

In development, we're only running a single Worker process, so it's getting all the tasks, but in production, we'll have enough Workers to handle our load, and any of them can pick up the Activity Task—not just the one that ran the order function.

The Worker follows the Activity Task instructions, running the chargeCustomer() function:

packages/activities/index.ts

Which calls paymentService.charge():

And the Server schedules a retry. The default initial interval is 1 second, so in 1 second, the Activity Task will be added bachttps://temporal.iohttps//images.ctfassets.net/0uuz8ydxyd9p/5O9Z6i0TvCzXwqcfZs32UJ/48f3e81e49654c497d30c4f5011807d3/activity-completed.png(and the return value, but in this case there is none) back to the Server:

The Server adds the ActivityTaskStarted and ActivityTaskCompleted events. Now that the Activity is completed, the order() function can continue executing, so the Server adds another WorkflowTaskScheduled event. It also adds a corresponding Workflow Task to the queue, which the Worker picks up (at which point the Server adds another WorkflowTaskStarted event).

Second Workflow Task#

If the Worker still has the execution context of the order function, it can just resolve the chargeCustomer(product) Promise, and the function will continue executing. If the Worker doesn't have the execution context (because it was evicted from cache in order to make room for another Workflow—see

This time, when the function hits the await chargeCustomer(product) line, the Worker knows from event 7, ActivityTaskCompleted, that chargeCustomer has already been run, so instead of sending a "Call Activity" command to the Server, it immediately resolves the Promise. The function continues running until the next await:

packages/workflows/order.ts

const notPickedUpInTime = !(await condition(() => state === 'Picked up', '1 minPicked up or 1 minute has passed. When it's called, the Worker tells the Server to set a timer:

The Server adds the WorkflowTaskCompleted and TimerStarted events to the Event History, and sets a timer in the database. When the timer goes off, a TimerFired event will be added along with a WorkflowTaskScheduled and Workflow Task on the queue telling the Worker the 1 minute is up, at which point the Worker will know to resolve the condition() Promise.

Send Signal#

But in our case, that didn't happen. Instead, before a minute was up, we clicked the "Pick up" button in the driver portal, which sent a pickedUp Signal to the Workflow:

WorkflowTaskScheduled event. Then a Workflow Task was added to the queue with the Signal info, which was picked up by the Worker:

The Worker then runs the pickedUp Signal handler:

setHandler(pickedUpSignal, () => { if (state === 'Paid') {   state = 'Picked up' }})

packages/workflows/order.ts

The handler changes the state to Picked up, and after Signal handlers have been called, the Worker runs all the condition() functions. Now () => state === 'Picked up' will return true, so the Worker will resolve the condition() Promise and cohttps://temporal.iohttps//images.ctfassets.net/0uuz8ydxyd9p/7cZfmMiaTyXULMlFjQjRlO/80372681650c47bac65f370d039d74d0/just-history.pnghe Server.

Event History#

All together, the part of the Event History we covered was:

The Event History is the core of what enables durable execution: a log of everything important that the Workflow does and gets sent. It allows us to Ctrl-C our Worker process, start it again, open up the UI, select our completed order Workflow, go to the Queries tab of the UI, and send the getStatus Query, which the Server will put on a queue for the Worker to pick up, which won't have the Workflow in cache, so it will fetch the Event History from the Server, and then call the order() function, immediately resolving any async functions with the original result from History, and then calling the getStatus handler function:

setHandler(getStatusQuery, () => { return { state, deliveredAt, productId }})

packages/workflows/order.ts

Since the whole function and all thhttps://temporal.iohttps//images.ctfassets.net/0uuz8ydxyd9p/2FKGQju4hGXSWezu5X7gcU/c4591da83a62e0daa4797cee361fdf15/query.pngl have their final values from when the function was originally executed, and they'll be returned to the Server, which returns them to the Client, which returns them to the UI to display on our screen:

Summary#

We examined how durable code works under the hood—how we can:

    Write functions that can't fail to complete executing (since when the Worker process dies, the next Worker to pick up the task will get the function's Event History and use the events to re-run the code until it's in the same state). Retry the functions that might have transient failures (if the chargeCustomer Activity can't reach the payment service, the Server automatically schedules another Activity Task).

The best part is that these failures are transparent to the application developer—we just write our Workflow and Activity code, and Temporal handles reliably executing it. 💃

In the next post, you’ll learn more things you can do with durable functions. To get notified when it comes out, you can follow us on Twitter or LinkedIn. Also, check out our new Temporal 101 course!

🖖 till next time! —Loren

Thanks to Brian Hogan, Roey Berman, Patrick Rachford, and Dail Magee Jr for reading drafts of this post.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Temporal 持久执行 分布式系统 Node.js 工作流
相关文章