// Saga事务管理
type SagaTransaction struct {
ID string `json:"id"`
Steps []*SagaStep `json:"steps"`
Status SagaStatus `json:"status"`
CurrentStep int `json:"current_step"`
Context map[string]interface{} `json:"context"`
CreateAt time.Time `json:"create_at"`
UpdateAt time.Time `json:"update_at"`
}
type SagaStep struct {
Name string `json:"name"`
Action SagaAction `json:"-"`
Compensation SagaAction `json:"-"`
Status SagaStepStatus `json:"status"`
Data map[string]interface{} `json:"data"`
Error string `json:"error,omitempty"`
}
type SagaAction func(ctx context.Context, data map[string]interface{}) error
type SagaStatus string
type SagaStepStatus string
const (
SagaStatusPending SagaStatus = "pending"
SagaStatusRunning SagaStatus = "running"
SagaStatusCompleted SagaStatus = "completed"
SagaStatusFailed SagaStatus = "failed"
SagaStatusCompensating SagaStatus = "compensating"
SagaStatusCompensated SagaStatus = "compensated"
)
const (
SagaStepStatusPending SagaStepStatus = "pending"
SagaStepStatusRunning SagaStepStatus = "running"
SagaStepStatusCompleted SagaStepStatus = "completed"
SagaStepStatusFailed SagaStepStatus = "failed"
SagaStepStatusCompensated SagaStepStatus = "compensated"
)
// Saga管理器
type SagaManager struct {
storage SagaStorage
executor SagaExecutor
}
type SagaStorage interface {
Save(ctx context.Context, saga *SagaTransaction) error
Load(ctx context.Context, sagaID string) (*SagaTransaction, error)
Update(ctx context.Context, saga *SagaTransaction) error
}
type SagaExecutor interface {
Execute(ctx context.Context, saga *SagaTransaction) error
}
func NewSagaManager(storage SagaStorage, executor SagaExecutor) *SagaManager {
return &SagaManager{
storage: storage,
executor: executor,
}
}
func (m *SagaManager) StartSaga(ctx context.Context, steps []*SagaStep) (*SagaTransaction, error) {
saga := &SagaTransaction{
ID: generateSagaID(),
Steps: steps,
Status: SagaStatusPending,
CurrentStep: 0,
Context: make(map[string]interface{}),
CreateAt: time.Now(),
UpdateAt: time.Now(),
}
// 保存Saga事务
if err := m.storage.Save(ctx, saga); err != nil {
return nil, fmt.Errorf("failed to save saga: %w", err)
}
// 异步执行
go func() {
if err := m.executor.Execute(context.Background(), saga); err != nil {
log.Printf("Saga execution failed: %v", err)
}
}()
return saga, nil
}
// Saga执行器实现
type DefaultSagaExecutor struct {
storage SagaStorage
}
func NewDefaultSagaExecutor(storage SagaStorage) *DefaultSagaExecutor {
return &DefaultSagaExecutor{
storage: storage,
}
}
func (e *DefaultSagaExecutor) Execute(ctx context.Context, saga *SagaTransaction) error {
saga.Status = SagaStatusRunning
saga.UpdateAt = time.Now()
if err := e.storage.Update(ctx, saga); err != nil {
return fmt.Errorf("failed to update saga status: %w", err)
}
// 执行所有步骤
for i, step := range saga.Steps {
saga.CurrentStep = i
step.Status = SagaStepStatusRunning
if err := e.storage.Update(ctx, saga); err != nil {
log.Printf("Failed to update saga: %v", err)
}
// 执行步骤
if err := step.Action(ctx, step.Data); err != nil {
step.Status = SagaStepStatusFailed
step.Error = err.Error()
saga.Status = SagaStatusFailed
if err := e.storage.Update(ctx, saga); err != nil {
log.Printf("Failed to update saga: %v", err)
}
// 执行补偿
return e.compensate(ctx, saga, i)
}
step.Status = SagaStepStatusCompleted
if err := e.storage.Update(ctx, saga); err != nil {
log.Printf("Failed to update saga: %v", err)
}
}
// 所有步骤完成
saga.Status = SagaStatusCompleted
saga.UpdateAt = time.Now()
return e.storage.Update(ctx, saga)
}
func (e *DefaultSagaExecutor) compensate(ctx context.Context, saga *SagaTransaction, failedStep int) error {
saga.Status = SagaStatusCompensating
saga.UpdateAt = time.Now()
if err := e.storage.Update(ctx, saga); err != nil {
log.Printf("Failed to update saga: %v", err)
}
// 从失败步骤开始向前补偿
for i := failedStep - 1; i >= 0; i-- {
step := saga.Steps[i]
if step.Status != SagaStepStatusCompleted {
continue
}
if step.Compensation != nil {
if err := step.Compensation(ctx, step.Data); err != nil {
log.Printf("Compensation failed for step %s: %v", step.Name, err)
// 补偿失败,记录错误但继续
} else {
step.Status = SagaStepStatusCompensated
}
}
}
saga.Status = SagaStatusCompensated
saga.UpdateAt = time.Now()
return e.storage.Update(ctx, saga)
}
// 订单处理Saga示例
func CreateOrderSaga(userID, productID string, quantity int, price float64) []*SagaStep {
return []*SagaStep{
{
Name: "reserve_inventory",
Action: func(ctx context.Context, data map[string]interface{}) error {
// 预留库存
inventoryClient, _ := getInventoryClient()
return inventoryClient.ReserveInventory(ctx, productID, quantity)
},
Compensation: func(ctx context.Context, data map[string]interface{}) error {
// 释放库存
inventoryClient, _ := getInventoryClient()
return inventoryClient.ReleaseInventory(ctx, productID, quantity)
},
Data: map[string]interface{}{
"product_id": productID,
"quantity": quantity,
},
},
{
Name: "process_payment",
Action: func(ctx context.Context, data map[string]interface{}) error {
// 处理支付
paymentClient, _ := getPaymentClient()
return paymentClient.ProcessPayment(ctx, userID, price)
},
Compensation: func(ctx context.Context, data map[string]interface{}) error {
// 退款
paymentClient, _ := getPaymentClient()
return paymentClient.RefundPayment(ctx, userID, price)
},
Data: map[string]interface{}{
"user_id": userID,
"amount": price,
},
},
{
Name: "create_order",
Action: func(ctx context.Context, data map[string]interface{}) error {
// 创建订单
orderClient, _ := getOrderClient()
order := &Order{
UserID: userID,
TotalPrice: price,
Items: []OrderItem{
{
ProductID: productID,
Quantity: quantity,
Price: price,
},
},
}
return orderClient.CreateOrder(ctx, order)
},
Compensation: func(ctx context.Context, data map[string]interface{}) error {
// 取消订单
orderClient, _ := getOrderClient()
orderID := data["order_id"].(string)
return orderClient.CancelOrder(ctx, orderID)
},
Data: map[string]interface{}{
"user_id": userID,
"product_id": productID,
"quantity": quantity,
"price": price,
},
},
}
}
// 辅助函数
func generateSagaID() string {
return fmt.Sprintf("saga_%d_%s", time.Now().Unix(), generateRandomString(8))
}
func generateRandomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
}
return string(b)
}
func getInventoryClient() (*InventoryServiceClient, error) {
// 返回库存服务客户端
return nil, nil
}
func getPaymentClient() (*PaymentServiceClient, error) {
// 返回支付服务客户端
return nil, nil
}
func getOrderClient() (*OrderServiceClient, error) {
// 返回订单服务客户端
return nil, nil
}