Adding custom version of QualysAPI
This commit is contained in:
10
deps/qualysapi/qualysapi/__init__.py
vendored
Normal file
10
deps/qualysapi/qualysapi/__init__.py
vendored
Normal file
@ -0,0 +1,10 @@
|
||||
# This is the version string assigned to the entire egg post
|
||||
# setup.py install
|
||||
|
||||
# Ownership and Copyright Information.
|
||||
from __future__ import absolute_import
|
||||
__author__ = "Parag Baxi <parag.baxi@gmail.com>"
|
||||
__copyright__ = "Copyright 2011-2013, Parag Baxi"
|
||||
__license__ = "BSD-new"
|
||||
|
||||
from qualysapi.util import connect
|
181
deps/qualysapi/qualysapi/api_actions.py
vendored
Normal file
181
deps/qualysapi/qualysapi/api_actions.py
vendored
Normal file
@ -0,0 +1,181 @@
|
||||
from __future__ import absolute_import
|
||||
from lxml import objectify
|
||||
import qualysapi.api_objects
|
||||
from qualysapi.api_objects import *
|
||||
|
||||
|
||||
class QGActions(object):
|
||||
def getHost(host):
|
||||
call = '/api/2.0/fo/asset/host/'
|
||||
parameters = {'action': 'list', 'ips': host, 'details': 'All'}
|
||||
hostData = objectify.fromstring(self.request(call, parameters)).RESPONSE
|
||||
try:
|
||||
hostData = hostData.HOST_LIST.HOST
|
||||
return Host(hostData.DNS, hostData.ID, hostData.IP, hostData.LAST_VULN_SCAN_DATETIME, hostData.NETBIOS, hostData.OS, hostData.TRACKING_METHOD)
|
||||
except AttributeError:
|
||||
return Host("", "", host, "never", "", "", "")
|
||||
|
||||
def getHostRange(self, start, end):
|
||||
call = '/api/2.0/fo/asset/host/'
|
||||
parameters = {'action': 'list', 'ips': start + '-' + end}
|
||||
hostData = objectify.fromstring(self.request(call, parameters))
|
||||
hostArray = []
|
||||
for host in hostData.RESPONSE.HOST_LIST.HOST:
|
||||
hostArray.append(Host(host.DNS, host.ID, host.IP, host.LAST_VULN_SCAN_DATETIME, host.NETBIOS, host.OS, host.TRACKING_METHOD))
|
||||
|
||||
return hostArray
|
||||
|
||||
def listAssetGroups(self, groupName=''):
|
||||
call = 'asset_group_list.php'
|
||||
if groupName == '':
|
||||
agData = objectify.fromstring(self.request(call))
|
||||
else:
|
||||
agData = objectify.fromstring(self.request(call, 'title=' + groupName)).RESPONSE
|
||||
|
||||
groupsArray = []
|
||||
scanipsArray = []
|
||||
scandnsArray = []
|
||||
scannersArray = []
|
||||
for group in agData.ASSET_GROUP:
|
||||
try:
|
||||
for scanip in group.SCANIPS:
|
||||
scanipsArray.append(scanip.IP)
|
||||
except AttributeError:
|
||||
scanipsArray = [] # No IPs defined to scan.
|
||||
|
||||
try:
|
||||
for scanner in group.SCANNER_APPLIANCES.SCANNER_APPLIANCE:
|
||||
scannersArray.append(scanner.SCANNER_APPLIANCE_NAME)
|
||||
except AttributeError:
|
||||
scannersArray = [] # No scanner appliances defined for this group.
|
||||
|
||||
try:
|
||||
for dnsName in group.SCANDNS:
|
||||
scandnsArray.append(dnsName.DNS)
|
||||
except AttributeError:
|
||||
scandnsArray = [] # No DNS names assigned to group.
|
||||
|
||||
groupsArray.append(AssetGroup(group.BUSINESS_IMPACT, group.ID, group.LAST_UPDATE, scanipsArray, scandnsArray, scannersArray, group.TITLE))
|
||||
|
||||
return groupsArray
|
||||
|
||||
def listReportTemplates(self):
|
||||
call = 'report_template_list.php'
|
||||
rtData = objectify.fromstring(self.request(call))
|
||||
templatesArray = []
|
||||
|
||||
for template in rtData.REPORT_TEMPLATE:
|
||||
templatesArray.append(ReportTemplate(template.GLOBAL, template.ID, template.LAST_UPDATE, template.TEMPLATE_TYPE, template.TITLE, template.TYPE, template.USER))
|
||||
|
||||
return templatesArray
|
||||
|
||||
def listReports(self, id=0):
|
||||
call = '/api/2.0/fo/report'
|
||||
|
||||
if id == 0:
|
||||
parameters = {'action': 'list'}
|
||||
|
||||
repData = objectify.fromstring(self.request(call, parameters)).RESPONSE
|
||||
reportsArray = []
|
||||
|
||||
for report in repData.REPORT_LIST.REPORT:
|
||||
reportsArray.append(Report(report.EXPIRATION_DATETIME, report.ID, report.LAUNCH_DATETIME, report.OUTPUT_FORMAT, report.SIZE, report.STATUS, report.TYPE, report.USER_LOGIN))
|
||||
|
||||
return reportsArray
|
||||
|
||||
else:
|
||||
parameters = {'action': 'list', 'id': id}
|
||||
repData = objectify.fromstring(self.request(call, parameters)).RESPONSE.REPORT_LIST.REPORT
|
||||
return Report(repData.EXPIRATION_DATETIME, repData.ID, repData.LAUNCH_DATETIME, repData.OUTPUT_FORMAT, repData.SIZE, repData.STATUS, repData.TYPE, repData.USER_LOGIN)
|
||||
|
||||
def notScannedSince(self, days):
|
||||
call = '/api/2.0/fo/asset/host/'
|
||||
parameters = {'action': 'list', 'details': 'All'}
|
||||
hostData = objectify.fromstring(self.request(call, parameters))
|
||||
hostArray = []
|
||||
today = datetime.date.today()
|
||||
for host in hostData.RESPONSE.HOST_LIST.HOST:
|
||||
last_scan = str(host.LAST_VULN_SCAN_DATETIME).split('T')[0]
|
||||
last_scan = datetime.date(int(last_scan.split('-')[0]), int(last_scan.split('-')[1]), int(last_scan.split('-')[2]))
|
||||
if (today - last_scan).days >= days:
|
||||
hostArray.append(Host(host.DNS, host.ID, host.IP, host.LAST_VULN_SCAN_DATETIME, host.NETBIOS, host.OS, host.TRACKING_METHOD))
|
||||
|
||||
return hostArray
|
||||
|
||||
def addIP(self, ips, vmpc):
|
||||
# 'ips' parameter accepts comma-separated list of IP addresses.
|
||||
# 'vmpc' parameter accepts 'vm', 'pc', or 'both'. (Vulnerability Managment, Policy Compliance, or both)
|
||||
call = '/api/2.0/fo/asset/ip/'
|
||||
enablevm = 1
|
||||
enablepc = 0
|
||||
if vmpc == 'pc':
|
||||
enablevm = 0
|
||||
enablepc = 1
|
||||
elif vmpc == 'both':
|
||||
enablevm = 1
|
||||
enablepc = 1
|
||||
|
||||
parameters = {'action': 'add', 'ips': ips, 'enable_vm': enablevm, 'enable_pc': enablepc}
|
||||
self.request(call, parameters)
|
||||
|
||||
def listScans(self, launched_after="", state="", target="", type="", user_login=""):
|
||||
# 'launched_after' parameter accepts a date in the format: YYYY-MM-DD
|
||||
# 'state' parameter accepts "Running", "Paused", "Canceled", "Finished", "Error", "Queued", and "Loading".
|
||||
# 'title' parameter accepts a string
|
||||
# 'type' parameter accepts "On-Demand", and "Scheduled".
|
||||
# 'user_login' parameter accepts a user name (string)
|
||||
call = '/api/2.0/fo/scan/'
|
||||
parameters = {'action': 'list', 'show_ags': 1, 'show_op': 1, 'show_status': 1}
|
||||
if launched_after != "":
|
||||
parameters['launched_after_datetime'] = launched_after
|
||||
|
||||
if state != "":
|
||||
parameters['state'] = state
|
||||
|
||||
if target != "":
|
||||
parameters['target'] = target
|
||||
|
||||
if type != "":
|
||||
parameters['type'] = type
|
||||
|
||||
if user_login != "":
|
||||
parameters['user_login'] = user_login
|
||||
|
||||
scanlist = objectify.fromstring(self.request(call, parameters))
|
||||
scanArray = []
|
||||
for scan in scanlist.RESPONSE.SCAN_LIST.SCAN:
|
||||
try:
|
||||
agList = []
|
||||
for ag in scan.ASSET_GROUP_TITLE_LIST.ASSET_GROUP_TITLE:
|
||||
agList.append(ag)
|
||||
except AttributeError:
|
||||
agList = []
|
||||
|
||||
scanArray.append(Scan(agList, scan.DURATION, scan.LAUNCH_DATETIME, scan.OPTION_PROFILE.TITLE, scan.PROCESSED, scan.REF, scan.STATUS, scan.TARGET, scan.TITLE, scan.TYPE, scan.USER_LOGIN))
|
||||
|
||||
return scanArray
|
||||
|
||||
def launchScan(self, title, option_title, iscanner_name, asset_groups="", ip=""):
|
||||
# TODO: Add ability to scan by tag.
|
||||
call = '/api/2.0/fo/scan/'
|
||||
parameters = {'action': 'launch', 'scan_title': title, 'option_title': option_title, 'iscanner_name': iscanner_name, 'ip': ip, 'asset_groups': asset_groups}
|
||||
if ip == "":
|
||||
parameters.pop("ip")
|
||||
|
||||
if asset_groups == "":
|
||||
parameters.pop("asset_groups")
|
||||
|
||||
scan_ref = objectify.fromstring(self.request(call, parameters)).RESPONSE.ITEM_LIST.ITEM[1].VALUE
|
||||
|
||||
call = '/api/2.0/fo/scan/'
|
||||
parameters = {'action': 'list', 'scan_ref': scan_ref, 'show_status': 1, 'show_ags': 1, 'show_op': 1}
|
||||
|
||||
scan = objectify.fromstring(self.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN
|
||||
try:
|
||||
agList = []
|
||||
for ag in scan.ASSET_GROUP_TITLE_LIST.ASSET_GROUP_TITLE:
|
||||
agList.append(ag)
|
||||
except AttributeError:
|
||||
agList = []
|
||||
|
||||
return Scan(agList, scan.DURATION, scan.LAUNCH_DATETIME, scan.OPTION_PROFILE.TITLE, scan.PROCESSED, scan.REF, scan.STATUS, scan.TARGET, scan.TITLE, scan.TYPE, scan.USER_LOGIN)
|
155
deps/qualysapi/qualysapi/api_methods.py
vendored
Normal file
155
deps/qualysapi/qualysapi/api_methods.py
vendored
Normal file
@ -0,0 +1,155 @@
|
||||
from __future__ import absolute_import
|
||||
__author__ = 'pbaxi'
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
api_methods = defaultdict(set)
|
||||
api_methods['1'] = set([
|
||||
'about.php',
|
||||
'action_log_report.php',
|
||||
'asset_data_report.php',
|
||||
'asset_domain.php',
|
||||
'asset_domain_list.php',
|
||||
'asset_group_delete.php',
|
||||
'asset_group_list.php',
|
||||
'asset_ip_list.php',
|
||||
'asset_range_info.php',
|
||||
'asset_search.php',
|
||||
'get_host_info.php',
|
||||
'ignore_vuln.php',
|
||||
'iscanner_list.php',
|
||||
'knowledgebase_download.php',
|
||||
'map-2.php',
|
||||
'map.php',
|
||||
'map_report.php',
|
||||
'map_report_list.php',
|
||||
'password_change.php',
|
||||
'scan.php',
|
||||
'scan_cancel.php',
|
||||
'scan_options.php',
|
||||
'scan_report.php',
|
||||
'scan_report_delete.php',
|
||||
'scan_report_list.php',
|
||||
'scan_running_list.php',
|
||||
'scan_target_history.php',
|
||||
'scheduled_scans.php',
|
||||
'ticket_delete.php',
|
||||
'ticket_edit.php',
|
||||
'ticket_list.php',
|
||||
'ticket_list_deleted.php',
|
||||
'time_zone_code.php',
|
||||
'user.php',
|
||||
'user_list.php',
|
||||
])
|
||||
# API v1 POST methods.
|
||||
api_methods['1 post'] = set([
|
||||
'action_log_report.php',
|
||||
'asset_group.php',
|
||||
'asset_ip.php',
|
||||
'ignore_vuln.php',
|
||||
'knowledgebase_download.php',
|
||||
'map-2.php',
|
||||
'map.php',
|
||||
'password_change.php',
|
||||
'scan.php',
|
||||
'scan_report.php',
|
||||
'scan_target_history.php',
|
||||
'scheduled_scans.php',
|
||||
'ticket_delete.php',
|
||||
'ticket_edit.php',
|
||||
'ticket_list.php',
|
||||
'ticket_list_deleted.php',
|
||||
'user.php',
|
||||
'user_list.php',
|
||||
])
|
||||
# API v2 methods (they're all POST).
|
||||
api_methods['2'] = set([
|
||||
'api/2.0/fo/appliance/',
|
||||
'api/2.0/fo/asset/excluded_ip/',
|
||||
'api/2.0/fo/asset/excluded_ip/history/',
|
||||
'api/2.0/fo/asset/host/',
|
||||
'api/2.0/fo/asset/host/cyberscope/',
|
||||
'api/2.0/fo/asset/host/cyberscope/fdcc/policy/',
|
||||
'api/2.0/fo/asset/host/cyberscope/fdcc/scan/',
|
||||
'api/2.0/fo/asset/host/vm/detection/',
|
||||
'api/2.0/fo/asset/ip/',
|
||||
'api/2.0/fo/asset/ip/v4_v6/',
|
||||
'api/2.0/fo/asset/vhost/',
|
||||
'api/2.0/fo/auth/',
|
||||
# 'api/2.0/fo/auth/{type}/', # Added below.
|
||||
'api/2.0/fo/compliance/',
|
||||
'api/2.0/fo/compliance/control',
|
||||
'api/2.0/fo/compliance/fdcc/policy',
|
||||
'api/2.0/fo/compliance/policy/',
|
||||
'api/2.0/fo/compliance/posture/info/',
|
||||
'api/2.0/fo/compliance/scap/arf/',
|
||||
'api/2.0/fo/knowledge_base/vuln/',
|
||||
'api/2.0/fo/report/',
|
||||
'api/2.0/fo/report/scorecard/',
|
||||
'api/2.0/fo/scan/',
|
||||
'api/2.0/fo/scan/compliance/',
|
||||
'api/2.0/fo/session/',
|
||||
'api/2.0/fo/setup/restricted_ips/',
|
||||
])
|
||||
for auth_type in set([
|
||||
'ibm_db2',
|
||||
'ms_sql',
|
||||
'oracle',
|
||||
'oracle_listener',
|
||||
'snmp',
|
||||
'unix',
|
||||
'windows',
|
||||
]):
|
||||
api_methods['2'].add('api/2.0/fo/auth/%s/' % auth_type)
|
||||
# WAS GET methods when no POST data.
|
||||
api_methods['was no data get'] = set([
|
||||
'count/was/report',
|
||||
'count/was/wasscan',
|
||||
'count/was/wasscanschedule',
|
||||
'count/was/webapp',
|
||||
'download/was/report/',
|
||||
'download/was/wasscan/',
|
||||
])
|
||||
# WAS GET methods.
|
||||
api_methods['was get'] = set([
|
||||
'download/was/report/',
|
||||
'download/was/wasscan/',
|
||||
'get/was/report/',
|
||||
'get/was/wasscan/',
|
||||
'get/was/wasscanschedule/',
|
||||
'get/was/webapp/',
|
||||
'status/was/report/',
|
||||
'status/was/wasscan/',
|
||||
])
|
||||
# Asset Management GET methods.
|
||||
api_methods['am get'] = set([
|
||||
'count/am/asset',
|
||||
'count/am/hostasset',
|
||||
'count/am/tag',
|
||||
'get/am/asset/',
|
||||
'get/am/hostasset/',
|
||||
'get/am/tag/',
|
||||
])
|
||||
# Asset Management v2 GET methods.
|
||||
api_methods['am2 get'] = set([
|
||||
'get/am/asset/',
|
||||
'get/am/hostasset/',
|
||||
'get/am/tag/',
|
||||
'get/am/hostinstancevuln/',
|
||||
'get/am/assetdataconnector/',
|
||||
'get/am/awsassetdataconnector/',
|
||||
'get/am/awsauthrecord/',
|
||||
])
|
||||
# Keep track of methods with ending slashes to autocorrect user when they forgot slash.
|
||||
api_methods_with_trailing_slash = defaultdict(set)
|
||||
for method_group in set(['1', '2', 'was', 'am', 'am2']):
|
||||
for method in api_methods[method_group]:
|
||||
if method[-1] == '/':
|
||||
# Add applicable method with api_version preceding it.
|
||||
# Example:
|
||||
# WAS API has 'get/was/webapp/'.
|
||||
# method_group = 'was get'
|
||||
# method_group.split()[0] = 'was'
|
||||
# Take off slash to match user provided method.
|
||||
# api_methods_with_trailing_slash['was'] contains 'get/was/webapp'
|
||||
api_methods_with_trailing_slash[method_group.split()[0]].add(method[:-1])
|
120
deps/qualysapi/qualysapi/api_objects.py
vendored
Normal file
120
deps/qualysapi/qualysapi/api_objects.py
vendored
Normal file
@ -0,0 +1,120 @@
|
||||
from __future__ import absolute_import
|
||||
import datetime
|
||||
from lxml import objectify
|
||||
|
||||
|
||||
class Host(object):
|
||||
def __init__(self, dns, id, ip, last_scan, netbios, os, tracking_method):
|
||||
self.dns = str(dns)
|
||||
self.id = int(id)
|
||||
self.ip = str(ip)
|
||||
last_scan = str(last_scan).replace('T', ' ').replace('Z', '').split(' ')
|
||||
date = last_scan[0].split('-')
|
||||
time = last_scan[1].split(':')
|
||||
self.last_scan = datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2]))
|
||||
self.netbios = str(netbios)
|
||||
self.os = str(os)
|
||||
self.tracking_method = str(tracking_method)
|
||||
|
||||
|
||||
class AssetGroup(object):
|
||||
def __init__(self, business_impact, id, last_update, scanips, scandns, scanner_appliances, title):
|
||||
self.business_impact = str(business_impact)
|
||||
self.id = int(id)
|
||||
self.last_update = str(last_update)
|
||||
self.scanips = scanips
|
||||
self.scandns = scandns
|
||||
self.scanner_appliances = scanner_appliances
|
||||
self.title = str(title)
|
||||
|
||||
def addAsset(conn, ip):
|
||||
call = '/api/2.0/fo/asset/group/'
|
||||
parameters = {'action': 'edit', 'id': self.id, 'add_ips': ip}
|
||||
conn.request(call, parameters)
|
||||
self.scanips.append(ip)
|
||||
|
||||
def setAssets(conn, ips):
|
||||
call = '/api/2.0/fo/asset/group/'
|
||||
parameters = {'action': 'edit', 'id': self.id, 'set_ips': ips}
|
||||
conn.request(call, parameters)
|
||||
|
||||
|
||||
class ReportTemplate(object):
|
||||
def __init__(self, isGlobal, id, last_update, template_type, title, type, user):
|
||||
self.isGlobal = int(isGlobal)
|
||||
self.id = int(id)
|
||||
self.last_update = str(last_update).replace('T', ' ').replace('Z', '').split(' ')
|
||||
self.template_type = template_type
|
||||
self.title = title
|
||||
self.type = type
|
||||
self.user = user.LOGIN
|
||||
|
||||
|
||||
class Report(object):
|
||||
def __init__(self, expiration_datetime, id, launch_datetime, output_format, size, status, type, user_login):
|
||||
self.expiration_datetime = str(expiration_datetime).replace('T', ' ').replace('Z', '').split(' ')
|
||||
self.id = int(id)
|
||||
self.launch_datetime = str(launch_datetime).replace('T', ' ').replace('Z', '').split(' ')
|
||||
self.output_format = output_format
|
||||
self.size = size
|
||||
self.status = status.STATE
|
||||
self.type = type
|
||||
self.user_login = user_login
|
||||
|
||||
def download(self, conn):
|
||||
call = '/api/2.0/fo/report'
|
||||
parameters = {'action': 'fetch', 'id': self.id}
|
||||
if self.status == 'Finished':
|
||||
return conn.request(call, parameters)
|
||||
|
||||
|
||||
class Scan(object):
|
||||
def __init__(self, assetgroups, duration, launch_datetime, option_profile, processed, ref, status, target, title, type, user_login):
|
||||
self.assetgroups = assetgroups
|
||||
self.duration = str(duration)
|
||||
launch_datetime = str(launch_datetime).replace('T', ' ').replace('Z', '').split(' ')
|
||||
date = launch_datetime[0].split('-')
|
||||
time = launch_datetime[1].split(':')
|
||||
self.launch_datetime = datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2]))
|
||||
self.option_profile = str(option_profile)
|
||||
self.processed = int(processed)
|
||||
self.ref = str(ref)
|
||||
self.status = str(status.STATE)
|
||||
self.target = str(target).split(', ')
|
||||
self.title = str(title)
|
||||
self.type = str(type)
|
||||
self.user_login = str(user_login)
|
||||
|
||||
def cancel(self, conn):
|
||||
cancelled_statuses = ['Cancelled', 'Finished', 'Error']
|
||||
if any(self.status in s for s in cancelled_statuses):
|
||||
raise ValueError("Scan cannot be cancelled because its status is " + self.status)
|
||||
else:
|
||||
call = '/api/2.0/fo/scan/'
|
||||
parameters = {'action': 'cancel', 'scan_ref': self.ref}
|
||||
conn.request(call, parameters)
|
||||
|
||||
parameters = {'action': 'list', 'scan_ref': self.ref, 'show_status': 1}
|
||||
self.status = objectify.fromstring(conn.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN.STATUS.STATE
|
||||
|
||||
def pause(self, conn):
|
||||
if self.status != "Running":
|
||||
raise ValueError("Scan cannot be paused because its status is " + self.status)
|
||||
else:
|
||||
call = '/api/2.0/fo/scan/'
|
||||
parameters = {'action': 'pause', 'scan_ref': self.ref}
|
||||
conn.request(call, parameters)
|
||||
|
||||
parameters = {'action': 'list', 'scan_ref': self.ref, 'show_status': 1}
|
||||
self.status = objectify.fromstring(conn.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN.STATUS.STATE
|
||||
|
||||
def resume(self, conn):
|
||||
if self.status != "Paused":
|
||||
raise ValueError("Scan cannot be resumed because its status is " + self.status)
|
||||
else:
|
||||
call = '/api/2.0/fo/scan/'
|
||||
parameters = {'action': 'resume', 'scan_ref': self.ref}
|
||||
conn.request(call, parameters)
|
||||
|
||||
parameters = {'action': 'list', 'scan_ref': self.ref, 'show_status': 1}
|
||||
self.status = objectify.fromstring(conn.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN.STATUS.STATE
|
218
deps/qualysapi/qualysapi/config.py
vendored
Normal file
218
deps/qualysapi/qualysapi/config.py
vendored
Normal file
@ -0,0 +1,218 @@
|
||||
""" Module providing a single class (QualysConnectConfig) that parses a config
|
||||
file and provides the information required to build QualysGuard sessions.
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
import os
|
||||
import stat
|
||||
import getpass
|
||||
import logging
|
||||
from six.moves import input
|
||||
from six.moves.configparser import *
|
||||
|
||||
import qualysapi.settings as qcs
|
||||
# Setup module level logging.
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# try:
|
||||
# from requests_ntlm import HttpNtlmAuth
|
||||
# except ImportError, e:
|
||||
# logger.warning('Warning: Cannot support NTML authentication.')
|
||||
|
||||
|
||||
__author__ = "Parag Baxi <parag.baxi@gmail.com> & Colin Bell <colin.bell@uwaterloo.ca>"
|
||||
__updated_by__ = "Austin Taylor <vulnWhisperer@austintaylor.io>"
|
||||
__copyright__ = "Copyright 2011-2013, Parag Baxi & University of Waterloo"
|
||||
__license__ = "BSD-new"
|
||||
|
||||
|
||||
class QualysConnectConfig:
|
||||
""" Class to create a ConfigParser and read user/password details
|
||||
from an ini file.
|
||||
"""
|
||||
|
||||
def __init__(self, filename=qcs.default_filename, remember_me=False, remember_me_always=False):
|
||||
|
||||
self._cfgfile = None
|
||||
|
||||
# Prioritize local directory filename.
|
||||
# Check for file existence.
|
||||
if os.path.exists(filename):
|
||||
self._cfgfile = filename
|
||||
elif os.path.exists(os.path.join(os.path.expanduser("~"), filename)):
|
||||
# Set home path for file.
|
||||
self._cfgfile = os.path.join(os.path.expanduser("~"), filename)
|
||||
|
||||
# create ConfigParser to combine defaults and input from config file.
|
||||
self._cfgparse = ConfigParser(qcs.defaults)
|
||||
|
||||
if self._cfgfile:
|
||||
self._cfgfile = os.path.realpath(self._cfgfile)
|
||||
|
||||
mode = stat.S_IMODE(os.stat(self._cfgfile)[stat.ST_MODE])
|
||||
|
||||
# apply bitmask to current mode to check ONLY user access permissions.
|
||||
if (mode & (stat.S_IRWXG | stat.S_IRWXO)) != 0:
|
||||
logging.warning('%s permissions allows more than user access.' % (filename,))
|
||||
|
||||
self._cfgparse.read(self._cfgfile)
|
||||
|
||||
# if 'info' doesn't exist, create the section.
|
||||
if not self._cfgparse.has_section('info'):
|
||||
self._cfgparse.add_section('info')
|
||||
|
||||
# Use default hostname (if one isn't provided).
|
||||
if not self._cfgparse.has_option('info', 'hostname'):
|
||||
if self._cfgparse.has_option('DEFAULT', 'hostname'):
|
||||
hostname = self._cfgparse.get('DEFAULT', 'hostname')
|
||||
self._cfgparse.set('info', 'hostname', hostname)
|
||||
else:
|
||||
raise Exception("No 'hostname' set. QualysConnect does not know who to connect to.")
|
||||
|
||||
# Use default max_retries (if one isn't provided).
|
||||
if not self._cfgparse.has_option('info', 'max_retries'):
|
||||
self.max_retries = qcs.defaults['max_retries']
|
||||
else:
|
||||
self.max_retries = self._cfgparse.get('info', 'max_retries')
|
||||
try:
|
||||
self.max_retries = int(self.max_retries)
|
||||
except Exception:
|
||||
logger.error('Value max_retries must be an integer.')
|
||||
print('Value max_retries must be an integer.')
|
||||
exit(1)
|
||||
self._cfgparse.set('info', 'max_retries', str(self.max_retries))
|
||||
self.max_retries = int(self.max_retries)
|
||||
|
||||
#Get template ID... user will need to set this to pull back CSV reports
|
||||
if not self._cfgparse.has_option('report', 'template_id'):
|
||||
self.report_template_id = qcs.defaults['template_id']
|
||||
else:
|
||||
self.report_template_id = self._cfgparse.get('report', 'template_id')
|
||||
try:
|
||||
self.report_template_id = int(self.report_template_id)
|
||||
except Exception:
|
||||
logger.error('Report Template ID Must be set and be an integer')
|
||||
print('Value template ID must be an integer.')
|
||||
exit(1)
|
||||
self._cfgparse.set('report', 'template_id', str(self.max_retries))
|
||||
self.max_retries = int(self.max_retries)
|
||||
|
||||
# Proxy support
|
||||
proxy_config = proxy_url = proxy_protocol = proxy_port = proxy_username = proxy_password = None
|
||||
# User requires proxy?
|
||||
if self._cfgparse.has_option('proxy', 'proxy_url'):
|
||||
proxy_url = self._cfgparse.get('proxy', 'proxy_url')
|
||||
# Remove protocol prefix from url if included.
|
||||
for prefix in ('http://', 'https://'):
|
||||
if proxy_url.startswith(prefix):
|
||||
proxy_protocol = prefix
|
||||
proxy_url = proxy_url[len(prefix):]
|
||||
# Default proxy protocol is http.
|
||||
if not proxy_protocol:
|
||||
proxy_protocol = 'https://'
|
||||
# Check for proxy port request.
|
||||
if ':' in proxy_url:
|
||||
# Proxy port already specified in url.
|
||||
# Set proxy port.
|
||||
proxy_port = proxy_url[proxy_url.index(':') + 1:]
|
||||
# Remove proxy port from proxy url.
|
||||
proxy_url = proxy_url[:proxy_url.index(':')]
|
||||
if self._cfgparse.has_option('proxy', 'proxy_port'):
|
||||
# Proxy requires specific port.
|
||||
if proxy_port:
|
||||
# Warn that a proxy port was already specified in the url.
|
||||
proxy_port_url = proxy_port
|
||||
proxy_port = self._cfgparse.get('proxy', 'proxy_port')
|
||||
logger.warning('Proxy port from url overwritten by specified proxy_port from config:')
|
||||
logger.warning('%s --> %s' % (proxy_port_url, proxy_port))
|
||||
else:
|
||||
proxy_port = self._cfgparse.get('proxy', 'proxy_port')
|
||||
if not proxy_port:
|
||||
# No proxy port specified.
|
||||
if proxy_protocol == 'http://':
|
||||
# Use default HTTP Proxy port.
|
||||
proxy_port = '8080'
|
||||
else:
|
||||
# Use default HTTPS Proxy port.
|
||||
proxy_port = '443'
|
||||
|
||||
# Check for proxy authentication request.
|
||||
if self._cfgparse.has_option('proxy', 'proxy_username'):
|
||||
# Proxy requires username & password.
|
||||
proxy_username = self._cfgparse.get('proxy', 'proxy_username')
|
||||
proxy_password = self._cfgparse.get('proxy', 'proxy_password')
|
||||
# Not sure if this use case below is valid.
|
||||
# # Support proxy with username and empty password.
|
||||
# try:
|
||||
# proxy_password = self._cfgparse.get('proxy','proxy_password')
|
||||
# except NoOptionError, e:
|
||||
# # Set empty password.
|
||||
# proxy_password = ''
|
||||
# Sample proxy config:f
|
||||
# 'http://user:pass@10.10.1.10:3128'
|
||||
if proxy_url:
|
||||
# Proxy requested.
|
||||
proxy_config = proxy_url
|
||||
if proxy_port:
|
||||
# Proxy port requested.
|
||||
proxy_config += ':' + proxy_port
|
||||
if proxy_username:
|
||||
# Proxy authentication requested.
|
||||
proxy_config = proxy_username + ':' + proxy_password + '@' + proxy_config
|
||||
# Prefix by proxy protocol.
|
||||
proxy_config = proxy_protocol + proxy_config
|
||||
# Set up proxy if applicable.
|
||||
if proxy_config:
|
||||
self.proxies = {'https': proxy_config}
|
||||
else:
|
||||
self.proxies = None
|
||||
|
||||
# ask username (if one doesn't exist)
|
||||
if not self._cfgparse.has_option('info', 'username'):
|
||||
username = input('QualysGuard Username: ')
|
||||
self._cfgparse.set('info', 'username', username)
|
||||
|
||||
# ask password (if one doesn't exist)
|
||||
if not self._cfgparse.has_option('info', 'password'):
|
||||
password = getpass.getpass('QualysGuard Password: ')
|
||||
self._cfgparse.set('info', 'password', password)
|
||||
|
||||
|
||||
|
||||
logging.debug(self._cfgparse.items('info'))
|
||||
|
||||
if remember_me or remember_me_always:
|
||||
# Let's create that config file for next time...
|
||||
# Where to store this?
|
||||
if remember_me:
|
||||
# Store in current working directory.
|
||||
config_path = filename
|
||||
if remember_me_always:
|
||||
# Store in home directory.
|
||||
config_path = os.path.expanduser("~")
|
||||
if not os.path.exists(config_path):
|
||||
# Write file only if it doesn't already exists.
|
||||
# http://stackoverflow.com/questions/5624359/write-file-with-specific-permissions-in-python
|
||||
mode = stat.S_IRUSR | stat.S_IWUSR # This is 0o600 in octal and 384 in decimal.
|
||||
umask_original = os.umask(0)
|
||||
try:
|
||||
config_file = os.fdopen(os.open(config_path, os.O_WRONLY | os.O_CREAT, mode), 'w')
|
||||
finally:
|
||||
os.umask(umask_original)
|
||||
# Add the settings to the structure of the file, and lets write it out...
|
||||
self._cfgparse.write(config_file)
|
||||
config_file.close()
|
||||
|
||||
def get_config_filename(self):
|
||||
return self._cfgfile
|
||||
|
||||
def get_config(self):
|
||||
return self._cfgparse
|
||||
|
||||
def get_auth(self):
|
||||
''' Returns username from the configfile. '''
|
||||
return (self._cfgparse.get('info', 'username'), self._cfgparse.get('info', 'password'))
|
||||
|
||||
def get_hostname(self):
|
||||
''' Returns hostname. '''
|
||||
return self._cfgparse.get('info', 'hostname')
|
357
deps/qualysapi/qualysapi/connector.py
vendored
Normal file
357
deps/qualysapi/qualysapi/connector.py
vendored
Normal file
@ -0,0 +1,357 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
|
||||
__copyright__ = 'Copyright 2013, Parag Baxi'
|
||||
__license__ = 'Apache License 2.0'
|
||||
|
||||
""" Module that contains classes for setting up connections to QualysGuard API
|
||||
and requesting data from it.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
import urllib.parse
|
||||
from collections import defaultdict
|
||||
|
||||
import requests
|
||||
|
||||
import qualysapi.version
|
||||
import qualysapi.api_methods
|
||||
|
||||
import qualysapi.api_actions
|
||||
import qualysapi.api_actions as api_actions
|
||||
|
||||
# Setup module level logging.
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
from lxml import etree
|
||||
except ImportError as e:
|
||||
logger.warning(
|
||||
'Warning: Cannot consume lxml.builder E objects without lxml. Send XML strings for AM & WAS API calls.')
|
||||
|
||||
|
||||
class QGConnector(api_actions.QGActions):
|
||||
""" Qualys Connection class which allows requests to the QualysGuard API using HTTP-Basic Authentication (over SSL).
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, auth, server='qualysapi.qualys.com', proxies=None, max_retries=3):
|
||||
# Read username & password from file, if possible.
|
||||
self.auth = auth
|
||||
# Remember QualysGuard API server.
|
||||
self.server = server
|
||||
# Remember rate limits per call.
|
||||
self.rate_limit_remaining = defaultdict(int)
|
||||
# api_methods: Define method algorithm in a dict of set.
|
||||
# Naming convention: api_methods[api_version optional_blah] due to api_methods_with_trailing_slash testing.
|
||||
self.api_methods = qualysapi.api_methods.api_methods
|
||||
#
|
||||
# Keep track of methods with ending slashes to autocorrect user when they forgot slash.
|
||||
self.api_methods_with_trailing_slash = qualysapi.api_methods.api_methods_with_trailing_slash
|
||||
self.proxies = proxies
|
||||
logger.debug('proxies = \n%s' % proxies)
|
||||
# Set up requests max_retries.
|
||||
logger.debug('max_retries = \n%s' % max_retries)
|
||||
self.session = requests.Session()
|
||||
http_max_retries = requests.adapters.HTTPAdapter(max_retries=max_retries)
|
||||
https_max_retries = requests.adapters.HTTPAdapter(max_retries=max_retries)
|
||||
self.session.mount('http://', http_max_retries)
|
||||
self.session.mount('https://', https_max_retries)
|
||||
|
||||
def __call__(self):
|
||||
return self
|
||||
|
||||
def format_api_version(self, api_version):
|
||||
""" Return QualysGuard API version for api_version specified.
|
||||
|
||||
"""
|
||||
# Convert to int.
|
||||
if type(api_version) == str:
|
||||
api_version = api_version.lower()
|
||||
if api_version[0] == 'v' and api_version[1].isdigit():
|
||||
# Remove first 'v' in case the user typed 'v1' or 'v2', etc.
|
||||
api_version = api_version[1:]
|
||||
# Check for input matching Qualys modules.
|
||||
if api_version in ('asset management', 'assets', 'tag', 'tagging', 'tags'):
|
||||
# Convert to Asset Management API.
|
||||
api_version = 'am'
|
||||
elif api_version in ('am2'):
|
||||
# Convert to Asset Management API v2
|
||||
api_version = 'am2'
|
||||
elif api_version in ('webapp', 'web application scanning', 'webapp scanning'):
|
||||
# Convert to WAS API.
|
||||
api_version = 'was'
|
||||
elif api_version in ('pol', 'pc'):
|
||||
# Convert PC module to API number 2.
|
||||
api_version = 2
|
||||
else:
|
||||
api_version = int(api_version)
|
||||
return api_version
|
||||
|
||||
def which_api_version(self, api_call):
|
||||
""" Return QualysGuard API version for api_call specified.
|
||||
|
||||
"""
|
||||
# Leverage patterns of calls to API methods.
|
||||
if api_call.endswith('.php'):
|
||||
# API v1.
|
||||
return 1
|
||||
elif api_call.startswith('api/2.0/'):
|
||||
# API v2.
|
||||
return 2
|
||||
elif '/am/' in api_call:
|
||||
# Asset Management API.
|
||||
return 'am'
|
||||
elif '/was/' in api_call:
|
||||
# WAS API.
|
||||
return 'was'
|
||||
return False
|
||||
|
||||
def url_api_version(self, api_version):
|
||||
""" Return base API url string for the QualysGuard api_version and server.
|
||||
|
||||
"""
|
||||
# Set base url depending on API version.
|
||||
if api_version == 1:
|
||||
# QualysGuard API v1 url.
|
||||
url = "https://%s/msp/" % (self.server,)
|
||||
elif api_version == 2:
|
||||
# QualysGuard API v2 url.
|
||||
url = "https://%s/" % (self.server,)
|
||||
elif api_version == 'was':
|
||||
# QualysGuard REST v3 API url (Portal API).
|
||||
url = "https://%s/qps/rest/3.0/" % (self.server,)
|
||||
elif api_version == 'am':
|
||||
# QualysGuard REST v1 API url (Portal API).
|
||||
url = "https://%s/qps/rest/1.0/" % (self.server,)
|
||||
elif api_version == 'am2':
|
||||
# QualysGuard REST v1 API url (Portal API).
|
||||
url = "https://%s/qps/rest/2.0/" % (self.server,)
|
||||
else:
|
||||
raise Exception("Unknown QualysGuard API Version Number (%s)" % (api_version,))
|
||||
logger.debug("Base url =\n%s" % (url))
|
||||
return url
|
||||
|
||||
def format_http_method(self, api_version, api_call, data):
|
||||
""" Return QualysGuard API http method, with POST preferred..
|
||||
|
||||
"""
|
||||
# Define get methods for automatic http request methodology.
|
||||
#
|
||||
# All API v2 requests are POST methods.
|
||||
if api_version == 2:
|
||||
return 'post'
|
||||
elif api_version == 1:
|
||||
if api_call in self.api_methods['1 post']:
|
||||
return 'post'
|
||||
else:
|
||||
return 'get'
|
||||
elif api_version == 'was':
|
||||
# WAS API call.
|
||||
# Because WAS API enables user to GET API resources in URI, let's chop off the resource.
|
||||
# '/download/was/report/18823' --> '/download/was/report/'
|
||||
api_call_endpoint = api_call[:api_call.rfind('/') + 1]
|
||||
if api_call_endpoint in self.api_methods['was get']:
|
||||
return 'get'
|
||||
# Post calls with no payload will result in HTTPError: 415 Client Error: Unsupported Media Type.
|
||||
if not data:
|
||||
# No post data. Some calls change to GET with no post data.
|
||||
if api_call_endpoint in self.api_methods['was no data get']:
|
||||
return 'get'
|
||||
else:
|
||||
return 'post'
|
||||
else:
|
||||
# Call with post data.
|
||||
return 'post'
|
||||
else:
|
||||
# Asset Management API call.
|
||||
if api_call in self.api_methods['am get']:
|
||||
return 'get'
|
||||
else:
|
||||
return 'post'
|
||||
|
||||
def preformat_call(self, api_call):
|
||||
""" Return properly formatted QualysGuard API call.
|
||||
|
||||
"""
|
||||
# Remove possible starting slashes or trailing question marks in call.
|
||||
api_call_formatted = api_call.lstrip('/')
|
||||
api_call_formatted = api_call_formatted.rstrip('?')
|
||||
if api_call != api_call_formatted:
|
||||
# Show difference
|
||||
logger.debug('api_call post strip =\n%s' % api_call_formatted)
|
||||
return api_call_formatted
|
||||
|
||||
def format_call(self, api_version, api_call):
|
||||
""" Return properly formatted QualysGuard API call according to api_version etiquette.
|
||||
|
||||
"""
|
||||
# Remove possible starting slashes or trailing question marks in call.
|
||||
api_call = api_call.lstrip('/')
|
||||
api_call = api_call.rstrip('?')
|
||||
logger.debug('api_call post strip =\n%s' % api_call)
|
||||
# Make sure call always ends in slash for API v2 calls.
|
||||
if (api_version == 2 and api_call[-1] != '/'):
|
||||
# Add slash.
|
||||
logger.debug('Adding "/" to api_call.')
|
||||
api_call += '/'
|
||||
if api_call in self.api_methods_with_trailing_slash[api_version]:
|
||||
# Add slash.
|
||||
logger.debug('Adding "/" to api_call.')
|
||||
api_call += '/'
|
||||
return api_call
|
||||
|
||||
def format_payload(self, api_version, data):
|
||||
""" Return appropriate QualysGuard API call.
|
||||
|
||||
"""
|
||||
# Check if payload is for API v1 or API v2.
|
||||
if (api_version in (1, 2)):
|
||||
# Check if string type.
|
||||
if type(data) == str:
|
||||
# Convert to dictionary.
|
||||
logger.debug('Converting string to dict:\n%s' % data)
|
||||
# Remove possible starting question mark & ending ampersands.
|
||||
data = data.lstrip('?')
|
||||
data = data.rstrip('&')
|
||||
# Convert to dictionary.
|
||||
data = urllib.parse.parse_qs(data)
|
||||
logger.debug('Converted:\n%s' % str(data))
|
||||
elif api_version in ('am', 'was', 'am2'):
|
||||
if type(data) == etree._Element:
|
||||
logger.debug('Converting lxml.builder.E to string')
|
||||
data = etree.tostring(data)
|
||||
logger.debug('Converted:\n%s' % data)
|
||||
return data
|
||||
|
||||
def request(self, api_call, data=None, api_version=None, http_method=None, concurrent_scans_retries=0,
|
||||
concurrent_scans_retry_delay=0):
|
||||
""" Return QualysGuard API response.
|
||||
|
||||
"""
|
||||
logger.debug('api_call =\n%s' % api_call)
|
||||
logger.debug('api_version =\n%s' % api_version)
|
||||
logger.debug('data %s =\n %s' % (type(data), str(data)))
|
||||
logger.debug('http_method =\n%s' % http_method)
|
||||
logger.debug('concurrent_scans_retries =\n%s' % str(concurrent_scans_retries))
|
||||
logger.debug('concurrent_scans_retry_delay =\n%s' % str(concurrent_scans_retry_delay))
|
||||
concurrent_scans_retries = int(concurrent_scans_retries)
|
||||
concurrent_scans_retry_delay = int(concurrent_scans_retry_delay)
|
||||
#
|
||||
# Determine API version.
|
||||
# Preformat call.
|
||||
api_call = self.preformat_call(api_call)
|
||||
if api_version:
|
||||
# API version specified, format API version inputted.
|
||||
api_version = self.format_api_version(api_version)
|
||||
else:
|
||||
# API version not specified, determine automatically.
|
||||
api_version = self.which_api_version(api_call)
|
||||
#
|
||||
# Set up base url.
|
||||
url = self.url_api_version(api_version)
|
||||
#
|
||||
# Set up headers.
|
||||
headers = {"X-Requested-With": "Parag Baxi QualysAPI (python) v%s" % (qualysapi.version.__version__,)}
|
||||
logger.debug('headers =\n%s' % (str(headers)))
|
||||
# Portal API takes in XML text, requiring custom header.
|
||||
if api_version in ('am', 'was', 'am2'):
|
||||
headers['Content-type'] = 'text/xml'
|
||||
#
|
||||
# Set up http request method, if not specified.
|
||||
if not http_method:
|
||||
http_method = self.format_http_method(api_version, api_call, data)
|
||||
logger.debug('http_method =\n%s' % http_method)
|
||||
#
|
||||
# Format API call.
|
||||
api_call = self.format_call(api_version, api_call)
|
||||
logger.debug('api_call =\n%s' % (api_call))
|
||||
# Append api_call to url.
|
||||
url += api_call
|
||||
#
|
||||
# Format data, if applicable.
|
||||
if data is not None:
|
||||
data = self.format_payload(api_version, data)
|
||||
# Make request at least once (more if concurrent_retry is enabled).
|
||||
retries = 0
|
||||
#
|
||||
# set a warning threshold for the rate limit
|
||||
rate_warn_threshold = 10
|
||||
while retries <= concurrent_scans_retries:
|
||||
# Make request.
|
||||
logger.debug('url =\n%s' % (str(url)))
|
||||
logger.debug('data =\n%s' % (str(data)))
|
||||
logger.debug('headers =\n%s' % (str(headers)))
|
||||
if http_method == 'get':
|
||||
# GET
|
||||
logger.debug('GET request.')
|
||||
request = self.session.get(url, params=data, auth=self.auth, headers=headers, proxies=self.proxies)
|
||||
else:
|
||||
# POST
|
||||
logger.debug('POST request.')
|
||||
# Make POST request.
|
||||
request = self.session.post(url, data=data, auth=self.auth, headers=headers, proxies=self.proxies)
|
||||
logger.debug('response headers =\n%s' % (str(request.headers)))
|
||||
#
|
||||
# Remember how many times left user can make against api_call.
|
||||
try:
|
||||
self.rate_limit_remaining[api_call] = int(request.headers['x-ratelimit-remaining'])
|
||||
logger.debug('rate limit for api_call, %s = %s' % (api_call, self.rate_limit_remaining[api_call]))
|
||||
if (self.rate_limit_remaining[api_call] > rate_warn_threshold):
|
||||
logger.debug('rate limit for api_call, %s = %s' % (api_call, self.rate_limit_remaining[api_call]))
|
||||
elif (self.rate_limit_remaining[api_call] <= rate_warn_threshold) and (self.rate_limit_remaining[api_call] > 0):
|
||||
logger.warning('Rate limit is about to being reached (remaining api calls = %s)' % self.rate_limit_remaining[api_call])
|
||||
elif self.rate_limit_remaining[api_call] <= 0:
|
||||
logger.critical('ATTENTION! RATE LIMIT HAS BEEN REACHED (remaining api calls = %s)!' % self.rate_limit_remaining[api_call])
|
||||
except KeyError as e:
|
||||
# Likely a bad api_call.
|
||||
logger.debug(e)
|
||||
pass
|
||||
except TypeError as e:
|
||||
# Likely an asset search api_call.
|
||||
logger.debug(e)
|
||||
pass
|
||||
# Response received.
|
||||
response = str(request.content)
|
||||
logger.debug('response text =\n%s' % (response))
|
||||
# Keep track of how many retries.
|
||||
retries += 1
|
||||
# Check for concurrent scans limit.
|
||||
if not ('<responseCode>INVALID_REQUEST</responseCode>' in response and
|
||||
'<errorMessage>You have reached the maximum number of concurrent running scans' in response and
|
||||
'<errorResolution>Please wait until your previous scans have completed</errorResolution>' in response):
|
||||
# Did not hit concurrent scan limit.
|
||||
break
|
||||
else:
|
||||
# Hit concurrent scan limit.
|
||||
logger.critical(response)
|
||||
# If trying again, delay next try by concurrent_scans_retry_delay.
|
||||
if retries <= concurrent_scans_retries:
|
||||
logger.warning('Waiting %d seconds until next try.' % concurrent_scans_retry_delay)
|
||||
time.sleep(concurrent_scans_retry_delay)
|
||||
# Inform user of how many retries.
|
||||
logger.critical('Retry #%d' % retries)
|
||||
else:
|
||||
# Ran out of retries. Let user know.
|
||||
print('Alert! Ran out of concurrent_scans_retries!')
|
||||
logger.critical('Alert! Ran out of concurrent_scans_retries!')
|
||||
return False
|
||||
# Check to see if there was an error.
|
||||
try:
|
||||
request.raise_for_status()
|
||||
except requests.HTTPError as e:
|
||||
# Error
|
||||
print('Error! Received a 4XX client error or 5XX server error response.')
|
||||
print('Content = \n', response)
|
||||
logger.error('Content = \n%s' % response)
|
||||
print('Headers = \n', request.headers)
|
||||
logger.error('Headers = \n%s' % str(request.headers))
|
||||
request.raise_for_status()
|
||||
if '<RETURN status="FAILED" number="2007">' in response:
|
||||
print('Error! Your IP address is not in the list of secure IPs. Manager must include this IP (QualysGuard VM > Users > Security).')
|
||||
print('Content = \n', response)
|
||||
logger.error('Content = \n%s' % response)
|
||||
print('Headers = \n', request.headers)
|
||||
logger.error('Headers = \n%s' % str(request.headers))
|
||||
return False
|
||||
return response
|
290
deps/qualysapi/qualysapi/contrib.py
vendored
Normal file
290
deps/qualysapi/qualysapi/contrib.py
vendored
Normal file
@ -0,0 +1,290 @@
|
||||
# File for 3rd party contributions.
|
||||
|
||||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
import six
|
||||
from six.moves import range
|
||||
|
||||
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
|
||||
__license__ = 'Apache License 2.0'
|
||||
|
||||
import logging
|
||||
import time
|
||||
import types
|
||||
import unicodedata
|
||||
from collections import defaultdict
|
||||
|
||||
from lxml import etree, objectify
|
||||
|
||||
|
||||
# Set module level logger.
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_vm_report(self, report_details, startup_delay=60, polling_delay=30, max_checks=10):
|
||||
''' Spool and download QualysGuard VM report.
|
||||
|
||||
startup_delay: Time in seconds to wait before initially checking.
|
||||
polling_delay: Time in seconds to wait between checks.
|
||||
max_checks: Maximum number of times to check for report spooling completion.
|
||||
|
||||
'''
|
||||
# Merge parameters.
|
||||
report_details['action'] = 'launch'
|
||||
logger.debug(report_details)
|
||||
xml_output = qualysapi_instance.request(2, 'report', report_details)
|
||||
report_id = etree.XML(xml_output).find('.//VALUE').text
|
||||
logger.debug('report_id: %s' % (report_id))
|
||||
# Wait for report to finish spooling.
|
||||
# Maximum number of times to check for report. About 10 minutes.
|
||||
MAX_CHECKS = 10
|
||||
logger.info('Report sent to spooler. Checking for report in %s seconds.' % (startup_delay))
|
||||
time.sleep(startup_delay)
|
||||
for n in range(0, max_checks):
|
||||
# Check to see if report is done.
|
||||
xml_output = qualysapi_instance.request(2, 'report', {'action': 'list', 'id': report_id})
|
||||
tag_status = etree.XML(xml_output).findtext(".//STATE")
|
||||
logger.debug('tag_status: %s' % (tag_status))
|
||||
tag_status = etree.XML(xml_output).findtext(".//STATE")
|
||||
logger.debug('tag_status: %s' % (tag_status))
|
||||
if tag_status is not None:
|
||||
# Report is showing up in the Report Center.
|
||||
if tag_status == 'Finished':
|
||||
# Report creation complete.
|
||||
break
|
||||
# Report not finished, wait.
|
||||
logger.info('Report still spooling. Trying again in %s seconds.' % (polling_delay))
|
||||
time.sleep(polling_delay)
|
||||
# We now have to fetch the report. Use the report id.
|
||||
report_xml = qualysapi_instance.request(2, 'report', {'action': 'fetch', 'id': report_id})
|
||||
return report_xml
|
||||
|
||||
|
||||
def qg_html_to_ascii(qg_html_text):
|
||||
"""Convert and return QualysGuard's quasi HTML text to ASCII text."""
|
||||
text = qg_html_text
|
||||
# Handle tagged line breaks (<p>, <br>)
|
||||
text = re.sub(r'(?i)<br>[ ]*', '\n', text)
|
||||
text = re.sub(r'(?i)<p>[ ]*', '\n', text)
|
||||
# Remove consecutive line breaks
|
||||
text = re.sub(r"^\s+", "", text, flags=re.MULTILINE)
|
||||
# Remove empty lines at the end.
|
||||
text = re.sub('[\n]+$', '$', text)
|
||||
# Store anchor tags href attribute
|
||||
links = list(lxml.html.iterlinks(text))
|
||||
# Remove anchor tags
|
||||
html_element = lxml.html.fromstring(text)
|
||||
# Convert anchor tags to "link_text (link: link_url )".
|
||||
logging.debug('Converting anchor tags...')
|
||||
text = html_element.text_content().encode('ascii', 'ignore')
|
||||
# Convert each link.
|
||||
for l in links:
|
||||
# Find and replace each link.
|
||||
link_text = l[0].text_content().encode('ascii', 'ignore').strip()
|
||||
link_url = l[2].strip()
|
||||
# Replacing link_text
|
||||
if link_text != link_url:
|
||||
# Link text is different, most likely a description.
|
||||
text = string.replace(text, link_text, '%s (link: %s )' % (link_text, link_url))
|
||||
else:
|
||||
# Link text is the same as the href. No need to duplicate link.
|
||||
text = string.replace(text, link_text, '%s' % (link_url))
|
||||
logging.debug('Done.')
|
||||
return text
|
||||
|
||||
|
||||
def qg_parse_informational_qids(xml_report):
|
||||
"""Return vulnerabilities of severity 1 and 2 levels due to a restriction of
|
||||
QualysGuard's inability to report them in the internal ticketing system.
|
||||
"""
|
||||
# asset_group's vulnerability data map:
|
||||
# {'qid_number': {
|
||||
# # CSV info
|
||||
# 'hosts': [{'ip': '10.28.0.1', 'dns': 'hostname', 'netbios': 'blah', 'vuln_id': 'remediation_ticket_number'}, {'ip': '10.28.0.3', 'dns': 'hostname2', 'netbios': '', 'vuln_id': 'remediation_ticket_number'}, ...],
|
||||
# 'solution': '',
|
||||
# 'impact': '',
|
||||
# 'threat': '',
|
||||
# 'severity': '',
|
||||
# }
|
||||
# 'qid_number2': ...
|
||||
# }
|
||||
# Add all vulnerabilities to list of dictionaries.
|
||||
# Use defaultdict in case a new QID is encountered.
|
||||
info_vulns = defaultdict(dict)
|
||||
# Parse vulnerabilities in xml string.
|
||||
tree = objectify.fromstring(xml_report)
|
||||
# Write IP, DNS, & Result into each QID CSV file.
|
||||
logging.debug('Parsing report...')
|
||||
# TODO: Check against c_args.max to prevent creating CSV content for QIDs that we won't use.
|
||||
for host in tree.HOST_LIST.HOST:
|
||||
# Extract possible extra hostname information.
|
||||
try:
|
||||
netbios = unicodedata.normalize('NFKD', six.text_type(host.NETBIOS)).encode('ascii', 'ignore').strip()
|
||||
except AttributeError:
|
||||
netbios = ''
|
||||
try:
|
||||
dns = unicodedata.normalize('NFKD', six.text_type(host.DNS)).encode('ascii', 'ignore').strip()
|
||||
except AttributeError:
|
||||
dns = ''
|
||||
ip = unicodedata.normalize('NFKD', six.text_type(host.IP)).encode('ascii', 'ignore').strip()
|
||||
# Extract vulnerabilities host is affected by.
|
||||
for vuln in host.VULN_INFO_LIST.VULN_INFO:
|
||||
try:
|
||||
result = unicodedata.normalize('NFKD', six.text_type(vuln.RESULT)).encode('ascii', 'ignore').strip()
|
||||
except AttributeError:
|
||||
result = ''
|
||||
qid = unicodedata.normalize('NFKD', six.text_type(vuln.QID)).encode('ascii', 'ignore').strip()
|
||||
# Attempt to add host to QID's list of affected hosts.
|
||||
try:
|
||||
info_vulns[qid]['hosts'].append({'ip': '%s' % (ip),
|
||||
'dns': '%s' % (dns),
|
||||
'netbios': '%s' % (netbios),
|
||||
'vuln_id': '',
|
||||
# Informational QIDs do not have vuln_id numbers. This is a flag to write the CSV file.
|
||||
'result': '%s' % (result), })
|
||||
except KeyError:
|
||||
# New QID.
|
||||
logging.debug('New QID found: %s' % (qid))
|
||||
info_vulns[qid]['hosts'] = []
|
||||
info_vulns[qid]['hosts'].append({'ip': '%s' % (ip),
|
||||
'dns': '%s' % (dns),
|
||||
'netbios': '%s' % (netbios),
|
||||
'vuln_id': '',
|
||||
# Informational QIDs do not have vuln_id numbers. This is a flag to write the CSV file.
|
||||
'result': '%s' % (result), })
|
||||
# All vulnerabilities added.
|
||||
# Add all vulnerabilty information.
|
||||
for vuln_details in tree.GLOSSARY.VULN_DETAILS_LIST.VULN_DETAILS:
|
||||
qid = unicodedata.normalize('NFKD', six.text_type(vuln_details.QID)).encode('ascii', 'ignore').strip()
|
||||
info_vulns[qid]['title'] = unicodedata.normalize('NFKD', six.text_type(vuln_details.TITLE)).encode('ascii',
|
||||
'ignore').strip()
|
||||
info_vulns[qid]['severity'] = unicodedata.normalize('NFKD', six.text_type(vuln_details.SEVERITY)).encode('ascii',
|
||||
'ignore').strip()
|
||||
info_vulns[qid]['solution'] = qg_html_to_ascii(
|
||||
unicodedata.normalize('NFKD', six.text_type(vuln_details.SOLUTION)).encode('ascii', 'ignore').strip())
|
||||
info_vulns[qid]['threat'] = qg_html_to_ascii(
|
||||
unicodedata.normalize('NFKD', six.text_type(vuln_details.THREAT)).encode('ascii', 'ignore').strip())
|
||||
info_vulns[qid]['impact'] = qg_html_to_ascii(
|
||||
unicodedata.normalize('NFKD', six.text_type(vuln_details.IMPACT)).encode('ascii', 'ignore').strip())
|
||||
# Ready to report informational vulnerabilities.
|
||||
return info_vulns
|
||||
|
||||
|
||||
# TODO: Implement required function qg_remediation_tickets(asset_group, status, qids)
|
||||
# TODO: Remove static 'report_template' value. Parameterize and document required report template.
|
||||
def qg_ticket_list(asset_group, severity, qids=None):
|
||||
"""Return dictionary of each vulnerability reported against asset_group of severity."""
|
||||
global asset_group_details
|
||||
# All vulnerabilities imported to list of dictionaries.
|
||||
vulns = qg_remediation_tickets(asset_group, 'OPEN', qids) # vulns now holds all open remediation tickets.
|
||||
if not vulns:
|
||||
# No tickets to report.
|
||||
return False
|
||||
#
|
||||
# Sort the vulnerabilities in order of prevalence -- number of hosts affected.
|
||||
vulns = OrderedDict(sorted(list(vulns.items()), key=lambda t: len(t[1]['hosts'])))
|
||||
logging.debug('vulns sorted = %s' % (vulns))
|
||||
#
|
||||
# Remove QIDs that have duplicate patches.
|
||||
#
|
||||
# Read in patch report.
|
||||
# TODO: Allow for lookup of report_template.
|
||||
# Report template is Patch report "Sev 5 confirmed patchable".
|
||||
logging.debug('Retrieving patch report from QualysGuard.')
|
||||
print('Retrieving patch report from QualysGuard.')
|
||||
report_template = '1063695'
|
||||
# Call QualysGuard for patch report.
|
||||
csv_output = qg_command(2, 'report', {'action': 'launch', 'output_format': 'csv',
|
||||
'asset_group_ids': asset_group_details['qg_asset_group_id'],
|
||||
'template_id': report_template,
|
||||
'report_title': 'QGIR Patch %s' % (asset_group)})
|
||||
logging.debug('csv_output =')
|
||||
logging.debug(csv_output)
|
||||
logging.debug('Improving remediation efficiency by removing unneeded, redundant patches.')
|
||||
print('Improving remediation efficiency by removing unneeded, redundant patches.')
|
||||
# Find the line for Patches by Host data.
|
||||
logging.debug('Header found at %s.' % (csv_output.find('Patch QID, IP, DNS, NetBIOS, OS, Vulnerability Count')))
|
||||
|
||||
starting_pos = csv_output.find('Patch QID, IP, DNS, NetBIOS, OS, Vulnerability Count') + 52
|
||||
logging.debug('starting_pos = %s' % str(starting_pos))
|
||||
# Data resides between line ending in 'Vulnerability Count' and a blank line.
|
||||
patches_by_host = csv_output[starting_pos:csv_output[starting_pos:].find(
|
||||
'Host Vulnerabilities Fixed by Patch') + starting_pos - 3]
|
||||
logging.debug('patches_by_host =')
|
||||
logging.debug(patches_by_host)
|
||||
# Read in string patches_by_host csv to a dictionary.
|
||||
f = patches_by_host.split(os.linesep)
|
||||
reader = csv.DictReader(f, ['Patch QID', 'IP', 'DNS', 'NetBIOS', 'OS', 'Vulnerability Count'], delimiter=',')
|
||||
# Mark Patch QIDs that fix multiple vulnerabilities with associated IP addresses.
|
||||
redundant_qids = defaultdict(list)
|
||||
for row in reader:
|
||||
if int(row['Vulnerability Count']) > 1:
|
||||
# Add to list of redundant QIDs.
|
||||
redundant_qids[row['Patch QID']].append(row['IP'])
|
||||
logging.debug('%s, %s, %s, %s' % (
|
||||
row['Patch QID'],
|
||||
row['IP'],
|
||||
int(row['Vulnerability Count']),
|
||||
redundant_qids[row['Patch QID']]))
|
||||
# Log for debugging.
|
||||
logging.debug('len(redundant_qids) = %s, redundant_qids =' % (len(redundant_qids)))
|
||||
for patch_qid in list(redundant_qids.keys()):
|
||||
logging.debug('%s, %s' % (str(patch_qid), str(redundant_qids[patch_qid])))
|
||||
# Extract redundant QIDs with associated IP addresses.
|
||||
# Find the line for Patches by Host data.
|
||||
starting_pos = csv_output.find('Patch QID, IP, QID, Severity, Type, Title, Instance, Last Detected') + 66
|
||||
# Data resides between line ending in 'Vulnerability Count' and end of string.
|
||||
host_vulnerabilities_fixed_by_patch = csv_output[starting_pos:]
|
||||
# Read in string host_vulnerabilities_fixed_by_patch csv to a dictionary.
|
||||
f = host_vulnerabilities_fixed_by_patch.split(os.linesep)
|
||||
reader = csv.DictReader(f, ['Patch QID', 'IP', 'QID', 'Severity', 'Type', 'Title', 'Instance', 'Last Detected'],
|
||||
delimiter=',')
|
||||
# Remove IP addresses associated with redundant QIDs.
|
||||
qids_to_remove = defaultdict(list)
|
||||
for row in reader:
|
||||
# If the row's IP address's Patch QID was found to have multiple vulnerabilities...
|
||||
if len(redundant_qids[row['Patch QID']]) > 0 and redundant_qids[row['Patch QID']].count(row['IP']) > 0:
|
||||
# Add the QID column to the list of dictionaries {QID: [IP address, IP address, ...], QID2: [IP address], ...}
|
||||
qids_to_remove[row['QID']].append(row['IP'])
|
||||
# Log for debugging.
|
||||
logging.debug('len(qids_to_remove) = %s, qids_to_remove =' % (len(qids_to_remove)))
|
||||
for a_qid in list(qids_to_remove.keys()):
|
||||
logging.debug('%s, %s' % (str(a_qid), str(qids_to_remove[a_qid])))
|
||||
#
|
||||
# Diff vulns against qids_to_remove and against open incidents.
|
||||
#
|
||||
vulns_length = len(vulns)
|
||||
# Iterate over list of keys rather than original dictionary as some keys may be deleted changing the size of the dictionary.
|
||||
for a_qid in list(vulns.keys()):
|
||||
# Debug log original qid's hosts.
|
||||
logging.debug('Before diffing vulns[%s] =' % (a_qid))
|
||||
logging.debug(vulns[a_qid]['hosts'])
|
||||
# Pop each host.
|
||||
# The [:] returns a "slice" of x, which happens to contain all its elements, and is thus effectively a copy of x.
|
||||
for host in vulns[a_qid]['hosts'][:]:
|
||||
# If the QID for the host is a dupe or if a there is an open Reaction incident.
|
||||
if qids_to_remove[a_qid].count(host['ip']) > 0 or reaction_open_issue(host['vuln_id']):
|
||||
# Remove the host from the QID's list of target hosts.
|
||||
logging.debug('Removing remediation ticket %s.' % (host['vuln_id']))
|
||||
vulns[a_qid]['hosts'].remove(host)
|
||||
else:
|
||||
# Do not remove this vuln
|
||||
logging.debug('Will report remediation %s.' % (host['vuln_id']))
|
||||
# Debug log diff'd qid's hosts.
|
||||
logging.debug('After diffing vulns[%s]=' % (a_qid))
|
||||
logging.debug(vulns[a_qid]['hosts'])
|
||||
# If there are no more hosts left to patch for the qid.
|
||||
if len(vulns[a_qid]['hosts']) == 0:
|
||||
# Remove the QID.
|
||||
logging.debug('Deleting vulns[%s].' % (a_qid))
|
||||
del vulns[a_qid]
|
||||
# Diff completed
|
||||
if not vulns_length == len(vulns):
|
||||
print('A count of %s vulnerabilities have been consolidated to %s vulnerabilities, a reduction of %s%%.' % (
|
||||
int(vulns_length),
|
||||
int(len(vulns)),
|
||||
int(round((int(vulns_length) - int(len(vulns))) / float(vulns_length) * 100))))
|
||||
# Return vulns to report.
|
||||
logging.debug('vulns =')
|
||||
logging.debug(vulns)
|
||||
return vulns
|
21
deps/qualysapi/qualysapi/settings.py
vendored
Normal file
21
deps/qualysapi/qualysapi/settings.py
vendored
Normal file
@ -0,0 +1,21 @@
|
||||
''' Module to hold global settings reused throughout qualysapi. '''
|
||||
|
||||
from __future__ import absolute_import
|
||||
__author__ = "Colin Bell <colin.bell@uwaterloo.ca>"
|
||||
__copyright__ = "Copyright 2011-2013, University of Waterloo"
|
||||
__license__ = "BSD-new"
|
||||
|
||||
import os
|
||||
|
||||
global defaults
|
||||
global default_filename
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
default_filename = "config.ini"
|
||||
else:
|
||||
default_filename = ".qcrc"
|
||||
|
||||
defaults = {'hostname': 'qualysapi.qualys.com',
|
||||
'max_retries': '3',
|
||||
'template_id': '00000'}
|
29
deps/qualysapi/qualysapi/util.py
vendored
Normal file
29
deps/qualysapi/qualysapi/util.py
vendored
Normal file
@ -0,0 +1,29 @@
|
||||
""" A set of utility functions for QualysConnect module. """
|
||||
from __future__ import absolute_import
|
||||
import logging
|
||||
|
||||
import qualysapi.config as qcconf
|
||||
import qualysapi.connector as qcconn
|
||||
import qualysapi.settings as qcs
|
||||
|
||||
__author__ = "Parag Baxi <parag.baxi@gmail.com> & Colin Bell <colin.bell@uwaterloo.ca>"
|
||||
__copyright__ = "Copyright 2011-2013, Parag Baxi & University of Waterloo"
|
||||
__license__ = 'Apache License 2.0'
|
||||
|
||||
# Set module level logger.
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def connect(config_file=qcs.default_filename, remember_me=False, remember_me_always=False):
|
||||
""" Return a QGAPIConnect object for v1 API pulling settings from config
|
||||
file.
|
||||
"""
|
||||
# Retrieve login credentials.
|
||||
conf = qcconf.QualysConnectConfig(filename=config_file, remember_me=remember_me,
|
||||
remember_me_always=remember_me_always)
|
||||
connect = qcconn.QGConnector(conf.get_auth(),
|
||||
conf.get_hostname(),
|
||||
conf.proxies,
|
||||
conf.max_retries)
|
||||
logger.info("Finished building connector.")
|
||||
return connect
|
3
deps/qualysapi/qualysapi/version.py
vendored
Normal file
3
deps/qualysapi/qualysapi/version.py
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
|
||||
__pkgname__ = 'qualysapi'
|
||||
__version__ = '4.1.0'
|
Reference in New Issue
Block a user