Files
VulnWhisperer/vulnwhisp/frameworks/qualys_web.py
2021-08-03 16:39:24 -05:00

395 lines
15 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from six.moves import range
from functools import reduce
__author__ = 'Austin Taylor'
from lxml import objectify
from lxml.builder import E
import xml.etree.ElementTree as ET
import pandas as pd
import qualysapi
import qualysapi.config as qcconf
import requests
import sys
import os
import csv
import logging
import dateutil.parser as dp
csv.field_size_limit(sys.maxsize)
class qualysWhisperAPI(object):
COUNT_WASSCAN = '/count/was/wasscan'
DELETE_REPORT = '/delete/was/report/{report_id}'
REPORT_STATUS = '/status/was/report/{report_id}'
REPORT_CREATE = '/create/was/report'
REPORT_DOWNLOAD = '/download/was/report/{report_id}'
SEARCH_WAS_SCAN = '/search/was/wasscan'
def __init__(self, config=None):
self.logger = logging.getLogger('qualysWhisperAPI')
self.config = config
try:
self.qgc = qualysapi.connect(config, 'qualys_web')
self.logger.info('Connected to Qualys at {}'.format(self.qgc.server))
except Exception as e:
self.logger.error('Could not connect to Qualys: {}'.format(str(e)))
self.config_parse = qcconf.QualysConnectConfig(config, 'qualys_web')
try:
self.template_id = self.config_parse.get_template_id()
except:
self.logger.error('Could not retrieve template ID')
####
#### GET SCANS TO PROCESS
####
def get_was_scan_count(self, status):
"""
Checks number of scans, used to control the api limits
"""
parameters = (
E.ServiceRequest(
E.filters(
E.Criteria({'field': 'status', 'operator': 'EQUALS'}, status))))
xml_output = self.qgc.request(self.COUNT_WASSCAN, parameters)
root = objectify.fromstring(xml_output.encode('utf-8'))
return root.count.text
def generate_scan_result_XML(self, limit=1000, offset=1, status='FINISHED'):
report_xml = E.ServiceRequest(
E.filters(E.Criteria({'field': 'status', 'operator': 'EQUALS'}, status)),
E.preferences(E.startFromOffset(str(offset)), E.limitResults(str(limit))),
)
return report_xml
def get_scan_info(self, limit=1000, offset=1, status='FINISHED'):
""" Returns XML of ALL WAS Scans"""
data = self.generate_scan_result_XML(limit=limit, offset=offset, status=status)
return self.qgc.request(self.SEARCH_WAS_SCAN, data)
def xml_parser(self, xml, dupfield=None):
all_records = []
root = ET.XML(xml)
for i, child in enumerate(root):
for subchild in child:
record = {}
dup_tracker = 0
for p in subchild:
record[p.tag] = p.text
for o in p:
if o.tag in record:
dup_tracker += 1
record[o.tag + '_%s' % dup_tracker] = o.text
else:
record[o.tag] = o.text
all_records.append(record)
return pd.DataFrame(all_records)
def get_all_scans(self, limit=1000, offset=1, status='FINISHED'):
qualys_api_limit = limit
dataframes = []
_records = []
try:
total = int(self.get_was_scan_count(status=status))
self.logger.error('Already have WAS scan count')
self.logger.info('Retrieving information for {} scans'.format(total))
for i in range(0, total):
if i % limit == 0:
if (total - i) < limit:
qualys_api_limit = total - i
self.logger.info('Making a request with a limit of {} at offset {}'
.format((str(qualys_api_limit)), str(i + 1)))
scan_info = self.get_scan_info(
limit=qualys_api_limit, offset=i + 1, status=status)
_records.append(scan_info)
self.logger.debug('Converting XML to DataFrame')
dataframes = [self.xml_parser(xml) for xml in _records]
except Exception as e:
self.logger.error("Couldn't process all scans: {}".format(e))
return pd.concat(dataframes, axis=0).reset_index().drop('index', axis=1)
####
#### CREATE VULNERABILITY REPORT AND DOWNLOAD IT
####
def get_report_status(self, report_id):
return self.qgc.request(self.REPORT_STATUS.format(report_id=report_id))
def download_report(self, report_id):
return self.qgc.request(
self.REPORT_DOWNLOAD.format(report_id=report_id), http_method='get')
def generate_scan_report_XML(self, scan_id):
"""Generates a CSV report for an asset based on template defined in .ini file"""
report_xml = E.ServiceRequest(
E.data(
E.Report(
E.name('<![CDATA[API Scan Report generated by VulnWhisperer]]>'),
E.description('<![CDATA[CSV Scanning report for VulnWhisperer]]>'),
E.format('CSV'),
#type is not needed, as the template already has it
E.type('WAS_SCAN_REPORT'),
E.template(E.id(self.template_id)),
E.config(E.scanReport(E.target(E.scans(E.WasScan(E.id(scan_id))))))
)
)
)
return report_xml
def create_report(self, report_id, kind='scan'):
mapper = {'scan': self.generate_scan_report_XML}
try:
data = mapper[kind](report_id)
except Exception as e:
self.logger.error('Error creating report: {}'.format(str(e)))
return self.qgc.request(self.REPORT_CREATE, data).encode('utf-8')
def delete_report(self, report_id):
return self.qgc.request(self.DELETE_REPORT.format(report_id=report_id))
class qualysUtils:
def __init__(self):
self.logger = logging.getLogger('qualysUtils')
def grab_section(self, report, section, end=[], pop_last=False):
temp_list = []
max_col_count = 0
with open(report, 'rt') as csvfile:
q_report = csv.reader(csvfile, delimiter=',', quotechar='"')
for line in q_report:
if set(line) == set(section):
break
# Reads text until the end of the block:
for line in q_report: # This keeps reading the file
temp_list.append(line)
if line in end:
break
if pop_last and len(temp_list) > 1:
temp_list.pop(-1)
return temp_list
def iso_to_epoch(self, dt):
return dp.parse(dt).strftime('%s')
def cleanser(self, _data):
repls = (('\n', '|||'), ('\r', '|||'), (',', ';'), ('\t', '|||'))
if _data:
_data = reduce(lambda a, kv: a.replace(*kv), repls, str(_data))
return _data
class qualysScanReport:
CATEGORIES = ['VULNERABILITY', 'SENSITIVECONTENT', 'INFORMATION_GATHERED']
WEB_SCAN_BLOCK = [
"ID", "Detection ID", "QID", "Url", "Param/Cookie", "Function",
"Form Entry Point", "Access Path", "Authentication", "Ajax Request",
"Ajax Request ID", "Ignored", "Ignore Reason", "Ignore Date", "Ignore User",
"Ignore Comments", "Detection Date", "Payload #1", "Request Method #1",
"Request URL #1", "Request Headers #1", "Response #1", "Evidence #1",
"Unique ID", "Flags", "Protocol", "Virtual Host", "IP", "Port", "Result",
"Info#1", "CVSS V3 Base", "CVSS V3 Temporal", "CVSS V3 Attack Vector",
"Request Body #1"
]
WEB_SCAN_VULN_BLOCK = [CATEGORIES[0]] + WEB_SCAN_BLOCK
WEB_SCAN_SENSITIVE_BLOCK = [CATEGORIES[1]] + WEB_SCAN_BLOCK
WEB_SCAN_HEADER = ["Vulnerability Category"] + WEB_SCAN_BLOCK
WEB_SCAN_INFO_BLOCK = [
"INFORMATION_GATHERED", "ID", "Detection ID", "QID", "Results", "Detection Date",
"Unique ID", "Flags", "Protocol", "Virtual Host", "IP", "Port", "Result",
"Info#1"
]
WEB_SCAN_INFO_HEADER = [
"Vulnerability Category", "ID", "Detection ID", "QID", "Results", "Last Time Detected",
"Unique ID", "Flags", "Protocol", "Virtual Host", "IP", "Port", "Result",
"Info#1"
]
QID_HEADER = [
"QID", "Id", "Title", "Category", "Severity Level", "Groups", "OWASP", "WASC",
"CWE", "CVSS Base", "CVSS Temporal", "Description", "Impact", "Solution",
"CVSS V3 Base", "CVSS V3 Temporal", "CVSS V3 Attack Vector"
]
GROUP_HEADER = ['GROUP', 'Name', 'Category']
OWASP_HEADER = ['OWASP', 'Code', 'Name']
WASC_HEADER = ['WASC', 'Code', 'Name']
SCAN_META = [
"Web Application Name", "URL", "Owner", "Scope", "ID", "Tags",
"Custom Attributes"
]
CATEGORY_HEADER = ['Category', 'Severity', 'Level', 'Description']
def __init__(
self,
config=None,
file_in=None,
file_stream=False,
delimiter=',',
quotechar='"',
):
self.logger = logging.getLogger('qualysScanReport')
self.file_in = file_in
self.file_stream = file_stream
self.report = None
self.utils = qualysUtils()
if config:
try:
self.qw = qualysWhisperAPI(config=config)
except Exception as e:
self.logger.error(
'Could not load config! Please check settings. Error: {}'.format(
str(e)))
if file_stream:
self.open_file = file_in.splitlines()
elif file_in:
self.open_file = open(file_in, 'rb')
self.downloaded_file = None
def grab_sections(self, report):
return {
'WEB_SCAN_VULN_BLOCK': pd.DataFrame(
self.utils.grab_section(
report,
self.WEB_SCAN_VULN_BLOCK,
end=[self.WEB_SCAN_SENSITIVE_BLOCK, self.WEB_SCAN_INFO_BLOCK],
pop_last=True),
columns=self.WEB_SCAN_HEADER),
'WEB_SCAN_SENSITIVE_BLOCK': pd.DataFrame(
self.utils.grab_section(report,
self.WEB_SCAN_SENSITIVE_BLOCK,
end=[self.WEB_SCAN_INFO_BLOCK, self.WEB_SCAN_SENSITIVE_BLOCK],
pop_last=True),
columns=self.WEB_SCAN_HEADER),
'WEB_SCAN_INFO_BLOCK': pd.DataFrame(
self.utils.grab_section(
report,
self.WEB_SCAN_INFO_BLOCK,
end=[self.QID_HEADER],
pop_last=True),
columns=self.WEB_SCAN_INFO_HEADER),
'QID_HEADER': pd.DataFrame(
self.utils.grab_section(
report,
self.QID_HEADER,
end=[self.GROUP_HEADER],
pop_last=True),
columns=self.QID_HEADER),
'GROUP_HEADER': pd.DataFrame(
self.utils.grab_section(
report,
self.GROUP_HEADER,
end=[self.OWASP_HEADER],
pop_last=True),
columns=self.GROUP_HEADER),
'OWASP_HEADER': pd.DataFrame(
self.utils.grab_section(
report,
self.OWASP_HEADER,
end=[self.WASC_HEADER],
pop_last=True),
columns=self.OWASP_HEADER),
'WASC_HEADER': pd.DataFrame(
self.utils.grab_section(
report,
self.WASC_HEADER,
end=[['APPENDIX']],
pop_last=True),
columns=self.WASC_HEADER),
'SCAN_META': pd.DataFrame(
self.utils.grab_section(report,
self.SCAN_META,
end=[self.CATEGORY_HEADER],
pop_last=True),
columns=self.SCAN_META),
'CATEGORY_HEADER': pd.DataFrame(
self.utils.grab_section(report,
self.CATEGORY_HEADER),
columns=self.CATEGORY_HEADER)
}
def data_normalizer(self, dataframes):
"""
Merge and clean data
:param dataframes:
:return:
"""
df_dict = dataframes
merged_df = pd.concat([
df_dict['WEB_SCAN_VULN_BLOCK'],
df_dict['WEB_SCAN_SENSITIVE_BLOCK'],
df_dict['WEB_SCAN_INFO_BLOCK']
], axis=0, ignore_index=False)
merged_df = pd.merge(
merged_df,
df_dict['QID_HEADER'].drop(
#these columns always seem to be the same as what we're merging into
['CVSS V3 Attack Vector', 'CVSS V3 Base', 'CVSS V3 Temporal'],
axis=1),
left_on='QID', right_on='Id'
)
if 'Content' not in merged_df:
merged_df['Content'] = ''
columns_to_cleanse = ['Payload #1', 'Request Method #1', 'Request URL #1',
'Request Headers #1', 'Response #1', 'Evidence #1',
'Description', 'Impact', 'Solution', 'Url', 'Content']
for col in columns_to_cleanse:
merged_df[col] = merged_df[col].apply(self.utils.cleanser)
merged_df = merged_df.drop(['QID_y', 'QID_x'], axis=1)
merged_df = merged_df.rename(columns={'Id': 'QID'})
merged_df = merged_df.assign(**df_dict['SCAN_META'].to_dict(orient='records')[0])
merged_df = pd.merge(
merged_df, df_dict['CATEGORY_HEADER'],
how='left', left_on=['Category', 'Severity Level'],
right_on=['Category', 'Severity'], suffixes=('Severity', 'CatSev')
)
merged_df = merged_df.replace('N/A', '').fillna('')
try:
merged_df = \
merged_df[~merged_df.Title.str.contains('Links Crawled|External Links Discovered')]
except Exception as e:
self.logger.error('Error normalizing: {}'.format(str(e)))
return merged_df
def download_file(self, path='', file_id=None):
report = self.qw.download_report(file_id)
filename = path + str(file_id) + '.csv'
file_out = open(filename, 'w')
for line in report.splitlines():
file_out.write(line + '\n')
file_out.close()
self.logger.info('File written to {}'.format(filename))
return filename
def process_data(self, path='', file_id=None, cleanup=True):
"""Downloads a file from qualys and normalizes it"""
download_file = self.download_file(path=path, file_id=file_id)
self.logger.info('Downloading file ID: {}'.format(file_id))
report_data = self.grab_sections(download_file)
merged_data = self.data_normalizer(report_data)
merged_data.sort_index(axis=1, inplace=True)
return merged_data