开发Python环境 Python 3.9.14
,具体支持到什么版本未测试!
我运行在 PVE 的 CT 里面,CentOS 9
家里的交换机,不晓得什么问题,有的时候就是丢包,正当来测试吧,又好了。不确定什么时候又有问题了。
还有一些个主机时不时的失联,所以就写了一个小工具,不间断的 ping,然后记录日志文件查看 ping 不通的时间,丢包率等等。
进入 ping.py
目录,运行
python3 ping.py
2024-05-10 10:00:00,000 - INFO - 日志文件名: /root/ping/network_monitor_2024-05-10_10-00-00.log
2024-05-10 10:00:00,000 - INFO - 程序开始运行时间: 2024-05-10 10:00:00
2024-05-10 10:00:05,000 - WARNING - 主机 10.0.0.1 不可达 - 开始时间: 2024-05-10 10:00:05
2024-05-10 10:00:10,000 - WARNING - 主机 10.0.0.1 已恢复 - 不可达持续时间: 5.00 秒
2024-05-10 10:01:00,000 - INFO - 程序结束运行时间: 2024-05-10 10:01:00
2024-05-10 10:01:00,000 - INFO - 程序总共运行时间: 60.00 秒
2024-05-10 10:01:00,000 - INFO - 总共 ping 次数: 12
2024-05-10 10:01:00,000 - INFO - 总共丢包次数: 1
2024-05-10 10:01:00,000 - INFO - 总丢包率: 8.33%
2024-05-10 10:01:00,000 - INFO - 平均响应时间: 0.15 毫秒
2024-05-10 10:01:00,000 - WARNING - 程序异常退出: 收到 SIGINT 信号 (Ctrl+C)
完整 Python 代码:
import subprocess
import time
import re
import logging
import atexit
import signal
import sys
import os
print("V2RaySSR 综合网原创")
print("访问 https://www.v2rayssr.com 获取更多资讯和教程")
# 定义主机名和日志文件路径为变量
HOST = "10.0.0.1"
LOG_DIR = "/root/ping"
# 获取程序开始时间并生成日志文件名
start_time = time.time()
start_time_str = time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(start_time))
log_filename = os.path.join(LOG_DIR, f'network_monitor_{start_time_str}.log')
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# 设置日志文件
logging.basicConfig(filename=log_filename, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# 测试日志记录功能
try:
logging.info(f"日志文件名: {log_filename}")
except Exception as e:
print(f"无法写入日志文件: {e}")
sys.exit(1)
# 初始化统计信息
total_pings = 0
total_lost_pings = 0
total_response_time = 0.0
log_exit_time_called = False # 标志位,确保 log_exit_time 只被调用一次
def ping(host):
"""
Ping 指定的主机并返回 ping 命令的输出。
"""
process = subprocess.Popen(['ping', '-c', '1', host], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode == 0:
return stdout.decode('utf-8')
else:
return None
def parse_ping_output(output):
"""
解析 ping 命令的输出,提取丢包率和响应时间。
"""
loss_match = re.search(r'(\d+)% packet loss', output)
if loss_match:
packet_loss = int(loss_match.group(1))
else:
packet_loss = 100
time_match = re.search(r'time=(\d+\.\d+) ms', output)
if time_match:
ping_time = float(time_match.group(1))
else:
ping_time = None
return packet_loss, ping_time
def monitor_network(host, interval=5):
"""
通过定期 ping 指定的主机来持续监控网络。
"""
global total_pings, total_lost_pings, total_response_time
last_failure_time = None # 记录上一次失败的时间
failure_duration = 0 # 记录持续失败的时间
while True:
output = ping(host)
total_pings += 1
if output:
packet_loss, ping_time = parse_ping_output(output)
if packet_loss == 100:
total_lost_pings += 1
if last_failure_time is None:
last_failure_time = time.time()
logging.warning(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
print(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
failure_duration = time.time() - last_failure_time
else:
total_response_time += ping_time
if last_failure_time is not None:
logging.warning(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒")
print(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒")
last_failure_time = None
failure_duration = 0
else:
total_lost_pings += 1
if last_failure_time is None:
last_failure_time = time.time()
logging.error(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
print(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
failure_duration = time.time() - last_failure_time
time.sleep(interval)
def log_exit_time(exit_reason=None):
"""
记录程序退出的时间和统计信息。
"""
global log_exit_time_called
if log_exit_time_called:
return
log_exit_time_called = True
exit_time = time.time()
exit_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(exit_time))
total_run_time = exit_time - start_time
total_lost_rate = (total_lost_pings / total_pings) * 100 if total_pings > 0 else 0
avg_response_time = (total_response_time / (total_pings - total_lost_pings)) if (total_pings - total_lost_pings) > 0 else 0
logging.info(f"程序结束运行时间: {exit_time_str}")
logging.info(f"程序总共运行时间: {total_run_time:.2f} 秒")
logging.info(f"总共 ping 次数: {total_pings}")
logging.info(f"总共丢包次数: {total_lost_pings}")
logging.info(f"总丢包率: {total_lost_rate:.2f}%")
logging.info(f"平均响应时间: {avg_response_time:.2f} 毫秒")
if exit_reason:
logging.warning(f"程序异常退出: {exit_reason}")
print(f"程序结束运行时间: {exit_time_str}")
print(f"程序总共运行时间: {total_run_time:.2f} 秒")
print(f"总共 ping 次数: {total_pings}")
print(f"总共丢包次数: {total_lost_pings}")
print(f"总丢包率: {total_lost_rate:.2f}%")
print(f"平均响应时间: {avg_response_time:.2f} 毫秒")
# 确保所有日志记录被刷新到磁盘
logging.shutdown()
# 注册程序正常退出时的回调函数
atexit.register(log_exit_time)
# 捕捉终止信号(例如 Ctrl+C 和系统重启)
def signal_handler(sig, frame):
exit_reasons = {
signal.SIGINT: "收到 SIGINT 信号 (Ctrl+C)",
signal.SIGTERM: "收到 SIGTERM 信号 (系统关机或重启)",
signal.SIGHUP: "收到 SIGHUP 信号 (SSH 会话关闭)"
}
reason = exit_reasons.get(sig, "收到未知信号")
log_exit_time(reason)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 捕捉 SIGHUP 信号(SSH 关闭引起的挂起信号)
signal.signal(signal.SIGHUP, signal_handler)
if __name__ == "__main__":
try:
monitor_network(HOST)
except Exception as e:
logging.error(f"程序异常退出: {e}")
log_exit_time(f"程序异常退出: {e}")
raise