1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
| import json
import re
from datetime import datetime
from typing import Dict, List, Any, Optional
import dashscope
class ITOperationsAssistant:
"""IT运维智能助手"""
def __init__(self, api_key: str):
dashscope.api_key = api_key
# 知识库:常见问题和解决方案
self.knowledge_base = {
"服务器": {
"高CPU使用率": [
"检查占用CPU最高的进程",
"分析是否有异常进程或死循环",
"考虑增加服务器资源或优化代码",
"检查是否有病毒或恶意软件"
],
"内存不足": [
"清理不必要的进程",
"检查内存泄漏",
"增加交换空间",
"升级物理内存"
],
"磁盘空间不足": [
"清理临时文件和日志",
"删除不必要的文件",
"扩展磁盘容量",
"设置自动清理策略"
]
},
"网络": {
"连接超时": [
"检查网络连接状态",
"验证防火墙设置",
"检查DNS解析",
"测试网络延迟和丢包率"
],
"带宽不足": [
"分析网络流量模式",
"优化数据传输",
"升级网络带宽",
"实施流量控制"
]
},
"数据库": {
"连接池耗尽": [
"检查数据库连接配置",
"优化SQL查询性能",
"增加连接池大小",
"检查是否有长时间未释放的连接"
],
"查询性能慢": [
"分析慢查询日志",
"检查索引使用情况",
"优化SQL语句",
"考虑数据库分区或分库"
]
}
}
def analyze_incident(self, incident_data: Dict[str, Any]) -> Dict[str, Any]:
"""分析IT事件"""
system_prompt = """你是一个资深的IT运维专家,擅长快速诊断和解决各种技术问题。
请分析IT事件,提供以下信息:
1. 事件级别(低/中/高/紧急)
2. 可能的根本原因
3. 紧急处理步骤
4. 详细解决方案
5. 预防措施
6. 需要的资源和时间估计
输出JSON格式:
{
"severity": "事件级别",
"category": "问题分类",
"root_causes": ["可能原因1", "可能原因2"],
"immediate_actions": ["紧急处理1", "紧急处理2"],
"detailed_solution": ["详细步骤1", "详细步骤2"],
"prevention": ["预防措施1", "预防措施2"],
"resources_needed": ["所需资源1", "所需资源2"],
"estimated_time": "预计解决时间",
"escalation": "是否需要升级"
}"""
user_content = f"""
事件信息:
- 事件ID: {incident_data.get('incident_id', 'N/A')}
- 发生时间: {incident_data.get('timestamp', 'N/A')}
- 系统: {incident_data.get('system', 'N/A')}
- 错误描述: {incident_data.get('description', 'N/A')}
- 错误日志: {incident_data.get('error_log', 'N/A')}
- 影响范围: {incident_data.get('impact', 'N/A')}
- 用户报告: {incident_data.get('user_report', 'N/A')}
- 系统指标: {incident_data.get('metrics', 'N/A')}
请进行全面的事件分析。
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
]
try:
response = dashscope.Generation.call(
model='qwen-plus',
messages=messages,
temperature=0.3
)
if response.status_code == 200:
result_text = response.output.message.content
# 解析JSON结果
json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
if json_match:
analysis_result = json.loads(json_match.group())
# 补充知识库建议
self._enhance_with_knowledge_base(analysis_result, incident_data)
return {
"success": True,
"analysis": analysis_result,
"raw_response": result_text
}
else:
return {
"success": False,
"error": "无法解析JSON结果",
"raw_response": result_text
}
else:
return {
"success": False,
"error": f"API调用失败: {response.code}"
}
except Exception as e:
return {
"success": False,
"error": f"分析异常: {str(e)}"
}
def _enhance_with_knowledge_base(self, analysis: Dict[str, Any], incident_data: Dict[str, Any]):
"""使用知识库增强分析结果"""
description = incident_data.get('description', '').lower()
# 匹配知识库中的解决方案
for category, problems in self.knowledge_base.items():
for problem, solutions in problems.items():
if problem.lower() in description or any(keyword in description for keyword in problem.lower().split()):
analysis.setdefault('knowledge_base_suggestions', []).extend([
f"{category}-{problem}: {solution}" for solution in solutions
])
def generate_incident_report(self, incident_data: Dict[str, Any], analysis: Dict[str, Any]) -> str:
"""生成事件报告"""
report_prompt = f"""
基于以下事件信息和分析结果,生成一份专业的IT事件处理报告:
事件信息:
{json.dumps(incident_data, ensure_ascii=False, indent=2)}
分析结果:
{json.dumps(analysis, ensure_ascii=False, indent=2)}
请生成包含以下部分的详细报告:
1. 事件概述
2. 影响评估
3. 根本原因分析
4. 处理过程
5. 解决方案
6. 后续行动计划
7. 经验教训
报告应该专业、简洁、易于理解。
"""
messages = [
{"role": "user", "content": report_prompt}
]
try:
response = dashscope.Generation.call(
model='qwen-plus',
messages=messages,
temperature=0.5
)
if response.status_code == 200:
return response.output.message.content
else:
return f"报告生成失败: {response.code}"
except Exception as e:
return f"报告生成异常: {str(e)}"
def suggest_monitoring_improvements(self, incident_history: List[Dict[str, Any]]) -> Dict[str, Any]:
"""基于历史事件建议监控改进"""
# 分析事件模式
categories = {}
systems = {}
for incident in incident_history:
category = incident.get('category', '未分类')
system = incident.get('system', '未知系统')
categories[category] = categories.get(category, 0) + 1
systems[system] = systems.get(system, 0) + 1
suggestions = {
"frequent_issues": [],
"monitoring_gaps": [],
"automation_opportunities": [],
"preventive_measures": []
}
# 分析高频问题
sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)
for category, count in sorted_categories[:3]:
suggestions["frequent_issues"].append({
"category": category,
"frequency": count,
"recommendation": f"加强{category}相关的监控和预警"
})
# 监控盲点建议
if "服务器" in categories:
suggestions["monitoring_gaps"].append("增加服务器性能监控告警阈值")
if "网络" in categories:
suggestions["monitoring_gaps"].append("部署网络质量实时监控")
if "数据库" in categories:
suggestions["monitoring_gaps"].append("配置数据库性能监控仪表板")
# 自动化机会
suggestions["automation_opportunities"].extend([
"自动重启服务脚本",
"日志清理自动化",
"健康检查自动化",
"告警自动分级和分发"
])
return suggestions
# 使用示例
def demo_it_operations():
"""演示IT运维助手功能"""
# 模拟事件数据
sample_incidents = [
{
"incident_id": "INC20241024001",
"timestamp": "2024-10-24 14:30:00",
"system": "Web服务器",
"description": "服务器响应缓慢,CPU使用率持续90%以上",
"error_log": "ERROR: High CPU usage detected, multiple processes consuming resources",
"impact": "影响200+用户,响应时间增加5倍",
"user_report": "网页加载非常慢,有时无法访问",
"metrics": "CPU: 95%, Memory: 78%, Disk I/O: High"
},
{
"incident_id": "INC20241024002",
"timestamp": "2024-10-24 16:45:00",
"system": "数据库服务器",
"description": "数据库连接池耗尽,应用无法连接",
"error_log": "FATAL: too many connections for role",
"impact": "所有业务功能停止,影响全部用户",
"user_report": "系统完全无法使用,登录失败",
"metrics": "DB Connections: 500/500, Query Time: >30s"
}
]
# 模拟分析结果
mock_analysis = {
"severity": "高",
"category": "性能问题",
"root_causes": [
"CPU密集型进程占用过多资源",
"可能存在死循环或资源泄漏",
"服务器配置不足以应对当前负载"
],
"immediate_actions": [
"识别并终止异常进程",
"临时增加服务器资源",
"启用负载均衡分散压力"
],
"detailed_solution": [
"使用top/htop命令分析CPU占用情况",
"检查应用日志寻找异常模式",
"优化高耗能的代码或查询",
"考虑服务器扩容或架构调整"
],
"prevention": [
"设置CPU使用率监控告警",
"定期进行性能测试",
"实施代码性能审查",
"建立容量规划流程"
],
"resources_needed": [
"运维工程师",
"开发团队支持",
"可能需要硬件升级"
],
"estimated_time": "2-4小时",
"escalation": "如4小时内无法解决则升级至高级工程师"
}
print("=== IT运维智能助手演示 ===")
for i, incident in enumerate(sample_incidents):
print(f"\n事件 {i+1}: {incident['incident_id']}")
print(f"系统: {incident['system']}")
print(f"问题: {incident['description']}")
print(f"影响: {incident['impact']}")
print(f"\n分析结果:")
print(f"严重级别: {mock_analysis['severity']}")
print(f"问题分类: {mock_analysis['category']}")
print(f"预计解决时间: {mock_analysis['estimated_time']}")
print(f"\n紧急处理步骤:")
for action in mock_analysis['immediate_actions']:
print(f" • {action}")
print(f"\n详细解决方案:")
for solution in mock_analysis['detailed_solution']:
print(f" • {solution}")
print("-" * 50)
# 监控改进建议
print("\n=== 监控改进建议 ===")
monitoring_suggestions = {
"frequent_issues": [
{"category": "性能问题", "frequency": 8, "recommendation": "加强性能相关的监控和预警"},
{"category": "连接问题", "frequency": 5, "recommendation": "加强连接相关的监控和预警"}
],
"monitoring_gaps": [
"增加服务器性能监控告警阈值",
"配置数据库性能监控仪表板"
],
"automation_opportunities": [
"自动重启服务脚本",
"日志清理自动化",
"健康检查自动化"
]
}
print("高频问题:")
for issue in monitoring_suggestions["frequent_issues"]:
print(f" • {issue['category']} (发生{issue['frequency']}次): {issue['recommendation']}")
print("\n监控盲点:")
for gap in monitoring_suggestions["monitoring_gaps"]:
print(f" • {gap}")
print("\n自动化机会:")
for opportunity in monitoring_suggestions["automation_opportunities"]:
print(f" • {opportunity}")
demo_it_operations()
|