main.py
· 2.8 KiB · Python
Orginalformat
import os
import re
import csv
from collections import defaultdict
def analyze_logs(log_directory, target_path):
"""
分析日志文件,统计每个IP的总请求数和特定路径的请求数
:param log_directory: 日志文件夹路径
:param target_path: 要统计的特定路径
"""
# 初始化统计字典
ip_stats = defaultdict(lambda: {'total': 0, 'target': 0})
# 日志文件正则表达式模式
log_pattern = re.compile(
r'^\d+\s+' # 时间戳(忽略)
r'(\S+)\s+' # IP地址(第2列)
r'\S+\s+' # 域名(忽略)
r'(\S+)\s+' # 请求路径(第4列)
r'.*$' # 剩余部分(忽略)
)
# 遍历日志目录中的所有文件
for root, dirs, files in os.walk(log_directory):
for file in files:
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
match = log_pattern.match(line.strip())
if match:
ip = match.group(1)
path = match.group(2)
# 更新统计
ip_stats[ip]['total'] += 1
if path == target_path:
ip_stats[ip]['target'] += 1
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return ip_stats
def save_results(ip_stats, output_file):
"""
将统计结果保存到CSV文件
:param ip_stats: 统计结果字典
:param output_file: 输出文件路径
"""
# 按总请求数降序排序
sorted_stats = sorted(ip_stats.items(), key=lambda x: x[1]['total'], reverse=True)
with open(output_file, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
# 写入CSV表头
writer.writerow(["IP地址", "总请求数", "特定路径请求数", "特定路径占比"])
# 写入数据
for ip, stats in sorted_stats:
total = stats['total']
target = stats['target']
ratio = (target / total) * 100 if total > 0 else 0
writer.writerow([ip, total, target, f"{ratio:.2f}%"])
if __name__ == "__main__":
# 配置参数
log_dir = './log' # 日志文件夹路径
target_path = '/apple-touch-icon.png' # 要统计的特定路径
output_file = 'ip_request_stats.csv' # 输出文件改为CSV
print("开始分析日志文件...")
stats = analyze_logs(log_dir, target_path)
print("保存统计结果...")
save_results(stats, output_file)
print(f"分析完成!结果已保存到 {output_file}")
1 | import os |
2 | import re |
3 | import csv |
4 | from collections import defaultdict |
5 | |
6 | def analyze_logs(log_directory, target_path): |
7 | """ |
8 | 分析日志文件,统计每个IP的总请求数和特定路径的请求数 |
9 | |
10 | :param log_directory: 日志文件夹路径 |
11 | :param target_path: 要统计的特定路径 |
12 | """ |
13 | # 初始化统计字典 |
14 | ip_stats = defaultdict(lambda: {'total': 0, 'target': 0}) |
15 | |
16 | # 日志文件正则表达式模式 |
17 | log_pattern = re.compile( |
18 | r'^\d+\s+' # 时间戳(忽略) |
19 | r'(\S+)\s+' # IP地址(第2列) |
20 | r'\S+\s+' # 域名(忽略) |
21 | r'(\S+)\s+' # 请求路径(第4列) |
22 | r'.*$' # 剩余部分(忽略) |
23 | ) |
24 | |
25 | # 遍历日志目录中的所有文件 |
26 | for root, dirs, files in os.walk(log_directory): |
27 | for file in files: |
28 | file_path = os.path.join(root, file) |
29 | try: |
30 | with open(file_path, 'r', encoding='utf-8') as f: |
31 | for line in f: |
32 | match = log_pattern.match(line.strip()) |
33 | if match: |
34 | ip = match.group(1) |
35 | path = match.group(2) |
36 | |
37 | # 更新统计 |
38 | ip_stats[ip]['total'] += 1 |
39 | if path == target_path: |
40 | ip_stats[ip]['target'] += 1 |
41 | except Exception as e: |
42 | print(f"处理文件 {file_path} 时出错: {e}") |
43 | |
44 | return ip_stats |
45 | |
46 | def save_results(ip_stats, output_file): |
47 | """ |
48 | 将统计结果保存到CSV文件 |
49 | |
50 | :param ip_stats: 统计结果字典 |
51 | :param output_file: 输出文件路径 |
52 | """ |
53 | # 按总请求数降序排序 |
54 | sorted_stats = sorted(ip_stats.items(), key=lambda x: x[1]['total'], reverse=True) |
55 | |
56 | with open(output_file, 'w', encoding='utf-8', newline='') as f: |
57 | writer = csv.writer(f) |
58 | # 写入CSV表头 |
59 | writer.writerow(["IP地址", "总请求数", "特定路径请求数", "特定路径占比"]) |
60 | |
61 | # 写入数据 |
62 | for ip, stats in sorted_stats: |
63 | total = stats['total'] |
64 | target = stats['target'] |
65 | ratio = (target / total) * 100 if total > 0 else 0 |
66 | writer.writerow([ip, total, target, f"{ratio:.2f}%"]) |
67 | |
68 | if __name__ == "__main__": |
69 | # 配置参数 |
70 | log_dir = './log' # 日志文件夹路径 |
71 | target_path = '/apple-touch-icon.png' # 要统计的特定路径 |
72 | output_file = 'ip_request_stats.csv' # 输出文件改为CSV |
73 | |
74 | print("开始分析日志文件...") |
75 | stats = analyze_logs(log_dir, target_path) |
76 | |
77 | print("保存统计结果...") |
78 | save_results(stats, output_file) |
79 | |
80 | print(f"分析完成!结果已保存到 {output_file}") |