Nessus简介
Nessus 是一款广泛使用的网络安全扫描工具,由 Tenable, Inc. 开发和维护。它专注于进行漏洞扫描和合规性审计,以帮助组织识别和解决其 IT 基础设施中的安全漏洞。Nessus 主要用于以下目的:
漏洞扫描:
扫描网络设备、服务器、虚拟机、数据库和其他 IT 资产中的已知漏洞。支持多种协议和服务的安全检查,包括 HTTP、FTP、SSH、DNS 等。
配置审计:
检测 IT 系统中的不安全配置,确保符合安全基准和最佳实践。
恶意软件扫描:
查找潜在的后门、恶意软件和其他安全威胁。
合规性审计:
评估系统是否符合特定的安全政策和法规,如 PCI DSS、HIPAA、NIST 等。
敏感数据检测:
扫描存储和传输中的敏感数据,如信用卡号和个人身份信息 (PII)。
工作原理
Nessus 使用插件(Plugins)系统进行扫描,每个插件包含检测特定漏洞或配置问题的指令。扫描时,Nessus 会连接到目标网络,并基于选定的扫描模板和策略, 发送请求和进行分析。Nessus 插件数据库经常更新,以确保它能够检测到最新的漏洞。
Nessus 主要有以下版本:
Nessus Essentials:免费版本,适用于个人和小型组织,功能有限。
Nessus Professional:付费版本,适用于专业安全测试人员和中小型企业,提供更广泛的功能和支持。
Nessus Expert:扩展了功能,如云环境扫描、容器化应用检测等。
Tenable.io 和 Tenable.sc:更高级的产品,提供企业级的漏洞管理解决方案和集成分析。
优点
易于使用:直观的界面和丰富的模板,用户可以快速上手。
插件更新:定期更新插件库,确保扫描工具始终能检测到最新的威胁。
定制化扫描:用户可以根据具体需求定制扫描任务和报告。
使用场景
企业定期进行的安全审计和风险评估。
安全团队用来识别和修复新发现的漏洞。
合规性检查以确保满足法规要求。
安全工程师和渗透测试人员进行的网络评估和测试。
Nessus 是全球最受欢迎的漏洞扫描工具之一,因其高效的检测能力和易用性,在安全行业中被广泛使用。
代码目录结构
|- export_report
|- config (配置文件目录)
| └── config.ini (核心的配置文件)
|- scripts (脚本目录)
| └── send_report.sh (发送生成文件的bash脚本)
|- main.py (通过NessusApi生成HTML的python文件)
|- nessus_api.py (NessusApi的函数python文件)
|- utils.py (工具python文件)
|- html (HTML存储目录)
相关代码内容
config
config.ini
[NessusInfo]
; nessus的访问站点
website = nessus的访问站点
; api的ak
accessKey = nessus生成的 accessKey
; api的ak
secretKey = nessus生成的 secretKey
[TaskInfo]
; 扫描任务的接口
get_scan_url=scans
; 导出的HTML的模板ID接口
get_template_url=reports/custom/templates
; 获取最新的扫描记录的接口
scan_url = scans/scan_id?limit=2500&includeHostDetailsForHostDiscovery=true
; 导出报告的接口
export_url = scans/scan_id/export?limit=2500&history_id=scan_result_id
; 下载报告的接口
download_url = scans/scan_id/export/fileId/download
; 是否可以导出的接口
export_status_url = tokens/token_id/status
utils.py
import sys
import requests
import os
import json
# 禁用安全请求警告
from requests.packages import urllib3
from datetime import datetime
import time
import configparser
import re
# 保存为 html的函数
def save_html(content, file_path):
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
# 确认目录是否存在
def ensure_directory_exists(directory_path):
# 检查目录是否存在
if not os.path.exists(directory_path):
# 如果目录不存在,创建它
os.makedirs(directory_path)
# 加载ini配置文件
def load_config(file, module_name):
config = configparser.ConfigParser()
config.read(file)
return config[module_name]
# 替换关键字符串的位置
def reg_replace(replace_string, origin_string, new_string):
new_url = re.sub(replace_string, str(new_string), origin_string)
return new_url
nessus_api.py
import requests
import json
# 获取scan_id的接口
def get_scan_id(scanApi,headers):
scan_id=-1
response = requests.get(scanApi,headers=headers, verify=False)
if response.status_code == 200:
scan_id = response.json()['scans'][0]['id']
return scan_id
# 获取自定义模板的接口
def get_template_id(templateApi, headers,template_name='Complete List of Vulnerabilities by Host'):
template_id=-1
response = requests.get(templateApi, headers=headers, verify=False)
if response.status_code == 200:
print(response.json())
for item in response.json():
if item['name'] == template_name:
template_id = item['id']
break
return template_id
def export_api(export_url,template_id,headers):
result=[]
# 导出的载荷模板
payload = json.dumps({
"format": "html",
"template_id": template_id,
"csvColumns": {},
"formattingOptions": {},
"extraFilters": {
"host_ids": [],
"plugin_ids": []
},
"plugin_detail_locale": "en"
})
export_res = requests.post(export_url, headers=headers, data=payload, verify=False)
if export_res.status_code == 200:
result = export_res
return result
else:
return result
main.py
import sys
import requests
import os
import json
# 禁用安全请求警告
from requests.packages import urllib3
from datetime import datetime,timedelta
import time
import configparser
import re
import subprocess
from utils import *
from nessus_api import get_scan_id,get_template_id,export_api
if __name__ == '__main__':
# 禁用安全请求警告
urllib3.disable_warnings()
file_path = os.path.dirname(os.path.realpath(__file__))
config_file = os.path.join(file_path, 'config', 'config.ini')
script_file = os.path.join(file_path, 'scripts', 'send_report.sh')
#集成告警媒介的一段代码
#dingding_notice_file = os.path.join(file_path, 'scripts', 'omc_dingding.py')
# 判断生成html 文件的目录是否有存在, 如果不存在就手动创建
ensure_directory_exists(os.path.join(file_path, "html"))
# 获取 ak sk
nessusInfo = load_config(config_file, "NessusInfo")
website=nessusInfo['website']
ak = nessusInfo['accessKey']
sk = nessusInfo['secretKey']
# 如果获取失败, 打印相关的ak sk 看看是否正常读取
# print(f"ak:{ak};sk:{sk}")
token = f"accessKey={ak}; secretKey={sk}"
# 请求头最新的数据
headers = {
"X-ApiKeys": token,
'Content-Type': 'application/json'
}
# 读取相关的配置文件的内容
TaskInfo_data = load_config(config_file, "TaskInfo")
get_scan_url = website+'/'+TaskInfo_data['get_scan_url']
get_template_url = website+'/'+TaskInfo_data['get_template_url']
scan_url = website+'/'+TaskInfo_data['scan_url']
download_url = website+'/'+TaskInfo_data['download_url']
export_url = website+'/'+TaskInfo_data['export_url']
export_status_url = website+'/'+TaskInfo_data['export_status_url']
scan_id=get_scan_id(get_scan_url,headers)
template_id=get_template_id(get_template_url,headers)
if scan_id == -1 or template_id == -1:
sys.exit(-1)
scan_url = reg_replace("scan_id", scan_url, scan_id)
# 发送POST请求导出报告
response = requests.get(scan_url, headers=headers, verify=False)
scan_result=[]
last_history_record={}
if response.status_code == 200 :
scan_result = json.loads(response.text)
if len(scan_result)>0:
if len(scan_result['history']) > 0:
# 获取 扫描记录的 last_history_record 判断 最新的扫描任务是否完成
if scan_result['history'][-1]['status'] == 'completed':
last_history_record = scan_result['history'][-1]
else:
# 说明还没有扫描完成
print("没有扫描完成")
sys.exit(-1)
else:
# 说明没有扫描记录文件
print("没有获取到报告")
sys.exit(-1)
print(last_history_record)
last_history_id=last_history_record['history_id']
# 获取最新的扫描记录的时间 , 并转换为北京时间
utc_time = datetime.utcfromtimestamp(last_history_record['last_modification_date'])
beijing_time = utc_time + timedelta(hours=8)
formatted_time = beijing_time.strftime('%Y%m%d')
# 发送POST请求导出报告
export_url=reg_replace("scan_result_id", export_url, last_history_id)
export_url = reg_replace("scan_id", export_url, scan_id)
print(f"export_url:{export_url}")
export_res=export_api(export_url, template_id,headers)
export_result=[]
fileId=000
if export_res.status_code == 200:
export_result =json.loads(export_res.text)
fileId=export_result['file']
token_id=export_result['token']
# 发送GET请求下载文件
download_url = reg_replace("fileId", download_url, fileId)
download_url = reg_replace("scan_id", download_url, scan_id)
export_status_url= reg_replace("token_id", export_status_url, token_id)
while True:
response = requests.get(export_status_url, headers=headers, verify=False)
if response.status_code == 200:
try:
data = json.loads(response.content)
if data.get('status') == 'ready':
print('下载已准备好!')
break
except json.JSONDecodeError:
print('返回的响应无法解析为 JSON。')
else:
print(f'请求失败,状态码:{response.status_code}')
print(f'响应内容:{response.content}')
# 等待 3 秒再进行下一次请求
time.sleep(3)
print('可以下载文件了。')
res = requests.get(download_url, headers=headers,verify=False)
filename = os.path.join(os.path.join(file_path, "html"), f"{formatted_time}.html")
# 检查响应状态码是否为200,表示请求成功
if res.status_code == 200:
with open(filename, 'wb') as f:
f.write(res.content)
print("文件已保存到:", filename)
else:
print("下载失败:", res.status_code)
# # 调用脚本将生成的html文件传送到相关的位置归档
subprocess.run(["/bin/bash", script_file, ])
scripts
send_report.sh
#!/bin/bash
# 传入的文件名称
file_name=${1}
# 远程传输的目录 远端的服务器需要做 SSH免密登录
remote_port= ssh端口
remote_host= ssh主机
remote_user= ssh用户
remote_path= ssh目录
if [ -d $file_dir ];then
if [ -f $file_name ];then
scp -P$remote_port $file_name $remote_user@$remote_host:$remote_path
echo "send success"
else
echo "$file_name not exist"
exit -2
fi
else
echo "$file_dir not exist"
exit -1
fi
附录:Nessus生成AKSK的方法


