On big networks or data centres where you have run variety of command line tests in minutes, then this solution can help you.
- Project has got python code, a json config and test cases CSV files (you can write as many as needed).
- The code logs into machines/devices using ssh with predefined credentials in json config.
- In parallel how many machines/devices will be picked in also defined in json config.
- Test cases file is a CSV file containing command to be executed, results to be matched with and match logic.
Here is the json config.
{
"maxhosts": 5,
"credsets": {
"testbot": { "username": "testbot", "password": "xxxx", "enpass": "xxxxx" },
"testbot2": { "username": "user1", "password": "xxxxx", "enpass": "xxxxxx" },
"testbot1": { "username": "testbot", "password": "xxxxxx" }
},
"hosts": {
"192.168.1.1": {"type": "linux", "credset": "testbot", "testfile": "/home/manish/test1.csv" },
"192.168.1.2": {"type": "linux", "credset": "testbot2", "testfile": "/home/manish/test1.csv" },
"192.168.1.3": {"type": "linux", "credset": "testbot1", "port": 22, "testfile": "/home/manish/test2.csv" }
},
"results": {
"output": "/home/manish/results.csv",
"enable": true
},
"logging": {
"logfile": "/home/manish/logs/test.logs",
"remote": {
"enable": true,
"LoggerName": "MKALogs",
"sysloghosts": {
"lab1": {
"sysloghost": "192.168.1.100",
"port": 514
},
"lab2": {
"sysloghost": "192.168.1.101",
"port": 514
}
}
}
},
"webhook": {
"enable": true,
"auth": {
"auth1": {
"username": "user1",
"password": "pass1"
},
"auth2": {
"username": "user2",
"password": "pass2"
}
},
"hooks": {
"hook1": {
"endpoint": "http://192.168.1.102:8080",
"auth": "auth1"
},
"hook2": {
"endpoint": "http://192.168.1.103:8090",
"auth": "auth2"
}
}
}
}
Python code:
#!/usr/local/bin/python3
from netmiko import ConnectHandler
from multiprocessing import Pool
import os
import json
from time import strftime, localtime
import sys
import csv
import base64
import logging
import logging.handlers
try:
params = sys.argv[1]
param, value = params.split('=')
if param != "--config":
sys.exit()
playconf = value
except:
print("Usage: testbot.py --config=<json config>")
sys.exit()
with open(playconf) as cfg:
cfgdata = json.load(cfg)
def RemoteSyslog(LogMessage):
sysloghosts = cfgdata.get('logging').get('remote').get('sysloghosts')
LoggerName = cfgdata.get('logging').get('remote').get('LoggerName')
#print("In Syslog", LogMessage)
Logger = logging.getLogger(LoggerName)
Logger.setLevel(logging.INFO)
for syslogobj in sysloghosts.keys():
print(syslogobj)
sysloghost = cfgdata.get('logging').get('remote').get('sysloghosts').get(syslogobj).get('sysloghost')
port = cfgdata.get('logging').get('remote').get('sysloghosts').get(syslogobj).get('port')
handler = logging.handlers.SysLogHandler(address = (sysloghost,port))
Logger.addHandler(handler)
Logger.info(LogMessage)
def CommitLogs(LogMessage):
logfile = cfgdata.get('logging').get('logfile')
try:
fopen = open(logfile, "a")
try:
fopen.write(LogMessage+"\n")
fopen.close()
except:
print("Failed to write ",LogMessage)
return
except:
print("failed to open file", logfile)
return
def CommitResults(message):
resultfile = cfgdata.get('results').get('output')
try:
fopen = open(resultfile, "a")
try:
fopen.write(message+"\n")
fopen.close()
except:
print("Failed to write ",message)
return
except:
print("failed to open file", resultfile)
return
def base64encode(message):
message_bytes = message.encode("ascii")
base64_bytes = base64.b64encode(message_bytes)
base64_string = base64_bytes.decode("ascii")
return(base64_string)
def base64decode(base64_string):
base64_bytes = base64_string.encode("ascii")
message_bytes = base64.b64decode(base64_bytes)
message = message_bytes.decode("ascii")
return(message)
def compare(output, expected_output, matchLogic):
print(expected_output, matchLogic)
if matchLogic == "exact":
if expected_output == output:
#print(output, expected_output, "OK")
#output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Pass")
else:
#print(output, expected_output, "Not OK")
#output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Fail")
return(message)
if matchLogic == "exists":
if expected_output in output:
#print(output, expected_output, "OK")
output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Pass")
else:
#print(output, expected_output, "Not OK")
output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Fail")
return(message)
if matchLogic == "not_exists":
if expected_output not in output:
#print(output, expected_output, "OK")
output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Pass")
else:
#print(output, expected_output, "Not OK")
output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Fail")
return(message)
if matchLogic == "min":
if float(expected_output) <= float(output):
message = "{}, {}, {}".format(expected_output, output, "Pass")
else:
message = "{}, {}, {}".format(expected_output, output, "Fail")
return(message)
if matchLogic == "max":
if float(expected_output) >= float(output):
message = "{}, {}, {}".format(expected_output, output, "Pass")
else:
message = "{}, {}, {}".format(expected_output, output, "Fail")
return(message)
if matchLogic == "record":
output = base64encode(output)
message = "{}, {}, {}".format(expected_output, output, "Check")
return(message)
def injectcmds(device, testfile):
try:
net_connect = ConnectHandler(**device)
try:
LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" Logged in..."
print(LogMessage)
CommitLogs(LogMessage)
if device.get('secret') is not None:
net_connect.enable()
LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" Enabled Elevated access...."
print(LogMessage)
CommitLogs(LogMessage)
sshstatus = 0
except:
LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+";Cannot gain elevated access...."
print(LogMessage)
CommitLogs(LogMessage)
sshstatus = -1
except:
LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+";SSH failed as "+device.get('username')
print(LogMessage)
CommitLogs(LogMessage)
sshstatus = -1
if (sshstatus == 0):
print(testfile)
with open(testfile, 'r') as testcases:
tcobj = csv.reader(testcases, delimiter=',')
for tc in tcobj:
cmd, delayfac=tc[0], int(tc[3])
#print(cmd)
try:
status = net_connect.send_command(cmd, delay_factor=delayfac)
LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" Command:"+cmd+"\nOutput:"+status
compresult = compare(status, tc[1].strip(), tc[2].strip())
#print(compresult)
resultrec = "{},{},{},{}, {}".format(device.get('host'),tc[4],tc[0],tc[2],compresult)
#print(resultrec)
#print(LogMessage)
if cfgdata.get('results').get('enable') == True:
CommitResults(resultrec)
if cfgdata.get('logging').get('remote').get('enable') == True:
RemoteSyslog(resultrec)
CommitLogs(LogMessage)
except:
LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" "+cmd+" command failed"
print(LogMessage)
CommitLogs(LogMessage)
def hostcmd(host):
hostdata = cfgdata.get('hosts').get(host)
type = hostdata.get('type')
testfile = hostdata.get('testfile')
port = hostdata.get('port')
if port is None:
port = 22
credset = hostdata.get('credset')
credparams = cfgdata.get('credsets').get(credset).keys()
username = cfgdata.get('credsets').get(credset).get("username")
password = cfgdata.get('credsets').get(credset).get("password")
if "enpass" in credparams:
enpass = cfgdata.get('credsets').get(credset).get("enpass")
else:
enpass = None
# print("Enpass", host, enpass)
if enpass is None:
device = {
'device_type': type,
'host': host,
'port': port,
'username': username,
'password': password,
}
else:
device = {
'device_type': type,
'host': host,
'port': port,
'username': username,
'password': password,
'secret': enpass,
}
injectcmds(device, testfile)
if __name__ == "__main__":
hostlist = list(cfgdata.get('hosts').keys())
maxhosts = cfgdata.get('maxhosts')
with Pool(maxhosts) as p:
#print(p, hostlist)
p.map(hostcmd, hostlist)
Sample test cases:
"1"; "ls -l /tmp/test*"; "nso"; "exact"; 2; "tag1"
"2"; "uptime"; "average"; "exists"; 1; "tag2"
"3"; "cat /etc/redhat-release"; "8"; "not_exists"; 3; "tag2"
Test Results:
"1"; "192.168.1.3"; "tag1"; "ls -l /tmp/test*"; "exact"; "nso"; "-rw-rw-r-- 1 marya marya 1301 Aug 17 15:52 /tmp/test.txt"; "Fail"
"2"; "10.91.140.108"; "tag2"; "uptime"; "exists"; "average"; " 10:55:56 up 68 days, 23:56, 3 users, load average: 0.00, 0.01, 0.05"; "Pass"
"3"; "10.91.140.108"; "tag2"; "cat /etc/redhat-release"; "not_exists"; "8"; "Red Hat Enterprise Linux Server release 7.9 (Maipo)"; "Pass"
Trace logs:
2023-08-18 10:55:52 192.168.1.2;SSH failed as testbot
2023-08-18 10:55:54 192.168.1.3 Logged in...
2023-08-18 10:55:55 192.168.1.3 Command: ls -l /tmp/test*
Output:-rw-rw-r-- 1 marya marya 1301 Aug 17 15:52 /tmp/test.txt
2023-08-18 10:55:56 192.168.1.3 Command: uptime
Output: 10:55:56 up 68 days, 23:56, 3 users, load average: 0.00, 0.01, 0.05
2023-08-18 10:55:58 192.168.1.3 Command: cat /etc/redhat-release
Output:Red Hat Enterprise Linux Server release 7.9 (Maipo)