Files
VulnWhisperer/vulnwhisp/frameworks/qualys.py

430 lines
15 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'Austin Taylor'
from lxml import objectify
from lxml.builder import E
import xml.etree.ElementTree as ET
import pandas as pd
import qualysapi
import qualysapi.config as qcconf
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import sys
import os
import csv
import dateutil.parser as dp
class qualysWhisper(object):
COUNT = '/count/was/webapp'
DELETE_REPORT = '/delete/was/report/{report_id}'
GET_WEBAPP_DETAILS = '/get/was/webapp/{was_id}'
QPS_REST_3 = '/qps/rest/3.0'
REPORT_DETAILS = '/get/was/report/{report_id}'
REPORT_STATUS = '/status/was/report/{report_id}'
REPORT_CREATE = '/create/was/report'
REPORT_DOWNLOAD = '/download/was/report/{report_id}'
SEARCH_REPORTS = '/search/was/report'
SEARCH_WEB_APPS = '/search/was/webapp'
SEARCH_WAS_SCAN = '/search/was/wasscan'
VERSION = '/qps/rest/portal/version'
def __init__(self, config=None):
self.config = config
try:
self.qgc = qualysapi.connect(config)
print('[SUCCESS] - Connected to Qualys at %s' \
% self.qgc.server)
except Exception as e:
print('[ERROR] Could not connect to Qualys - %s' % e)
self.headers = {'content-type': 'text/xml'}
self.config_parse = qcconf.QualysConnectConfig(config)
try:
self.template_id = self.config_parse.get_template_id()
except:
print 'ERROR - Could not retrieve template ID'
sys.exit(2)
def request(
self,
path,
method='get',
data=None,
):
methods = {'get': requests.get, 'post': requests.post}
base = 'https://' + self.qgc.server + path
req = methods[method](base, auth=self.qgc.auth, data=data,
headers=self.headers).content
return req
def get_version(self):
return self.request(self.VERSION)
def get_scan_count(self, scan_name):
parameters = E.ServiceRequest(E.filters(E.Criteria(scan_name,
field='name', operator='CONTAINS')))
xml_output = self.qgc.request(self.COUNT, parameters)
root = objectify.fromstring(xml_output)
return root.count.text
def get_reports(self):
return self.qgc.request(self.SEARCH_REPORTS)
def xml_parser(self, xml, dupfield=None):
all_records = []
root = ET.XML(xml)
for (i, child) in enumerate(root):
for subchild in child:
record = {}
for p in subchild:
record[p.tag] = p.text
for o in p:
if o.tag == 'id':
record[dupfield] = o.text
else:
record[o.tag] = o.text
all_records.append(record)
return pd.DataFrame(all_records)
def get_report_list(self):
"""Returns a dataframe of reports"""
return self.xml_parser(self.get_reports(), dupfield='user_id')
def get_web_apps(self):
"""Returns webapps available for account"""
return self.qgc.request(self.SEARCH_WEB_APPS)
def get_web_app_list(self):
"""Returns dataframe of webapps"""
return self.xml_parser(self.get_web_apps(), dupfield='user_id')
def get_web_app_details(self, was_id):
"""Get webapp details - use to retrieve app ID tag"""
return self.qgc.request(self.GET_WEBAPP_DETAILS.format(was_id=was_id))
def get_scans_by_app_id(self, app_id):
data = self.generate_app_id_scan_XML(app_id)
return self.qgc.request(self.SEARCH_WAS_SCAN, data)
def get_report_details(self, report_id):
return self.qgc.request(self.REPORT_DETAILS.format(report_id=report_id))
def get_report_status(self, report_id):
return self.qgc.request(self.REPORT_STATUS.format(report_id=report_id))
def download_report(self, report_id):
return self.qgc.request(self.REPORT_DOWNLOAD.format(report_id=report_id))
def generate_webapp_report_XML(self, app_id):
"""Generates a CSV report for an asset based on template defined in .ini file"""
report_xml = \
E.ServiceRequest(E.data(E.Report(E.name('![CDATA[API Web Application Report generated by VulnWhisperer]]>'
),
E.description('<![CDATA[CSV WebApp report for VulnWhisperer]]>'
), E.format('CSV'),
E.template(E.id(self.template_id)),
E.config(E.webAppReport(E.target(E.webapps(E.WebApp(E.id(app_id)))))))))
return report_xml
def generate_app_id_scan_XML(self, app_id):
report_xml = \
E.ServiceRequest(E.filters(E.Criteria({'field': 'webApp.id'
, 'operator': 'EQUALS'}, app_id)))
return report_xml
def create_report(self, report_id):
data = self.generate_webapp_report_XML(report_id)
return self.qgc.request(self.REPORT_CREATE.format(report_id=report_id),
data)
def delete_report(self, report_id):
return self.qgc.request(self.DELETE_REPORT.format(report_id=report_id))
class qualysWebAppReport:
CATEGORIES = ['VULNERABILITY', 'SENSITIVE CONTENT',
'INFORMATION GATHERED']
# URL Vulnerability Information
WEB_APP_VULN_BLOCK = [
'Web Application Name',
CATEGORIES[0],
'ID',
'QID',
'Url',
'Param',
'Function',
'Form Entry Point',
'Access Path',
'Authentication',
'Ajax Request',
'Ajax Request ID',
'Status',
'Ignored',
'Ignore Reason',
'Ignore Date',
'Ignore User',
'Ignore Comments',
'First Time Detected',
'Last Time Detected',
'Last Time Tested',
'Times Detected',
'Payload #1',
'Request Method #1',
'Request URL #1',
'Request Headers #1',
'Response #1',
'Evidence #1',
]
WEB_APP_VULN_HEADER = list(WEB_APP_VULN_BLOCK)
WEB_APP_VULN_HEADER[WEB_APP_VULN_BLOCK.index(CATEGORIES[0])] = \
'Vulnerability Category'
WEB_APP_SENSITIVE_HEADER = list(WEB_APP_VULN_HEADER)
WEB_APP_SENSITIVE_HEADER.insert(WEB_APP_SENSITIVE_HEADER.index('Url'
), 'Content')
WEB_APP_SENSITIVE_BLOCK = list(WEB_APP_SENSITIVE_HEADER)
WEB_APP_SENSITIVE_BLOCK[WEB_APP_SENSITIVE_BLOCK.index('Vulnerability Category'
)] = CATEGORIES[1]
WEB_APP_INFO_HEADER = [
'Web Application Name',
'Vulnerability Category',
'ID',
'QID',
'Response #1',
'Last Time Detected',
]
WEB_APP_INFO_BLOCK = [
'Web Application Name',
CATEGORIES[2],
'ID',
'QID',
'Results',
'Detection Date',
]
QID_HEADER = [
'QID',
'Id',
'Title',
'Category',
'Severity Level',
'Groups',
'OWASP',
'WASC',
'CWE',
'CVSS Base',
'CVSS Temporal',
'Description',
'Impact',
'Solution',
]
GROUP_HEADER = ['GROUP', 'Name', 'Category']
OWASP_HEADER = ['OWASP', 'Code', 'Name']
WASC_HEADER = ['WASC', 'Code', 'Name']
CATEGORY_HEADER = ['Category', 'Severity', 'Level', 'Description']
def __init__(
self,
config=None,
file_in=None,
file_stream=False,
delimiter=',',
quotechar='"',
):
self.file_in = file_in
self.file_stream = file_stream
self.report = None
if config:
try:
self.qw = qualysWhisper(config=config)
except Exception as e:
print('Could not load config! Please check settings for %s' \
% e)
if file_stream:
self.open_file = file_in.splitlines()
elif file_in:
self.open_file = open(file_in, 'rb')
self.downloaded_file = None
def get_hostname(self, report):
host = ''
with open(report, 'rb') as csvfile:
q_report = csv.reader(csvfile, delimiter=',', quotechar='"')
for x in q_report:
if 'Web Application Name' in x[0]:
host = q_report.next()[0]
return host
def grab_section(
self,
report,
section,
end=[],
pop_last=False,
):
temp_list = []
max_col_count = 0
with open(report, 'rb') as csvfile:
q_report = csv.reader(csvfile, delimiter=',', quotechar='"')
for line in q_report:
if set(line) == set(section):
break
# Reads text until the end of the block:
for line in q_report: # This keeps reading the file
temp_list.append(line)
if line in end:
break
if pop_last and len(temp_list) > 1:
temp_list.pop(-1)
return temp_list
def iso_to_epoch(self, dt):
return dp.parse(dt).strftime('%s')
def cleanser(self, _data):
repls = (('\n', '|||'), ('\r', '|||'), (',', ';'), ('\t', '|||'
))
if _data:
_data = reduce(lambda a, kv: a.replace(*kv), repls, _data)
return _data
def grab_sections(self, report):
all_dataframes = []
with open(report, 'rb') as csvfile:
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.WEB_APP_VULN_BLOCK,
end=[self.WEB_APP_SENSITIVE_BLOCK,
self.WEB_APP_INFO_BLOCK],
pop_last=True),
columns=self.WEB_APP_VULN_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.WEB_APP_SENSITIVE_BLOCK,
end=[self.WEB_APP_INFO_BLOCK,
self.WEB_APP_SENSITIVE_BLOCK],
pop_last=True),
columns=self.WEB_APP_SENSITIVE_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.WEB_APP_INFO_BLOCK,
end=[self.QID_HEADER],
pop_last=True),
columns=self.WEB_APP_INFO_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.QID_HEADER,
end=[self.GROUP_HEADER],
pop_last=True),
columns=self.QID_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.GROUP_HEADER,
end=[self.OWASP_HEADER],
pop_last=True),
columns=self.GROUP_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.OWASP_HEADER,
end=[self.WASC_HEADER],
pop_last=True),
columns=self.OWASP_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.WASC_HEADER, end=[['APPENDIX']],
pop_last=True),
columns=self.WASC_HEADER))
all_dataframes.append(pd.DataFrame(self.grab_section(report,
self.CATEGORY_HEADER, end=''),
columns=self.CATEGORY_HEADER))
return all_dataframes
def data_normalizer(self, dataframes):
"""
Merge and clean data
:param dataframes:
:return:
"""
merged_df = pd.concat([dataframes[0], dataframes[1],
dataframes[2]], axis=0,
ignore_index=False).fillna('')
merged_df = pd.merge(merged_df, dataframes[3], left_on='QID',
right_on='Id')
if 'Content' not in merged_df:
merged_df['Content'] = ''
columns_to_cleanse = ['Payload #1','Request Method #1','Request URL #1',
'Request Headers #1','Response #1','Evidence #1',
'Description','Impact','Solution','Url','Content']
for col in columns_to_cleanse:
merged_df[col] = merged_df[col].apply(self.cleanser)
merged_df = merged_df.drop(['QID_y', 'QID_x'], axis=1)
merged_df = merged_df.rename(columns={'Id': 'QID'})
try:
merged_df = \
merged_df[~merged_df.Title.str.contains('Links Crawled|External Links Discovered'
)]
except Exception as e:
print(e)
return merged_df
def download_file(self, file_id):
report = self.qw.download_report(file_id)
filename = str(file_id) + '.csv'
file_out = open(filename, 'w')
for line in report.splitlines():
file_out.write(line + '\n')
file_out.close()
print('[ACTION] - File written to %s' % filename)
return filename
def remove_file(self, filename):
os.remove(filename)
def process_data(self, file_id, cleanup=True):
"""Downloads a file from qualys and normalizes it"""
download_file = self.download_file(file_id)
print('[ACTION] - Downloading file ID: %s' % file_id)
report_data = self.grab_sections(download_file)
merged_data = self.data_normalizer(report_data)
# TODO cleanup old data (delete)
return merged_data
maxInt = sys.maxsize
decrement = True
while decrement:
decrement = False
try:
csv.field_size_limit(maxInt)
except OverflowError:
maxInt = int(maxInt/10)
decrement = True