背景
在现代软件开发中,日志文件扮演着至关重要的角色,它们记录了软件运行时的详细事件,帮助开发者理解程序的行为,以及在出现问题时进行调试。本文档旨在介绍一个特定的场景——分析和提取多线程日志文件中的SQL语句及其执行细节,并将过程中遇到的问题和相应的解决策略一一呈现。
初步需求
我们的任务是编写一个Python程序,该程序能够从日志文件中提取出所有完整的SQL语句、它们的执行时间以及执行结果的数量。用户可以选择提取日志文件中的最后N条记录,或者提取所有记录。输出结果可以是控制台打印或保存到文本文件中。
发现问题
- 多线程日志的处理:由于程序是多线程运行的,单纯按顺序读取日志条目并不能准确关联SQL的
Preparing
、Parameters
和Total/Updates
信息。 - 参数的动态替换:日志中的SQL语句使用
?
作为参数占位符,需要根据日志中的参数信息动态替换。 - 数据类型处理:对于字符串类型的参数,替换时需要添加单引号。
- 性能优化:考虑到日志文件可能非常大,需要有效管理内存使用,并提供实时进度反馈。
逐步优化
步骤一:基础功能实现
首先,我们创建了一个简单的程序框架,使用正则表达式匹配日志文件中的SQL语句、参数列表和执行结果。这个版本的程序能够处理最基本的需求,但很快我们就遇到了上述问题。
步骤二:处理多线程日志
为了准确地关联每个SQL语句的相关信息,我们开始根据线程ID(grpc-default-executor-x
)来追踪和匹配每一条日志记录。这一改进使得即使在多线程环境下,每个SQL语句的Preparing
、Parameters
和结果也能被正确地关联起来。
步骤三:动态参数替换
我们增加了参数处理逻辑,根据参数类型动态替换SQL语句中的?
占位符。对于字符串类型的参数,我们确保在替换时添加了单引号。
步骤四:优化数据类型处理
进一步分析日志文件后,我们发现在处理空参数和非字符串类型参数时存在缺陷。因此,我们优化了参数解析函数,以更鲁棒地处理不同类型的参数和空参数情况。
步骤五:性能优化和进度反馈
为了提高程序处理大型日志文件的效率,我们采用了逐行读取文件的方式,并使用tqdm
库引入了进度条,为用户提供实时的进度反馈。
结果
通过一系列的问题发现和逐步优化,最终我们得到了一个功能完善、性能优异的程序。它不仅能够准确地从多线程日志文件中提取SQL语句及其执行细节,还支持灵活的输出选项和实时进度反馈。这个过程展示了在面对复杂问题时,通过逐步迭代和优化,我们能够有效地提高程序的质量和用户体验。
代码展示、讲解
接下来,我们将逐块分析和解释刚才优化过程中涉及的代码,以便更好地理解每一部分的功能和作用。
1. 导入必要的库
import re
from datetime import datetime
import pandas as pd
from tqdm import tqdm
re
: 正则表达式库,用于匹配日志中的特定模式。datetime
: 日期时间库,用于处理和转换日期时间格式。pandas
: 数据处理库,用于最终的数据整理和输出到Excel文件。tqdm
: 进度条库,用于在处理大型文件时提供实时进度反馈。
2. 参数解析函数
def parse_parameters(param_str):
processed_params = []
if param_str.strip(): # 检查参数字符串是否为空或仅包含空白字符
raw_params = param_str.split(", ")
for param in raw_params:
parts = param.rsplit("(", 1)
if len(parts) == 2:
value, param_type = parts
if "String" in param_type:
processed_params.append(f"'{value}'") # 为字符串类型参数添加单引号
else:
processed_params.append(value)
else:
processed_params.append(parts[0])
return processed_params
该函数负责解析Parameters:
后的参数字符串。对于字符串类型的参数,函数会添加单引号。这是为了在后续替换SQL语句中的?
时,能正确处理字符串类型的参数。
3. 主函数
def extract_sql_to_file_jupyter():
# 用户输入
file_path = input("Enter the log file path: ")
number_of_entries_input = input("Enter the number of entries to extract (leave blank or enter a negative number for all): ")
output_format = input("Choose the output format (xlsx/txt): ").lower()
这部分代码负责接收用户输入,包括日志文件的路径、要提取的日志条目数量,以及输出格式(Excel或文本)。
4. 初始化和正则表达式匹配
# 定义正则表达式
datetime_pattern = re.compile(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})")
sql_stmt_pattern = re.compile(...)
params_pattern = re.compile(...)
result_pattern = re.compile(...)
executor_data = {}
data = []
在这部分代码中,我们定义了几个重要的正则表达式用于匹配日志中的日期时间、SQL语句、参数列表和执行结果。executor_data
字典用于追踪每个线程的SQL语句和参数,而data
列表用于存储处理结果。
5. 逐行处理日志文件
with open(file_path, "r", encoding="utf-8") as file:
lines = file.readlines()
for line in tqdm(lines, desc="Processing log"):
datetime_match = datetime_pattern.search(line)
if datetime_match:
...
使用with open
读取日志文件,并通过for
循环结合tqdm
逐行处理,同时搜索匹配定义好的正则表达式模式。每找到一个匹配,就根据匹配类型(SQL语句、参数、结果)进行相应的处理。
6. 数据输出
if output_format == 'xlsx':
...
else:
...
根据用户选择的输出格式,最终将提取的SQL语句及其执行细节保存到Excel或文本文件中。对于Excel输出,使用了pandas
库的DataFrame
和to_excel
方法;对于文本输出,简单地将数据写入到文本文件中。
完整代码
import re
from datetime import datetime
import pandas as pd
from tqdm import tqdm
def parse_parameters(param_str):
processed_params = []
if param_str.strip(): # 检查参数字符串是否为空或仅包含空白字符
raw_params = param_str.split(", ")
for param in raw_params:
parts = param.rsplit("(", 1)
if len(parts) == 2:
value, param_type = parts
if "String" in param_type:
processed_params.append(f"'{value}'")
else:
processed_params.append(value)
else:
processed_params.append(parts[0])
return processed_params
def extract_sql_to_file_jupyter():
file_path = input("Enter the log file path: ")
number_of_entries_input = input("Enter the number of entries to extract (leave blank or enter a negative number for all): ")
output_format = input("Choose the output format (xlsx/txt): ").lower()
if not output_format:
output_format = 'txt'
try:
number_of_entries = int(number_of_entries_input)
if number_of_entries < 0:
number_of_entries = None
except ValueError:
number_of_entries = None
sql_stmt_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).+ (grpc-default-executor-\d+) .+ ==> Preparing:\s*((?:SELECT|INSERT INTO|UPDATE|DELETE).+)", re.I)
params_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).+ (grpc-default-executor-\d+) .+ ==> Parameters: (.*)", re.I)
result_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).+ (grpc-default-executor-\d+) .+ <==\s+(Total|Updates):\s+(\d+)", re.I)
executor_data = {}
data = []
with open(file_path, "r", encoding="utf-8") as file:
lines = file.readlines()
for line in tqdm(lines, desc="Processing log"):
datetime_match = re.search(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})", line)
if datetime_match:
line_datetime = datetime.strptime(datetime_match.group(1), "%Y-%m-%d %H:%M:%S,%f")
sql_match = sql_stmt_pattern.search(line)
params_match = params_pattern.search(line)
result_match = result_pattern.search(line)
if sql_match:
datetime_str, thread_id, sql = sql_match.groups()
executor_data[thread_id] = {'sql': sql, 'params': [], 'start_datetime': line_datetime}
elif params_match:
_, thread_id, params_str = params_match.groups()
if thread_id in executor_data:
executor_data[thread_id]['params'] = parse_parameters(params_str)
elif result_match:
_, thread_id, _, result = result_match.groups()
if thread_id in executor_data:
start_datetime = executor_data[thread_id]['start_datetime']
duration = int((line_datetime - start_datetime).total_seconds() * 1000) # Calculate duration in milliseconds
thread_info = executor_data.pop(thread_id, None)
if thread_info:
sql_filled = thread_info['sql']
for param in thread_info['params']:
sql_filled = sql_filled.replace('?', param, 1)
data.append([thread_info['start_datetime'].strftime("%Y-%m-%d %H:%M:%S,%f"), result, f"{duration}ms", sql_filled])
if number_of_entries is not None:
data = data[-number_of_entries:]
if output_format == 'xlsx':
df = pd.DataFrame(data, columns=['Time', 'Result', 'Duration', 'SQL Statement'])
output_file_path = "extracted_sql_statements.xlsx"
df.to_excel(output_file_path, index=False, engine='openpyxl')
print(f"All SQL statements have been written to {output_file_path}")
else:
output_file_path = "extracted_sql_statements.txt"
with open(output_file_path, "w", encoding="utf-8") as f:
for row in data:
f.write(' | '.join(row) + '\n')
print(f"All SQL statements have been written to {output_file_path}")
# 调用函数
extract_sql_to_file_jupyter()
这篇文章写得深入浅出,让我这个小白也看懂了!