Addition of OpenVas -- ready for alpha
This commit is contained in:
@ -5,6 +5,7 @@ __author__ = 'Austin Taylor'
|
||||
from base.config import vwConfig
|
||||
from frameworks.nessus import NessusAPI
|
||||
from frameworks.qualys import qualysScanReport
|
||||
from frameworks.openvas import OpenVAS_API
|
||||
from utils.cli import bcolors
|
||||
import pandas as pd
|
||||
from lxml import objectify
|
||||
@ -44,6 +45,7 @@ class vulnWhispererBase(object):
|
||||
self.purge = purge
|
||||
self.develop = develop
|
||||
|
||||
|
||||
if config is not None:
|
||||
self.config = vwConfig(config_in=config)
|
||||
self.enabled = self.config.get(self.CONFIG_SECTION, 'enabled')
|
||||
@ -160,6 +162,18 @@ class vulnWhispererBase(object):
|
||||
results = []
|
||||
return results
|
||||
|
||||
def directory_check(self):
|
||||
if not os.path.exists(self.write_path):
|
||||
os.makedirs(self.write_path)
|
||||
self.vprint('{info} Directory created at {scan} - Skipping creation'.format(
|
||||
scan=self.write_path, info=bcolors.INFO))
|
||||
else:
|
||||
os.path.exists(self.write_path)
|
||||
self.vprint('{info} Directory already exist for {scan} - Skipping creation'.format(
|
||||
scan=self.write_path, info=bcolors.INFO))
|
||||
|
||||
|
||||
|
||||
class vulnWhispererNessus(vulnWhispererBase):
|
||||
|
||||
CONFIG_SECTION = 'nessus'
|
||||
@ -469,24 +483,13 @@ class vulnWhispererQualys(vulnWhispererBase):
|
||||
password=None,
|
||||
):
|
||||
|
||||
super(vulnWhispererQualys, self).__init__(config=config, )
|
||||
super(vulnWhispererQualys, self).__init__(config=config)
|
||||
|
||||
self.qualys_scan = qualysScanReport(config=config)
|
||||
self.latest_scans = self.qualys_scan.qw.get_all_scans()
|
||||
self.directory_check()
|
||||
self.scans_to_process = None
|
||||
|
||||
|
||||
def directory_check(self):
|
||||
if not os.path.exists(self.write_path):
|
||||
os.makedirs(self.write_path)
|
||||
self.vprint('{info} Directory created at {scan} - Skipping creation'.format(
|
||||
scan=self.write_path, info=bcolors.INFO))
|
||||
else:
|
||||
os.path.exists(self.write_path)
|
||||
self.vprint('{info} Directory already exist for {scan} - Skipping creation'.format(
|
||||
scan=self.write_path, info=bcolors.INFO))
|
||||
|
||||
def whisper_reports(self,
|
||||
report_id=None,
|
||||
launched_date=None,
|
||||
@ -609,6 +612,135 @@ class vulnWhispererQualys(vulnWhispererBase):
|
||||
exit(0)
|
||||
|
||||
|
||||
class vulnWhispererOpenVAS(vulnWhispererBase):
|
||||
CONFIG_SECTION = 'openvas'
|
||||
COLUMN_MAPPING = {'IP': 'asset',
|
||||
'Hostname': 'hostname',
|
||||
'Port': 'port',
|
||||
'Port Protocol': 'protocol',
|
||||
'CVSS': 'cvss',
|
||||
'Severity': 'severity',
|
||||
'Solution Type': 'category',
|
||||
'NVT Name': 'plugin_name',
|
||||
'Summary': 'synopsis',
|
||||
'Specific Result': 'plugin_output',
|
||||
'NVT OID': 'nvt_oid',
|
||||
'Task ID': 'task_id',
|
||||
'Task Name': 'task_name',
|
||||
'Timestamp': 'timestamp',
|
||||
'Result ID': 'result_id',
|
||||
'Impact': 'description',
|
||||
'Solution': 'solution',
|
||||
'Affected Software/OS': 'affected_software',
|
||||
'Vulnerability Insight': 'vulnerability_insight',
|
||||
'Vulnerability Detection Method': 'vulnerability_detection_method',
|
||||
'Product Detection Result': 'product_detection_result',
|
||||
'BIDs': 'bids',
|
||||
'CERTs': 'certs',
|
||||
'Other References': 'see_also'
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config=None,
|
||||
db_name='report_tracker.db',
|
||||
purge=False,
|
||||
verbose=None,
|
||||
debug=False,
|
||||
username=None,
|
||||
password=None,
|
||||
):
|
||||
super(vulnWhispererOpenVAS, self).__init__(config=config)
|
||||
|
||||
self.port = int(self.config.get(self.CONFIG_SECTION, 'port'))
|
||||
self.template_id = self.config.get(self.CONFIG_SECTION, 'report_format_id')
|
||||
self.develop = True
|
||||
self.purge = purge
|
||||
self.scans_to_process = None
|
||||
self.openvas_api = OpenVAS_API(hostname=self.hostname, port=self.port, username=self.username,
|
||||
password=self.password)
|
||||
|
||||
def whisper_reports(self, output_format='json', launched_date=None, report_id=None, cleanup=True):
|
||||
report = None
|
||||
if report_id:
|
||||
print('Processing report ID: %s' % report_id)
|
||||
|
||||
vuln_ready = self.openvas_api.process_report(report_id=report_id)
|
||||
scan_name = report_id.replace('-', '')
|
||||
vuln_ready['scan_name'] = scan_name
|
||||
vuln_ready['scan_reference'] = report_id
|
||||
vuln_ready.rename(columns=self.COLUMN_MAPPING, inplace=True)
|
||||
report_name = 'openvas_scan_{scan_name}_{last_updated}.{extension}'.format(scan_name=scan_name,
|
||||
last_updated=launched_date,
|
||||
extension=output_format)
|
||||
relative_path_name = self.path_check(report_name)
|
||||
scan_reference = report_id
|
||||
print relative_path_name
|
||||
|
||||
if os.path.isfile(relative_path_name):
|
||||
# TODO Possibly make this optional to sync directories
|
||||
file_length = len(open(relative_path_name).readlines())
|
||||
record_meta = (
|
||||
scan_name,
|
||||
scan_reference,
|
||||
launched_date,
|
||||
report_name,
|
||||
time.time(),
|
||||
file_length,
|
||||
self.CONFIG_SECTION,
|
||||
report_id,
|
||||
1,
|
||||
)
|
||||
self.record_insert(record_meta)
|
||||
self.vprint('{info} File {filename} already exist! Updating database'.format(info=bcolors.INFO,
|
||||
filename=relative_path_name))
|
||||
|
||||
record_meta = (
|
||||
scan_name,
|
||||
scan_reference,
|
||||
launched_date,
|
||||
report_name,
|
||||
time.time(),
|
||||
vuln_ready.shape[0],
|
||||
self.CONFIG_SECTION,
|
||||
report_id,
|
||||
1,
|
||||
)
|
||||
|
||||
vuln_ready.port = vuln_ready.port.fillna(0).astype(int)
|
||||
if output_format == 'json':
|
||||
with open(relative_path_name, 'w') as f:
|
||||
f.write(vuln_ready.to_json(orient='records', lines=True))
|
||||
print('{success} - Report written to %s'.format(success=bcolors.SUCCESS) \
|
||||
% report_name)
|
||||
|
||||
return report
|
||||
|
||||
def identify_scans_to_process(self):
|
||||
if self.uuids:
|
||||
self.scans_to_process = self.openvas_api.openvas_reports[
|
||||
~self.openvas_api.openvas_reports.report_ids.isin(self.uuids)]
|
||||
else:
|
||||
self.scans_to_process = self.openvas_api.openvas_reports
|
||||
self.vprint('{info} Identified {new} scans to be processed'.format(info=bcolors.INFO,
|
||||
new=len(self.scans_to_process)))
|
||||
|
||||
def process_openvas_scans(self):
|
||||
counter = 0
|
||||
self.identify_scans_to_process()
|
||||
if self.scans_to_process.shape[0]:
|
||||
for scan in self.scans_to_process.iterrows():
|
||||
counter += 1
|
||||
info = scan[1]
|
||||
print(
|
||||
'[INFO] Processing %s/%s - Report ID: %s' % (counter, len(self.scans_to_process), info['report_ids']))
|
||||
self.whisper_reports(report_id=info['report_ids'],
|
||||
launched_date=info['epoch'])
|
||||
self.vprint('{info} Processing complete!'.format(info=bcolors.INFO))
|
||||
else:
|
||||
self.vprint('{info} No new scans to process. Exiting...'.format(info=bcolors.INFO))
|
||||
self.conn.close()
|
||||
exit(0)
|
||||
|
||||
|
||||
|
||||
@ -639,4 +771,8 @@ class vulnWhisperer(object):
|
||||
|
||||
elif self.profile == 'qualys':
|
||||
vw = vulnWhispererQualys(config=self.config)
|
||||
vw.process_web_assets()
|
||||
vw.process_web_assets()
|
||||
|
||||
elif self.profile == 'openvas':
|
||||
vw_openvas = vulnWhispererOpenVAS(config=self.config)
|
||||
vw_openvas.process_openvas_scans()
|
Reference in New Issue
Block a user