上一篇
上周三凌晨2点,我被刺耳的报警短信惊醒——生产环境某个核心服务挂了,我睡眼惺忪地打开终端,准备查看日志定位问题,却发现日志文件早已被滚动压缩成了几十个归档包,在解压、grep、再解压的循环中,我意识到:如果这些日志能直接存到数据库里该多好!
我就来分享如何用Python将日志直接存储到数据库,让你的运维生活轻松十倍。
在开始前,我们需要准备几个基础组件:
logging
模块pymysql
或 mysql-connector-python
psycopg2
SQLAlchemy
(如果你喜欢ORM方式)# 以MySQL为例,先安装必要包 pip install pymysql
让我们先从最直接的方式开始——创建一个自定义的日志处理器,直接将日志写入MySQL。
import logging import pymysql from logging import Handler class MySQLHandler(Handler): def __init__(self, host, user, password, db, table): super().__init__() self.host = host self.user = user self.password = password self.db = db self.table = table self.connection = None self.cursor = None self.connect() def connect(self): try: self.connection = pymysql.connect( host=self.host, user=self.user, password=self.password, database=self.db, charset='utf8mb4' ) self.cursor = self.connection.cursor() except Exception as e: print(f"数据库连接失败: {e}") def emit(self, record): if not self.connection or not self.connection.open: self.connect() try: log_time = self.formatTime(record) sql = f""" INSERT INTO {self.table} (log_time, level, message, module, func_name, line_no) VALUES (%s, %s, %s, %s, %s, %s) """ self.cursor.execute(sql, ( log_time, record.levelname, record.getMessage(), record.module, record.funcName, record.lineno )) self.connection.commit() except Exception as e: print(f"日志写入失败: {e}") self.connection.rollback() # 创建数据库表(如果不存在) def create_log_table(host, user, password, db, table): conn = pymysql.connect( host=host, user=user, password=password, database=db ) with conn.cursor() as cursor: cursor.execute(f""" CREATE TABLE IF NOT EXISTS {table} ( id INT AUTO_INCREMENT PRIMARY KEY, log_time DATETIME NOT NULL, level VARCHAR(10) NOT NULL, message TEXT, module VARCHAR(50), func_name VARCHAR(50), line_no INT, INDEX idx_log_time (log_time), INDEX idx_level (level) ) """) conn.commit() conn.close() # 使用示例 if __name__ == "__main__": # 先创建表 create_log_table( host="localhost", user="your_username", password="your_password", db="log_db", table="app_logs" ) # 配置日志 logger = logging.getLogger("db_logger") logger.setLevel(logging.INFO) # 添加我们的MySQL处理器 mysql_handler = MySQLHandler( host="localhost", user="your_username", password="your_password", db="log_db", table="app_logs" ) mysql_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(mysql_handler) # 测试日志 logger.info("这是一条测试日志,将被存入数据库") logger.error("模拟一个错误日志", extra={"some_context": "额外信息"})
如果你更喜欢ORM方式,或者需要支持多种数据库,SQLAlchemy是个不错的选择。
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import logging from logging import Handler from datetime import datetime Base = declarative_base() class LogEntry(Base): __tablename__ = 'app_logs' id = Column(Integer, primary_key=True) timestamp = Column(DateTime, default=datetime.now, index=True) level = Column(String(10), index=True) message = Column(Text) module = Column(String(50)) func_name = Column(String(50)) line_no = Column(Integer) def __repr__(self): return f"<LogEntry {self.timestamp} {self.level} {self.message[:20]}>" class SQLAlchemyHandler(Handler): def __init__(self, db_url): super().__init__() self.engine = create_engine(db_url) Base.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() def emit(self, record): try: log_entry = LogEntry( level=record.levelname, message=self.format(record), module=record.module, func_name=record.funcName, line_no=record.lineno ) self.session.add(log_entry) self.session.commit() except Exception as e: print(f"日志记录失败: {e}") self.session.rollback() finally: self.session.close() # 使用示例 if __name__ == "__main__": # 配置日志 logger = logging.getLogger("alchemy_logger") logger.setLevel(logging.DEBUG) # 添加SQLAlchemy处理器 # 支持多种数据库: # MySQL: mysql+pymysql://user:pass@host/dbname # PostgreSQL: postgresql+psycopg2://user:pass@host/dbname # SQLite: sqlite:///log.db db_handler = SQLAlchemyHandler("mysql+pymysql://your_username:your_password@localhost/log_db") db_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(db_handler) # 测试日志 logger.debug("这是一条DEBUG级别日志") logger.info("用户登录成功", extra={"user_id": 123})
直接写入数据库虽然方便,但在高并发场景下可能会遇到性能问题,以下是几个优化建议:
import queue import threading class AsyncDBHandler(Handler): def __init__(self, host, user, password, db, table, batch_size=10): super().__init__() self.host = host self.user = user self.password = password self.db = db self.table = table self.batch_size = batch_size self.log_queue = queue.Queue() self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() def _worker(self): batch = [] conn = pymysql.connect( host=self.host, user=self.user, password=self.password, database=self.db ) while True: try: # 从队列获取日志记录 record = self.log_queue.get(timeout=5) if record: batch.append(record) # 达到批量大小或超时后提交 if len(batch) >= self.batch_size or (batch and record is None): with conn.cursor() as cursor: sql = f""" INSERT INTO {self.table} (log_time, level, message, module, func_name, line_no) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.executemany(sql, batch) conn.commit() batch = [] if record is None: # 终止信号 break except Exception as e: print(f"日志批量写入失败: {e}") conn.rollback() # 简单重连机制 try: conn.ping(reconnect=True) except: conn = pymysql.connect( host=self.host, user=self.user, password=self.password, database=self.db ) conn.close() def emit(self, record): try: log_entry = ( self.formatTime(record), record.levelname, record.getMessage(), record.module, record.funcName, record.lineno ) self.log_queue.put(log_entry) except Exception as e: print(f"无法加入日志队列: {e}") def close(self): self.log_queue.put(None) # 发送终止信号 self.worker_thread.join() super().close()
日志存入数据库后,你可以轻松执行复杂查询:
def query_logs(level=None, start_time=None, end_time=None, keyword=None): conn = pymysql.connect(host="localhost", user="your_username", password="your_password", database="log_db") query = f"SELECT * FROM app_logs WHERE 1=1" params = [] if level: query += " AND level = %s" params.append(level) if start_time: query += " AND log_time >= %s" params.append(start_time) if end_time: query += " AND log_time <= %s" params.append(end_time) if keyword: query += " AND message LIKE %s" params.append(f"%{keyword}%") query += " ORDER BY log_time DESC LIMIT 100" with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(query, params) results = cursor.fetchall() conn.close() return results # 示例查询 # error_logs = query_logs(level="ERROR", start_time="2025-08-01") # login_logs = query_logs(keyword="用户登录")
日志是系统健康的晴雨表,把它们存到数据库不仅方便查询分析,还能为后续的日志监控、报警系统打下基础,下次再遇到凌晨报警,你就能优雅地从数据库快速定位问题,而不是在成堆的日志文件里大海捞针了!
现在就去试试吧,让你的日志管理更上一层楼!
本文由 塞阳 于2025-08-04发表在【云服务器提供商】,文中图片由(塞阳)上传,本平台仅提供信息存储服务;作者观点、意见不代表本站立场,如有侵权,请联系我们删除;若有图片侵权,请您准备原始证明材料和公证书后联系我方删除!
本文链接:https://vps.7tqx.com/wenda/536269.html
发表评论