Files
VulnWhisperer/vulnwhisp/frameworks/openvas.py
2020-03-03 08:48:00 +01:00

194 lines
7.4 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
__author__ = 'Austin Taylor'
import datetime as dt
import io
import logging
import pandas as pd
import requests
from bs4 import BeautifulSoup
class OpenVAS_API(object):
OMP = '/omp'
def __init__(self,
hostname=None,
port=None,
username=None,
password=None,
report_format_id=None,
verbose=True):
self.logger = logging.getLogger('OpenVAS_API')
if verbose:
self.logger.setLevel(logging.DEBUG)
if username is None or password is None:
raise Exception('ERROR: Missing username or password.')
self.username = username
self.password = password
self.base = 'https://{hostname}:{port}'.format(hostname=hostname, port=port)
self.verbose = verbose
self.processed_reports = 0
self.report_format_id = report_format_id
self.headers = {
'Origin': self.base,
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.8',
'User-Agent': 'VulnWhisperer for OpenVAS',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Cache-Control': 'max-age=0',
'Referer': self.base,
'X-Requested-With': 'XMLHttpRequest',
'Connection': 'keep-alive',
}
self.login()
self.openvas_reports = self.get_reports()
self.report_formats = self.get_report_formats()
def login(self):
resp = self.get_token()
if resp.status_code is 200:
xml_response = BeautifulSoup(resp.content, 'lxml')
self.token = xml_response.find(attrs={'id': 'gsa-token'}).text
self.cookies = resp.cookies.get_dict()
else:
raise Exception('[FAIL] Could not login to OpenVAS')
def request(self, url, data=None, params=None, headers=None, cookies=None, method='POST', download=False,
json=False):
if headers is None:
headers = self.headers
if cookies is None:
cookies = self.cookies
timeout = 0
success = False
url = self.base + url
methods = {'GET': requests.get,
'POST': requests.post,
'DELETE': requests.delete}
while (timeout <= 10) and (not success):
data = methods[method](url,
data=data,
headers=self.headers,
params=params,
cookies=cookies,
verify=False)
if data.status_code == 401:
try:
self.login()
timeout += 1
self.logger.info(' Token refreshed')
except Exception as e:
self.logger.error('Could not refresh token\nReason: {}'.format(str(e)))
else:
success = True
if json:
data = data.json()
if download:
return data.content
return data
def get_token(self):
data = [
('cmd', 'login'),
('text', '/omp?r=1'),
('login', self.username),
('password', self.password),
]
token = requests.post(self.base + self.OMP, data=data, verify=False)
return token
def get_report_formats(self):
params = (
('cmd', 'get_report_formats'),
('token', self.token)
)
self.logger.info('Retrieving available report formats')
data = self.request(url=self.OMP, method='GET', params=params)
bs = BeautifulSoup(data.content, "lxml")
table_body = bs.find('tbody')
rows = table_body.find_all('tr')
format_mapping = {}
for row in rows:
cols = row.find_all('td')
for x in cols:
for y in x.find_all('a'):
if y.get_text() != '':
format_mapping[y.get_text()] = \
[h.split('=')[1] for h in y['href'].split('&') if 'report_format_id' in h][0]
return format_mapping
def get_reports(self, complete=True):
self.logger.info('Retreiving OpenVAS report data...')
params = (('cmd', 'get_reports'),
('token', self.token),
('max_results', 1),
('ignore_pagination', 1),
('filter', 'apply_overrides=1 min_qod=70 autofp=0 first=1 rows=0 levels=hml sort-reverse=severity'),
)
reports = self.request(self.OMP, params=params, method='GET')
soup = BeautifulSoup(reports.text, 'lxml')
data = []
links = []
table = soup.find('table', attrs={'class': 'gbntable'})
table_body = table.find('tbody')
rows = table_body.find_all('tr')
for row in rows:
cols = row.find_all('td')
links.extend([a['href'] for a in row.find_all('a', href=True) if 'get_report' in str(a)])
cols = [ele.text.strip() for ele in cols]
data.append([ele for ele in cols if ele])
report = pd.DataFrame(data, columns=['date', 'status', 'task', 'scan_severity', 'high', 'medium', 'low', 'log',
'false_pos'])
if report.shape[0] != 0:
report['links'] = links
report['report_ids'] = report.links.str.extract('.*report_id=([a-z-0-9]*)', expand=False)
report['epoch'] = (pd.to_datetime(report['date']) - dt.datetime(1970, 1, 1)).dt.total_seconds().astype(int)
else:
raise Exception("Could not retrieve OpenVAS Reports - Please check your settings and try again")
report['links'] = links
report['report_ids'] = report.links.str.extract('.*report_id=([a-z-0-9]*)', expand=False)
report['epoch'] = (pd.to_datetime(report['date']) - dt.datetime(1970, 1, 1)).dt.total_seconds().astype(int)
if complete:
report = report[report.status == 'Done']
severity_extraction = report.scan_severity.str.extract('([0-9.]*) \(([\w]+)\)', expand=False)
severity_extraction.columns = ['scan_highest_severity', 'severity_rate']
report_with_severity = pd.concat([report, severity_extraction], axis=1)
return report_with_severity
def process_report(self, report_id):
params = (
('token', self.token),
('cmd', 'get_report'),
('report_id', report_id),
('filter', 'apply_overrides=0 min_qod=70 autofp=0 levels=hml first=1 rows=0 sort-reverse=severity'),
('ignore_pagination', '1'),
('report_format_id', '{report_format_id}'.format(report_format_id=self.report_formats['CSV Results'])),
('submit', 'Download'),
)
self.logger.info('Retrieving {}'.format(report_id))
req = self.request(self.OMP, params=params, method='GET')
report_df = pd.read_csv(io.BytesIO(req.text.encode('utf-8')))
report_df['report_ids'] = report_id
self.processed_reports += 1
merged_df = pd.merge(report_df, self.openvas_reports, on='report_ids').reset_index().drop('index', axis=1)
return merged_df