第11章:日志系统与监控告警
11.0 错误处理与监控
恢复策略:
gin.CustomRecovery捕获 panic,输出结构化 JSON,记录栈信息;业务错误统一封装响应体。指标与探活: 暴露
/debug/pprof于开发环境;健康检查、请求耗时、错误计数纳入监控面板。日志基线: 结构化日志、级别与轮转配置;关键链路使用
request_id贯穿。
项目片段:
server.Use(gin.CustomRecovery(func(c *gin.Context, err any){
common.SysLog(fmt.Sprintf("panic: %v", err))
c.JSON(http.StatusInternalServerError, gin.H{"error": gin.H{"message": "internal error"}})
}))与其他章节的关系:
第5章错误响应规范;第13章部署监控实践;第17章性能优化;第18章安全审计与敏感字段脱敏。
概念要点:
可观测性(Observability):以日志、指标、追踪三件套为核心,支撑定位与预防问题。
结构化日志:以键值对形式输出,便于检索、聚合与分析。
相关性标识:
request_id/trace_id/span_id贯穿全链路,实现日志与追踪互查。日志级别:DEBUG/INFO/WARN/ERROR/FATAL,按场景控制粒度与采样率。
SLI/SLO:以可量化指标(可用性/延迟/错误率)与目标驱动告警和优化。
11.1 日志系统设计
graph TB
subgraph App[Application]
A1[Handlers]
A2[Middleware]
end
subgraph Obs[Observability]
L[Logs]
M[Metrics]
T[Traces]
end
A1 --> L
A2 --> L
A1 --> M
A2 --> M
A1 --> T
A2 --> T
L --> LS[(Log Store)]
M --> TS[(Time Series DB)]
T --> TP[(Tracing Backend)]
TS --> Graf[Dashboard]
LS --> Graf
TP --> Graf图1:可观测性总览(应用→日志/指标/追踪→存储/看板)
graph LR
Logs[Logs] ---|correlate via| Traces[Traces]
Metrics[Metrics] ---|labels/context| Logs
Metrics ---|spans/ids| Traces图2:日志、指标、追踪之间的关联关系
11.1.1 日志系统概述
在企业级应用中,完善的日志系统是系统运维、问题排查和性能分析的重要基础。New API项目采用结构化日志设计,支持多级别日志记录、日志轮转、远程日志收集等功能。
日志系统架构
graph TB
A[应用程序] --> B[日志收集器]
B --> C[日志格式化]
C --> D[日志输出]
D --> E[本地文件]
D --> F[远程日志服务]
D --> G[控制台输出]
H[日志轮转] --> E
I[日志压缩] --> E
J[日志清理] --> E
F --> K[ELK Stack]
F --> L[Prometheus]
F --> M[Grafana]图3:日志系统架构与输出通道
11.1.2 日志模型设计
// 日志模型
type Log struct {
ID int `json:"id" gorm:"primaryKey"`
UserID int `json:"user_id" gorm:"index"`
Type int `json:"type" gorm:"index"`
Content string `json:"content"`
Username string `json:"username" gorm:"index:idx_username_type,priority:1"`
TokenName string `json:"token_name"`
ModelName string `json:"model_name" gorm:"index"`
Quota int `json:"quota"`
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
RequestId string `json:"request_id" gorm:"index"`
ChannelId *int `json:"channel_id" gorm:"index"`
CreatedAt time.Time `json:"created_at" gorm:"index"`
// 扩展字段
IPAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
Duration int64 `json:"duration"` // 请求耗时(毫秒)
StatusCode int `json:"status_code"`
ErrorMsg string `json:"error_msg"`
// 关联字段
User *User `json:"user,omitempty" gorm:"foreignKey:UserID"`
Channel *Channel `json:"channel,omitempty" gorm:"foreignKey:ChannelId"`
}
// 日志类型常量
const (
LogTypeUnknown = iota
LogTypeTopup // 充值
LogTypeConsume // 消费
LogTypeManage // 管理
LogTypeSystem // 系统
LogTypeAuth // 认证
LogTypeAPI // API调用
LogTypeError // 错误
LogTypeWarning // 警告
LogTypeInfo // 信息
LogTypeDebug // 调试
)
// 日志级别
type LogLevel int
const (
LogLevelDebug LogLevel = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelFatal
)
// 日志级别字符串映射
var LogLevelNames = map[LogLevel]string{
LogLevelDebug: "DEBUG",
LogLevelInfo: "INFO",
LogLevelWarn: "WARN",
LogLevelError: "ERROR",
LogLevelFatal: "FATAL",
}
// 结构化日志条目
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level LogLevel `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Caller string `json:"caller,omitempty"`
RequestID string `json:"request_id,omitempty"`
UserID int `json:"user_id,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
}11.1.3 日志记录器实现
// 日志记录器接口
type Logger interface {
Debug(msg string, fields ...Field)
Info(msg string, fields ...Field)
Warn(msg string, fields ...Field)
Error(msg string, fields ...Field)
Fatal(msg string, fields ...Field)
With(fields ...Field) Logger
WithContext(ctx context.Context) Logger
}
// 字段类型
type Field struct {
Key string
Value interface{}
}
// 字段构造函数
func String(key, value string) Field {
return Field{Key: key, Value: value}
}
func Int(key string, value int) Field {
return Field{Key: key, Value: value}
}
func Int64(key string, value int64) Field {
return Field{Key: key, Value: value}
}
func Float64(key string, value float64) Field {
return Field{Key: key, Value: value}
}
func Bool(key string, value bool) Field {
return Field{Key: key, Value: value}
}
func Error(err error) Field {
return Field{Key: "error", Value: err.Error()}
}
func Duration(key string, value time.Duration) Field {
return Field{Key: key, Value: value.String()}
}
// 默认日志记录器实现
type DefaultLogger struct {
level LogLevel
outputs []LogOutput
fields map[string]interface{}
mutex sync.RWMutex
requestID string
userID int
enableCaller bool
}
// 日志输出接口
type LogOutput interface {
Write(entry *LogEntry) error
Close() error
}
// 创建新的日志记录器
func NewLogger(level LogLevel, outputs ...LogOutput) *DefaultLogger {
return &DefaultLogger{
level: level,
outputs: outputs,
fields: make(map[string]interface{}),
enableCaller: true,
}
}
// 设置日志级别
func (l *DefaultLogger) SetLevel(level LogLevel) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.level = level
}
// 添加输出
func (l *DefaultLogger) AddOutput(output LogOutput) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.outputs = append(l.outputs, output)
}
// 实现Logger接口
func (l *DefaultLogger) Debug(msg string, fields ...Field) {
l.log(LogLevelDebug, msg, fields...)
}
func (l *DefaultLogger) Info(msg string, fields ...Field) {
l.log(LogLevelInfo, msg, fields...)
}
func (l *DefaultLogger) Warn(msg string, fields ...Field) {
l.log(LogLevelWarn, msg, fields...)
}
func (l *DefaultLogger) Error(msg string, fields ...Field) {
l.log(LogLevelError, msg, fields...)
}
func (l *DefaultLogger) Fatal(msg string, fields ...Field) {
l.log(LogLevelFatal, msg, fields...)
os.Exit(1)
}
// 添加字段
func (l *DefaultLogger) With(fields ...Field) Logger {
newLogger := &DefaultLogger{
level: l.level,
outputs: l.outputs,
fields: make(map[string]interface{}),
requestID: l.requestID,
userID: l.userID,
enableCaller: l.enableCaller,
}
// 复制现有字段
for k, v := range l.fields {
newLogger.fields[k] = v
}
// 添加新字段
for _, field := range fields {
newLogger.fields[field.Key] = field.Value
}
return newLogger
}
// 从上下文添加字段
func (l *DefaultLogger) WithContext(ctx context.Context) Logger {
newLogger := l.With()
// 从上下文中提取请求ID
if requestID := ctx.Value("request_id"); requestID != nil {
if rid, ok := requestID.(string); ok {
newLogger.(*DefaultLogger).requestID = rid
}
}
// 从上下文中提取用户ID
if userID := ctx.Value("user_id"); userID != nil {
if uid, ok := userID.(int); ok {
newLogger.(*DefaultLogger).userID = uid
}
}
// 从上下文中提取追踪ID
if traceID := ctx.Value("trace_id"); traceID != nil {
if tid, ok := traceID.(string); ok {
newLogger = newLogger.With(String("trace_id", tid))
}
}
return newLogger
}
// 核心日志记录方法
func (l *DefaultLogger) log(level LogLevel, msg string, fields ...Field) {
l.mutex.RLock()
defer l.mutex.RUnlock()
// 检查日志级别
if level < l.level {
return
}
// 创建日志条目
entry := &LogEntry{
Timestamp: time.Now(),
Level: level,
Message: msg,
Fields: make(map[string]interface{}),
RequestID: l.requestID,
UserID: l.userID,
}
// 添加现有字段
for k, v := range l.fields {
entry.Fields[k] = v
}
// 添加新字段
for _, field := range fields {
entry.Fields[field.Key] = field.Value
}
// 添加调用者信息
if l.enableCaller {
if caller := getCaller(3); caller != "" {
entry.Caller = caller
}
}
// 输出到所有输出器
for _, output := range l.outputs {
if err := output.Write(entry); err != nil {
fmt.Fprintf(os.Stderr, "日志输出错误: %v\n", err)
}
}
}
// 获取调用者信息
func getCaller(skip int) string {
_, file, line, ok := runtime.Caller(skip)
if !ok {
return ""
}
// 只保留文件名
if idx := strings.LastIndex(file, "/"); idx >= 0 {
file = file[idx+1:]
}
return fmt.Sprintf("%s:%d", file, line)
}11.2 日志输出器实现
11.2.1 控制台输出器
// 控制台输出器
type ConsoleOutput struct {
writer io.Writer
formatter LogFormatter
colorize bool
}
// 日志格式化器接口
type LogFormatter interface {
Format(entry *LogEntry) ([]byte, error)
}
// 创建控制台输出器
func NewConsoleOutput(colorize bool) *ConsoleOutput {
return &ConsoleOutput{
writer: os.Stdout,
formatter: NewTextFormatter(colorize),
colorize: colorize,
}
}
// 实现LogOutput接口
func (co *ConsoleOutput) Write(entry *LogEntry) error {
data, err := co.formatter.Format(entry)
if err != nil {
return err
}
_, err = co.writer.Write(data)
return err
}
func (co *ConsoleOutput) Close() error {
return nil
}
// 文本格式化器
type TextFormatter struct {
colorize bool
colors map[LogLevel]string
}
// 颜色常量
const (
ColorReset = "\033[0m"
ColorRed = "\033[31m"
ColorYellow = "\033[33m"
ColorBlue = "\033[34m"
ColorGray = "\033[37m"
ColorWhite = "\033[97m"
)
// 创建文本格式化器
func NewTextFormatter(colorize bool) *TextFormatter {
return &TextFormatter{
colorize: colorize,
colors: map[LogLevel]string{
LogLevelDebug: ColorGray,
LogLevelInfo: ColorBlue,
LogLevelWarn: ColorYellow,
LogLevelError: ColorRed,
LogLevelFatal: ColorRed,
},
}
}
// 格式化日志条目
func (tf *TextFormatter) Format(entry *LogEntry) ([]byte, error) {
var buf bytes.Buffer
// 时间戳
timestamp := entry.Timestamp.Format("2006-01-02 15:04:05.000")
// 日志级别
levelName := LogLevelNames[entry.Level]
if tf.colorize {
color := tf.colors[entry.Level]
levelName = color + levelName + ColorReset
}
// 基本信息
fmt.Fprintf(&buf, "[%s] %s %s", timestamp, levelName, entry.Message)
// 添加字段
if len(entry.Fields) > 0 {
buf.WriteString(" |")
for k, v := range entry.Fields {
fmt.Fprintf(&buf, " %s=%v", k, v)
}
}
// 添加请求ID
if entry.RequestID != "" {
fmt.Fprintf(&buf, " | request_id=%s", entry.RequestID)
}
// 添加用户ID
if entry.UserID > 0 {
fmt.Fprintf(&buf, " | user_id=%d", entry.UserID)
}
// 添加调用者信息
if entry.Caller != "" {
fmt.Fprintf(&buf, " | caller=%s", entry.Caller)
}
buf.WriteString("\n")
return buf.Bytes(), nil
}11.2.2 文件输出器
// 文件输出器
type FileOutput struct {
filename string
file *os.File
formatter LogFormatter
rotator *LogRotator
mutex sync.Mutex
}
// 日志轮转器
type LogRotator struct {
maxSize int64 // 最大文件大小(字节)
maxAge time.Duration // 最大保存时间
maxBackups int // 最大备份文件数
compress bool // 是否压缩
currentSize int64
mutex sync.Mutex
}
// 创建文件输出器
func NewFileOutput(filename string, rotator *LogRotator) (*FileOutput, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
// 获取文件大小
stat, err := file.Stat()
if err != nil {
file.Close()
return nil, err
}
if rotator != nil {
rotator.currentSize = stat.Size()
}
return &FileOutput{
filename: filename,
file: file,
formatter: NewJSONFormatter(),
rotator: rotator,
}, nil
}
// 实现LogOutput接口
func (fo *FileOutput) Write(entry *LogEntry) error {
fo.mutex.Lock()
defer fo.mutex.Unlock()
data, err := fo.formatter.Format(entry)
if err != nil {
return err
}
// 检查是否需要轮转
if fo.rotator != nil {
if err := fo.checkRotation(int64(len(data))); err != nil {
return err
}
}
n, err := fo.file.Write(data)
if err != nil {
return err
}
if fo.rotator != nil {
fo.rotator.currentSize += int64(n)
}
return fo.file.Sync()
}
func (fo *FileOutput) Close() error {
fo.mutex.Lock()
defer fo.mutex.Unlock()
if fo.file != nil {
return fo.file.Close()
}
return nil
}
// 检查日志轮转
func (fo *FileOutput) checkRotation(dataSize int64) error {
if fo.rotator == nil {
return nil
}
// 检查文件大小
if fo.rotator.currentSize+dataSize > fo.rotator.maxSize {
return fo.rotate()
}
return nil
}
// 执行日志轮转
func (fo *FileOutput) rotate() error {
fo.rotator.mutex.Lock()
defer fo.rotator.mutex.Unlock()
// 关闭当前文件
if err := fo.file.Close(); err != nil {
return err
}
// 生成备份文件名
timestamp := time.Now().Format("20060102-150405")
backupName := fmt.Sprintf("%s.%s", fo.filename, timestamp)
// 重命名当前文件
if err := os.Rename(fo.filename, backupName); err != nil {
return err
}
// 压缩备份文件
if fo.rotator.compress {
go fo.compressFile(backupName)
}
// 创建新文件
file, err := os.OpenFile(fo.filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
fo.file = file
fo.rotator.currentSize = 0
// 清理旧文件
go fo.cleanupOldFiles()
return nil
}
// 压缩文件
func (fo *FileOutput) compressFile(filename string) {
gzFilename := filename + ".gz"
input, err := os.Open(filename)
if err != nil {
return
}
defer input.Close()
output, err := os.Create(gzFilename)
if err != nil {
return
}
defer output.Close()
gzWriter := gzip.NewWriter(output)
defer gzWriter.Close()
if _, err := io.Copy(gzWriter, input); err != nil {
os.Remove(gzFilename)
return
}
// 删除原文件
os.Remove(filename)
}
// 清理旧文件
func (fo *FileOutput) cleanupOldFiles() {
if fo.rotator.maxBackups <= 0 && fo.rotator.maxAge <= 0 {
return
}
dir := filepath.Dir(fo.filename)
baseName := filepath.Base(fo.filename)
files, err := filepath.Glob(filepath.Join(dir, baseName+".*"))
if err != nil {
return
}
var backupFiles []os.FileInfo
for _, file := range files {
if info, err := os.Stat(file); err == nil {
backupFiles = append(backupFiles, info)
}
}
// 按修改时间排序
sort.Slice(backupFiles, func(i, j int) bool {
return backupFiles[i].ModTime().After(backupFiles[j].ModTime())
})
now := time.Now()
for i, info := range backupFiles {
shouldDelete := false
// 检查数量限制
if fo.rotator.maxBackups > 0 && i >= fo.rotator.maxBackups {
shouldDelete = true
}
// 检查时间限制
if fo.rotator.maxAge > 0 && now.Sub(info.ModTime()) > fo.rotator.maxAge {
shouldDelete = true
}
if shouldDelete {
os.Remove(filepath.Join(dir, info.Name()))
}
}
}11.2.3 JSON格式化器
// JSON格式化器
type JSONFormatter struct {
prettyPrint bool
}
// 创建JSON格式化器
func NewJSONFormatter() *JSONFormatter {
return &JSONFormatter{
prettyPrint: false,
}
}
// 格式化日志条目
func (jf *JSONFormatter) Format(entry *LogEntry) ([]byte, error) {
// 创建日志数据
logData := map[string]interface{}{
"timestamp": entry.Timestamp.Format(time.RFC3339Nano),
"level": LogLevelNames[entry.Level],
"message": entry.Message,
}
// 添加字段
if len(entry.Fields) > 0 {
for k, v := range entry.Fields {
logData[k] = v
}
}
// 添加可选字段
if entry.RequestID != "" {
logData["request_id"] = entry.RequestID
}
if entry.UserID > 0 {
logData["user_id"] = entry.UserID
}
if entry.Caller != "" {
logData["caller"] = entry.Caller
}
if entry.TraceID != "" {
logData["trace_id"] = entry.TraceID
}
if entry.SpanID != "" {
logData["span_id"] = entry.SpanID
}
// 序列化为JSON
var data []byte
var err error
if jf.prettyPrint {
data, err = json.MarshalIndent(logData, "", " ")
} else {
data, err = json.Marshal(logData)
}
if err != nil {
return nil, err
}
// 添加换行符
data = append(data, '\n')
return data, nil
}11.3 业务日志记录
11.3.1 API调用日志
// API调用日志记录中间件
func LoggingMiddleware() gin.HandlerFunc {
return gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
// 记录API调用日志
logger := GetLogger().WithContext(param.Keys["context"].(context.Context))
fields := []Field{
String("method", param.Method),
String("path", param.Path),
String("ip", param.ClientIP),
String("user_agent", param.Request.UserAgent()),
Int("status_code", param.StatusCode),
Duration("latency", param.Latency),
Int64("body_size", int64(param.BodySize)),
}
// 添加用户信息
if userID, exists := param.Keys["user_id"]; exists {
fields = append(fields, Int("user_id", userID.(int)))
}
// 添加错误信息
if param.ErrorMessage != "" {
fields = append(fields, String("error", param.ErrorMessage))
}
// 根据状态码选择日志级别
if param.StatusCode >= 500 {
logger.Error("API请求处理失败", fields...)
} else if param.StatusCode >= 400 {
logger.Warn("API请求错误", fields...)
} else {
logger.Info("API请求成功", fields...)
}
return ""
})
}
// 记录业务操作日志
func RecordLog(userID int, logType int, content string, fields ...Field) {
// 记录到数据库
log := &Log{
UserID: userID,
Type: logType,
Content: content,
CreatedAt: time.Now(),
}
// 从字段中提取额外信息
for _, field := range fields {
switch field.Key {
case "token_name":
if tokenName, ok := field.Value.(string); ok {
log.TokenName = tokenName
}
case "model_name":
if modelName, ok := field.Value.(string); ok {
log.ModelName = modelName
}
case "quota":
if quota, ok := field.Value.(int); ok {
log.Quota = quota
}
case "prompt_tokens":
if tokens, ok := field.Value.(int); ok {
log.PromptTokens = tokens
}
case "completion_tokens":
if tokens, ok := field.Value.(int); ok {
log.CompletionTokens = tokens
}
case "request_id":
if requestID, ok := field.Value.(string); ok {
log.RequestId = requestID
}
case "channel_id":
if channelID, ok := field.Value.(int); ok {
log.ChannelId = &channelID
}
case "ip_address":
if ip, ok := field.Value.(string); ok {
log.IPAddress = ip
}
case "user_agent":
if ua, ok := field.Value.(string); ok {
log.UserAgent = ua
}
case "duration":
if duration, ok := field.Value.(int64); ok {
log.Duration = duration
}
case "status_code":
if code, ok := field.Value.(int); ok {
log.StatusCode = code
}
case "error_msg":
if errMsg, ok := field.Value.(string); ok {
log.ErrorMsg = errMsg
}
}
}
// 异步保存到数据库
go func() {
if err := common.DB.Create(log).Error; err != nil {
GetLogger().Error("保存业务日志失败", Error(err))
}
}()
// 记录到结构化日志
logger := GetLogger().With(fields...)
switch logType {
case LogTypeError:
logger.Error(content)
case LogTypeWarning:
logger.Warn(content)
case LogTypeSystem, LogTypeManage:
logger.Info(content)
default:
logger.Debug(content)
}
}
// 记录API调用日志
func RecordAPILog(c *gin.Context, userID int, modelName string, quota int, promptTokens, completionTokens int, channelID *int, duration time.Duration, err error) {
fields := []Field{
String("model_name", modelName),
Int("quota", quota),
Int("prompt_tokens", promptTokens),
Int("completion_tokens", completionTokens),
String("ip_address", c.ClientIP()),
String("user_agent", c.Request.UserAgent()),
Int64("duration", duration.Milliseconds()),
Int("status_code", c.Writer.Status()),
}
if requestID := c.GetString("request_id"); requestID != "" {
fields = append(fields, String("request_id", requestID))
}
if channelID != nil {
fields = append(fields, Int("channel_id", *channelID))
}
var content string
var logType int
if err != nil {
content = fmt.Sprintf("API调用失败: %v", err)
logType = LogTypeError
fields = append(fields, String("error_msg", err.Error()))
} else {
content = "API调用成功"
logType = LogTypeAPI
}
RecordLog(userID, logType, content, fields...)
}11.3.2 用户操作日志
// 用户登录日志
func RecordLoginLog(userID int, username, ip, userAgent string, success bool, reason string) {
fields := []Field{
String("username", username),
String("ip_address", ip),
String("user_agent", userAgent),
Bool("success", success),
}
var content string
var logType int
if success {
content = "用户登录成功"
logType = LogTypeAuth
} else {
content = fmt.Sprintf("用户登录失败: %s", reason)
logType = LogTypeError
fields = append(fields, String("reason", reason))
}
RecordLog(userID, logType, content, fields...)
}
// 用户注册日志
func RecordRegisterLog(username, email, ip, userAgent string, success bool, reason string) {
fields := []Field{
String("username", username),
String("email", email),
String("ip_address", ip),
String("user_agent", userAgent),
Bool("success", success),
}
var content string
var logType int
if success {
content = "用户注册成功"
logType = LogTypeAuth
} else {
content = fmt.Sprintf("用户注册失败: %s", reason)
logType = LogTypeError
fields = append(fields, String("reason", reason))
}
RecordLog(0, logType, content, fields...)
}
// 充值日志
func RecordTopupLog(userID int, amount int, method, tradeNo string) {
fields := []Field{
Int("amount", amount),
String("method", method),
String("trade_no", tradeNo),
}
content := fmt.Sprintf("用户充值: %d 额度", amount)
RecordLog(userID, LogTypeTopup, content, fields...)
}
// 消费日志
func RecordConsumeLog(userID int, quota int, modelName, tokenName string, channelID *int) {
fields := []Field{
Int("quota", quota),
String("model_name", modelName),
String("token_name", tokenName),
}
if channelID != nil {
fields = append(fields, Int("channel_id", *channelID))
}
content := fmt.Sprintf("消费额度: %d", quota)
RecordLog(userID, LogTypeConsume, content, fields...)
}
// 管理操作日志
func RecordManageLog(userID int, action, target string, details map[string]interface{}) {
fields := []Field{
String("action", action),
String("target", target),
}
for k, v := range details {
fields = append(fields, Field{Key: k, Value: v})
}
content := fmt.Sprintf("管理操作: %s %s", action, target)
RecordLog(userID, LogTypeManage, content, fields...)
}11.4 监控告警系统
sequenceDiagram
participant App as Application
participant Mon as Monitor
participant Al as AlertManager
participant On as On-call
App->>Mon: Emit metrics/errors
Mon-->>Al: Trigger alert (rule)
Al-->>On: Page via Email/IM
On-->>Al: Ack / Silence
On-->>App: Mitigate / Rollback图4:监控告警的触发与响应时序
监控告警系统是保障系统稳定运行的重要组成部分,通过实时收集系统指标、分析异常情况并及时通知相关人员,实现问题的快速发现和处理。
监控告警系统概述
监控告警系统主要包括以下几个核心组件:
指标收集器(Metrics Collector):负责收集各类系统和业务指标
告警规则引擎(Alert Rule Engine):根据预定义规则评估指标状态
告警管理器(Alert Manager):处理告警事件的生命周期管理
通知系统(Notification System):将告警信息发送给相关人员
flowchart LR
SRC[来源: Metrics/Logs/Traces] --> RULE[告警规则]
RULE --> DEDUP[去重/抑制]
DEDUP --> ROUTE[路由]
ROUTE --> PAGE[通知(Email/IM/PagerDuty)]
ROUTE --> TICKET[工单/事件平台]
PAGE --> ACK[值班响应]
ACK --> ACT[处置: 降级/回滚/扩容]
ACT --> LEARN[复盘与规则优化]图5:告警从触发到处置的处理链路
graph TB
subgraph "监控数据源"
A1[应用指标]
A2[系统指标]
A3[业务指标]
A4[日志数据]
end
subgraph "指标处理"
B1[数据收集]
B2[数据聚合]
B3[数据存储]
end
subgraph "告警处理"
C1[规则评估]
C2[告警触发]
C3[告警去重]
C4[告警路由]
end
subgraph "通知渠道"
D1[邮件通知]
D2[短信通知]
D3[即时消息]
D4[工单系统]
end
A1 --> B1
A2 --> B1
A3 --> B1
A4 --> B1
B1 --> B2
B2 --> B3
B3 --> C1
C1 --> C2
C2 --> C3
C3 --> C4
C4 --> D1
C4 --> D2
C4 --> D3
C4 --> D4图6:监控告警系统架构图
核心概念解析
SLI/SLO(服务级别指标/目标):
SLI(Service Level Indicator):衡量服务质量的具体指标,如可用性、延迟、错误率等
SLO(Service Level Objective):基于SLI设定的服务质量目标,如99.9%可用性、p99延迟<500ms
SLA(Service Level Agreement):与用户约定的服务质量协议
告警策略:
Burn Rate:误差预算消耗速率,用于触发多窗口告警,有效减少告警噪音
抑制(Silence):在维护期间临时抑制特定告警,避免无效通知
路由(Routing):按服务、团队、严重级别将告警分派到不同通知通道
去重(Deduplication):合并相同或相似的告警,避免重复通知
指标类型:
Counter(计数器):只增不减的累积指标,如请求总数、错误总数
Gauge(仪表盘):可增可减的瞬时指标,如CPU使用率、内存使用量
Histogram(直方图):统计数据分布的指标,如请求延迟分布
Summary(摘要):类似直方图,但在客户端计算分位数
11.4.1 监控指标定义
// 监控指标类型
type MetricType int
const (
MetricTypeCounter MetricType = iota // 计数器
MetricTypeGauge // 仪表盘
MetricTypeHistogram // 直方图
MetricTypeSummary // 摘要
)
// 监控指标
type Metric struct {
Name string `json:"name"`
Type MetricType `json:"type"`
Value float64 `json:"value"`
Labels map[string]string `json:"labels"`
Timestamp time.Time `json:"timestamp"`
Description string `json:"description"`
}
// 监控收集器
type MetricsCollector struct {
metrics map[string]*Metric
mutex sync.RWMutex
// Prometheus集成
registry *prometheus.Registry
// 内置指标
requestTotal *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
activeUsers prometheus.Gauge
systemLoad prometheus.Gauge
memoryUsage prometheus.Gauge
diskUsage prometheus.Gauge
}
// 创建监控收集器
// 初始化Prometheus指标收集器,创建常用的系统和业务指标
func NewMetricsCollector() *MetricsCollector {
// 创建独立的Prometheus注册表,避免与全局注册表冲突
registry := prometheus.NewRegistry()
// 创建API请求计数器(Counter类型)
// 用于统计不同方法、端点和状态码的请求总数
requestTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "api_requests_total",
Help: "Total number of API requests",
},
[]string{"method", "endpoint", "status"}, // 标签维度
)
// 创建API请求耗时直方图(Histogram类型)
// 用于统计请求延迟分布,支持分位数计算
requestDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "api_request_duration_seconds",
Help: "API request duration in seconds",
Buckets: prometheus.DefBuckets, // 使用默认的延迟桶
},
[]string{"method", "endpoint"},
)
// 创建活跃用户数仪表盘(Gauge类型)
// 用于实时显示当前活跃用户数量
activeUsers := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_users_total",
Help: "Number of active users",
},
)
// 创建系统负载仪表盘
// 监控系统的平均负载情况
systemLoad := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "system_load_average",
Help: "System load average",
},
)
// 创建内存使用量仪表盘
// 监控系统内存使用情况(字节)
memoryUsage := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "memory_usage_bytes",
Help: "Memory usage in bytes",
},
)
// 创建磁盘使用量仪表盘
// 监控磁盘空间使用情况(字节)
diskUsage := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "disk_usage_bytes",
Help: "Disk usage in bytes",
},
)
// 将所有指标注册到注册表中
// MustRegister会在注册失败时panic,确保指标正确初始化
registry.MustRegister(requestTotal)
registry.MustRegister(requestDuration)
registry.MustRegister(activeUsers)
registry.MustRegister(systemLoad)
registry.MustRegister(memoryUsage)
registry.MustRegister(diskUsage)
return &MetricsCollector{
metrics: make(map[string]*Metric),
registry: registry,
requestTotal: requestTotal,
requestDuration: requestDuration,
activeUsers: activeUsers,
systemLoad: systemLoad,
memoryUsage: memoryUsage,
diskUsage: diskUsage,
}
}
// 记录API请求指标
func (mc *MetricsCollector) RecordAPIRequest(method, endpoint, status string, duration time.Duration) {
mc.requestTotal.WithLabelValues(method, endpoint, status).Inc()
mc.requestDuration.WithLabelValues(method, endpoint).Observe(duration.Seconds())
}
// 更新活跃用户数
func (mc *MetricsCollector) UpdateActiveUsers(count float64) {
mc.activeUsers.Set(count)
}
// 更新系统负载
func (mc *MetricsCollector) UpdateSystemLoad(load float64) {
mc.systemLoad.Set(load)
}
// 更新内存使用量
func (mc *MetricsCollector) UpdateMemoryUsage(bytes float64) {
mc.memoryUsage.Set(bytes)
}
// 更新磁盘使用量
func (mc *MetricsCollector) UpdateDiskUsage(bytes float64) {
mc.diskUsage.Set(bytes)
}
// 获取Prometheus处理器
func (mc *MetricsCollector) GetPrometheusHandler() http.Handler {
return promhttp.HandlerFor(mc.registry, promhttp.HandlerOpts{})
}
// 自定义指标记录
func (mc *MetricsCollector) RecordMetric(name string, metricType MetricType, value float64, labels map[string]string) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
metric := &Metric{
Name: name,
Type: metricType,
Value: value,
Labels: labels,
Timestamp: time.Now(),
}
mc.metrics[name] = metric
}
// 获取指标
func (mc *MetricsCollector) GetMetric(name string) *Metric {
mc.mutex.RLock()
defer mc.mutex.RUnlock()
return mc.metrics[name]
}
// 获取所有指标
func (mc *MetricsCollector) GetAllMetrics() map[string]*Metric {
mc.mutex.RLock()
defer mc.mutex.RUnlock()
result := make(map[string]*Metric)
for k, v := range mc.metrics {
result[k] = v
}
return result
}11.4.2 系统监控
系统监控是监控告警系统的重要组成部分,负责实时收集服务器的各项性能指标,包括CPU使用率、内存使用情况、磁盘空间、网络流量等。通过持续监控这些关键指标,可以及时发现系统性能瓶颈和潜在问题。
系统监控指标分类
graph TB
subgraph "系统资源监控"
A1[CPU使用率]
A2[内存使用率]
A3[磁盘使用率]
A4[网络流量]
A5[文件描述符]
end
subgraph "应用性能监控"
B1[响应时间]
B2[吞吐量]
B3[错误率]
B4[并发连接数]
B5[队列长度]
end
subgraph "业务指标监控"
C1[活跃用户数]
C2[API调用量]
C3[数据库连接数]
C4[缓存命中率]
C5[任务处理量]
end
A1 --> D[告警评估]
A2 --> D
A3 --> D
B1 --> D
B2 --> D
B3 --> D
C1 --> D
C2 --> D
C3 --> D
D --> E[告警触发]
E --> F[通知发送]图7:系统监控指标分类与告警流程
监控数据收集流程
sequenceDiagram
participant SM as SystemMonitor
participant OS as OperatingSystem
participant DB as Database
participant MC as MetricsCollector
participant AM as AlertManager
loop 每30秒
SM->>OS: 获取CPU使用率
OS-->>SM: 返回CPU数据
SM->>OS: 获取内存信息
OS-->>SM: 返回内存数据
SM->>OS: 获取磁盘信息
OS-->>SM: 返回磁盘数据
SM->>DB: 查询活跃用户数
DB-->>SM: 返回用户统计
SM->>MC: 记录所有指标
MC->>AM: 触发指标评估
alt 指标异常
AM->>AM: 生成告警事件
end
end图8:系统监控数据收集时序图
// 系统监控器
type SystemMonitor struct {
collector *MetricsCollector
logger Logger
// 配置
interval time.Duration
// 控制
stopChan chan struct{}
running bool
mutex sync.RWMutex
}
// 创建系统监控器
func NewSystemMonitor(collector *MetricsCollector, logger Logger) *SystemMonitor {
return &SystemMonitor{
collector: collector,
logger: logger,
interval: 30 * time.Second,
stopChan: make(chan struct{}),
}
}
// 启动监控
func (sm *SystemMonitor) Start() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if sm.running {
return
}
sm.running = true
go sm.monitorLoop()
sm.logger.Info("系统监控已启动")
}
// 停止监控
func (sm *SystemMonitor) Stop() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
if !sm.running {
return
}
sm.running = false
close(sm.stopChan)
sm.logger.Info("系统监控已停止")
}
// 监控循环
func (sm *SystemMonitor) monitorLoop() {
ticker := time.NewTicker(sm.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sm.collectSystemMetrics()
case <-sm.stopChan:
return
}
}
}
// 收集系统指标
func (sm *SystemMonitor) collectSystemMetrics() {
// 收集CPU使用率
if cpuPercent, err := sm.getCPUUsage(); err == nil {
sm.collector.RecordMetric("cpu_usage_percent", MetricTypeGauge, cpuPercent, nil)
sm.logger.Debug("CPU使用率", Float64("percent", cpuPercent))
} else {
sm.logger.Error("获取CPU使用率失败", Error(err))
}
// 收集内存使用情况
if memInfo, err := sm.getMemoryInfo(); err == nil {
sm.collector.UpdateMemoryUsage(float64(memInfo.Used))
sm.collector.RecordMetric("memory_usage_percent", MetricTypeGauge, memInfo.UsedPercent, nil)
sm.logger.Debug("内存使用情况",
Int64("used", int64(memInfo.Used)),
Int64("total", int64(memInfo.Total)),
Float64("percent", memInfo.UsedPercent))
} else {
sm.logger.Error("获取内存信息失败", Error(err))
}
// 收集磁盘使用情况
if diskInfo, err := sm.getDiskInfo(); err == nil {
sm.collector.UpdateDiskUsage(float64(diskInfo.Used))
sm.collector.RecordMetric("disk_usage_percent", MetricTypeGauge, diskInfo.UsedPercent, nil)
sm.logger.Debug("磁盘使用情况",
Int64("used", int64(diskInfo.Used)),
Int64("total", int64(diskInfo.Total)),
Float64("percent", diskInfo.UsedPercent))
} else {
sm.logger.Error("获取磁盘信息失败", Error(err))
}
// 收集系统负载
if loadAvg, err := sm.getLoadAverage(); err == nil {
sm.collector.UpdateSystemLoad(loadAvg)
sm.logger.Debug("系统负载", Float64("load_avg", loadAvg))
} else {
sm.logger.Error("获取系统负载失败", Error(err))
}
// 收集活跃用户数
if activeUsers, err := sm.getActiveUsers(); err == nil {
sm.collector.UpdateActiveUsers(float64(activeUsers))
sm.logger.Debug("活跃用户数", Int("count", activeUsers))
} else {
sm.logger.Error("获取活跃用户数失败", Error(err))
}
}
// 获取CPU使用率
func (sm *SystemMonitor) getCPUUsage() (float64, error) {
// 这里使用第三方库如 github.com/shirou/gopsutil
// 为了简化,这里返回模拟数据
return rand.Float64() * 100, nil
}
// 内存信息结构
type MemoryInfo struct {
Total uint64
Used uint64
Available uint64
UsedPercent float64
}
// 获取内存信息
func (sm *SystemMonitor) getMemoryInfo() (*MemoryInfo, error) {
// 模拟内存信息
total := uint64(8 * 1024 * 1024 * 1024) // 8GB
used := uint64(rand.Float64() * float64(total))
available := total - used
usedPercent := float64(used) / float64(total) * 100
return &MemoryInfo{
Total: total,
Used: used,
Available: available,
UsedPercent: usedPercent,
}, nil
}
// 磁盘信息结构
type DiskInfo struct {
Total uint64
Used uint64
Available uint64
UsedPercent float64
}
// 获取磁盘信息
func (sm *SystemMonitor) getDiskInfo() (*DiskInfo, error) {
// 模拟磁盘信息
total := uint64(100 * 1024 * 1024 * 1024) // 100GB
used := uint64(rand.Float64() * float64(total))
available := total - used
usedPercent := float64(used) / float64(total) * 100
return &DiskInfo{
Total: total,
Used: used,
Available: available,
UsedPercent: usedPercent,
}, nil
}
// 获取系统负载
func (sm *SystemMonitor) getLoadAverage() (float64, error) {
// 模拟系统负载
return rand.Float64() * 4, nil
}
// 获取活跃用户数
func (sm *SystemMonitor) getActiveUsers() (int, error) {
var count int64
// 查询最近5分钟内有活动的用户数
err := common.DB.Model(&Log{}).
Where("created_at > ?", time.Now().Add(-5*time.Minute)).
Distinct("user_id").
Count(&count).Error
return int(count), err
}11.4.3 告警系统
// 告警规则
type AlertRule struct {
ID int `json:"id" gorm:"primaryKey"`
Name string `json:"name" gorm:"uniqueIndex"`
Description string `json:"description"`
MetricName string `json:"metric_name" gorm:"index"`
Operator string `json:"operator"` // >, <, >=, <=, ==, !=
Threshold float64 `json:"threshold"`
Duration int `json:"duration"` // 持续时间(秒)
Severity string `json:"severity"` // critical, warning, info
Enabled bool `json:"enabled" gorm:"default:true"`
// 通知配置
NotifyChannels []string `json:"notify_channels" gorm:"type:json"`
// 时间配置
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// 告警事件
type AlertEvent struct {
ID int `json:"id" gorm:"primaryKey"`
RuleID int `json:"rule_id" gorm:"index"`
RuleName string `json:"rule_name"`
MetricName string `json:"metric_name"`
MetricValue float64 `json:"metric_value"`
Threshold float64 `json:"threshold"`
Severity string `json:"severity"`
Status string `json:"status"` // firing, resolved
Message string `json:"message"`
// 时间信息
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
// 关联
Rule *AlertRule `json:"rule,omitempty" gorm:"foreignKey:RuleID"`
}
// 告警管理器
type AlertManager struct {
rules map[int]*AlertRule
events map[int]*AlertEvent
collector *MetricsCollector
notifier *AlertNotifier
logger Logger
// 状态跟踪
ruleStates map[int]*RuleState
// 控制
stopChan chan struct{}
running bool
mutex sync.RWMutex
// 配置
checkInterval time.Duration
}
// 规则状态
type RuleState struct {
LastCheck time.Time
LastValue float64
AlertStart *time.Time
EventID *int
}
// 创建告警管理器
func NewAlertManager(collector *MetricsCollector, notifier *AlertNotifier, logger Logger) *AlertManager {
return &AlertManager{
rules: make(map[int]*AlertRule),
events: make(map[int]*AlertEvent),
collector: collector,
notifier: notifier,
logger: logger,
ruleStates: make(map[int]*RuleState),
stopChan: make(chan struct{}),
checkInterval: 30 * time.Second,
}
}
// 加载告警规则
func (am *AlertManager) LoadRules() error {
var rules []AlertRule
if err := common.DB.Where("enabled = ?", true).Find(&rules).Error; err != nil {
return err
}
am.mutex.Lock()
defer am.mutex.Unlock()
for _, rule := range rules {
am.rules[rule.ID] = &rule
if _, exists := am.ruleStates[rule.ID]; !exists {
am.ruleStates[rule.ID] = &RuleState{
LastCheck: time.Now(),
}
}
}
am.logger.Info("告警规则加载完成", Int("count", len(rules)))
return nil
}
// 启动告警检查
func (am *AlertManager) Start() {
am.mutex.Lock()
defer am.mutex.Unlock()
if am.running {
return
}
am.running = true
go am.checkLoop()
am.logger.Info("告警管理器已启动")
}
// 停止告警检查
func (am *AlertManager) Stop() {
am.mutex.Lock()
defer am.mutex.Unlock()
if !am.running {
return
}
am.running = false
close(am.stopChan)
am.logger.Info("告警管理器已停止")
}
// 检查循环
func (am *AlertManager) checkLoop() {
ticker := time.NewTicker(am.checkInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
am.checkRules()
case <-am.stopChan:
return
}
}
}
// 检查所有规则
func (am *AlertManager) checkRules() {
am.mutex.RLock()
rules := make(map[int]*AlertRule)
for k, v := range am.rules {
rules[k] = v
}
am.mutex.RUnlock()
for ruleID, rule := range rules {
am.checkRule(ruleID, rule)
}
}
// 检查单个规则
func (am *AlertManager) checkRule(ruleID int, rule *AlertRule) {
// 获取指标值
metric := am.collector.GetMetric(rule.MetricName)
if metric == nil {
am.logger.Warn("指标不存在", String("metric", rule.MetricName))
return
}
// 获取规则状态
am.mutex.Lock()
state, exists := am.ruleStates[ruleID]
if !exists {
state = &RuleState{LastCheck: time.Now()}
am.ruleStates[ruleID] = state
}
am.mutex.Unlock()
// 更新状态
state.LastCheck = time.Now()
state.LastValue = metric.Value
// 检查阈值
triggered := am.evaluateCondition(metric.Value, rule.Operator, rule.Threshold)
if triggered {
// 触发告警
if state.AlertStart == nil {
// 开始新的告警
now := time.Now()
state.AlertStart = &now
} else {
// 检查持续时间
duration := time.Since(*state.AlertStart)
if duration >= time.Duration(rule.Duration)*time.Second {
// 满足持续时间条件,触发告警
am.fireAlert(ruleID, rule, metric.Value, state)
}
}
} else {
// 未触发告警
if state.AlertStart != nil {
// 解决告警
am.resolveAlert(ruleID, rule, state)
state.AlertStart = nil
}
}
}
// 评估条件
func (am *AlertManager) evaluateCondition(value float64, operator string, threshold float64) bool {
switch operator {
case ">":
return value > threshold
case "<":
return value < threshold
case ">=":
return value >= threshold
case "<=":
return value <= threshold
case "==":
return value == threshold
case "!=":
return value != threshold
default:
return false
}
}
// 触发告警
func (am *AlertManager) fireAlert(ruleID int, rule *AlertRule, value float64, state *RuleState) {
// 检查是否已经有活跃的告警事件
if state.EventID != nil {
return
}
// 创建告警事件
event := &AlertEvent{
RuleID: ruleID,
RuleName: rule.Name,
MetricName: rule.MetricName,
MetricValue: value,
Threshold: rule.Threshold,
Severity: rule.Severity,
Status: "firing",
Message: fmt.Sprintf("指标 %s 当前值 %.2f %s 阈值 %.2f", rule.MetricName, value, rule.Operator, rule.Threshold),
StartTime: time.Now(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// 保存到数据库
if err := common.DB.Create(event).Error; err != nil {
am.logger.Error("保存告警事件失败", Error(err))
return
}
// 更新状态
state.EventID = &event.ID
// 缓存事件
am.mutex.Lock()
am.events[event.ID] = event
am.mutex.Unlock()
// 发送通知
am.notifier.SendAlert(event, rule.NotifyChannels)
am.logger.Warn("告警触发",
String("rule", rule.Name),
String("metric", rule.MetricName),
Float64("value", value),
Float64("threshold", rule.Threshold))
}
// 解决告警
func (am *AlertManager) resolveAlert(ruleID int, rule *AlertRule, state *RuleState) {
if state.EventID == nil {
return
}
// 更新告警事件
now := time.Now()
err := common.DB.Model(&AlertEvent{}).
Where("id = ?", *state.EventID).
Updates(map[string]interface{}{
"status": "resolved",
"end_time": &now,
"updated_at": now,
}).Error
if err != nil {
am.logger.Error("更新告警事件失败", Error(err))
return
}
// 获取事件详情
am.mutex.RLock()
event, exists := am.events[*state.EventID]
am.mutex.RUnlock()
if exists {
event.Status = "resolved"
event.EndTime = &now
event.UpdatedAt = now
// 发送解决通知
am.notifier.SendResolved(event, rule.NotifyChannels)
}
// 清理状态
state.EventID = nil
am.logger.Info("告警解决",
String("rule", rule.Name),
String("metric", rule.MetricName))
}
// 告警通知器
type AlertNotifier struct {
channels map[string]NotifyChannel
logger Logger
}
// 通知渠道接口
type NotifyChannel interface {
SendAlert(event *AlertEvent) error
SendResolved(event *AlertEvent) error
}
// 创建告警通知器
func NewAlertNotifier(logger Logger) *AlertNotifier {
return &AlertNotifier{
channels: make(map[string]NotifyChannel),
logger: logger,
}
}
// 注册通知渠道
func (an *AlertNotifier) RegisterChannel(name string, channel NotifyChannel) {
an.channels[name] = channel
}
// 发送告警通知
func (an *AlertNotifier) SendAlert(event *AlertEvent, channelNames []string) {
for _, name := range channelNames {
if channel, exists := an.channels[name]; exists {
go func(ch NotifyChannel, evt *AlertEvent) {
if err := ch.SendAlert(evt); err != nil {
an.logger.Error("发送告警通知失败",
String("channel", name),
Error(err))
}
}(channel, event)
}
}
}
// 发送解决通知
func (an *AlertNotifier) SendResolved(event *AlertEvent, channelNames []string) {
for _, name := range channelNames {
if channel, exists := an.channels[name]; exists {
go func(ch NotifyChannel, evt *AlertEvent) {
if err := ch.SendResolved(evt); err != nil {
an.logger.Error("发送解决通知失败",
String("channel", name),
Error(err))
}
}(channel, event)
}
}
}
// 邮件通知渠道
type EmailNotifier struct {
smtpHost string
smtpPort int
username string
password string
fromAddress string
toAddresses []string
}
// 创建邮件通知器
func NewEmailNotifier(smtpHost string, smtpPort int, username, password, fromAddress string, toAddresses []string) *EmailNotifier {
return &EmailNotifier{
smtpHost: smtpHost,
smtpPort: smtpPort,
username: username,
password: password,
fromAddress: fromAddress,
toAddresses: toAddresses,
}
}
// 发送告警邮件
func (en *EmailNotifier) SendAlert(event *AlertEvent) error {
subject := fmt.Sprintf("[%s] 告警: %s", strings.ToUpper(event.Severity), event.RuleName)
body := fmt.Sprintf(`
告警详情:
规则名称: %s
指标名称: %s
当前值: %.2f
阈值: %.2f
严重程度: %s
开始时间: %s
消息: %s
`,
event.RuleName,
event.MetricName,
event.MetricValue,
event.Threshold,
event.Severity,
event.StartTime.Format("2006-01-02 15:04:05"),
event.Message)
return en.sendEmail(subject, body)
}
// 发送解决邮件
func (en *EmailNotifier) SendResolved(event *AlertEvent) error {
subject := fmt.Sprintf("[RESOLVED] 告警解决: %s", event.RuleName)
duration := ""
if event.EndTime != nil {
duration = event.EndTime.Sub(event.StartTime).String()
}
body := fmt.Sprintf(`
告警已解决:
规则名称: %s
指标名称: %s
严重程度: %s
开始时间: %s
结束时间: %s
持续时间: %s
`,
event.RuleName,
event.MetricName,
event.Severity,
event.StartTime.Format("2006-01-02 15:04:05"),
event.EndTime.Format("2006-01-02 15:04:05"),
duration)
return en.sendEmail(subject, body)
}
// 发送邮件
func (en *EmailNotifier) sendEmail(subject, body string) error {
// 这里实现SMTP邮件发送逻辑
// 为了简化,这里只是模拟
fmt.Printf("发送邮件: %s\n%s\n", subject, body)
return nil
}
// Webhook通知渠道
type WebhookNotifier struct {
url string
timeout time.Duration
client *http.Client
}
// 创建Webhook通知器
func NewWebhookNotifier(url string, timeout time.Duration) *WebhookNotifier {
return &WebhookNotifier{
url: url,
timeout: timeout,
client: &http.Client{
Timeout: timeout,
},
}
}
// 发送告警Webhook
func (wn *WebhookNotifier) SendAlert(event *AlertEvent) error {
payload := map[string]interface{}{
"type": "alert",
"rule_name": event.RuleName,
"metric_name": event.MetricName,
"metric_value": event.MetricValue,
"threshold": event.Threshold,
"severity": event.Severity,
"status": event.Status,
"message": event.Message,
"start_time": event.StartTime,
}
return wn.sendWebhook(payload)
}
// 发送解决Webhook
func (wn *WebhookNotifier) SendResolved(event *AlertEvent) error {
payload := map[string]interface{}{
"type": "resolved",
"rule_name": event.RuleName,
"metric_name": event.MetricName,
"severity": event.Severity,
"status": event.Status,
"start_time": event.StartTime,
"end_time": event.EndTime,
}
return wn.sendWebhook(payload)
}
// 发送Webhook请求
func (wn *WebhookNotifier) sendWebhook(payload interface{}) error {
data, err := json.Marshal(payload)
if err != nil {
return err
}
req, err := http.NewRequest("POST", wn.url, bytes.NewBuffer(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := wn.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("webhook请求失败: %d", resp.StatusCode)
}
return nil
}11.5 日志分析与查询
日志分析与查询是日志系统的核心功能,通过对海量日志数据的高效检索和统计分析,为业务监控、故障排查和数据洞察提供支撑。本节将介绍如何设计和实现一个功能完整的日志查询系统。
核心组件
查询引擎:支持多维度条件查询和全文检索
统计分析:提供时间序列统计和聚合分析
数据导出:支持查询结果的批量导出
性能优化:通过索引和缓存提升查询效率
flowchart TD
L[应用日志] --> SHP[清洗/标准化]
SHP --> ENR[富化(请求ID/用户/渠道)]
ENR --> IDX[索引/存储]
IDX --> SRCH[查询/KQL/Lucene]
SRCH --> AGG[聚合/仪表]
AGG --> INS[洞察/报表]图9:日志采集到分析的流水线
sequenceDiagram
participant C as 客户端
participant API as 查询API
participant DB as 数据库
participant Cache as 缓存
C->>API: 提交查询请求
API->>API: 参数验证与默认值设置
API->>Cache: 检查缓存
alt 缓存命中
Cache-->>API: 返回缓存结果
else 缓存未命中
API->>DB: 构建SQL查询
DB-->>API: 返回查询结果
API->>Cache: 更新缓存
end
API->>API: 结果分页处理
API-->>C: 返回查询响应图10:日志查询处理时序图
核心概念解析
Schema-on-write vs. Schema-on-read
Schema-on-write:数据写入时定义结构,查询性能高但灵活性差
Schema-on-read:查询时解析结构,灵活性高但查询成本大
字段索引策略
对常用查询字段(user_id、type、created_at)建立索引
复合索引优化多条件查询性能
全文索引支持内容关键词搜索
查询优化技术
分页查询:避免大结果集内存溢出
预加载关联:减少N+1查询问题
结果缓存:热点查询结果缓存提升响应速度
11.5.1 日志查询API
// 日志查询请求
type LogQueryRequest struct {
UserID *int `json:"user_id,omitempty"`
Type *int `json:"type,omitempty"`
Username string `json:"username,omitempty"`
ModelName string `json:"model_name,omitempty"`
TokenName string `json:"token_name,omitempty"`
ChannelID *int `json:"channel_id,omitempty"`
StartTime time.Time `json:"start_time,omitempty"`
EndTime time.Time `json:"end_time,omitempty"`
Keyword string `json:"keyword,omitempty"`
Page int `json:"page" binding:"min=1"`
PageSize int `json:"page_size" binding:"min=1,max=100"`
OrderBy string `json:"order_by"`
Order string `json:"order"`
}
// 日志查询响应
type LogQueryResponse struct {
Logs []Log `json:"logs"`
Total int64 `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
TotalPages int `json:"total_pages"`
}
// 日志统计请求
type LogStatsRequest struct {
UserID *int `json:"user_id,omitempty"`
Type *int `json:"type,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
GroupBy string `json:"group_by"` // hour, day, week, month
}
// 日志统计响应
type LogStatsResponse struct {
Stats []LogStatItem `json:"stats"`
Total LogSummary `json:"total"`
}
// 日志统计项
type LogStatItem struct {
Time time.Time `json:"time"`
Count int64 `json:"count"`
TotalQuota int64 `json:"total_quota"`
TotalTokens int64 `json:"total_tokens"`
UniqueUsers int64 `json:"unique_users"`
}
// 日志摘要
type LogSummary struct {
TotalCount int64 `json:"total_count"`
TotalQuota int64 `json:"total_quota"`
TotalTokens int64 `json:"total_tokens"`
UniqueUsers int64 `json:"unique_users"`
AvgQuota float64 `json:"avg_quota"`
AvgTokens float64 `json:"avg_tokens"`
}
// QueryLogs 查询日志接口
// 功能:提供多维度条件查询和分页功能,支持用户、类型、时间范围等条件过滤
// 特点:
// 1. 灵活的查询条件组合
// 2. 分页查询避免大结果集
// 3. 预加载关联数据减少查询次数
// 4. 统一的错误处理和响应格式
func QueryLogs(c *gin.Context) {
var req LogQueryRequest
// 步骤1:解析和验证请求参数
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 步骤2:设置查询默认值,确保参数合理性
if req.Page == 0 {
req.Page = 1 // 默认第一页
}
if req.PageSize == 0 {
req.PageSize = 20 // 默认每页20条
}
if req.OrderBy == "" {
req.OrderBy = "created_at" // 默认按创建时间排序
}
if req.Order == "" {
req.Order = "desc" // 默认降序,最新的在前
}
// 步骤3:构建基础查询对象
query := common.DB.Model(&Log{})
// 步骤4:根据请求参数动态添加查询条件
// 用户ID精确匹配
if req.UserID != nil {
query = query.Where("user_id = ?", *req.UserID)
}
// 日志类型精确匹配
if req.Type != nil {
query = query.Where("type = ?", *req.Type)
}
// 用户名模糊匹配
if req.Username != "" {
query = query.Where("username LIKE ?", "%"+req.Username+"%")
}
// 模型名称精确匹配
if req.ModelName != "" {
query = query.Where("model_name = ?", req.ModelName)
}
// 令牌名称模糊匹配
if req.TokenName != "" {
query = query.Where("token_name LIKE ?", "%"+req.TokenName+"%")
}
// 渠道ID精确匹配
if req.ChannelID != nil {
query = query.Where("channel_id = ?", *req.ChannelID)
}
// 时间范围过滤:开始时间
if !req.StartTime.IsZero() {
query = query.Where("created_at >= ?", req.StartTime)
}
// 时间范围过滤:结束时间
if !req.EndTime.IsZero() {
query = query.Where("created_at <= ?", req.EndTime)
}
// 内容关键词模糊匹配
if req.Keyword != "" {
query = query.Where("content LIKE ?", "%"+req.Keyword+"%")
}
// 获取总数
var total int64
if err := query.Count(&total).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询失败"})
return
}
// 分页查询
var logs []Log
offset := (req.Page - 1) * req.PageSize
orderClause := fmt.Sprintf("%s %s", req.OrderBy, req.Order)
err := query.Preload("User").Preload("Channel").
Order(orderClause).
Limit(req.PageSize).
Offset(offset).
Find(&logs).Error
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询失败"})
return
}
// 计算总页数
totalPages := int(math.Ceil(float64(total) / float64(req.PageSize)))
response := LogQueryResponse{
Logs: logs,
Total: total,
Page: req.Page,
PageSize: req.PageSize,
TotalPages: totalPages,
}
c.JSON(http.StatusOK, response)
}
// 日志统计
func GetLogStats(c *gin.Context) {
var req LogStatsRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 设置默认值
if req.GroupBy == "" {
req.GroupBy = "day"
}
// 构建查询
query := common.DB.Model(&Log{})
// 添加条件
if req.UserID != nil {
query = query.Where("user_id = ?", *req.UserID)
}
if req.Type != nil {
query = query.Where("type = ?", *req.Type)
}
query = query.Where("created_at >= ? AND created_at <= ?", req.StartTime, req.EndTime)
// 获取总体统计
var summary LogSummary
err := query.Select(
"COUNT(*) as total_count",
"COALESCE(SUM(quota), 0) as total_quota",
"COALESCE(SUM(prompt_tokens + completion_tokens), 0) as total_tokens",
"COUNT(DISTINCT user_id) as unique_users",
"COALESCE(AVG(quota), 0) as avg_quota",
"COALESCE(AVG(prompt_tokens + completion_tokens), 0) as avg_tokens",
).Scan(&summary).Error
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "统计查询失败"})
return
}
// 获取分组统计
var timeFormat string
var timeGroup string
switch req.GroupBy {
case "hour":
timeFormat = "%Y-%m-%d %H:00:00"
timeGroup = "DATE_FORMAT(created_at, '%Y-%m-%d %H:00:00')"
case "day":
timeFormat = "%Y-%m-%d"
timeGroup = "DATE(created_at)"
case "week":
timeFormat = "%Y-%u"
timeGroup = "YEARWEEK(created_at)"
case "month":
timeFormat = "%Y-%m"
timeGroup = "DATE_FORMAT(created_at, '%Y-%m')"
default:
timeFormat = "%Y-%m-%d"
timeGroup = "DATE(created_at)"
}
var stats []LogStatItem
err = query.Select(
fmt.Sprintf("DATE_FORMAT(created_at, '%s') as time", timeFormat),
"COUNT(*) as count",
"COALESCE(SUM(quota), 0) as total_quota",
"COALESCE(SUM(prompt_tokens + completion_tokens), 0) as total_tokens",
"COUNT(DISTINCT user_id) as unique_users",
).Group(timeGroup).Order("time").Scan(&stats).Error
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "分组统计查询失败"})
return
}
response := LogStatsResponse{
Stats: stats,
Total: summary,
}
c.JSON(http.StatusOK, response)
}
// 导出日志
func ExportLogs(c *gin.Context) {
var req LogQueryRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 构建查询(不分页)
query := common.DB.Model(&Log{})
// 添加条件(复用查询逻辑)
if req.UserID != nil {
query = query.Where("user_id = ?", *req.UserID)
}
if req.Type != nil {
query = query.Where("type = ?", *req.Type)
}
if req.Username != "" {
query = query.Where("username LIKE ?", "%"+req.Username+"%")
}
if req.ModelName != "" {
query = query.Where("model_name = ?", req.ModelName)
}
if req.TokenName != "" {
query = query.Where("token_name LIKE ?", "%"+req.TokenName+"%")
}
if req.ChannelID != nil {
query = query.Where("channel_id = ?", *req.ChannelID)
}
if !req.StartTime.IsZero() {
query = query.Where("created_at >= ?", req.StartTime)
}
if !req.EndTime.IsZero() {
query = query.Where("created_at <= ?", req.EndTime)
}
if req.Keyword != "" {
query = query.Where("content LIKE ?", "%"+req.Keyword+"%")
}
// 查询数据
var logs []Log
err := query.Preload("User").Preload("Channel").
Order("created_at desc").
Limit(10000). // 限制导出数量
Find(&logs).Error
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询失败"})
return
}
// 生成CSV
var buf bytes.Buffer
writer := csv.NewWriter(&buf)
// 写入表头
headers := []string{
"ID", "用户ID", "用户名", "类型", "内容", "令牌名称", "模型名称",
"配额", "提示令牌", "完成令牌", "请求ID", "渠道ID", "IP地址", "用户代理",
"耗时(ms)", "状态码", "错误信息", "创建时间",
}
writer.Write(headers)
// 写入数据
for _, log := range logs {
record := []string{
fmt.Sprintf("%d", log.ID),
fmt.Sprintf("%d", log.UserID),
log.Username,
getLogTypeName(log.Type),
log.Content,
log.TokenName,
log.ModelName,
fmt.Sprintf("%d", log.Quota),
fmt.Sprintf("%d", log.PromptTokens),
fmt.Sprintf("%d", log.CompletionTokens),
log.RequestId,
func() string {
if log.ChannelId != nil {
return fmt.Sprintf("%d", *log.ChannelId)
}
return ""
}(),
log.IPAddress,
log.UserAgent,
fmt.Sprintf("%d", log.Duration),
fmt.Sprintf("%d", log.StatusCode),
log.ErrorMsg,
log.CreatedAt.Format("2006-01-02 15:04:05"),
}
writer.Write(record)
}
writer.Flush()
// 设置响应头
filename := fmt.Sprintf("logs_%s.csv", time.Now().Format("20060102_150405"))
c.Header("Content-Type", "text/csv")
c.Header("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
c.Data(http.StatusOK, "text/csv", buf.Bytes())
}
// 获取日志类型名称
func getLogTypeName(logType int) string {
switch logType {
case LogTypeTopup:
return "充值"
case LogTypeConsume:
return "消费"
case LogTypeManage:
return "管理"
case LogTypeSystem:
return "系统"
case LogTypeAuth:
return "认证"
case LogTypeAPI:
return "API调用"
case LogTypeError:
return "错误"
case LogTypeWarning:
return "警告"
case LogTypeInfo:
return "信息"
case LogTypeDebug:
return "调试"
default:
return "未知"
}
}11.6 性能优化与最佳实践
在企业级应用中,日志系统和监控系统的性能直接影响应用的整体表现。本节将介绍如何通过异步处理、缓存优化、资源管理等技术手段,构建高性能的日志监控系统。
核心优化策略
异步处理:避免同步I/O阻塞主业务流程
批量操作:减少系统调用次数,提升吞吐量
缓存机制:热点数据缓存,减少重复计算
资源管理:合理配置缓冲区和连接池
数据压缩:减少存储空间和网络传输开销
flowchart LR
RAW[原始日志] --> FILT[采样/过滤]
FILT --> REDACT[脱敏/打码]
REDACT --> ASYNC[异步落盘]
ASYNC --> ROT[切割/压缩/轮转]
ROT --> SHIP[转运到集中存储]图11:高性能日志落地与传输链路
flowchart TD
APP[应用程序] --> BUF[缓冲区]
BUF --> BATCH[批量处理器]
BATCH --> COMP[压缩器]
COMP --> DISK[磁盘存储]
BUF --> CACHE[内存缓存]
CACHE --> QUERY[查询接口]
DISK --> ROTATE[日志轮转]
ROTATE --> ARCHIVE[归档存储]图12:日志系统性能优化架构
核心概念解析
异步日志处理
主线程将日志写入缓冲区后立即返回
后台线程负责批量处理和持久化
通过缓冲区大小和刷新间隔平衡性能和数据安全
日志采样策略
高频DEBUG/INFO日志按比例采样
ERROR/FATAL级别日志全量保留
动态调整采样率应对流量突增
数据脱敏技术
敏感字段自动识别和掩码处理
支持正则表达式和字段名匹配
保留数据结构的同时隐藏敏感信息
性能监控指标
日志写入延迟和吞吐量
缓冲区使用率和溢出次数
磁盘I/O和网络传输性能
11.6.1 日志性能优化
// AsyncLogWriter 异步日志写入器
// 功能:通过缓冲区和后台goroutine实现异步日志写入,避免阻塞主业务流程
// 特点:
// 1. 非阻塞写入:主线程写入缓冲区后立即返回
// 2. 批量处理:后台线程批量处理日志条目,提升I/O效率
// 3. 定时刷新:定期刷新缓冲区,确保日志及时落盘
// 4. 优雅关闭:关闭时处理完所有缓冲区中的日志
type AsyncLogWriter struct {
output LogOutput // 实际的日志输出器
buffer chan *LogEntry // 日志条目缓冲区
batchSize int // 批量处理大小
flushTime time.Duration // 定时刷新间隔
stopChan chan struct{} // 停止信号通道
wg sync.WaitGroup // 等待组,确保优雅关闭
}
// NewAsyncLogWriter 创建异步日志写入器
// 参数:
// output: 底层日志输出器
// bufferSize: 缓冲区大小,影响内存使用和写入性能
// batchSize: 批量处理大小,影响I/O效率
// flushTime: 刷新间隔,影响日志实时性
func NewAsyncLogWriter(output LogOutput, bufferSize, batchSize int, flushTime time.Duration) *AsyncLogWriter {
writer := &AsyncLogWriter{
output: output,
buffer: make(chan *LogEntry, bufferSize), // 创建带缓冲的通道
batchSize: batchSize,
flushTime: flushTime,
stopChan: make(chan struct{}),
}
// 启动后台写入goroutine
writer.wg.Add(1)
go writer.writeLoop()
return writer
}
// 写入日志条目
func (alw *AsyncLogWriter) Write(entry *LogEntry) error {
select {
case alw.buffer <- entry:
return nil
default:
// 缓冲区满,直接写入
return alw.output.Write(entry)
}
}
// 关闭写入器
func (alw *AsyncLogWriter) Close() error {
close(alw.stopChan)
alw.wg.Wait()
return alw.output.Close()
}
// 写入循环
func (alw *AsyncLogWriter) writeLoop() {
defer alw.wg.Done()
batch := make([]*LogEntry, 0, alw.batchSize)
ticker := time.NewTicker(alw.flushTime)
defer ticker.Stop()
for {
select {
case entry := <-alw.buffer:
batch = append(batch, entry)
if len(batch) >= alw.batchSize {
alw.flushBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
alw.flushBatch(batch)
batch = batch[:0]
}
case <-alw.stopChan:
// 处理剩余的日志
for len(alw.buffer) > 0 {
batch = append(batch, <-alw.buffer)
}
if len(batch) > 0 {
alw.flushBatch(batch)
}
return
}
}
}
// 批量刷新
func (alw *AsyncLogWriter) flushBatch(batch []*LogEntry) {
for _, entry := range batch {
if err := alw.output.Write(entry); err != nil {
// 错误处理
fmt.Fprintf(os.Stderr, "异步日志写入错误: %v\n", err)
}
}
}
// 日志缓存
type LogCache struct {
cache map[string]*LogEntry
mutex sync.RWMutex
maxSize int
ttl time.Duration
// 清理
stopChan chan struct{}
wg sync.WaitGroup
}
// 创建日志缓存
func NewLogCache(maxSize int, ttl time.Duration) *LogCache {
cache := &LogCache{
cache: make(map[string]*LogEntry),
maxSize: maxSize,
ttl: ttl,
stopChan: make(chan struct{}),
}
cache.wg.Add(1)
go cache.cleanupLoop()
return cache
}
// 获取日志条目
func (lc *LogCache) Get(key string) *LogEntry {
lc.mutex.RLock()
defer lc.mutex.RUnlock()
return lc.cache[key]
}
// 设置日志条目
func (lc *LogCache) Set(key string, entry *LogEntry) {
lc.mutex.Lock()
defer lc.mutex.Unlock()
// 检查缓存大小
if len(lc.cache) >= lc.maxSize {
// 删除最旧的条目
var oldestKey string
var oldestTime time.Time
for k, v := range lc.cache {
if oldestKey == "" || v.Timestamp.Before(oldestTime) {
oldestKey = k
oldestTime = v.Timestamp
}
}
delete(lc.cache, oldestKey)
}
lc.cache[key] = entry
}
// 删除日志条目
func (lc *LogCache) Delete(key string) {
lc.mutex.Lock()
defer lc.mutex.Unlock()
delete(lc.cache, key)
}
// 清理循环
func (lc *LogCache) cleanupLoop() {
defer lc.wg.Done()
ticker := time.NewTicker(lc.ttl / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lc.cleanup()
case <-lc.stopChan:
return
}
}
}
// 清理过期条目
func (lc *LogCache) cleanup() {
lc.mutex.Lock()
defer lc.mutex.Unlock()
now := time.Now()
for key, entry := range lc.cache {
if now.Sub(entry.Timestamp) > lc.ttl {
delete(lc.cache, key)
}
}
}
// 关闭缓存
func (lc *LogCache) Close() {
close(lc.stopChan)
lc.wg.Wait()
}11.6.2 监控最佳实践
// 监控配置
type MonitoringConfig struct {
// 日志配置
LogLevel LogLevel `json:"log_level"`
LogOutput []string `json:"log_output"`
LogRotation bool `json:"log_rotation"`
LogMaxSize int64 `json:"log_max_size"`
LogMaxAge time.Duration `json:"log_max_age"`
LogMaxBackups int `json:"log_max_backups"`
LogCompress bool `json:"log_compress"`
// 指标配置
MetricsEnabled bool `json:"metrics_enabled"`
MetricsInterval time.Duration `json:"metrics_interval"`
MetricsEndpoint string `json:"metrics_endpoint"`
// 告警配置
AlertEnabled bool `json:"alert_enabled"`
AlertInterval time.Duration `json:"alert_interval"`
AlertChannels []string `json:"alert_channels"`
// 性能配置
AsyncLogging bool `json:"async_logging"`
BufferSize int `json:"buffer_size"`
BatchSize int `json:"batch_size"`
FlushInterval time.Duration `json:"flush_interval"`
}
// 默认配置
func DefaultMonitoringConfig() *MonitoringConfig {
return &MonitoringConfig{
LogLevel: LogLevelInfo,
LogOutput: []string{"console", "file"},
LogRotation: true,
LogMaxSize: 100 * 1024 * 1024, // 100MB
LogMaxAge: 7 * 24 * time.Hour, // 7天
LogMaxBackups: 10,
LogCompress: true,
MetricsEnabled: true,
MetricsInterval: 30 * time.Second,
MetricsEndpoint: "/metrics",
AlertEnabled: true,
AlertInterval: 30 * time.Second,
AlertChannels: []string{"email", "webhook"},
AsyncLogging: true,
BufferSize: 1000,
BatchSize: 100,
FlushInterval: 5 * time.Second,
}
}
// 监控系统
type MonitoringSystem struct {
config *MonitoringConfig
logger Logger
metricsCollector *MetricsCollector
alertManager *AlertManager
systemMonitor *SystemMonitor
// 状态
running bool
mutex sync.RWMutex
}
// 创建监控系统
func NewMonitoringSystem(config *MonitoringConfig) (*MonitoringSystem, error) {
// 创建日志记录器
var outputs []LogOutput
for _, outputType := range config.LogOutput {
switch outputType {
case "console":
outputs = append(outputs, NewConsoleOutput(true))
case "file":
rotator := &LogRotator{
maxSize: config.LogMaxSize,
maxAge: config.LogMaxAge,
maxBackups: config.LogMaxBackups,
compress: config.LogCompress,
}
fileOutput, err := NewFileOutput("logs/app.log", rotator)
if err != nil {
return nil, err
}
if config.AsyncLogging {
asyncOutput := NewAsyncLogWriter(
fileOutput,
config.BufferSize,
config.BatchSize,
config.FlushInterval,
)
outputs = append(outputs, asyncOutput)
} else {
outputs = append(outputs, fileOutput)
}
}
}
logger := NewLogger(config.LogLevel, outputs...)
// 创建指标收集器
metricsCollector := NewMetricsCollector()
// 创建告警通知器
alertNotifier := NewAlertNotifier(logger)
// 注册通知渠道
for _, channel := range config.AlertChannels {
switch channel {
case "email":
emailNotifier := NewEmailNotifier(
"smtp.example.com", 587,
"[email protected]", "password",
"[email protected]",
[]string{"[email protected]"},
)
alertNotifier.RegisterChannel("email", emailNotifier)
case "webhook":
webhookNotifier := NewWebhookNotifier(
"http://localhost:8080/webhook/alert",
10*time.Second,
)
alertNotifier.RegisterChannel("webhook", webhookNotifier)
}
}
// 创建告警管理器
alertManager := NewAlertManager(metricsCollector, alertNotifier, logger)
// 创建系统监控器
systemMonitor := NewSystemMonitor(metricsCollector, logger)
return &MonitoringSystem{
config: config,
logger: logger,
metricsCollector: metricsCollector,
alertManager: alertManager,
systemMonitor: systemMonitor,
}, nil
}
// 启动监控系统
func (ms *MonitoringSystem) Start() error {
ms.mutex.Lock()
defer ms.mutex.Unlock()
if ms.running {
return nil
}
// 启动系统监控
if ms.config.MetricsEnabled {
ms.systemMonitor.Start()
}
// 加载并启动告警管理
if ms.config.AlertEnabled {
if err := ms.alertManager.LoadRules(); err != nil {
return err
}
ms.alertManager.Start()
}
ms.running = true
ms.logger.Info("监控系统已启动")
return nil
}
// 停止监控系统
func (ms *MonitoringSystem) Stop() {
ms.mutex.Lock()
defer ms.mutex.Unlock()
if !ms.running {
return
}
// 停止各个组件
ms.systemMonitor.Stop()
ms.alertManager.Stop()
ms.running = false
ms.logger.Info("监控系统已停止")
}
// 获取日志记录器
func (ms *MonitoringSystem) GetLogger() Logger {
return ms.logger
}
// 获取指标收集器
func (ms *MonitoringSystem) GetMetricsCollector() *MetricsCollector {
return ms.metricsCollector
}
// 获取Prometheus处理器
func (ms *MonitoringSystem) GetPrometheusHandler() http.Handler {
return ms.metricsCollector.GetPrometheusHandler()
}11.7 本章小结
本章详细介绍了New API项目中日志系统与监控告警的设计与实现:
11.7.1 主要内容
日志系统设计
结构化日志模型设计
多级别日志记录
灵活的日志输出器架构
日志轮转和压缩机制
日志输出实现
控制台输出器
文件输出器
JSON格式化器
异步日志写入
业务日志记录
API调用日志
用户操作日志
系统事件日志
错误和异常日志
监控告警系统
指标收集和存储
告警规则管理
多渠道通知机制
系统性能监控
日志分析查询
灵活的查询API
统计分析功能
数据导出功能
可视化支持
性能优化
异步日志处理
批量写入优化
缓存机制
资源管理
11.7.2 技术特点
高性能: 异步处理、批量操作、缓存优化
高可用: 故障转移、健康检查、自动恢复
可扩展: 插件化架构、多输出支持、灵活配置
易维护: 结构化设计、清晰接口、完善文档
11.7.3 最佳实践
合理设置日志级别,避免过度记录
使用结构化日志,便于查询和分析
实施日志轮转,控制磁盘使用
建立完善的告警机制,及时发现问题
定期分析日志数据,优化系统性能
11.8 练习题
11.8.1 基础练习
日志记录器实现
实现一个支持多输出的日志记录器
添加日志级别过滤功能
实现调用者信息记录
文件轮转实现
实现基于大小的日志轮转
添加基于时间的轮转策略
实现日志压缩功能
告警规则配置
设计告警规则的配置格式
实现规则的动态加载
添加规则验证功能
11.8.2 进阶练习
分布式日志收集
设计分布式日志收集架构
实现日志聚合功能
添加日志去重机制
实时监控面板
实现实时指标展示
添加告警状态显示
设计交互式查询界面
智能告警
实现基于机器学习的异常检测
添加告警收敛机制
设计自适应阈值调整
11.8.3 项目练习
完整监控系统
构建端到端的监控解决方案
集成Prometheus和Grafana
实现告警管理平台
日志分析平台
开发日志搜索和分析工具
实现日志可视化功能
添加报表生成功能
11.9 扩展阅读
11.9.1 相关技术
ELK Stack: Elasticsearch、Logstash、Kibana日志处理栈 - 官方网站
Prometheus: 开源监控和告警工具 - 官方文档
Grafana: 数据可视化和监控面板 - 官方网站
Jaeger: 分布式追踪系统 - 官方文档
OpenTelemetry: 可观测性框架 - 官方网站
Fluentd: 统一日志收集层 - 官方网站
Vector: 高性能可观测性数据管道 - 官方网站
Loki: Grafana日志聚合系统 - 官方文档
11.9.2 推荐资源
11.9.3 开源项目
logrus: 结构化日志库
zap: 高性能日志库
zerolog: 零分配JSON日志库
lumberjack: 日志轮转库
alertmanager: Prometheus告警管理器
fluentd: 统一日志收集器
filebeat: Elastic日志收集器
promtail: Loki日志代理
node_exporter: 系统指标收集器
blackbox_exporter: 网络探测工具
cadvisor: 容器监控工具
jaeger: 分布式追踪平台
zipkin: 分布式追踪系统
otel-collector: OpenTelemetry收集器
最后更新于
这有帮助吗?
