Temporal Blog 09月30日 19:15
使用Temporal构建客户忠诚度计划:Actor模型实践
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入探讨如何利用Temporal Workflow构建一个模拟Actor模型的客户忠诚度计划。文章回顾了Actor模型的核心要素(消息传递、创建新Actor、维护状态),并以一个具体的客户忠诚度场景为例,展示了如何将Temporal Workflow实现这些Actor特性。通过详解代码示例(Go语言),文章演示了如何使用Signals处理消息,如何通过Continue-As-New管理长生命周期的Workflow,以及如何利用Activity中的Client API创建新的Workflow实例(Signal-with-Start),从而构建一个高度分布式、可扩展且状态持久的客户忠诚度系统。

🎭 **Actor模型核心要素与Temporal实现**:文章首先回顾了Actor模型的三大要素:发送和接收消息、创建新Actor、维护状态。并指出Temporal Workflow通过Signals实现消息收发,通过启动新Workflow实例实现Actor创建,而Workflow本身则负责维护状态,从而提供了一个通用而非特定于Actor模型的解决方案。

📈 **客户忠诚度计划的Workflow设计**:以客户忠诚度计划为例,文章详细阐述了如何将客户积分累加、奖励解锁、账户取消等逻辑映射到Temporal Workflow。重点介绍了使用Signal Channels(如'addPoints'和'cancelAccount')来接收外部事件,并通过Workflow内部逻辑更新客户状态(如LoyaltyPoints和AccountActive),确保了状态的持久性和可靠性。

🔄 **长生命周期管理与状态恢复**:为了应对Workflow事件历史的增长,文章引入了Temporal的'Continue-As-New'机制。通过设置事件历史长度阈值,当达到阈值时,Workflow会保存当前状态并重新启动一个新的实例,有效解决了长运行Workflow的历史膨胀问题,并保证了服务的连续性。

🌟 **跨Workflow交互与Actor创建**:文章强调了Actor模型中创建新Actor的能力,并展示了如何在Temporal中实现。通过在Activity中调用`Client.SignalWithStartWorkflow`,可以实现从一个Workflow向另一个Workflow发送信号并启动它的功能,这在客户可以赠送状态或积分给他人(即创建新客户账户)的场景下尤为关键。同时,还详细说明了如何处理`WorkflowExecutionAlreadyStarted`等错误,以确保Actor创建的健壮性。

Actors and Workflows Part 2: Building a Customer Loyalty Program#

A few weeks ago, I wrote about the core things needed to turn a Temporal Workflow into an "Actor." If you haven't read that post yet, I encourage you to give it a skim-through before reading this one. Here, I dive into a concrete example: a Workflow representing a customer's loyalty status.

If you want to skip the prose and just jump right into the code, you can find it all in this GitHub repository, with implementations in Go, Java, and Python.

Actor Model Refresher#

As formally defined, Actors must be able to do three things:

    Send and receive messages Create new Actors Maintain state

Exact implementation details vary depending on what framework, library, or tools you're using, but the biggest challenge is having some kind of software artifact running somewhere that can handle these things.

That's where most Actor frameworks come in to help: providing both the programming model and the runtime environment for being able to build an Actor-based application in a highly distributed, concurrent, and scalable way.

Temporal differs here in that it’s general-purpose, rather than specific to one model or system design pattern. With Workflows, you define a function that Temporal will ensure runs to completion (or reliably runs forever, if the function doesn’t return).

I recognize that statement is both rather bold and also so generic as to be hard to disprove. So, let's look at a concrete example.

Loyal Customers#

Many consumer businesses have some kind of loyalty program. Buy 10 items, get the 11th free! Fly 10,000 miles, get free access to the airport lounge! Earn one million points over the lifetime of your account, earn a gold star!

At the highest level, the application's logic isn't complex: Each customer has an integer counter that's incremented after the customer does certain things (e.g., buy something, or take a trip). When that counter crosses different thresholds, new rewards are unlocked. And, although we may not like it, customers can always close their accounts.

When we create the diagram for the app, it might look like this:

In terms of the Actor Model, two of the three requirements are on display:

    Send and receive messages: A customer can send either an "earn points" message or a "try to use reward" message. Create new Actors: ??? (This is the Actor requirement not apparent in this application, but we'll see later how it can be incorporated.) Maintain state: A customer loyalty account needs to maintain the points counter and which rewards are unlocked (or be able to look up this information based on the points value).

Requirement #2, the ability to create other Actors, isn't immediately obvious here, but it isn't too far out of reach. We could define in this example application that one of the rewards for earning enough points is the ability to gift status to someone else, inviting them (i.e., creating their account) to the program if they aren't already a member.

If our goal is to create a demo application for the Actor Model (as it is in this post), then there's actually one other thing missing: the ability for a customer (or rather, their loyalty account) to send messages. For that, we could also declare that customers with enough points can gift points or status levels (i.e., which rewards are unlocked) to their guests. Then they can send messages, too!

Reworking the previous diagram to be more befitting of a full "Actor," we'd get the following:

And, as for the exact implementation details, read on!

Loyal (Temporal!) Customers#

Imagine being able to write the customer loyalty program above in just a function or two. Conceptually, that's not hard. In pseudocode, that might look like the following:

INVITE_REWARD_MINIMUM_POINTS = 1000function CustomerLoyaltyAccount:  account_canceled = false points = 0    while !account_canceled:     message = receive_message()      switch message.type:         case 'cancel':               account_canceled = true          case 'add_points':               fallthrough          case 'gift_points':              points += message.value          case 'invite_guest'              if points >= INVITE_REWARD_MINIMUM_POINTS:                    spawn(new CustomerLoyaltyAccount())

But there are a few crucial details that are, well, rather undefined in this pseudo-function. Specifically:

    What's receive_message() doing? How is it receiving messages? Similarly, what's spawn(new CustomerLoyaltyAccount()) doing? And most importantly, where is this function running? What happens if that runtime crashes or the function otherwise stops running?

Each of these maps to core Temporal features that we can implement in an example Workflow:

    Data can be sent to Workflows via Signals Workflows can create new Workflow instances As long as there are Workers running somewhere that can communicate with the Temporal Server, then if the Worker running the function dies, the function will continue running on another (you know, kind of Temporal's main benefit)

Customers Go Loyal#

Let's build this up in Go. If you are more comfortable with other languages, I've also written the same Workflow in Python and Java. While the languages are different, most of the same concepts and patterns should carry over.

(For brevity in the body of this blog post, I'll in most cases omit error handling but include it when non-trivial and relevant.)

First, we write the skeleton of a Workflow and an Activity. For some of the milestones in a customer's lifecycle, it'd be nice to send them some kind of notification. In a real application, you'd call out to SendGrid, Mailchimp, Constant Contact, or some other email provider, but for simplicity's sake, I'm just logging out the details. This initial Workflow does just that: if it's a new customer, send a welcome email, but otherwise move on.

func CustomerLoyaltyWorkflow(ctx workflow.Context, customer CustomerInfo, newCustomer bool) error { logger := workflow.GetLogger(ctx) logger.Info("Loyalty workflow started.", "CustomerInfo", customer)  var activities Activities  if newCustomer {       logger.Info("New customer workflow; sending welcome email.")      err := workflow.ExecuteActivity(ctx, activities.SendEmail,            fmt.Sprintf("Welcome, %v, to our loyalty program!", customer.Name)).            Get(ctx, nil)       if err != nil {          logger.Error("Error running SendEmail activity for welcome email.", "Error", err)       }    } else {     logger.Info("Skipping welcome email for non-new customer.")   }  // ... [to be added later] ... //  return nil}type Activities struct {   Client client.Client}func (*Activities) SendEmail(ctx context.Context, body string) error {    logger := activity.GetLogger(ctx) logger.Info("Sending email.", "Contents", body) return nil}

Next up, we need to be able to handle messages. This is the primary thing the Workflow (i.e., customer loyalty Actor) does: sit around waiting for new messages to come in.

The following code replaces the // ... [to be added below] ... // line from the previous snippet:

  selector := workflow.NewSelector(ctx)  // Signal handler for adding points  selector.AddReceive(workflow.GetSignalChannel(ctx, "addPoints"),        func(c workflow.ReceiveChannel, _ bool) {            signalAddPoints(ctx, c, &customer)       }) // Signal handler for canceling account  selector.AddReceive(workflow.GetSignalChannel(ctx, "cancelAccount"),        func(c workflow.ReceiveChannel, _ bool) {            signalCancelAccount(ctx, c, &customer)       }) // ... [register other Signal handlers here] ... //  logger.Info("Waiting for new messages") for customer.AccountActive {     selector.Select(ctx)    }

The Signal handler function for adding points does very little, adding in the given points to the customer's state and then sending an email to the customer with the new value.

As you might imagine, the cancel account handler is very similar, setting the customer.AccountActive flag used above to false and then notifying the customer.

func signalAddPoints(ctx workflow.Context, c workflow.ReceiveChannel, customer *CustomerInfo) {   logger := workflow.GetLogger(ctx) var activities Activities    var pointsToAdd int    c.Receive(ctx, &pointsToAdd)   logger.Info("Adding points to customer account.", "PointsAdded", pointsToAdd)   customer.LoyaltyPoints += pointsToAdd  err := workflow.ExecuteActivity(ctx, activities.SendEmail,        fmt.Sprintf("You've earned more points! You now have %v.", customer.LoyaltyPoints)).        Get(ctx, nil)   if err != nil {      logger.Error("Error running SendEmail activity for added points.", "Error", err)    }  // ... [insert logic for unlocking status levels or rewards] ... //}

All combined, the code so far does three things:

    First, it registers the signalAddPoints and signalCancelAccount functions as the handlers for the "addPoints" and "cancelAccount" Signals, respectively. Then, it blocks forward progress on the Workflow, via selector.Select(ctx), until a registered Signal comes in. Unless that Signal is "cancelAccount," the Workflow will keep looping on this select. I've chosen for this application to not fail the Workflow when an email fails to send. This keeps the Workflow representing the customer's loyalty account active and running even in spite of external system failure.
      For that, you'll want to set an appropriate retry policy to ensure that the Workflow doesn't completely block on email failures, for example by setting the MaximumAttempts to a reasonably low number like 10.

Already this gives us most of the application. We have a function that runs perpetually, thanks to Temporal, and can receive two different kinds of messages, both of which modify the state of the Workflow with one that also results in the Workflow finishing.

What remains is a couple of more Temporal-specific considerations.

Long-Lived Customers#

In my last post, I spilled many words on the topic of "Continue-As-New." If you didn't—or don't want to!—read those words, the gist is this: at some point, a Workflow's history may get unwieldily big; Continue-As-New resets it.

For this customer loyalty example Workflow, the far-and-away biggest contributor to the Event History is the number of events, not the size. With the addPoints Signal only taking a single integer argument and the cancelAccount Signal taking none, the combined contribution to the size of the history is minimal.

A Signal with only a single integer parameter will, by itself, contribute one Event and about 500 bytes to the History, even with very large values. And so, how many of these Signals would be required to hit either the size or length limits?

If nothing else happened but addPoints Signals, it'd take 51,200 of them to reach the length limit, but 50 * 1024 * 1024 / 500 or 104,857.6 to reach the size limit. Knowing that many of these Signals will result in the SendEmail Activity running, and each Activity contributes a handful of (small) events to the history, this Workflow will hit the History length limit well before the size limit.

So, let's add a check for that into our Workflow loop:

    const eventsThreshold = 10_000  // ... snip ...    info := workflow.GetInfo(ctx)   logger.Info("Waiting for new messages")   for customer.AccountActive && info.GetCurrentHistoryLength() < eventsThreshold {       selector.Select(ctx)    }

Finally, trigger Continue-As-New as needed, draining any pending signals:

  if customer.AccountActive {      logger.Info("Account still active, but hit continue-as-new threshold.")       // Drain signals before continuing-as-new        for selector.HasPending() {          selector.Select(ctx)        }        return workflow.NewContinueAsNewError(ctx, CustomerLoyaltyWorkflow, customer, false) }

My previous post on this topic explained in a little more detail about why it's necessary to drain signals before continuing-as-new. To briefly recap, Continue-As-New finishes the current Workflow run and starts a new instance of the Workflow regardless of any pending Signals. If we don't drain (and handle!) Signals before calling workflow.NewContinueAsNewError (or workflow.continue_as_new in Python, or Workflow.continueAsNew in Java), those pending Signals will be forever lost.

The last major thing this Workflow needs to make it a true, stage-worthy Actor is the ability to create others.

Spawning New Customers#

While Temporal has support for Parent/Child relationships between Workflows, in this customer loyalty application, the only thing we need is the ability to send a message from one to the other in the case of gifting status or points.

Temporal provides an API in the Client that can do this and create other Workflows all in one call, called Signal-with-Start. Since this is only available in the Client, not from a Workflow, we'll need to do this in an Activity.

First, I'm setting the ID Reuse Policy to REJECT. This is in some ways a "business logic" kind of decision, where I'm declaring that once a customer's account is closed, it can't be re-invited. (Note that after a namespace's retention period has passed, IDs from closed Workflows can be reused regardless of this policy, and so in a real-life production version of this app, you'd want to have this check an external source for customer account statuses.)

func (a *Activities) StartGuestWorkflow(ctx context.Context, guest CustomerInfo) error {   // ...   workflowOptions := client.StartWorkflowOptions{     TaskQueue:             TaskQueue,     WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, }

Then, we can call Client.SignalWithStartWorkflow:

 logger.Info("Starting and signaling guest workflow.", "GuestID", guest.CustomerID)    _, err := a.Client.SignalWithStartWorkflow(ctx, CustomerWorkflowID(guest.CustomerID),       SignalEnsureMinimumStatus, guest.StatusLevel.Ordinal,     workflowOptions, CustomerLoyaltyWorkflow, guest, true)

Note the use of the Client from the Activities receiver struct! I'm making use of something in the way Temporal works in Go: if, when we instantiate and register the Activities in the Worker, we also set this Client, then the same connection will be available within the Activities. This way, we don't have to worry about re-creating the Client.

I'm also ignoring the returned future from SignalWithStartWorkflow via a Go convention of assigning to _; because this "guest" Workflow is expected to run indefinitely long, blocking on its results would prevent the original Workflow from doing anything else. Since the future returned from starting a Workflow is either used for waiting for the Workflow to finish, or getting its IDs (which we already know from the CustomerWorkflowID(guest.CustomerID) call), we can safely ignore it.

But, it's still necessary to handle the error. With the ID Reuse Policy set to REJECT, retrying the resulting error from trying to start a an already-closed Workflow will get us nowhere, and so we should instead send some useful information back to the Workflow:

    target := &serviceerror.WorkflowExecutionAlreadyStarted{}  if errors.As(err, &target) {        return GuestAlreadyCanceled, nil  } else if err != nil {       return -1, err   }  return GuestInvited, nil}// ... [Defined at top] ...type GuestInviteResult intconst (   GuestInvited GuestInviteResult = iota GuestAlreadyCanceled)

Back in the Workflow, after running this Activity I can then check for that error and notify the customer as appropriate. As before, I'm allowing the Workflow to continue if sending the email failed. But if that SignalWithStartWorkflow call failed for any reason other than the guest's account already existing, I want to make some noise and fail the Workflow—something unusual is likely happening.

var inviteResult GuestInviteResulterr := workflow.ExecuteActivity(ctx, activities.StartGuestWorkflow, guest).   Get(ctx, &inviteResult)if err != nil {  return fmt.Errorf("could not signal-with-start guest/child workflow for guest ID '%v': %w", guestID, err)}if inviteResult == GuestAlreadyCanceled {   emailToSend = "Your guest has canceled!"} else {  emailToSend = "Your guest has been invited!"}err := workflow.ExecuteActivity(ctx, activities.SendEmail, emailToSend).Get(ctx, nil)

This snippet of code would end up being in a Signal handler for something like an "invite guest" Signal. The handler would also include, as discussed at the top of this post, a check for if the current customer is even allowed to do this action.

Summing it all up#

There are a few other things to explore in this app, like catching a cancellation request or looking through the tests, but this post has gotten long enough as it is. 🙂

Hopefully this post serves as a nice "close-to-real-world" example for you of how to build something that looks like an "Actor"—aka, a really, really long running Workflow that can send and receive messages and maintain state without a database—using Temporal.

For more information related to this post and about Temporal, check out the following links:

And the best way to learn Temporal is with our free courses.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Temporal Workflow Actor Model Customer Loyalty Program Go Distributed Systems Concurrency Scalability State Management Signal-with-Start Continue-As-New
相关文章