python scapy库可选择网卡抓包分析数据包
注: python scapy库提供了show_interface()函数来输出本机网卡信息,但仅仅是输出,网卡信息不能用,所以就需要对该函数做一些修改,具体就是在scapy安装目录下的interface.py文件中,搜索show()函数,其中有一个res[]变量,中括号里面是一个pr开头的什么单词没记下来,需要函数直接return这个变量,然后程序就能正常用了。 在pycharm中更容易,按住ctrl,点击show_interface()函数,然后再点击show_interface()函数中的show()函数,选择第一个show()函数,就直接跳转到目的函数了。 注:计算机需安装npcap供python调用。 具体实现效果是:每1秒存放数据包数量的字典会清零一次,然后把这一秒内的数据存到当前目录下的data.txt文件中,每个tcp udp arp icmp数据包的内容存放到当前目录下以协议名命名的txt文件中。 以下是完整代码,虽然长,但大部分都是if判断数据包分类。 from scapy.all import * import time import json global dict1 dict1={"TCP":0,"UDP":0,"ICMP":0,"ARP":0} def save(packet,name): dict1[name] = dict1.get(name, 0) + 1 trans=packet[f'{name}'].payload with open(f'./message/{name}.txt','a') as f: if name!="ARP": src=packet["IP"].src dst=packet["IP"].dst if name=="TCP": sport=packet[f'{name}'].sport dport=packet[f'{name}'].dport options=packet[f'{name}'].options flags=packet[f'{name}'].flags f.writelines(f"{src}/{sport}-->{dst}/{dport} {str(packet['TCP'].payload)} {str(packet['TCP'].load)}"+'\n') elif name=="UDP": sport = packet["UDP"].sport dport = packet["UDP"].dport f.writelines(f"{src}/{sport}-->{dst}/{dport} {str(packet['UDP'].payload)} {str(packet['UDP'].load)}" + '\n') elif name=="ICMP": type = packet["ICMP"].type code = packet["ICMP"].code f.writelines(f"{type}/{code} {src}-->{dst} {str(packet['ICMP'].load)}" + '\n') elif name=="ARP": op=packet["ARP"].op hwsrc = packet['ARP'].hwsrc psrc = packet['ARP'].psrc hwdst = packet['ARP'].hwdst pdst = packet['ARP'].pdst if str(op)=="1": f.writelines(f"request {hwsrc}/{psrc}-->{pdst}"+'\n') elif str(op)=="2": f.writelines(f"respond {hwsrc}/{psrc}-->{hwdst}/{pdst}"+'\n') print('\r',dict1,end="") def tcp(packet): save(packet, "TCP") def udp(packet): save(packet,"UDP") def icmp(packet): save(packet,"ICMP") def ip4(packet): content=packet['IP'].payload proto=packet["IP"].proto if proto==6: tcp(packet) elif proto==17: udp(packet) elif proto==1: icmp(packet) def arp(packet): save(packet,"ARP") # print('\r',dict1,end="") def pack_callback(packet): ltime=time.time() global ftime,dict1 if ltime-ftime>=1: ftime=time.time() with open("./data.txt",'a') as f: f.writelines(json.dumps(dict1)+'\n') del dict1 dict1={"TCP":0,"UDP":0,"ICMP":0,"ARP":0} try: proto=str(hex(packet["Ethernet"].type)) if proto=="0x800": ip4(packet) elif proto=='0x806': arp(packet) elif proto=="0x86dd": pass # print("IP6") elif proto=="0x8864": print("PPPoE") elif proto=="0x8100": print('802.1 q tag') elif proto=="0x8847": print('MPLS lable') else: pass except: pass # print("非以太网帧") # time.sleep(1) def sniff_1(netcart): with open("./data.txt", 'w') as f: f.writelines('') sniff(filter = "",prn=pack_callback, iface='{}'.format(netcart)) def netcart(inp): for i in list1: if inp in i: print(f"已选择{i[2]}") return i[2] #show_interfaces()函数被更改过 list1=sorted(show_interfaces(),key=lambda x:int(x[1])) for i in list1: if "None" in i: continue print(' '.join(i[1:])) print("输入pcap文件路径或输入序号选择网卡:",end="") while True: inp=input() global ftime ftime = time.time() if '.pcap' in inp : try: packets = rdpcap(f'{inp}') print('文件'+inp+'已选择') for i in packets: pack_callback(i) break except: print("文件未找到,请检查文件名以及文件路径是否有误,并重新输入;或者再次通过序号选择网卡:",end="") pass elif int(inp): sniff_1(netcart(inp)) break