Temporal Blog 09月30日
Temporal vs 传统任务队列
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Temporal 是一种比传统 Node.js 任务队列(如 Bull、Bee Queue、Agenda)更强大的工作流管理系统。它通过 Workflows 和 Activities 框架提供更优的类型安全、持久化、重试机制和可视化 UI。Temporal 支持复杂的多步骤任务编排,例如发送带延迟和条件逻辑的欢迎邮件系列,而传统队列通常需要多个独立任务。其重试策略和信号机制使任务处理更可靠,适合需要高可用性和可观测性的生产环境。

Temporal 通过 Workflows 和 Activities 架构替代传统任务队列,提供更强大的任务编排能力,可以在单个流程中处理复杂的多步骤任务,而传统队列通常需要多个独立任务。

Temporal 支持 TypeScript 强类型检查,确保任务参数与处理函数预期一致,而传统队列如 Bull 缺乏此功能,可能导致运行时错误。

Temporal 内置重试机制和指数退避策略,可自动重试失败任务,传统队列如 Bull 默认不支持重试,需要额外实现。

Temporal 支持持久化延迟操作(sleep),可以安全地在 Workflows 中安排未来执行的任务,传统队列通常需要外部调度器辅助。

Temporal 提供 UI 可视化界面,可实时监控 Workflows 和 Activities 的执行状态,传统队列的监控功能相对基础。

There are numerous task queues for Node.js. Bull is the most popular, but Bee Queue, Agenda, and Kue are also common.

All these task queue libraries solve a similar problem. They let you put a task onto a queue and define a worker that consumes tasks from the queue. While Temporal is much more than a task queue, Temporal does solve the same problems as these task queue libraries. Plus, Temporal's feature set provides several advantages over conventional task queues.

Setting Up a Task Queue With Temporal#

A common use case for a task queue is sending a welcome email when a new user registers. Sending email is a good candidate for a task queue because sending an email often requires a slow API call and is more of a "side effect" of registering the user. The following is a basic example of defining an email task queue with Bull. If you're interested in a more complete tutorial, here's a tutorial about building an email queue with Bull in TypeScript.

email-queue.js

const Queue = require('bull');const axios = require('axios');const emailQueue = new Queue('email');const mailgunAPI = 'https://api.mailgun.net/v3';emailQueue.process(async function sendEmail(job) {  const { to, from, subject, html } = job.data;  await axios({    url: `${mailgunAPI}/${domain}/messages`,    method: 'post',    params: { to, from, subject, html },    auth: {      username: 'api',      password: apiKey    }  })});

create-test-task.js

const Queue = require('bull');const emailQueue = new Queue('email');emailQueue.add({  to: 'john.smith@gmail.com',  from: 'test@temporal.io',  subject: 'Welcome to Temporal!',  html: 'This is a test email'});

With Temporal, the setup is similar. A task or job corresponds to a Workflow. But, first, you need to define an Activity to send the email as follows.

src/activities.ts

import axios from 'axios';import { SendEmailParams } from './interfaces';interface MailgunSettings {  apiKey: string;  domain: string;}const mailgunAPI = 'https://api.mailgun.net/v3';export const createActivities = ({ apiKey, domain }: MailgunSettings) => ({  async sendEmail(params: SendEmailParams): Promise<void> {    const { to, from, subject, html } = params;    await axios({      url: `${mailgunAPI}/${domain}/messages`,      method: 'post',      params: { to, from, subject, html },      auth: {        username: 'api',        password: apiKey,      },    });  },});

Next, you need to define a Workflow that calls this activity.

src/workflows.ts

import { proxyActivities } from '@temporalio/workflow';import type { createActivities } from './activities';import { SendEmailParams } from './interfaces';const { sendEmail } = proxyActivities<ReturnType<typeof createActivities>>({  startToCloseTimeout: '5m',});export async function sendEmailWorkflow(params: SendEmailParams): Promise<void> {  await sendEmail(params);}

Finally, you need to create a Worker to process tasks as follows.

src/worker.ts

import { Worker } from '@temporalio/worker';import dotenv from 'dotenv';import { createActivities } from './activities';dotenv.config();const {  MAILGUN_API: apiKey,  MAILGUN_DOMAIN: domain} = process.env;async function run(): Promise<void> {  if (apiKey === undefined) throw new Error('No API key');  if (domain === undefined) throw new Error('No domain');  const activities = createActivities({    apiKey,    domain,  });  const worker = await Worker.create({    taskQueue: 'email',    activities,    workflowsPath: require.resolve('./workflows'),  });  await worker.run();}run().catch((err) => {  console.error(err);  process.exit(1);});

Finally, to start a Workflow, you need to create a Client and start() the Workflow as follows.

src/client.ts

import { Client } from '@temporalio/client';import { sendEmailWorkflow } from './workflows';import { nanoid } from 'nanoid';const client = new Client();const handle = await client.workflow.start(sendEmailWorkflow, {  args: [{    to: 'john.smith@gmail.com',    from: 'test@temporal.io',    subject: 'Welcome to Temporal!',    html: 'This is a test email'  }],  taskQueue: 'email',  workflowId: 'workflow-' + nanoid(),});

The Temporal code is slight more verbose because you need a separate Activity function. However, Temporal has several features that make it an excellent option for the task queue use case.

Additional Features: UI, Type Checking, Retries, Delays#

Most Node.js task queues have a corresponding UI for monitoring and managing tasks. For example, Taskforce for Bull, or Agendash for Agenda. The Temporal UI lets you manage and monitor Workflows in a similar way.

The Temporal UI for an execution of sendEmailWorkflow() looks like the following. It displays when the sendEmailWorkflow() started and ended, when the sendEmail() activity started and ended, and the data sendEmail() returned.

Temporal has stronger built-in type safety than conventional task queues. For example, Bull doesn't provide a way for TypeScript to tie the email queue to the arguments that the sendEmail() handler expects. You can call emailQueue.add(args) with an arbitrarily typed argument. On the other hand, TypeScript type checks args in the following code to make sure it lines up with the parameters sendEmailWorkflow() expects.

const handle = await client.workflow.start(sendEmailWorkflow, {  args: [{    to: 'john.smith@gmail.com',    from: 'test@temporal.io',    subject: 'Welcome to Temporal!',    html: 'This is a test email'  }], // TypeScript can type check `args`  taskQueue: 'email',  workflowId: 'workflow-' + nanoid(),});

Suppose the Mailgun API has a temporary issue. Ideally, your task queue would retry sending the email automatically a fixed number of times before marking the task as failed. Some task queues support retrying failed jobs: Bull does not, but Bee Queue does.

Temporal supports retrying activities with exponential backoff via retry policies. For example, the following code shows how you might configure Temporal to retry the sendEmail() activity at most 2 times.

src/workflows.ts

import { proxyActivities } from '@temporalio/workflow';import type { createActivities } from './activities';import { SendEmailParams } from './interfaces';interface SendEmailParams {  to: string;  from: string;  subject: string;  html: string;}const { sendEmail } = proxyActivities<ReturnType<typeof createActivities>>({  retry: {    maximumAttempts: 2 // Retry `sendEmail()` twice before failing  },  startToCloseTimeout: '5m',});export async function sendEmailWorkflow(params: SendEmailParams): Promise<void> {  await sendEmail(params);}

Suppose you want to delay sending your email for 24 hours, because you want to remind the user of something. Many task queues also support delaying processing a task. For example, Bull lets you specify a delay in milliseconds when adding a task to a queue.

Temporal Workflows are durable, which means you can safely put a sleep() in your Workflow to add a delay.

src/workflows.ts

import { proxyActivities, sleep } from '@temporalio/workflow';import type { createActivities } from './activities';import { SendEmailParams } from './interfaces';const { sendEmail } = proxyActivities<ReturnType<typeof createActivities>>({  startToCloseTimeout: '5m',});export async function sendEmailWorkflow(params: SendEmailParams): Promise<void> {  const delay = params.delay;  if (delay !== undefined) {    await sleep(delay);  }  await sendEmail(params);}

The ability to use sleep() in your Workflows without sacrificing durability means you can write multiple "tasks" in one Workflow. If you want to send another email after a certain period of time, you can add it to your single Workflow rather than scheduling multiple tasks in a traditional task queue. Let's take a look at how you can use Workflows to represent a more sophisticated series of tasks.

Multi-Part Tasks#

Many modern apps don't just send one welcome email. They often have a whole series of emails over several days that help new customers get acquainted with the app. With a traditional task queue, you would need a separate task for every email in the series for every customer. Add in conditional logic, like sending a different email if a customer has no interactions with the app, and implementing a multi-part email series can be an extremely complex endeavor.

With Temporal, you can send multiple emails from one Workflow. In the following code, the customerWelcomeSeriesWorkflow() sends multiple emails separated by 24 hours.

src/workflows.ts

import { proxyActivities, sleep } from '@temporalio/workflow';import type { createActivities } from './activities';interface CustomerWelcomeSeriesParams {  to: string;  from: string;}const { sendEmail } = proxyActivities<ReturnType<typeof createActivities>>({  retry: {    maximumAttempts: 2  },  startToCloseTimeout: '5m',});export async function customerWelcomeSeriesWorkflow(params: CustomerWelcomeSeriesParams): Promise<void> {  const { from, to } = params;  await sendEmail({    to,    from,    subject: 'Welcome to Temporal!',    html: 'This is a test email'  });  await sleep('1 day'); // Wait for 24 hours  await sendEmail({    to,    from,    subject: 'Your First Day Using Temporal',    html: 'This is a test email'  });  await sleep('1 day'); // Wait for 24 hours  await sendEmail({    to,    from,    subject: 'Advanced Temporal Workflows',    html: 'This is a test email'  });}

You can also conditionally schedule emails based on user actions using Signals. For example, your app can send a Signal to customerWelcomeSeriesWorkflow() when a user logs in, so customerWelcomeSeriesWorkflow() can send a different email if a new customer hasn't logged in for 24 hours.

src/workflows.ts

import { defineSignal, proxyActivities, sleep } from '@temporalio/workflow';import type { createActivities } from './activities';export const loggedInSignal = defineSignal('loggedIn');interface CustomerWelcomeSeriesParams {  to: string;  from: string;}const { sendEmail } = proxyActivities<ReturnType<typeof createActivities>>({  retry: {    maximumAttempts: 2  },  startToCloseTimeout: '5m',});export async function customerWelcomeSeriesWorkflow(params: CustomerWelcomeSeriesParams): Promise<void> {  const { from, to } = params;  // Assume app sends a signal to this Workflow when a customer logs in  let hasLoggedIn = false;  setHandler(loggedInSignal, () => {    hasLoggedIn = true;  });  await sendEmail({    to,    from,    subject: 'Welcome to Temporal!',    html: 'This is a test email'  });  await sleep('1 days');  // Send a different email if user hasn't logged in during first 24 hours  if (!hasLoggedIn) {    await sendEmail({      to,      from,      subject: 'Looks Like You Haven\'t Tried Temporal Yet...',      html: 'This is a test email'    });  } else {    await sendEmail({      to,      from,      subject: 'Your First Day Using Temporal',      html: 'This is a test email'    });  }}

While Temporal Workflows can execute just one task, they can also execute a whole series of tasks. With a conventional task queue, you would have to schedule a separate task for each email in customerWelcomeSeriesWorkflow(). Temporal Workflows make writing multi-part tasks much easier, because you can write the task orchestration logic in a single JavaScript function, rather than spread out across multiple task handlers.

Moving On#

Temporal is not a task queue, but you can use Temporal Workflows to solve the same problem that task queues solve: sending tasks to workers using a queue in a durable and observable way. Temporal provides a lot of the same features as task queues, including retries, delays, and a sleek UI. But Temporal Workflows also help you write more complex multi-step tasks in a single JavaScript function, including logic that would require multiple tasks in a traditional task queue. So next time you're looking for a Node.js task queue, try Temporal instead!

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Temporal Node.js 任务队列 工作流 TypeScript 重试机制 持久化
相关文章