当前位置:首页 > 问答 > 正文

日志管理|数据库存储 Python实现日志写入数据库的示例,Demo,日志数据存储到数据库demo

Python实现日志写入数据库的完整指南

当日志遇上数据库:一个开发者的日常痛点

上周三凌晨2点,我被刺耳的报警短信惊醒——生产环境某个核心服务挂了,我睡眼惺忪地打开终端,准备查看日志定位问题,却发现日志文件早已被滚动压缩成了几十个归档包,在解压、grep、再解压的循环中,我意识到:如果这些日志能直接存到数据库里该多好!

我就来分享如何用Python将日志直接存储到数据库,让你的运维生活轻松十倍。

基础准备:选择你的武器库

在开始前,我们需要准备几个基础组件:

日志管理|数据库存储 Python实现日志写入数据库的示例,Demo,日志数据存储到数据库demo

  1. Python日志模块 - 内置的logging模块
  2. 数据库驱动 - 根据数据库类型选择:
    • MySQL: pymysqlmysql-connector-python
    • PostgreSQL: psycopg2
    • SQLite: 内置支持
  3. 可选工具 - SQLAlchemy(如果你喜欢ORM方式)
# 以MySQL为例,先安装必要包
pip install pymysql

直接写入数据库(简单粗暴版)

让我们先从最直接的方式开始——创建一个自定义的日志处理器,直接将日志写入MySQL。

import logging
import pymysql
from logging import Handler
class MySQLHandler(Handler):
    def __init__(self, host, user, password, db, table):
        super().__init__()
        self.host = host
        self.user = user
        self.password = password
        self.db = db
        self.table = table
        self.connection = None
        self.cursor = None
        self.connect()
    def connect(self):
        try:
            self.connection = pymysql.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.db,
                charset='utf8mb4'
            )
            self.cursor = self.connection.cursor()
        except Exception as e:
            print(f"数据库连接失败: {e}")
    def emit(self, record):
        if not self.connection or not self.connection.open:
            self.connect()
        try:
            log_time = self.formatTime(record)
            sql = f"""
            INSERT INTO {self.table} 
            (log_time, level, message, module, func_name, line_no) 
            VALUES (%s, %s, %s, %s, %s, %s)
            """
            self.cursor.execute(sql, (
                log_time,
                record.levelname,
                record.getMessage(),
                record.module,
                record.funcName,
                record.lineno
            ))
            self.connection.commit()
        except Exception as e:
            print(f"日志写入失败: {e}")
            self.connection.rollback()
# 创建数据库表(如果不存在)
def create_log_table(host, user, password, db, table):
    conn = pymysql.connect(
        host=host, user=user, password=password, database=db
    )
    with conn.cursor() as cursor:
        cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {table} (
            id INT AUTO_INCREMENT PRIMARY KEY,
            log_time DATETIME NOT NULL,
            level VARCHAR(10) NOT NULL,
            message TEXT,
            module VARCHAR(50),
            func_name VARCHAR(50),
            line_no INT,
            INDEX idx_log_time (log_time),
            INDEX idx_level (level)
        )
        """)
    conn.commit()
    conn.close()
# 使用示例
if __name__ == "__main__":
    # 先创建表
    create_log_table(
        host="localhost",
        user="your_username",
        password="your_password",
        db="log_db",
        table="app_logs"
    )
    # 配置日志
    logger = logging.getLogger("db_logger")
    logger.setLevel(logging.INFO)
    # 添加我们的MySQL处理器
    mysql_handler = MySQLHandler(
        host="localhost",
        user="your_username",
        password="your_password",
        db="log_db",
        table="app_logs"
    )
    mysql_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
    logger.addHandler(mysql_handler)
    # 测试日志
    logger.info("这是一条测试日志,将被存入数据库")
    logger.error("模拟一个错误日志", extra={"some_context": "额外信息"})

使用SQLAlchemy(优雅ORM版)

如果你更喜欢ORM方式,或者需要支持多种数据库,SQLAlchemy是个不错的选择。

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import logging
from logging import Handler
from datetime import datetime
Base = declarative_base()
class LogEntry(Base):
    __tablename__ = 'app_logs'
    id = Column(Integer, primary_key=True)
    timestamp = Column(DateTime, default=datetime.now, index=True)
    level = Column(String(10), index=True)
    message = Column(Text)
    module = Column(String(50))
    func_name = Column(String(50))
    line_no = Column(Integer)
    def __repr__(self):
        return f"<LogEntry {self.timestamp} {self.level} {self.message[:20]}>"
class SQLAlchemyHandler(Handler):
    def __init__(self, db_url):
        super().__init__()
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()
    def emit(self, record):
        try:
            log_entry = LogEntry(
                level=record.levelname,
                message=self.format(record),
                module=record.module,
                func_name=record.funcName,
                line_no=record.lineno
            )
            self.session.add(log_entry)
            self.session.commit()
        except Exception as e:
            print(f"日志记录失败: {e}")
            self.session.rollback()
        finally:
            self.session.close()
# 使用示例
if __name__ == "__main__":
    # 配置日志
    logger = logging.getLogger("alchemy_logger")
    logger.setLevel(logging.DEBUG)
    # 添加SQLAlchemy处理器
    # 支持多种数据库:
    # MySQL: mysql+pymysql://user:pass@host/dbname
    # PostgreSQL: postgresql+psycopg2://user:pass@host/dbname
    # SQLite: sqlite:///log.db
    db_handler = SQLAlchemyHandler("mysql+pymysql://your_username:your_password@localhost/log_db")
    db_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
    logger.addHandler(db_handler)
    # 测试日志
    logger.debug("这是一条DEBUG级别日志")
    logger.info("用户登录成功", extra={"user_id": 123})

高级技巧:处理性能与可靠性问题

直接写入数据库虽然方便,但在高并发场景下可能会遇到性能问题,以下是几个优化建议:

  1. 批量写入:改为积累一定数量的日志后批量提交
  2. 异步写入:使用队列或后台线程处理日志写入
  3. 故障转移:当数据库不可用时,临时写入本地文件
import queue
import threading
class AsyncDBHandler(Handler):
    def __init__(self, host, user, password, db, table, batch_size=10):
        super().__init__()
        self.host = host
        self.user = user
        self.password = password
        self.db = db
        self.table = table
        self.batch_size = batch_size
        self.log_queue = queue.Queue()
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
    def _worker(self):
        batch = []
        conn = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.db
        )
        while True:
            try:
                # 从队列获取日志记录
                record = self.log_queue.get(timeout=5)
                if record:
                    batch.append(record)
                # 达到批量大小或超时后提交
                if len(batch) >= self.batch_size or (batch and record is None):
                    with conn.cursor() as cursor:
                        sql = f"""
                        INSERT INTO {self.table} 
                        (log_time, level, message, module, func_name, line_no) 
                        VALUES (%s, %s, %s, %s, %s, %s)
                        """
                        cursor.executemany(sql, batch)
                        conn.commit()
                    batch = []
                if record is None:  # 终止信号
                    break
            except Exception as e:
                print(f"日志批量写入失败: {e}")
                conn.rollback()
                # 简单重连机制
                try:
                    conn.ping(reconnect=True)
                except:
                    conn = pymysql.connect(
                        host=self.host,
                        user=self.user,
                        password=self.password,
                        database=self.db
                    )
        conn.close()
    def emit(self, record):
        try:
            log_entry = (
                self.formatTime(record),
                record.levelname,
                record.getMessage(),
                record.module,
                record.funcName,
                record.lineno
            )
            self.log_queue.put(log_entry)
        except Exception as e:
            print(f"无法加入日志队列: {e}")
    def close(self):
        self.log_queue.put(None)  # 发送终止信号
        self.worker_thread.join()
        super().close()

日志查询:从数据库获取价值

日志存入数据库后,你可以轻松执行复杂查询:

日志管理|数据库存储 Python实现日志写入数据库的示例,Demo,日志数据存储到数据库demo

def query_logs(level=None, start_time=None, end_time=None, keyword=None):
    conn = pymysql.connect(host="localhost", user="your_username", 
                          password="your_password", database="log_db")
    query = f"SELECT * FROM app_logs WHERE 1=1"
    params = []
    if level:
        query += " AND level = %s"
        params.append(level)
    if start_time:
        query += " AND log_time >= %s"
        params.append(start_time)
    if end_time:
        query += " AND log_time <= %s"
        params.append(end_time)
    if keyword:
        query += " AND message LIKE %s"
        params.append(f"%{keyword}%")
    query += " ORDER BY log_time DESC LIMIT 100"
    with conn.cursor(pymysql.cursors.DictCursor) as cursor:
        cursor.execute(query, params)
        results = cursor.fetchall()
    conn.close()
    return results
# 示例查询
# error_logs = query_logs(level="ERROR", start_time="2025-08-01")
# login_logs = query_logs(keyword="用户登录")

选择适合你的方案

  1. 简单项目:直接使用第一个基础方案
  2. 中型项目:考虑SQLAlchemy版本,便于维护和扩展
  3. 高性能需求:使用异步批量写入方案

日志是系统健康的晴雨表,把它们存到数据库不仅方便查询分析,还能为后续的日志监控、报警系统打下基础,下次再遇到凌晨报警,你就能优雅地从数据库快速定位问题,而不是在成堆的日志文件里大海捞针了!

现在就去试试吧,让你的日志管理更上一层楼!

发表评论