总结

我们将使用 gRPC 实现一个可靠的 Saga 模式,以管理微服务之间的分布式事务。我们将介绍基础知识,教你如何设置,并提供一些实用的代码示例。到最后,你将像专业指挥家一样协调微服务的分布式事务。

Saga 模式简介

在深入细节之前,让我们快速回顾一下 Saga 模式的基本概念:

  • Saga 是一系列本地事务的序列
  • 每个事务更新单个服务中的数据
  • 如果某个步骤失败,将执行补偿事务以撤销之前的更改

可以把它想象成分布式系统的高级撤销按钮。现在,让我们看看如何使用 gRPC 实现这一点。

为什么选择 gRPC 实现 Saga?

你可能会想,“为什么是 gRPC?我不能用 REST 吗?”当然可以,但 gRPC 带来了一些显著的优势:

  • 高效的二进制序列化(Protocol Buffers)
  • 强类型支持
  • 双向流
  • 内置的身份验证、负载均衡等支持

而且,它速度极快。谁不喜欢速度呢?

准备工作

让我们从在 Protocol Buffers 中定义服务开始。我们将创建一个简单的 OrderSaga 服务:

syntax = "proto3";

package ordersaga;

service OrderSaga {
  rpc StartSaga(SagaRequest) returns (SagaResponse) {}
  rpc CompensateSaga(CompensationRequest) returns (CompensationResponse) {}
}

message SagaRequest {
  string order_id = 1;
  double amount = 2;
}

message SagaResponse {
  bool success = 1;
  string message = 2;
}

message CompensationRequest {
  string order_id = 1;
}

message CompensationResponse {
  bool success = 1;
  string message = 2;
}

这为我们的基本服务设置了两个 RPC:一个用于启动 saga,另一个用于在出现问题时进行补偿。

实现 Saga 协调器

现在,让我们创建一个 Saga 协调器来协调我们的分布式事务。我们将使用 Go 语言进行示例,但你可以选择自己喜欢的语言。

package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"
    pb "path/to/your/proto"
)

type server struct {
    pb.UnimplementedOrderSagaServer
}

func (s *server) StartSaga(ctx context.Context, req *pb.SagaRequest) (*pb.SagaResponse, error) {
    // 在这里实现 saga 逻辑
    log.Printf("Starting saga for order: %s", req.OrderId)

    // 调用其他微服务执行分布式事务
    if err := createOrder(req.OrderId); err != nil {
        return &pb.SagaResponse{Success: false, Message: "Failed to create order"}, nil
    }

    if err := processPayment(req.OrderId, req.Amount); err != nil {
        // 补偿订单创建
        cancelOrder(req.OrderId)
        return &pb.SagaResponse{Success: false, Message: "Failed to process payment"}, nil
    }

    if err := updateInventory(req.OrderId); err != nil {
        // 补偿订单创建和支付
        cancelOrder(req.OrderId)
        refundPayment(req.OrderId, req.Amount)
        return &pb.SagaResponse{Success: false, Message: "Failed to update inventory"}, nil
    }

    return &pb.SagaResponse{Success: true, Message: "Saga completed successfully"}, nil
}

func (s *server) CompensateSaga(ctx context.Context, req *pb.CompensationRequest) (*pb.CompensationResponse, error) {
    // 在这里实现补偿逻辑
    log.Printf("Compensating saga for order: %s", req.OrderId)

    // 为每个步骤调用补偿方法
    cancelOrder(req.OrderId)
    refundPayment(req.OrderId, 0) // 你可能需要在某处存储金额
    restoreInventory(req.OrderId)

    return &pb.CompensationResponse{Success: true, Message: "Compensation completed"}, nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterOrderSagaServer(s, &server{})
    log.Println("Server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

// 实现这些函数以与其他微服务交互
func createOrder(orderId string) error { /* ... */ }
func processPayment(orderId string, amount float64) error { /* ... */ }
func updateInventory(orderId string) error { /* ... */ }
func cancelOrder(orderId string) error { /* ... */ }
func refundPayment(orderId string, amount float64) error { /* ... */ }
func restoreInventory(orderId string) error { /* ... */ }

这个实现展示了我们 Saga 协调器的基本结构。它处理分布式事务的主要逻辑,并在任何步骤失败时提供补偿机制。

处理失败和重试

在分布式系统中,失败不仅可能发生,而且是不可避免的。让我们为我们的 Saga 实现增加一些弹性:

func (s *server) StartSaga(ctx context.Context, req *pb.SagaRequest) (*pb.SagaResponse, error) {
    maxRetries := 3
    var err error

    for i := 0; i < maxRetries; i++ {
        err = s.executeSaga(ctx, req)
        if err == nil {
            return &pb.SagaResponse{Success: true, Message: "Saga completed successfully"}, nil
        }
        log.Printf("Attempt %d failed: %v. Retrying...", i+1, err)
    }

    // 如果我们已经用尽所有重试,进行补偿并返回错误
    s.CompensateSaga(ctx, &pb.CompensationRequest{OrderId: req.OrderId})
    return &pb.SagaResponse{Success: false, Message: "Saga failed after multiple retries"}, err
}

func (s *server) executeSaga(ctx context.Context, req *pb.SagaRequest) error {
    // 在这里实现实际的 saga 逻辑
    // ...
}

这个重试机制为我们的 Saga 提供了几次成功的机会,然后才放弃并启动补偿。

监控和日志记录

在处理分布式事务时,可见性是关键。让我们为我们的 Saga 协调器添加一些日志记录和指标:

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    sagaSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
        Name: "saga_success_total",
        Help: "The total number of successful sagas",
    })
    sagaFailureCounter = promauto.NewCounter(prometheus.CounterOpts{
        Name: "saga_failure_total",
        Help: "The total number of failed sagas",
    })
)

func (s *server) StartSaga(ctx context.Context, req *pb.SagaRequest) (*pb.SagaResponse, error) {
    log.Printf("Starting saga for order: %s", req.OrderId)
    defer func(start time.Time) {
        log.Printf("Saga for order %s completed in %v", req.OrderId, time.Since(start))
    }(time.Now())

    // ... (saga 逻辑)

    if err != nil {
        sagaFailureCounter.Inc()
        log.Printf("Saga failed for order %s: %v", req.OrderId, err)
        return &pb.SagaResponse{Success: false, Message: "Saga failed"}, err
    }

    sagaSuccessCounter.Inc()
    return &pb.SagaResponse{Success: true, Message: "Saga completed successfully"}, nil
}

这些指标可以轻松集成到像 Prometheus 这样的监控系统中,为你提供 Saga 性能的实时洞察。

测试你的 Saga

测试分布式事务可能很棘手,但至关重要。以下是如何测试你的 Saga 协调器的简单示例:

func TestStartSaga(t *testing.T) {
    // 设置一个模拟服务器
    s := &server{}

    // 创建一个测试请求
    req := &pb.SagaRequest{
        OrderId: "test-order-123",
        Amount:  100.50,
    }

    // 调用 StartSaga 方法
    resp, err := s.StartSaga(context.Background(), req)

    // 断言结果
    if err != nil {
        t.Errorf("StartSaga returned an error: %v", err)
    }
    if !resp.Success {
        t.Errorf("StartSaga failed: %s", resp.Message)
    }
}

记得也要测试失败场景和补偿逻辑!

总结

到这里,我们已经使用 gRPC 实现了一个可靠的 Saga 模式来管理分布式事务。让我们回顾一下我们学到的内容:

  • Saga 模式帮助管理微服务之间的分布式事务
  • gRPC 提供了一种高效、强类型的方式来实现 Saga
  • 适当的错误处理和重试对于弹性至关重要
  • 监控和日志记录为你的分布式事务提供了可见性
  • 测试虽然具有挑战性,但对于可靠的 Saga 至关重要

记住,分布式事务是复杂的。这种实现是一个起点,你可能需要根据具体的用例进行调整。但有了这些知识,你已经在驯服分布式事务怪兽的路上了。

思考题

在你离开之前,这里有一些问题供你思考:

  • 你将如何处理可能超过 gRPC 超时限制的长时间运行的 Saga?
  • 你可以采用哪些策略使你的 Saga 协调器本身具有容错能力?
  • 你如何将这种 Saga 模式与现有的事件驱动架构集成?

祝编码愉快,愿你的事务始终保持一致!