不间断PING检测,检查局域网、交换机丢包率等

发布时间: 2024-11-15

环境介绍

开发Python环境 Python 3.9.14 ,具体支持到什么版本未测试!

我运行在 PVE 的 CT 里面,CentOS 9

功能

家里的交换机,不晓得什么问题,有的时候就是丢包,正当来测试吧,又好了。不确定什么时候又有问题了。

还有一些个主机时不时的失联,所以就写了一个小工具,不间断的 ping,然后记录日志文件查看 ping 不通的时间,丢包率等等。

使用代码

进入 ping.py 目录,运行

python3 ping.py

日志输出

2024-05-10 10:00:00,000 - INFO - 日志文件名: /root/ping/network_monitor_2024-05-10_10-00-00.log
2024-05-10 10:00:00,000 - INFO - 程序开始运行时间: 2024-05-10 10:00:00
2024-05-10 10:00:05,000 - WARNING - 主机 10.0.0.1 不可达 - 开始时间: 2024-05-10 10:00:05
2024-05-10 10:00:10,000 - WARNING - 主机 10.0.0.1 已恢复 - 不可达持续时间: 5.00 秒
2024-05-10 10:01:00,000 - INFO - 程序结束运行时间: 2024-05-10 10:01:00
2024-05-10 10:01:00,000 - INFO - 程序总共运行时间: 60.00 秒
2024-05-10 10:01:00,000 - INFO - 总共 ping 次数: 12
2024-05-10 10:01:00,000 - INFO - 总共丢包次数: 1
2024-05-10 10:01:00,000 - INFO - 总丢包率: 8.33%
2024-05-10 10:01:00,000 - INFO - 平均响应时间: 0.15 毫秒
2024-05-10 10:01:00,000 - WARNING - 程序异常退出: 收到 SIGINT 信号 (Ctrl+C)

完整 Python 代码:

import subprocess
import time
import re
import logging
import atexit
import signal
import sys
import os
 
print("V2RaySSR 综合网原创")
print("访问 https://www.v2rayssr.com 获取更多资讯和教程")
 
# 定义主机名和日志文件路径为变量
HOST = "10.0.0.1"
LOG_DIR = "/root/ping"
 
# 获取程序开始时间并生成日志文件名
start_time = time.time()
start_time_str = time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(start_time))
log_filename = os.path.join(LOG_DIR, f'network_monitor_{start_time_str}.log')
 
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)
 
# 设置日志文件
logging.basicConfig(filename=log_filename, level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
 
# 测试日志记录功能
try:
    logging.info(f"日志文件名: {log_filename}")
except Exception as e:
    print(f"无法写入日志文件: {e}")
    sys.exit(1)
 
# 初始化统计信息
total_pings = 0
total_lost_pings = 0
total_response_time = 0.0
log_exit_time_called = False  # 标志位,确保 log_exit_time 只被调用一次
 
def ping(host):
    """
    Ping 指定的主机并返回 ping 命令的输出。
    """
    process = subprocess.Popen(['ping', '-c', '1', host], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    
    if process.returncode == 0:
        return stdout.decode('utf-8')
    else:
        return None
 
def parse_ping_output(output):
    """
    解析 ping 命令的输出,提取丢包率和响应时间。
    """
    loss_match = re.search(r'(\d+)% packet loss', output)
    if loss_match:
        packet_loss = int(loss_match.group(1))
    else:
        packet_loss = 100
    
    time_match = re.search(r'time=(\d+\.\d+) ms', output)
    if time_match:
        ping_time = float(time_match.group(1))
    else:
        ping_time = None
    
    return packet_loss, ping_time
 
def monitor_network(host, interval=5):
    """
    通过定期 ping 指定的主机来持续监控网络。
    """
    global total_pings, total_lost_pings, total_response_time
    last_failure_time = None  # 记录上一次失败的时间
    failure_duration = 0      # 记录持续失败的时间
 
    while True:
        output = ping(host)
        total_pings += 1
        
        if output:
            packet_loss, ping_time = parse_ping_output(output)
            if packet_loss == 100:
                total_lost_pings += 1
                if last_failure_time is None:
                    last_failure_time = time.time()
                    logging.warning(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
                    print(f"主机 {host} 不可达 - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
                failure_duration = time.time() - last_failure_time
            else:
                total_response_time += ping_time
                if last_failure_time is not None:
                    logging.warning(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒")
                    print(f"主机 {host} 已恢复 - 不可达持续时间: {failure_duration:.2f} 秒")
                    last_failure_time = None
                    failure_duration = 0
        else:
            total_lost_pings += 1
            if last_failure_time is None:
                last_failure_time = time.time()
                logging.error(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
                print(f"无法 ping 通主机 {host} - 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_failure_time))}")
            failure_duration = time.time() - last_failure_time
        
        time.sleep(interval)
 
def log_exit_time(exit_reason=None):
    """
    记录程序退出的时间和统计信息。
    """
    global log_exit_time_called
    if log_exit_time_called:
        return
    log_exit_time_called = True
 
    exit_time = time.time()
    exit_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(exit_time))
    total_run_time = exit_time - start_time
    total_lost_rate = (total_lost_pings / total_pings) * 100 if total_pings > 0 else 0
    avg_response_time = (total_response_time / (total_pings - total_lost_pings)) if (total_pings - total_lost_pings) > 0 else 0
 
    logging.info(f"程序结束运行时间: {exit_time_str}")
    logging.info(f"程序总共运行时间: {total_run_time:.2f} 秒")
    logging.info(f"总共 ping 次数: {total_pings}")
    logging.info(f"总共丢包次数: {total_lost_pings}")
    logging.info(f"总丢包率: {total_lost_rate:.2f}%")
    logging.info(f"平均响应时间: {avg_response_time:.2f} 毫秒")
 
    if exit_reason:
        logging.warning(f"程序异常退出: {exit_reason}")
 
    print(f"程序结束运行时间: {exit_time_str}")
    print(f"程序总共运行时间: {total_run_time:.2f} 秒")
    print(f"总共 ping 次数: {total_pings}")
    print(f"总共丢包次数: {total_lost_pings}")
    print(f"总丢包率: {total_lost_rate:.2f}%")
    print(f"平均响应时间: {avg_response_time:.2f} 毫秒")
 
    # 确保所有日志记录被刷新到磁盘
    logging.shutdown()
 
# 注册程序正常退出时的回调函数
atexit.register(log_exit_time)
 
# 捕捉终止信号(例如 Ctrl+C 和系统重启)
def signal_handler(sig, frame):
    exit_reasons = {
        signal.SIGINT: "收到 SIGINT 信号 (Ctrl+C)",
        signal.SIGTERM: "收到 SIGTERM 信号 (系统关机或重启)",
        signal.SIGHUP: "收到 SIGHUP 信号 (SSH 会话关闭)"
    }
    reason = exit_reasons.get(sig, "收到未知信号")
    log_exit_time(reason)
    sys.exit(0)
 
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
 
# 捕捉 SIGHUP 信号(SSH 关闭引起的挂起信号)
signal.signal(signal.SIGHUP, signal_handler)
 
if __name__ == "__main__":
    try:
        monitor_network(HOST)
    except Exception as e:
        logging.error(f"程序异常退出: {e}")
        log_exit_time(f"程序异常退出: {e}")
        raise

请在下方留下您的评论.加入TG吹水群