本文主要是介绍【使用python实现多目标批量ping】附案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
以下为使用 Python 实现批量 ping 的多种方法及代码示例:
方法一:
import subprocessfilepath = 'E:\\Python\\tools\\AutoMatic\\hosts.txt'
with open(filepath, 'r') as f:hosts = f.readlines()for host in hosts:result = subprocess.check_output(('ping', '-n', '1', host.strip()))print(result)
运行效果如下:
运行结果
将回显中的关键信息提取出来并以更易读的格式输出:
import subprocessfilepath = 'E:\\Python\\tools\\AutoMatic\\hosts.txt'
with open(filepath, 'r') as f:hosts = f.readlines()for host in hosts:result = subprocess.check_output(('ping', '-n', '1', host.strip()))result_str = result.decode('gbk')lines = result_str.split('\r\n')for line in lines:if '来自' in line:parts = line.split('来自 ')ip = parts[1].split(' 的回复')[0]if '时间<' in line:time_part = line.split('时间<')[1].split('ms')[0]print(f"主机:{host.strip()}, IP:{ip}, 响应时间:{time_part}ms")else:print(f"主机:{host.strip()}, IP:{ip}, 响应时间未知")
方法二:
import asyncio
import datetime
import ipaddress
import re
import socket
from prettytable import PrettyTableclass MultiICMP:def __init__(self, hosts, pool_size=150, timeout=1, SType='ICMP', EMode=1, isEcho=True):self.hosts = hostsself.timeout = timeoutself.pool_size = pool_sizeself.SType = STypeself.EMode = EModeself.isEcho = isEchoself.result = ()async def ICMP(self, ip):result = await asyncio.create_subprocess_shell("ping -n 2 -w 35 {ip}".format(ip=ip), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)stdout, stderr = await result.communicate()date = stdout.decode('gbk')if self.SType == 'ICMP':res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN'}elif self.SType == 'ARP':self.EMode = 2res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN', 'MAC': ''}if "正在 Ping" in date:date2 = date.splitlines()r = re.compile(r'(?:Ping).*具有').search(date2(1)).group()if '(' in r:r = r.replace('Ping ', '').split('(')res('IP') = r(1).split(')')(0)else:res('IP') = res('Destination')if 'TTL' in date:if 'TTL' in date2(2):res('TTL') = date2(2).split('TTL=')(1)else:res('TTL') = date2(3).split('TTL=')(1)r = date2(8).split(',')res('MIN') = r(0).split('=')(1)res('MAX') = r(1).split('=')(1)res('AVERAGE') = r(2).split('=')(1)res('STATE') = 'UP'self.result.append(res)else:if self.EMode > 1:self.result.append(res)elif self.EMode == 3:self.result.append(res)def addMac(self):import subprocessresult = subprocess.Popen("arp -a", shell=True, stdout=subprocess.PIPE).stdout.read().decode('gbk')k = result.splitlines()arptable = {}for f in k:if '态' in f:date2 = f.split()arptable(date2(0)) = date2(1)for i in self.result:if i('IP') in arptable:i('MAC') = arptable(i('IP'))
方法三:
import ping3def ping_ip(ip):result = ping3.ping(ip)if result is not None:print(f"{ip} is reachable.")else:print(f"{ip} is unreachable.")def batch_ping(ips):for ip in ips:ping_ip(ip)if __name__ == "__main__":ips = ("192.168.0.1", "192.168.0.2", "192.168.0.3")batch_ping(ips)
方法四:
import subprocessdef ping_host(host, is_windows):"""发送一个ping请求到指定的主机,并返回ping的结果。"""# 根据操作系统类型选择ping命令和参数if is_windows:params = ('ping', '-n', '1', host)else:params = ('ping', '-c', '1', host)# 发送ping请求并获取输出result = subprocess.run(params, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)output = result.stdout + result.stderr# 分析ping输出if is_windows:# 对于Windows系统,检查是否有"请求超时"这样的输出return "请求超时" not in outputelse:# 对于UNIX-like系统,检查是否有"64 bytes from"或"0% packet loss"这样的输出return ("64 bytes from" in output) or ("0% packet loss" in output) or (not output)# 显示操作系统选择菜单
print("请选择您的操作系统:")
print("1. Windows")
print("2. Linux/macOS")
# 获取用户输入
choice = input("请输入对应的序号:")
# 根据用户输入确定操作系统类型
is_windows = choice == "1"
# 读取iplist.txt文件中的IP地址,"iplist.txt"换成自己的ip字典文件的文件名
with open('iplist.txt', 'r') as file:ips = file.read().splitlines()
# 对每个IP地址执行ping操作,并输出结果
for ip in ips:if ping_host(ip, is_windows):print(f"ping {ip} 是通的")else:print(f"ping {ip} 不通")
这些方法可以根据不同的需求和场景选择使用。
Python 批量 ping 方法一的代码解析
在第一种方法中,通常会使用一些内置模块和特定的函数来实现批量 ping 操作。例如,可能会用到 subprocess
模块来执行系统的 ping
命令,并通过读取命令的输出结果来判断目标主机的连通性。代码可能会先定义一个读取网段或 IP 列表的函数,然后通过循环逐个对这些网段或 IP 进行 ping
操作。在处理 ping
结果时,会根据返回的状态码或特定的输出字符串来判断是否连通。比如,如果返回的代码为 0 ,可能表示连通,否则表示不通。同时,还会考虑对结果的存储和处理,以便后续的分析和统计。
为了更好地理解,我们来看一个简单的示例代码:
import subprocessdef check_alive(ip):result = subprocess.call('ping -w 1000 -n 1 %s' %ip,stdout=subprocess.PIPE,shell=True)if result == 0:h = subprocess.getoutput('ping ' ip)returnnum = h.split('平均 = ')(1)info = ('\\033(32m%s\\033(0m 能ping通,延迟平均值为:%s' %(ip,returnnum))print('\\033(32m%s\\033(0m 能ping通,延迟平均值为:%s' %(ip,returnnum))else:with open('notong.txt','a') as f:f.write(ip)info = ('\\033(31m%s\\033(0m ping 不通!' % ip)print('\\033(31m%s\\033(0m ping 不通!' % ip)if __name__ == '__main__':print("开始批量ping所有IP!")with open('ip.txt', 'r') as f: # ip.txt为本地文件记录所有需要检测连通性的ipfor i in f:
这段代码首先定义了一个 check_alive
函数来执行 ping
操作并处理结果,然后在主程序中读取一个包含 IP 地址的文件进行批量 ping
。
Python 批量 ping 方法二的代码详解
第二种方法可能会采用不同的模块和策略。比如,使用 ping3
模块来发送 ping
请求。在代码中,会先安装所需的模块,然后定义相关的函数来执行具体的 ping
操作。可能会通过设置一些参数,如超时时间、发送次数等,来控制 ping
的行为。在处理结果时,根据模块返回的值来判断主机的连通性,并进行相应的输出或记录。
以下是一个可能的示例:
import ping3def ping(host):response_time = ping3.ping(host)if response_time is not None:print(f"Ping {host} 成功,响应时间为 {response_time} 毫秒")else:print(f"Ping {host} 失败")ping("example.com")
这个示例简单地展示了如何使用 ping3
模块对单个主机进行 ping
测试。
Python 批量 ping 方法三的实现细节
在第三种方法中,可能会利用线程池或进程池来提高批量 ping
的效率。通过将 ping
任务分配到多个线程或进程中并行执行,可以大大减少执行时间。代码中会涉及到线程或进程的创建、任务的分配和结果的收集。
比如,可能会像下面这样:
import time
import threading
import subprocess
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETEDdef get_ip_list(net_segment, ip_num):# 创建一个队列IP_QUEUE = Queue()ip_list = ()list_segment = net_segment.split('.')ip_index = 1# 将需要 ping 的 ip 加入队列for i in range(1, 254):list_segment(-1) = str(ip_index + i)addr = ('.').join(list_segment)IP_QUEUE.put(addr)# 定义一个执行 ping 的函数def ping_ip(ip):res = subprocess.call('ping -n 2 -w 5 %s' % ip, stdout=subprocess.PIPE)# linux 系统将 '-n' 替换成 '-c'# 打印运行结果print(ip, True if res == 0 else False)if res!= 0:if lock.acquire():if len(ip_list) < ip_num:ip_list.append(ip)lock.release()# 创建一个最大任务为 100 的线程池pool = ThreadPoolExecutor(max_workers=100)lock =
这种方式通过合理利用线程资源,能够同时对多个 IP 进行 ping
操作。
Python 批量 ping 方法四的代码解读
第四种方法可能会结合一些更高级的技术或模块,以实现更复杂或更优化的批量 ping
功能。可能会涉及到网络协议的底层处理,或者对 ping
结果进行更细致的分析和统计。
例如:
import asyncio
import datetime
import ipaddress
import re
import socket
from prettytable import PrettyTableclass MultiICMP:def __init__(self, hosts, pool_size=150, timeout=1, SType='ICMP', EMode=1, isEcho=True):self.hosts = hostsself.timeout = timeoutself.pool_size = pool_sizeself.SType = STypeself.EMode = EModeself.isEcho = isEchoself.result = ()async def ICMP(self, ip):result = await asyncio.create_subprocess_shell("ping -n 2 -w 35 {ip}".format(ip=ip), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)stdout, stderr = await result.communicate()date = stdout.decode('gbk')if self.SType == 'ICMP':res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN'}elif self.SType == 'ARP':self.EMode = 2res = {'Destination': ip, 'IP': "", 'TTL': '', 'MIN': '', 'MAX': '', 'AVERAGE': '', 'STATE': 'DOWN','MAC': ""}
这段代码使用了异步编程的方式来执行 ping
操作,提高了程序的并发性能。
总之,使用 Python 实现批量 ping
有多种方法,每种方法都有其特点和适用场景。您可以根据具体的需求选择合适的方法来实现高效、准确的批量 ping
功能。
这篇关于【使用python实现多目标批量ping】附案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!