add option to list available scans

This commit is contained in:
pemontto
2019-05-10 15:57:39 +01:00
parent aa9fa5b652
commit 32b54391e5
2 changed files with 108 additions and 20 deletions

View File

@ -32,6 +32,8 @@ def main():
help='Regex filter to limit to matching scan names')
parser.add_argument('--days', dest='days', type=int, required=False,
help='Only import scans in the last X days')
parser.add_argument('-l', '--list', dest='list_scans', required=False, action="store_true",
help='List available scans')
parser.add_argument('--source', dest='source', required=False,
help='JIRA required only! Source scanner to report')
parser.add_argument('-n', '--scanname', dest='scanname', required=False,
@ -93,7 +95,8 @@ def main():
source=args.source,
scan_filter=args.scan_filter,
days=args.days,
scanname=args.scanname)
scanname=args.scanname,
list_scans=args.list_scans)
exit_code += vw.whisper_vulnerabilities()
else:
logger.info('Running vulnwhisperer for section {}'.format(args.section))
@ -104,7 +107,8 @@ def main():
source=args.source,
scan_filter=args.scan_filter,
days=args.days,
scanname=args.scanname)
scanname=args.scanname,
list_scans=args.list_scans)
exit_code += vw.whisper_vulnerabilities()
close_logging_handlers(logger)

View File

@ -6,11 +6,12 @@ import io
import json
import logging
import os
import re
import socket
import sqlite3
import sys
import time
import re
from datetime import datetime
import numpy as np
import pandas as pd
@ -330,6 +331,24 @@ class vulnWhispererBase(object):
return df
def print_available_scans(self, scan_list):
"""
Takes a list of dicts with fields 'time', 'scan_name', 'imported' and 'status' and prints a table
"""
output_string = '| {time} | {scan_name}\t| {imported}\t| {status} |'
print '-' * 118
print output_string.format(time='Time'.ljust(19), scan_name='Scan Name'.ljust(60), imported='Imported'.ljust(8), status='Status'.ljust(10))
print '-' * 118
for scan in sorted(scan_list, key=lambda k: k['time']):
scan['imported'] = scan['imported'].ljust(8)
scan['scan_name'] = scan['scan_name'].encode('utf-8')[:60].ljust(60)
scan['time'] = scan['time'][:19].ljust(19)
scan['status'] = scan['status'][:10].ljust(10)
print output_string.format(**scan)
print '-' * 118
return 0
class vulnWhispererNessus(vulnWhispererBase):
@ -345,6 +364,7 @@ class vulnWhispererNessus(vulnWhispererBase):
profile='nessus',
scan_filter=None,
days=None,
list_scans=None,
):
self.CONFIG_SECTION=profile
@ -357,6 +377,7 @@ class vulnWhispererNessus(vulnWhispererBase):
self.develop = True
self.purge = purge
self.list_scans = list_scans
try:
self.nessus_port = self.config.get(self.CONFIG_SECTION, 'port')
@ -449,6 +470,17 @@ class vulnWhispererNessus(vulnWhispererBase):
folders = scan_data['folders']
scans = scan_data['scans'] if scan_data['scans'] else []
all_scans = self.scan_count(scans)
if self.list_scans:
scan_list = []
for scan in all_scans:
scan['imported'] = 'Yes' if scan['uuid'] in self.uuids else 'No'
scan['time'] = datetime.utcfromtimestamp(scan['norm_time']).isoformat()
scan_list.append(scan)
print 'Available {} scans:'.format(self.CONFIG_SECTION)
self.print_available_scans(scan_list)
return 0
if self.uuids:
scan_list = [
scan for scan in all_scans
@ -600,6 +632,7 @@ class vulnWhispererQualysWAS(vulnWhispererBase):
debug=False,
scan_filter=None,
days=None,
list_scans=None,
):
super(vulnWhispererQualysWAS, self).__init__(config=config, verbose=verbose, debug=debug, scan_filter=scan_filter, days=days)
@ -612,6 +645,7 @@ class vulnWhispererQualysWAS(vulnWhispererBase):
self.latest_scans = self.qualys_scan.qw.get_all_scans(days=self.days)
self.directory_check()
self.scans_to_process = None
self.list_scans = list_scans
def whisper_reports(self,
report_id=None,
@ -714,11 +748,6 @@ class vulnWhispererQualysWAS(vulnWhispererBase):
def identify_scans_to_process(self):
if self.scan_filter:
self.logger.info('Filtering scans that match "{}"'.format(self.scan_filter))
self.latest_scans = self.latest_scans.loc[
self.latest_scans["name"].str.contains(self.scan_filter, case=False)
]
if self.uuids:
self.scans_to_process = self.latest_scans[~self.latest_scans['id'].isin(self.uuids)]
else:
@ -728,6 +757,24 @@ class vulnWhispererQualysWAS(vulnWhispererBase):
def process_web_assets(self):
counter = 0
if self.scan_filter:
self.logger.info('Filtering scans that match "{}"'.format(self.scan_filter))
self.latest_scans = self.latest_scans.loc[
self.latest_scans["name"].str.contains(self.scan_filter, case=False)
]
if self.list_scans:
if self.uuids:
self.latest_scans.loc[self.latest_scans['id'].isin(self.uuids), 'imported'] = 'Yes'
else:
self.latest_scans['imported'] = 'No'
self.latest_scans['imported'].fillna('No', inplace=True)
self.latest_scans.rename(columns={'launchedDate': 'time', 'name': 'scan_name'}, inplace=True)
print 'Available {} scans:'.format(self.CONFIG_SECTION)
self.print_available_scans(self.latest_scans[['time', 'scan_name', 'imported', 'status']].to_dict(orient='records'))
return 0
self.identify_scans_to_process()
if self.scans_to_process.shape[0]:
for app in self.scans_to_process.iterrows():
@ -756,6 +803,7 @@ class vulnWhispererOpenVAS(vulnWhispererBase):
debug=False,
scan_filter=None,
days=None,
list_scans=None,
):
super(vulnWhispererOpenVAS, self).__init__(config=config, verbose=verbose, debug=debug, scan_filter=scan_filter, days=days)
self.logger = logging.getLogger('vulnWhispererOpenVAS')
@ -768,6 +816,7 @@ class vulnWhispererOpenVAS(vulnWhispererBase):
self.develop = True
self.purge = purge
self.scans_to_process = None
self.list_scans = list_scans
self.openvas_api = OpenVAS_API(hostname=self.hostname,
port=self.port,
username=self.username,
@ -850,6 +899,20 @@ class vulnWhispererOpenVAS(vulnWhispererBase):
def process_openvas_scans(self):
counter = 0
if self.list_scans:
self.scans_to_process = self.openvas_api.openvas_reports
if self.uuids:
self.scans_to_process.loc[self.scans_to_process['report_ids'].isin(self.uuids), 'imported'] = 'Yes'
else:
self.scans_to_process['imported'] = 'No'
self.scans_to_process['imported'].fillna('No', inplace=True)
self.scans_to_process['time'] = pd.to_datetime(self.scans_to_process['epoch'], unit='s').astype(str)
self.scans_to_process.rename(columns={'task': 'scan_name'}, inplace=True)
print 'Available {} scans:'.format(self.CONFIG_SECTION)
self.print_available_scans(self.scans_to_process[['time', 'scan_name', 'imported', 'status']].to_dict(orient='records'))
return 0
self.identify_scans_to_process()
if self.scans_to_process.shape[0]:
for scan in self.scans_to_process.iterrows():
@ -878,6 +941,7 @@ class vulnWhispererQualysVM(vulnWhispererBase):
debug=False,
scan_filter=None,
days=None,
list_scans=None,
):
super(vulnWhispererQualysVM, self).__init__(config=config, verbose=verbose, debug=debug, scan_filter=scan_filter, days=days)
@ -889,6 +953,8 @@ class vulnWhispererQualysVM(vulnWhispererBase):
self.qualys_scan = qualysVulnScan(config=config)
self.directory_check()
self.scans_to_process = None
self.list_scans = list_scans
self.latest_scans = self.qualys_scan.qw.get_all_scans(days=self.days)
def whisper_reports(self,
report_id=None,
@ -970,12 +1036,6 @@ class vulnWhispererQualysVM(vulnWhispererBase):
return self.exit_code
def identify_scans_to_process(self):
self.latest_scans = self.qualys_scan.qw.get_all_scans(days=self.days)
if self.scan_filter:
self.logger.info('Filtering scans that match "{}"'.format(self.scan_filter))
self.latest_scans = self.latest_scans.loc[
self.latest_scans["name"].str.contains(self.scan_filter, case=False)
]
if self.uuids:
self.scans_to_process = self.latest_scans.loc[
(~self.latest_scans['id'].isin(self.uuids))
@ -987,6 +1047,23 @@ class vulnWhispererQualysVM(vulnWhispererBase):
def process_vuln_scans(self):
counter = 0
if self.scan_filter:
self.logger.info('Filtering scans that match "{}"'.format(self.scan_filter))
self.latest_scans = self.latest_scans.loc[
self.latest_scans["name"].str.contains(self.scan_filter, case=False)
]
if self.list_scans:
if self.uuids:
self.latest_scans.loc[self.latest_scans['id'].isin(self.uuids), 'imported'] = 'Yes'
else:
self.latest_scans['imported'] = 'No'
self.latest_scans['imported'].fillna('No', inplace=True)
self.latest_scans.rename(columns={'date': 'time', 'name': 'scan_name'}, inplace=True)
print 'Available {} scans:'.format(self.CONFIG_SECTION)
self.print_available_scans(self.latest_scans[['time', 'scan_name', 'imported', 'status']].to_dict(orient='records'))
return 0
self.identify_scans_to_process()
if self.scans_to_process.shape[0]:
for app in self.scans_to_process.iterrows():
@ -1297,7 +1374,8 @@ class vulnWhisperer(object):
source=None,
scan_filter=None,
days=None,
scanname=None):
scanname=None,
list_scans=None):
self.logger = logging.getLogger('vulnWhisperer')
self.logger.setLevel(logging.DEBUG if debug else logging.INFO if verbose else logging.WARNING)
@ -1307,6 +1385,7 @@ class vulnWhisperer(object):
self.config = config
self.source = source
self.scan_filter = scan_filter
self.list_scans = list_scans
self.days = days
self.scanname = scanname
self.exit_code = 0
@ -1320,7 +1399,8 @@ class vulnWhisperer(object):
scan_filter=self.scan_filter,
days=self.days,
verbose=self.verbose,
debug=self.debug)
debug=self.debug,
list_scans=self.list_scans)
self.exit_code += vw.whisper_nessus()
elif self.profile == 'qualys_was':
@ -1328,7 +1408,8 @@ class vulnWhisperer(object):
scan_filter=self.scan_filter,
days=self.days,
verbose=self.verbose,
debug=self.debug)
debug=self.debug,
list_scans=self.list_scans)
self.exit_code += vw.process_web_assets()
elif self.profile == 'openvas':
@ -1336,7 +1417,8 @@ class vulnWhisperer(object):
scan_filter=self.scan_filter,
days=self.days,
verbose=self.verbose,
debug=self.debug)
debug=self.debug,
list_scans=self.list_scans)
self.exit_code += vw_openvas.process_openvas_scans()
elif self.profile == 'tenable':
@ -1345,7 +1427,8 @@ class vulnWhisperer(object):
scan_filter=self.scan_filter,
days=self.days,
verbose=self.verbose,
debug=self.debug)
debug=self.debug,
list_scans=self.list_scans)
self.exit_code += vw.whisper_nessus()
elif self.profile == 'qualys_vm':
@ -1353,7 +1436,8 @@ class vulnWhisperer(object):
scan_filter=self.scan_filter,
days=self.days,
verbose=self.verbose,
debug=self.debug)
debug=self.debug,
list_scans=self.list_scans)
self.exit_code += vw.process_vuln_scans()
elif self.profile == 'jira':