(AI)一次从日志文件中提取SQL语句的尝试……

背景

在现代软件开发中,日志文件扮演着至关重要的角色,它们记录了软件运行时的详细事件,帮助开发者理解程序的行为,以及在出现问题时进行调试。本文档旨在介绍一个特定的场景——分析和提取多线程日志文件中的SQL语句及其执行细节,并将过程中遇到的问题和相应的解决策略一一呈现。

初步需求

我们的任务是编写一个Python程序,该程序能够从日志文件中提取出所有完整的SQL语句、它们的执行时间以及执行结果的数量。用户可以选择提取日志文件中的最后N条记录,或者提取所有记录。输出结果可以是控制台打印或保存到文本文件中。

发现问题

  1. 多线程日志的处理:由于程序是多线程运行的,单纯按顺序读取日志条目并不能准确关联SQL的PreparingParametersTotal/Updates信息。
  2. 参数的动态替换:日志中的SQL语句使用?作为参数占位符,需要根据日志中的参数信息动态替换。
  3. 数据类型处理:对于字符串类型的参数,替换时需要添加单引号。
  4. 性能优化:考虑到日志文件可能非常大,需要有效管理内存使用,并提供实时进度反馈。

逐步优化

步骤一:基础功能实现

首先,我们创建了一个简单的程序框架,使用正则表达式匹配日志文件中的SQL语句、参数列表和执行结果。这个版本的程序能够处理最基本的需求,但很快我们就遇到了上述问题。

步骤二:处理多线程日志

为了准确地关联每个SQL语句的相关信息,我们开始根据线程ID(grpc-default-executor-x)来追踪和匹配每一条日志记录。这一改进使得即使在多线程环境下,每个SQL语句的PreparingParameters和结果也能被正确地关联起来。

步骤三:动态参数替换

我们增加了参数处理逻辑,根据参数类型动态替换SQL语句中的?占位符。对于字符串类型的参数,我们确保在替换时添加了单引号。

步骤四:优化数据类型处理

进一步分析日志文件后,我们发现在处理空参数和非字符串类型参数时存在缺陷。因此,我们优化了参数解析函数,以更鲁棒地处理不同类型的参数和空参数情况。

步骤五:性能优化和进度反馈

为了提高程序处理大型日志文件的效率,我们采用了逐行读取文件的方式,并使用tqdm库引入了进度条,为用户提供实时的进度反馈。

结果

通过一系列的问题发现和逐步优化,最终我们得到了一个功能完善、性能优异的程序。它不仅能够准确地从多线程日志文件中提取SQL语句及其执行细节,还支持灵活的输出选项和实时进度反馈。这个过程展示了在面对复杂问题时,通过逐步迭代和优化,我们能够有效地提高程序的质量和用户体验。

代码展示、讲解


接下来,我们将逐块分析和解释刚才优化过程中涉及的代码,以便更好地理解每一部分的功能和作用。

1. 导入必要的库

import re
from datetime import datetime
import pandas as pd
from tqdm import tqdm
  • re: 正则表达式库,用于匹配日志中的特定模式。
  • datetime: 日期时间库,用于处理和转换日期时间格式。
  • pandas: 数据处理库,用于最终的数据整理和输出到Excel文件。
  • tqdm: 进度条库,用于在处理大型文件时提供实时进度反馈。

2. 参数解析函数

def parse_parameters(param_str):
    processed_params = []
    if param_str.strip():  # 检查参数字符串是否为空或仅包含空白字符
        raw_params = param_str.split(", ")
        for param in raw_params:
            parts = param.rsplit("(", 1)
            if len(parts) == 2:
                value, param_type = parts
                if "String" in param_type:
                    processed_params.append(f"'{value}'")  # 为字符串类型参数添加单引号
                else:
                    processed_params.append(value)
            else:
                processed_params.append(parts[0])
    return processed_params

该函数负责解析Parameters:后的参数字符串。对于字符串类型的参数,函数会添加单引号。这是为了在后续替换SQL语句中的?时,能正确处理字符串类型的参数。

3. 主函数

def extract_sql_to_file_jupyter():
    # 用户输入
    file_path = input("Enter the log file path: ")
    number_of_entries_input = input("Enter the number of entries to extract (leave blank or enter a negative number for all): ")
    output_format = input("Choose the output format (xlsx/txt): ").lower()

这部分代码负责接收用户输入,包括日志文件的路径、要提取的日志条目数量,以及输出格式(Excel或文本)。

4. 初始化和正则表达式匹配

    # 定义正则表达式
    datetime_pattern = re.compile(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})")
    sql_stmt_pattern = re.compile(...)
    params_pattern = re.compile(...)
    result_pattern = re.compile(...)
    
    executor_data = {}
    data = []

在这部分代码中,我们定义了几个重要的正则表达式用于匹配日志中的日期时间、SQL语句、参数列表和执行结果。executor_data字典用于追踪每个线程的SQL语句和参数,而data列表用于存储处理结果。

5. 逐行处理日志文件

    with open(file_path, "r", encoding="utf-8") as file:
        lines = file.readlines()

    for line in tqdm(lines, desc="Processing log"):
        datetime_match = datetime_pattern.search(line)
        if datetime_match:
            ...

使用with open读取日志文件,并通过for循环结合tqdm逐行处理,同时搜索匹配定义好的正则表达式模式。每找到一个匹配,就根据匹配类型(SQL语句、参数、结果)进行相应的处理。

6. 数据输出

    if output_format == 'xlsx':
        ...
    else:
        ...

根据用户选择的输出格式,最终将提取的SQL语句及其执行细节保存到Excel或文本文件中。对于Excel输出,使用了pandas库的DataFrameto_excel方法;对于文本输出,简单地将数据写入到文本文件中。

完整代码

import re
from datetime import datetime
import pandas as pd
from tqdm import tqdm

def parse_parameters(param_str):
    processed_params = []
    if param_str.strip():  # 检查参数字符串是否为空或仅包含空白字符
        raw_params = param_str.split(", ")
        for param in raw_params:
            parts = param.rsplit("(", 1)
            if len(parts) == 2:
                value, param_type = parts
                if "String" in param_type:
                    processed_params.append(f"'{value}'")
                else:
                    processed_params.append(value)
            else:
                processed_params.append(parts[0])
    return processed_params

def extract_sql_to_file_jupyter():
    file_path = input("Enter the log file path: ")
    number_of_entries_input = input("Enter the number of entries to extract (leave blank or enter a negative number for all): ")
    output_format = input("Choose the output format (xlsx/txt): ").lower()

    if not output_format:
        output_format = 'txt'

    try:
        number_of_entries = int(number_of_entries_input)
        if number_of_entries < 0:
            number_of_entries = None
    except ValueError:
        number_of_entries = None

    sql_stmt_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).+ (grpc-default-executor-\d+) .+ ==>  Preparing:\s*((?:SELECT|INSERT INTO|UPDATE|DELETE).+)", re.I)
    params_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).+ (grpc-default-executor-\d+) .+ ==> Parameters: (.*)", re.I)
    result_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).+ (grpc-default-executor-\d+) .+ <==\s+(Total|Updates):\s+(\d+)", re.I)
    
    executor_data = {}
    data = []

    with open(file_path, "r", encoding="utf-8") as file:
        lines = file.readlines()

    for line in tqdm(lines, desc="Processing log"):
        datetime_match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})", line)
        if datetime_match:
            line_datetime = datetime.strptime(datetime_match.group(1), "%Y-%m-%d %H:%M:%S,%f")
            sql_match = sql_stmt_pattern.search(line)
            params_match = params_pattern.search(line)
            result_match = result_pattern.search(line)

            if sql_match:
                datetime_str, thread_id, sql = sql_match.groups()
                executor_data[thread_id] = {'sql': sql, 'params': [], 'start_datetime': line_datetime}

            elif params_match:
                _, thread_id, params_str = params_match.groups()
                if thread_id in executor_data:
                    executor_data[thread_id]['params'] = parse_parameters(params_str)

            elif result_match:
                _, thread_id, _, result = result_match.groups()
                if thread_id in executor_data:
                    start_datetime = executor_data[thread_id]['start_datetime']
                    duration = int((line_datetime - start_datetime).total_seconds() * 1000)  # Calculate duration in milliseconds
                    thread_info = executor_data.pop(thread_id, None)
                    if thread_info:
                        sql_filled = thread_info['sql']
                        for param in thread_info['params']:
                            sql_filled = sql_filled.replace('?', param, 1)
                        data.append([thread_info['start_datetime'].strftime("%Y-%m-%d %H:%M:%S,%f"), result, f"{duration}ms", sql_filled])
    
    if number_of_entries is not None:
        data = data[-number_of_entries:]

    if output_format == 'xlsx':
        df = pd.DataFrame(data, columns=['Time', 'Result', 'Duration', 'SQL Statement'])
        output_file_path = "extracted_sql_statements.xlsx"
        df.to_excel(output_file_path, index=False, engine='openpyxl')
        print(f"All SQL statements have been written to {output_file_path}")
    else:
        output_file_path = "extracted_sql_statements.txt"
        with open(output_file_path, "w", encoding="utf-8") as f:
            for row in data:
                f.write(' | '.join(row) + '\n')
        print(f"All SQL statements have been written to {output_file_path}")

# 调用函数
extract_sql_to_file_jupyter()

评论

  1. 博主
    Windows Edge 122.0.0.0
    8 月前
    2024-3-21 14:20:22

  2. Windows Firefox 108.0
    6 月前
    2024-5-24 9:15:42

    这篇文章写得深入浅出,让我这个小白也看懂了!

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇