Files
VulnWhisperer/vulnwhisp/frameworks/qualys_vuln.py
pemontto 1ef67d48be Feature error codes (#165)
* Use error codes for failed scans

* Fix indentations

* Fix more indentation

* Continue after failed download

* Add tests for failed scans

* Add more tests

* move definition

* Update nessus.py

This function was used by function `print_scans` which at the same time was an unused one that had been deleted in the PR itself.
2019-04-05 11:36:13 +02:00

121 lines
4.3 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'Nathan Young'
import xml.etree.ElementTree as ET
import logging
import qualysapi
import pandas as pd
import dateutil.parser as dp
class qualysWhisperAPI(object):
SCANS = 'api/2.0/fo/scan'
def __init__(self, config=None):
self.logger = logging.getLogger('qualysWhisperAPI')
self.config = config
try:
self.qgc = qualysapi.connect(config, 'qualys_vuln')
# Fail early if we can't make a request or auth is incorrect
self.qgc.request('about.php')
self.logger.info('Connected to Qualys at {}'.format(self.qgc.server))
except Exception as e:
self.logger.error('Could not connect to Qualys: {}'.format(str(e)))
# FIXME: exit(1) does not exist: either it's exit() or sys.exit(CODE)
exit(1)
def scan_xml_parser(self, xml):
all_records = []
root = ET.XML(xml.encode("utf-8"))
for child in root.find('.//SCAN_LIST'):
all_records.append({
'name': child.find('TITLE').text,
'id': child.find('REF').text,
'date': child.find('LAUNCH_DATETIME').text,
'type': child.find('TYPE').text,
'duration': child.find('DURATION').text,
'status': child.find('.//STATE').text,
})
return pd.DataFrame(all_records)
def get_all_scans(self):
parameters = {
'action': 'list',
'echo_request': 0,
'show_op': 0,
'launched_after_datetime': '0001-01-01'
}
scans_xml = self.qgc.request(self.SCANS, parameters)
return self.scan_xml_parser(scans_xml)
def get_scan_details(self, scan_id=None):
parameters = {
'action': 'fetch',
'echo_request': 0,
'output_format': 'json_extended',
'mode': 'extended',
'scan_ref': scan_id
}
scan_json = self.qgc.request(self.SCANS, parameters)
# First two columns are metadata we already have
# Last column corresponds to "target_distribution_across_scanner_appliances" element
# which doesn't follow the schema and breaks the pandas data manipulation
return pd.read_json(scan_json).iloc[2:-1]
class qualysUtils:
def __init__(self):
self.logger = logging.getLogger('qualysUtils')
def iso_to_epoch(self, dt):
out = dp.parse(dt).strftime('%s')
self.logger.info('Converted {} to {}'.format(dt, out))
return out
class qualysVulnScan:
def __init__(
self,
config=None,
file_in=None,
file_stream=False,
delimiter=',',
quotechar='"',
):
self.logger = logging.getLogger('qualysVulnScan')
self.file_in = file_in
self.file_stream = file_stream
self.report = None
self.utils = qualysUtils()
if config:
try:
self.qw = qualysWhisperAPI(config=config)
except Exception as e:
self.logger.error('Could not load config! Please check settings. Error: {}'.format(str(e)))
if file_stream:
self.open_file = file_in.splitlines()
elif file_in:
self.open_file = open(file_in, 'rb')
self.downloaded_file = None
def process_data(self, scan_id=None):
"""Downloads a file from Qualys and normalizes it"""
self.logger.info('Downloading scan ID: {}'.format(scan_id))
scan_report = self.qw.get_scan_details(scan_id=scan_id)
if not scan_report.empty:
keep_columns = ['category', 'cve_id', 'cvss3_base', 'cvss3_temporal', 'cvss_base', 'cvss_temporal', 'dns', 'exploitability', 'fqdn', 'impact', 'ip', 'ip_status', 'netbios', 'os', 'pci_vuln', 'port', 'protocol', 'qid', 'results', 'severity', 'solution', 'ssl', 'threat', 'title', 'type', 'vendor_reference']
scan_report = scan_report.filter(keep_columns)
scan_report['severity'] = scan_report['severity'].astype(int).astype(str)
scan_report['qid'] = scan_report['qid'].astype(int).astype(str)
else:
self.logger.warn('Scan ID {} has no vulnerabilities, skipping.'.format(scan_id))
return scan_report
return scan_report