graph TB
GW[API Gateway] --> U[User Service]
GW --> T[Token Service]
GW --> C[Channel Service]
GW --> L[Log Service]
MQ[(Message Bus)] --- U
MQ --- T
MQ --- C
U --> UDB[(User DB)]
T --> TDB[(Token DB)]
C --> CDB[(Channel DB)]
L --> LStore[(Log Store)]
graph TB
subgraph "单体架构"
A[Web层] --> B[业务逻辑层]
B --> C[数据访问层]
C --> D[(数据库)]
end
subgraph "微服务架构"
E[API网关] --> F[用户服务]
E --> G[订单服务]
E --> H[支付服务]
E --> I[通知服务]
F --> J[(用户数据库)]
G --> K[(订单数据库)]
H --> L[(支付数据库)]
I --> M[(消息队列)]
end
flowchart TD
A["开始拆分分析"] --> B["识别业务领域"]
B --> C["分析数据依赖"]
C --> D["评估团队结构"]
D --> E{"是否有明确边界?"}
E -->|是| F["按业务领域拆分"]
E -->|否| G["分析数据流"]
G --> H{"数据耦合度高?"}
H -->|是| I["保持单体或粗粒度拆分"]
H -->|否| J["按数据边界拆分"]
F --> K["定义服务接口"]
J --> K
I --> L["重新评估拆分策略"]
K --> M["验证服务独立性"]
M --> N{"服务可独立部署?"}
N -->|是| O["拆分完成"]
N -->|否| P["调整服务边界"]
P --> K
L --> B
// 用户领域
package user
type User struct {
ID string `json:"id" gorm:"primaryKey"`
Username string `json:"username" gorm:"uniqueIndex"`
Email string `json:"email" gorm:"uniqueIndex"`
Status string `json:"status"`
Profile Profile `json:"profile" gorm:"embedded"`
CreateAt time.Time `json:"create_at"`
UpdateAt time.Time `json:"update_at"`
}
type Profile struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Avatar string `json:"avatar"`
Bio string `json:"bio"`
}
// 订单领域
package order
type Order struct {
ID string `json:"id" gorm:"primaryKey"`
UserID string `json:"user_id" gorm:"index"`
Status OrderStatus `json:"status"`
TotalPrice float64 `json:"total_price"`
Items []OrderItem `json:"items" gorm:"foreignKey:OrderID"`
CreateAt time.Time `json:"create_at"`
UpdateAt time.Time `json:"update_at"`
}
type OrderItem struct {
ID string `json:"id" gorm:"primaryKey"`
OrderID string `json:"order_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type OrderStatus string
const (
OrderStatusPending OrderStatus = "pending"
OrderStatusPaid OrderStatus = "paid"
OrderStatusShipped OrderStatus = "shipped"
OrderStatusDelivered OrderStatus = "delivered"
OrderStatusCancelled OrderStatus = "cancelled"
)
// 服务数据访问层抽象
type Repository interface {
Create(ctx context.Context, entity interface{}) error
GetByID(ctx context.Context, id string, entity interface{}) error
Update(ctx context.Context, entity interface{}) error
Delete(ctx context.Context, id string) error
List(ctx context.Context, filter interface{}, entities interface{}) error
}
// 用户服务数据访问层
type UserRepository struct {
db *gorm.DB
}
func NewUserRepository(db *gorm.DB) *UserRepository {
return &UserRepository{db: db}
}
func (r *UserRepository) Create(ctx context.Context, user *User) error {
return r.db.WithContext(ctx).Create(user).Error
}
func (r *UserRepository) GetByID(ctx context.Context, id string) (*User, error) {
var user User
err := r.db.WithContext(ctx).First(&user, "id = ?", id).Error
if err != nil {
return nil, err
}
return &user, nil
}
func (r *UserRepository) GetByUsername(ctx context.Context, username string) (*User, error) {
var user User
err := r.db.WithContext(ctx).First(&user, "username = ?", username).Error
if err != nil {
return nil, err
}
return &user, nil
}
// 订单服务数据访问层
type OrderRepository struct {
db *gorm.DB
}
func NewOrderRepository(db *gorm.DB) *OrderRepository {
return &OrderRepository{db: db}
}
func (r *OrderRepository) Create(ctx context.Context, order *Order) error {
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Create(order).Error; err != nil {
return err
}
for i := range order.Items {
order.Items[i].OrderID = order.ID
if err := tx.Create(&order.Items[i]).Error; err != nil {
return err
}
}
return nil
})
}
func (r *OrderRepository) GetByUserID(ctx context.Context, userID string) ([]*Order, error) {
var orders []*Order
err := r.db.WithContext(ctx).Preload("Items").Find(&orders, "user_id = ?", userID).Error
return orders, err
}
sequenceDiagram
participant Client
participant GW as API Gateway
participant Svc as Domain Service
participant MQ as Message Bus
Client->>GW: HTTP Request
GW->>Svc: Validate + Route
Svc-->>Client: Response
Svc-->>MQ: Emit Event (async)
MQ-->>Svc: Retry/Consumer (out of band)
graph TB
subgraph "同步通信"
A["客户端"] -->|"HTTP/gRPC请求"| B["服务A"]
B -->|"调用"| C["服务B"]
C -->|"响应"| B
B -->|"响应"| A
D["特点:"]
D --> E["实时响应"]
D --> F["强一致性"]
D --> G["服务耦合"]
end
subgraph "异步通信"
H["服务A"] -->|"发布事件"| I["消息队列"]
I -->|"订阅"| J["服务B"]
I -->|"订阅"| K["服务C"]
L["特点:"]
L --> M["解耦合"]
L --> N["最终一致性"]
L --> O["高可用性"]
end
graph TB
subgraph "配置管理系统"
A["配置管理界面"] --> B["配置中心"]
B --> C["配置存储"]
C --> D[("Consul/etcd")]
B --> E["配置推送"]
E --> F["服务A"]
E --> G["服务B"]
E --> H["服务C"]
F --> I["本地缓存"]
G --> J["本地缓存"]
H --> K["本地缓存"]
L["配置变更监听"]
B --> L
L --> F
L --> G
L --> H
end
subgraph "配置特性"
M["动态更新"]
N["版本管理"]
O["环境隔离"]
P["权限控制"]
end
// 配置中心接口
type ConfigCenter interface {
Get(key string) (string, error)
Set(key, value string) error
Watch(key string, callback ConfigChangeCallback) error
GetAll(prefix string) (map[string]string, error)
}
type ConfigChangeCallback func(key, oldValue, newValue string)
// Consul配置中心实现
type ConsulConfigCenter struct {
client *consulapi.Client
watchers map[string][]ConfigChangeCallback
mu sync.RWMutex
}
func NewConsulConfigCenter(address string) (*ConsulConfigCenter, error) {
config := consulapi.DefaultConfig()
config.Address = address
client, err := consulapi.NewClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create consul client: %w", err)
}
return &ConsulConfigCenter{
client: client,
watchers: make(map[string][]ConfigChangeCallback),
}, nil
}
func (c *ConsulConfigCenter) Get(key string) (string, error) {
kv := c.client.KV()
pair, _, err := kv.Get(key, nil)
if err != nil {
return "", fmt.Errorf("failed to get config: %w", err)
}
if pair == nil {
return "", fmt.Errorf("config key not found: %s", key)
}
return string(pair.Value), nil
}
func (c *ConsulConfigCenter) Set(key, value string) error {
kv := c.client.KV()
pair := &consulapi.KVPair{
Key: key,
Value: []byte(value),
}
_, err := kv.Put(pair, nil)
if err != nil {
return fmt.Errorf("failed to set config: %w", err)
}
return nil
}
func (c *ConsulConfigCenter) Watch(key string, callback ConfigChangeCallback) error {
c.mu.Lock()
c.watchers[key] = append(c.watchers[key], callback)
c.mu.Unlock()
go c.watchKey(key)
return nil
}
func (c *ConsulConfigCenter) watchKey(key string) {
kv := c.client.KV()
var lastIndex uint64
for {
opts := &consulapi.QueryOptions{
WaitIndex: lastIndex,
WaitTime: 30 * time.Second,
}
pair, meta, err := kv.Get(key, opts)
if err != nil {
log.Printf("Failed to watch config key %s: %v", key, err)
time.Sleep(5 * time.Second)
continue
}
if meta.LastIndex <= lastIndex {
continue
}
lastIndex = meta.LastIndex
var newValue string
if pair != nil {
newValue = string(pair.Value)
}
c.mu.RLock()
callbacks := c.watchers[key]
c.mu.RUnlock()
for _, callback := range callbacks {
go callback(key, "", newValue)
}
}
}
// 配置管理器
type ConfigManager struct {
center ConfigCenter
cache map[string]string
mu sync.RWMutex
}
func NewConfigManager(center ConfigCenter) *ConfigManager {
return &ConfigManager{
center: center,
cache: make(map[string]string),
}
}
func (m *ConfigManager) GetString(key string, defaultValue string) string {
m.mu.RLock()
if value, exists := m.cache[key]; exists {
m.mu.RUnlock()
return value
}
m.mu.RUnlock()
value, err := m.center.Get(key)
if err != nil {
log.Printf("Failed to get config %s: %v", key, err)
return defaultValue
}
m.mu.Lock()
m.cache[key] = value
m.mu.Unlock()
return value
}
func (m *ConfigManager) GetInt(key string, defaultValue int) int {
value := m.GetString(key, "")
if value == "" {
return defaultValue
}
intValue, err := strconv.Atoi(value)
if err != nil {
log.Printf("Failed to parse config %s as int: %v", key, err)
return defaultValue
}
return intValue
}
func (m *ConfigManager) GetBool(key string, defaultValue bool) bool {
value := m.GetString(key, "")
if value == "" {
return defaultValue
}
boolValue, err := strconv.ParseBool(value)
if err != nil {
log.Printf("Failed to parse config %s as bool: %v", key, err)
return defaultValue
}
return boolValue
}
func (m *ConfigManager) WatchConfig(key string, callback func(string)) {
m.center.Watch(key, func(k, oldValue, newValue string) {
m.mu.Lock()
m.cache[k] = newValue
m.mu.Unlock()
callback(newValue)
})
}
sequenceDiagram
participant S1 as 服务实例
participant SR as 服务注册中心
participant C as 客户端
participant LB as 负载均衡器
Note over S1,LB: 服务注册阶段
S1->>SR: 注册服务(IP, Port, 健康检查)
SR-->>S1: 注册成功
Note over S1,LB: 服务发现阶段
C->>SR: 查询服务列表
SR-->>C: 返回可用服务实例
C->>LB: 选择服务实例
LB-->>C: 返回目标实例
C->>S1: 发起请求
S1-->>C: 返回响应
Note over S1,LB: 健康检查阶段
SR->>S1: 健康检查
S1-->>SR: 健康状态
Note over S1,LB: 服务下线阶段
S1->>SR: 注销服务
SR-->>S1: 注销成功
// 服务发现接口
type ServiceDiscovery interface {
Register(service *ServiceInfo) error
Deregister(serviceID string) error
Discover(serviceName string) ([]*ServiceInfo, error)
Watch(serviceName string, callback ServiceChangeCallback) error
}
type ServiceInfo struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Tags []string `json:"tags"`
Metadata map[string]string `json:"metadata"`
Health HealthCheck `json:"health"`
}
type HealthCheck struct {
HTTP string `json:"http"`
Interval time.Duration `json:"interval"`
Timeout time.Duration `json:"timeout"`
}
type ServiceChangeCallback func(services []*ServiceInfo)
// Consul服务发现实现
type ConsulServiceDiscovery struct {
client *consulapi.Client
watchers map[string][]ServiceChangeCallback
mu sync.RWMutex
}
func NewConsulServiceDiscovery(address string) (*ConsulServiceDiscovery, error) {
config := consulapi.DefaultConfig()
config.Address = address
client, err := consulapi.NewClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create consul client: %w", err)
}
return &ConsulServiceDiscovery{
client: client,
watchers: make(map[string][]ServiceChangeCallback),
}, nil
}
func (d *ConsulServiceDiscovery) Register(service *ServiceInfo) error {
agent := d.client.Agent()
registration := &consulapi.AgentServiceRegistration{
ID: service.ID,
Name: service.Name,
Address: service.Address,
Port: service.Port,
Tags: service.Tags,
Meta: service.Metadata,
}
if service.Health.HTTP != "" {
registration.Check = &consulapi.AgentServiceCheck{
HTTP: service.Health.HTTP,
Interval: service.Health.Interval.String(),
Timeout: service.Health.Timeout.String(),
}
}
return agent.ServiceRegister(registration)
}
func (d *ConsulServiceDiscovery) Deregister(serviceID string) error {
agent := d.client.Agent()
return agent.ServiceDeregister(serviceID)
}
func (d *ConsulServiceDiscovery) Discover(serviceName string) ([]*ServiceInfo, error) {
health := d.client.Health()
services, _, err := health.Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("failed to discover services: %w", err)
}
var result []*ServiceInfo
for _, service := range services {
info := &ServiceInfo{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Metadata: service.Service.Meta,
}
result = append(result, info)
}
return result, nil
}
func (d *ConsulServiceDiscovery) Watch(serviceName string, callback ServiceChangeCallback) error {
d.mu.Lock()
d.watchers[serviceName] = append(d.watchers[serviceName], callback)
d.mu.Unlock()
go d.watchService(serviceName)
return nil
}
func (d *ConsulServiceDiscovery) watchService(serviceName string) {
health := d.client.Health()
var lastIndex uint64
for {
opts := &consulapi.QueryOptions{
WaitIndex: lastIndex,
WaitTime: 30 * time.Second,
}
services, meta, err := health.Service(serviceName, "", true, opts)
if err != nil {
log.Printf("Failed to watch service %s: %v", serviceName, err)
time.Sleep(5 * time.Second)
continue
}
if meta.LastIndex <= lastIndex {
continue
}
lastIndex = meta.LastIndex
var serviceInfos []*ServiceInfo
for _, service := range services {
info := &ServiceInfo{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Metadata: service.Service.Meta,
}
serviceInfos = append(serviceInfos, info)
}
d.mu.RLock()
callbacks := d.watchers[serviceName]
d.mu.RUnlock()
for _, callback := range callbacks {
go callback(serviceInfos)
}
}
}
// 负载均衡器
type LoadBalancer interface {
Select(services []*ServiceInfo) *ServiceInfo
}
// 轮询负载均衡
type RoundRobinLoadBalancer struct {
counter uint64
}
func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer {
return &RoundRobinLoadBalancer{}
}
func (lb *RoundRobinLoadBalancer) Select(services []*ServiceInfo) *ServiceInfo {
if len(services) == 0 {
return nil
}
index := atomic.AddUint64(&lb.counter, 1) % uint64(len(services))
return services[index]
}
// 服务客户端管理器
type ServiceClientManager struct {
discovery ServiceDiscovery
loadBalancer LoadBalancer
clients map[string]*ServiceClient
mu sync.RWMutex
}
type ServiceClient struct {
serviceName string
services []*ServiceInfo
httpClient *HTTPClient
mu sync.RWMutex
}
func NewServiceClientManager(discovery ServiceDiscovery, loadBalancer LoadBalancer) *ServiceClientManager {
return &ServiceClientManager{
discovery: discovery,
loadBalancer: loadBalancer,
clients: make(map[string]*ServiceClient),
}
}
func (m *ServiceClientManager) GetClient(serviceName string) (*ServiceClient, error) {
m.mu.RLock()
if client, exists := m.clients[serviceName]; exists {
m.mu.RUnlock()
return client, nil
}
m.mu.RUnlock()
// 发现服务
services, err := m.discovery.Discover(serviceName)
if err != nil {
return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
}
client := &ServiceClient{
serviceName: serviceName,
services: services,
httpClient: NewHTTPClient("", 30*time.Second),
}
// 监听服务变化
m.discovery.Watch(serviceName, func(updatedServices []*ServiceInfo) {
client.mu.Lock()
client.services = updatedServices
client.mu.Unlock()
})
m.mu.Lock()
m.clients[serviceName] = client
m.mu.Unlock()
return client, nil
}
func (c *ServiceClient) Call(ctx context.Context, method, path string, data interface{}, result interface{}) error {
c.mu.RLock()
services := c.services
c.mu.RUnlock()
if len(services) == 0 {
return fmt.Errorf("no available services for %s", c.serviceName)
}
// 选择服务实例
service := c.selectService(services)
if service == nil {
return fmt.Errorf("failed to select service instance")
}
// 构建请求URL
baseURL := fmt.Sprintf("http://%s:%d", service.Address, service.Port)
c.httpClient.baseURL = baseURL
// 发起请求
switch method {
case "GET":
return c.httpClient.Get(ctx, path, result)
case "POST":
return c.httpClient.Post(ctx, path, data, result)
default:
return fmt.Errorf("unsupported HTTP method: %s", method)
}
}
func (c *ServiceClient) selectService(services []*ServiceInfo) *ServiceInfo {
// 这里可以集成负载均衡器
if len(services) == 0 {
return nil
}
return services[0] // 简单选择第一个
}
sequenceDiagram
participant UI
participant Order
participant Payment
participant Inventory
participant Outbox
UI->>Order: Create Order
Order->>Outbox: Save Event (local tx)
Order-->>UI: Accepted
Outbox-->>Payment: Process Payment
Payment-->>Inventory: Reserve Stock
Inventory-->>Order: Confirm/Reject
flowchart TD
A["开始Saga事务"] --> B["执行步骤1"]
B --> C{"步骤1成功?"}
C -->|是| D["执行步骤2"]
C -->|否| E["补偿步骤1"]
D --> F{"步骤2成功?"}
F -->|是| G["执行步骤3"]
F -->|否| H["补偿步骤2"]
G --> I{"步骤3成功?"}
I -->|是| J["事务完成"]
I -->|否| K["补偿步骤3"]
E --> L["事务失败"]
H --> M["补偿步骤1"]
K --> N["补偿步骤2"]
M --> L
N --> O["补偿步骤1"]
O --> L
style J fill:#90EE90
style L fill:#FFB6C1
graph TD
A["单体应用"] --> B["识别服务边界"]
B --> C["提取共享库"]
C --> D["数据库拆分"]
D --> E["API网关引入"]
E --> F["第一个微服务"]
F --> G["绞杀者模式"]
G --> H["逐步迁移"]
H --> I["完整微服务架构"]
subgraph "演进阶段"
J["阶段1: 准备阶段"]
K["阶段2: 拆分阶段"]
L["阶段3: 迁移阶段"]
M["阶段4: 优化阶段"]
end
A -.-> J
B -.-> J
C -.-> K
D -.-> K
E -.-> L
F -.-> L
G -.-> L
H -.-> M
I -.-> M