Files
VulnWhisperer/vulnwhisp/frameworks/qualys_vm.py
2019-05-10 16:29:07 +01:00

189 lines
6.5 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'Nathan Young'
import logging
import sys
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import dateutil.parser as dp
import pandas as pd
import qualysapi
class qualysWhisperAPI(object):
SCANS = 'api/2.0/fo/scan'
def __init__(self, config=None):
self.logger = logging.getLogger('qualysWhisperAPI')
self.config = config
try:
self.qgc = qualysapi.connect(config, 'qualys_vm')
# Fail early if we can't make a request or auth is incorrect
self.qgc.request('about.php')
self.logger.info('Connected to Qualys at {}'.format(self.qgc.server))
except Exception as e:
self.logger.error('Could not connect to Qualys: {}'.format(str(e)))
sys.exit(1)
def scan_xml_parser(self, xml):
all_records = []
root = ET.XML(xml.encode('utf-8'))
if len(root.find('.//SCAN_LIST')) == 0:
return pd.DataFrame(columns=['id', 'status'])
for child in root.find('.//SCAN_LIST'):
all_records.append({
'name': child.find('TITLE').text,
'id': child.find('REF').text,
'date': child.find('LAUNCH_DATETIME').text,
'type': child.find('TYPE').text,
'duration': child.find('DURATION').text,
'status': child.find('.//STATE').text,
})
return pd.DataFrame(all_records)
def get_all_scans(self, days=None):
if days == None:
self.launched_date = '0001-01-01'
else:
self.launched_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
parameters = {
'action': 'list',
'echo_request': 0,
'show_op': 0,
'state': 'Finished',
'launched_after_datetime': self.launched_date
}
scans_xml = self.qgc.request(self.SCANS, parameters)
return self.scan_xml_parser(scans_xml)
def get_scan_details(self, scan_id=None):
parameters = {
'action': 'fetch',
'echo_request': 0,
'output_format': 'json_extended',
'mode': 'extended',
'scan_ref': scan_id
}
scan_json = self.qgc.request(self.SCANS, parameters)
# First two columns are metadata we already have
# Last column corresponds to "target_distribution_across_scanner_appliances" element
# which doesn't follow the schema and breaks the pandas data manipulation
return pd.read_json(scan_json).iloc[2:-1]
class qualysUtils:
def __init__(self):
self.logger = logging.getLogger('qualysUtils')
def iso_to_epoch(self, dt):
out = dp.parse(dt).strftime('%s')
self.logger.info('Converted {} to {}'.format(dt, out))
return out
class qualysVulnScan:
COLUMN_MAPPING = {
'cve_id': 'cve',
'impact': 'synopsis',
'ip_status': 'state',
'os': 'operating_system',
'qid': 'signature_id',
'results': 'plugin_output',
'threat': 'description',
'title': 'signature'
}
def __init__(
self,
config=None,
file_in=None,
file_stream=False,
delimiter=',',
quotechar='"',
):
self.logger = logging.getLogger('qualysVulnScan')
self.file_in = file_in
self.file_stream = file_stream
self.report = None
self.utils = qualysUtils()
if config:
try:
self.qw = qualysWhisperAPI(config=config)
except Exception as e:
self.logger.error('Could not load config! Please check settings. Error: {}'.format(str(e)))
if file_stream:
self.open_file = file_in.splitlines()
elif file_in:
self.open_file = open(file_in, 'rb')
self.downloaded_file = None
def process_data(self, scan_id=None):
"""Downloads a file from Qualys and normalizes it"""
self.logger.info('Downloading scan ID: {}'.format(scan_id))
scan_report = self.qw.get_scan_details(scan_id=scan_id)
if not scan_report.empty:
keep_columns = ['category', 'cve_id', 'cvss3_base', 'cvss3_temporal', 'cvss_base',
'cvss_temporal', 'dns', 'exploitability', 'fqdn', 'impact', 'ip', 'ip_status',
'netbios', 'os', 'pci_vuln', 'port', 'protocol', 'qid', 'results', 'severity',
'solution', 'ssl', 'threat', 'title', 'type', 'vendor_reference']
scan_report = scan_report.filter(keep_columns)
scan_report['severity'] = scan_report['severity'].astype(int).astype(str)
scan_report['qid'] = scan_report['qid'].astype(int).astype(str)
else:
self.logger.warn('Scan ID {} has no vulnerabilities, skipping.'.format(scan_id))
return scan_report
return scan_report
def normalise(self, df):
self.logger.debug('Normalising data')
df = self.map_fields(df)
df = self.transform_values(df)
return df
def map_fields(self, df):
self.logger.info('Mapping fields')
# Lowercase and map fields from COLUMN_MAPPING
df.columns = [x.lower() for x in df.columns]
df.rename(columns=self.COLUMN_MAPPING, inplace=True)
df.columns = [x.replace(' ', '_') for x in df.columns]
return df
def transform_values(self, df):
self.logger.info('Transforming values')
df.fillna('', inplace=True)
# upper/lowercase fields
self.logger.info('Changing case of fields')
df['cve'] = df['cve'].str.upper()
df['protocol'] = df['protocol'].str.lower()
# Contruct the CVSS vector
self.logger.info('Extracting CVSS components')
df['cvss2_vector'] = df['cvss_base'].str.extract('\((.*)\)', expand=False)
df['cvss2_base'] = df['cvss_base'].str.extract('^(\d+(?:\.\d+)?)', expand=False)
df['cvss2_temporal_vector'] = df['cvss_temporal'].str.extract('\((.*)\)', expand=False)
df['cvss2_temporal'] = df['cvss_temporal'].str.extract('^(\d+(?:\.\d+)?)', expand=False)
df.drop('cvss_base', axis=1, inplace=True, errors='ignore')
df.drop('cvss_temporal', axis=1, inplace=True, errors='ignore')
# Set asset to ip
df['asset'] = df['ip']
# Convert Qualys severity to standardised risk number
df['risk_number'] = df['severity'].astype(int)-1
df.fillna('', inplace=True)
return df