脚本只测试过H3C S12500系列交换机,只支持10GE、40GE、100GE接口。
#Author: An
import ping3
import paramiko
import re
import time
import xlwt
import os
def Get_Info():
f = open('./hostlist.txt', 'r')
for line in f:
line = line.strip('\n')
SSH_Connect(line)
def SSH_Connect(ssh_ip):
print('正在通过SSH获取数据中,请稍等~')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_ping = ping3.ping(ssh_ip)
#SSH钱先进行PING测试设备
if ssh_ping == None:
print('%s 网络无法PING通,请检查' % ssh_ip)
else:
#通过SSH登录设备获取光衰信息
ssh.connect(ssh_ip, 22, 'username', 'password')
channel = ssh.invoke_shell()
time.sleep(1)
channel.send('screen-length disable\ndisplay curr | in sysname\ndisplay transceiver diagnosis interface\n')
time.sleep(60)
recv_temp = channel.recv(9999999).decode(errors='ignore')
#将获取的结果写入到临时文件中
f = open('./H3C_temp.log', 'w+')
f.write(recv_temp)
f.close()
def Get_Trans(POWER):
print('正在判断光衰值,请稍等~')
Trans_list = []
Trans_list_temp = []
Interface_ID = []
rx = 0
tx = 0
bias = 0
f = open('./H3C_temp.log', 'r')
for line in f:
#读取设备名称
if re.findall('sysname', line):
Hostname = line.split(' ')[2]
#读取接口ID
if re.match('Ten|Forty|Hundred', line):
Interface_ID = line.split(' ')
#判断接口为40GE和100GE的光衰情况
if len(Interface_ID) != 0 and re.match('Forty|Hundred', Interface_ID[0]):
#判断接口是否缺少模块
if re.match('The transceiver is absent', line):
print(Interface_ID[0] + '缺少模块')
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append('接口缺少模块')
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
#判断是否40GE/100GE的光衰值行内容
if line != '\n' and re.match(r'\d', line[4]):
#取数据,由于re返回结果为list,因此用其他变量取值更明朗写,也可以简化写一下,但是re需要运行多次
temp = re.findall(r'-?\d+\.?\d*', line)
rx = float(temp[2])
bias = float(temp[1])
tx = float(temp[3])
#判断收光大于设定值,并且bias不等于0,tx和rx不能相等的情况才是需要的rx值
#TX=RX相等的情况下说明接口被shutdown掉,不做处理。bias在H3C中不准确,不确定该值是否影响结果
if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx:
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append(rx)
rx = 0
bias = 0
tx = 0
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
#判断10GE接口
if len(Interface_ID) != 0 and re.match('Ten', Interface_ID[0]):
if line != '\n' and re.match(r'\d', line[4]):
temp = re.findall(r'-?\d+\.?\d*', line)
rx = float(temp[3])
bias = float(temp[2])
tx = float(temp[4])
if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx:
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append(rx)
rx = 0
bias = 0
tx = 0
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
print('.', end='')
f.close()
print('光衰值判断完毕,准备写入,请稍等~')
return Trans_list
def Write_Info(Trans_list):
workbook_result = xlwt.Workbook(encoding='utf-8')
sheet = workbook_result.add_sheet(u'sheet1', cell_overwrite_ok=True)
# 设置标题样式和内容样式
style_title = xlwt.XFStyle()
style_context = xlwt.XFStyle()
font = xlwt.Font()
align = xlwt.Alignment()
align.horz = 0x02
align.vert = 0x01
style_title.alignment = align
style_context.alignment = align
font.bold = True
style_title.font = font
# 设置列宽
sheet.col(0).width = 10000
sheet.col(1).width = 10000
sheet.col(2).width = 10000
# 写表格表头
sheet.write(0, 0, '交换机名称', style_title)
sheet.write(0, 1, '交换机端口ID', style_title)
sheet.write(0, 2, 'RX Power(dBm)', style_title)
# 循环写入表格
i = 0
for i in range(i, len(Trans_list)):
for j in range(3):
sheet.write(i + 1, j, Trans_list[i][j], style_context)
workbook_result.save('H3C_Trans_result.xlsx')
print('程序执行完毕~')
if __name__ == '__main__':
POWER = -4
Get_Info()
Trans_list = Get_Trans(POWER)
Write_Info(Trans_list)
os.remove('./H3C_temp.log')