Add external API mocking and travis tests (#164)

* Fix closing logging handlers

* Fix *some* unicode issues for nessus and qualys

* Prevent multiple requests to nessus scans endpoint

* More unicode fixes

* Remove unnecessary call

* Fix whitespace

* Add mock module and argument

* Add test config and data

* Fix whitespace again

* Disable qualys_web until data is available

* Use logging module

* Delete report_tracker.db

* Cleanup mock calls

* Add httpretty to requirements

* Refactor into a class

* Updates travis tests

* Fix exit codes

* Remove print statements

* Remove test

* Add test directory as submodule
This commit is contained in:
pemontto
2019-04-05 19:57:39 +11:00
committed by Quim Montal
parent a30a22ab98
commit 71352aee57
12 changed files with 221 additions and 19 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "test"]
path = test
url = https://github.com/HASecuritySolutions/VulnWhisperer-tests

View File

@ -18,7 +18,8 @@ before_script:
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- flake8 . --count --exit-zero --exclude=deps/qualysapi --max-complexity=10 --max-line-length=127 --statistics
script:
- true # pytest --capture=sys # add other tests here
- python setup.py install
- vuln_whisperer -c configs/test.ini --mock --mock_dir test
notifications:
on_success: change
on_failure: change # `always` will be the setting once code changes slow down

View File

@ -5,6 +5,7 @@ __author__ = 'Austin Taylor'
from vulnwhisp.vulnwhisp import vulnWhisperer
from vulnwhisp.base.config import vwConfig
from vulnwhisp.test.mock import mockAPI
import os
import argparse
import sys
@ -34,6 +35,9 @@ def main():
parser.add_argument('-p', '--password', dest='password', required=False, default=None, type=lambda x: x.strip(), help='The NESSUS password')
parser.add_argument('-F', '--fancy', action='store_true', help='Enable colourful logging output')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debugging messages')
parser.add_argument('--mock', action='store_true', help='Enable mocked API responses')
parser.add_argument('--mock_dir', dest='mock_dir', required=False, default='test',
help='Path of test directory')
args = parser.parse_args()
# First setup logging
@ -54,9 +58,12 @@ def main():
import coloredlogs
coloredlogs.install(level='DEBUG' if args.debug else 'INFO')
if args.mock:
mock_api = mockAPI(args.mock_dir, args.verbose)
mock_api.mock_endpoints()
try:
if args.config and not args.section:
# this remains a print since we are in the main binary
print('WARNING: {warning}'.format(warning='No section was specified, vulnwhisperer will scrape enabled modules from config file. \
\nPlease specify a section using -s. \
@ -74,10 +81,10 @@ def main():
source=args.source,
scanname=args.scanname)
vw.whisper_vulnerabilities()
exit_code = vw.whisper_vulnerabilities()
# TODO: fix this to NOT be exit 1 unless in error
close_logging_handlers(logger)
sys.exit(1)
sys.exit(exit_code)
else:
logger.info('Running vulnwhisperer for section {}'.format(args.section))
@ -89,10 +96,10 @@ def main():
source=args.source,
scanname=args.scanname)
vw.whisper_vulnerabilities()
exit_code = vw.whisper_vulnerabilities()
# TODO: fix this to NOT be exit 1 unless in error
close_logging_handlers(logger)
sys.exit(1)
sys.exit(exit_code)
except Exception as e:
if args.verbose:

110
configs/test.ini Executable file
View File

@ -0,0 +1,110 @@
[nessus]
enabled=true
hostname=nessus
port=443
username=nessus_username
password=nessus_password
write_path=/tmp/VulnWhisperer/data/nessus/
db_path=/tmp/VulnWhisperer/data/database
trash=false
verbose=true
[tenable]
enabled=true
hostname=tenable
port=443
username=tenable.io_username
password=tenable.io_password
write_path=/tmp/VulnWhisperer/data/tenable/
db_path=/tmp/VulnWhisperer/data/database
trash=false
verbose=true
[qualys_web]
#Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API
enabled = false
hostname = qualys_web
username = exampleuser
password = examplepass
write_path=/tmp/VulnWhisperer/data/qualys/
db_path=/tmp/VulnWhisperer/data/database
verbose=true
# Set the maximum number of retries each connection should attempt.
#Note, this applies only to failed connections and timeouts, never to requests where the server returns a response.
max_retries = 10
# Template ID will need to be retrieved for each document. Please follow the reference guide above for instructions on how to get your template ID.
template_id = 126024
[qualys_vuln]
#Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API
enabled = true
hostname = qualys_vuln
username = exampleuser
password = examplepass
write_path=/tmp/VulnWhisperer/data/qualys/
db_path=/tmp/VulnWhisperer/data/database
verbose=true
# Set the maximum number of retries each connection should attempt.
#Note, this applies only to failed connections and timeouts, never to requests where the server returns a response.
max_retries = 10
# Template ID will need to be retrieved for each document. Please follow the reference guide above for instructions on how to get your template ID.
template_id = 126024
[detectify]
#Reference https://developer.detectify.com/
enabled = false
hostname = detectify
#username variable used as apiKey
username = exampleuser
#password variable used as secretKey
password = examplepass
write_path =/tmp/VulnWhisperer/data/detectify/
db_path = /tmp/VulnWhisperer/data/database
verbose = true
[openvas]
enabled = false
hostname = openvas
port = 4000
username = exampleuser
password = examplepass
write_path=/tmp/VulnWhisperer/data/openvas/
db_path=/tmp/VulnWhisperer/data/database
verbose=true
#[proxy]
; This section is optional. Leave it out if you're not using a proxy.
; You can use environmental variables as well: http://www.python-requests.org/en/latest/user/advanced/#proxies
; proxy_protocol set to https, if not specified.
#proxy_url = proxy.mycorp.com
; proxy_port will override any port specified in proxy_url
#proxy_port = 8080
; proxy authentication
#proxy_username = proxyuser
#proxy_password = proxypass
[jira]
enabled = false
hostname = jira-host
username = username
password = password
write_path = /tmp/VulnWhisperer/data/jira/
db_path = /tmp/VulnWhisperer/data/database
verbose = true
dns_resolv = False
#Sample jira report scan, will automatically be created for existent scans
#[jira.qualys_vuln.test_scan]
#source = qualys_vuln
#scan_name = Test Scan
#jira_project = PROJECT
; if multiple components, separate by "," = None
#components =
; minimum criticality to report (low, medium, high or critical) = None
#min_critical_to_report = high

View File

@ -9,3 +9,4 @@ jira
bottle
coloredlogs
qualysapi>=5.1.0
httpretty

View File

@ -14,7 +14,7 @@ until [ "`curl -I "$kibana_url"/status | head -n1 |cut -d$' ' -f2`" == "200" ];
echo "Waiting for Kibana"
sleep 5
done
echo "Loading VulnWhisperer Saved Objects"
echo $add_saved_objects$saved_objects_file
eval $(echo $add_saved_objects$saved_objects_file)

1
test Submodule

Submodule test added at 606b8bcbe3

View File

@ -26,16 +26,16 @@ class vwConfig(object):
return self.config.getboolean(section, option)
def get_sections_with_attribute(self, attribute):
sections = []
sections = []
# TODO: does this not also need the "yes" case?
check = ["true", "True", "1"]
for section in self.config.sections():
check = ["true", "True", "1"]
for section in self.config.sections():
try:
if self.get(section, attribute) in check:
sections.append(section)
except:
self.logger.warn("Section {} has no option '{}'".format(section, attribute))
return sections
return sections
def exists_jira_profiles(self, profiles):
# get list of profiles source_scanner.scan_name

View File

@ -64,8 +64,8 @@ class qualysWhisperAPI(object):
# First two columns are metadata we already have
# Last column corresponds to "target_distribution_across_scanner_appliances" element
# which doesn't follow the schema and breaks the pandas data manipulation
return pd.read_json(scan_json).iloc[2:-1]
# which doesn't follow the schema and breaks the pandas data manipulation
return pd.read_json(scan_json).iloc[2:-1]
class qualysUtils:
def __init__(self):

View File

75
vulnwhisp/test/mock.py Normal file
View File

@ -0,0 +1,75 @@
import os
import logging
import httpretty
class mockAPI(object):
def __init__(self, mock_dir=None, debug=False):
self.mock_dir = mock_dir
if not self.mock_dir:
# Try to guess the mock_dir if python setup.py develop was used
self.mock_dir = '/'.join(__file__.split('/')[:-3]) + '/test'
self.logger = logging.getLogger('mockAPI')
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.info('mockAPI initialised, API requests will be mocked'.format(self.mock_dir))
self.logger.debug('Test path resolved as {}'.format(self.mock_dir))
def get_directories(self, path):
dir, subdirs, files = next(os.walk(path))
return subdirs
def get_files(self, path):
dir, subdirs, files = next(os.walk(path))
return files
def qualys_vuln_callback(self, request, uri, response_headers):
self.logger.debug('Simulating response for {} ({})'.format(uri, request.body))
if 'list' in request.parsed_body['action']:
return [ 200,
response_headers,
open('{}/{}'.format(self.qualys_vuln_path, 'scans')).read()]
elif 'fetch' in request.parsed_body['action']:
try:
response_body = open('{}/{}'.format(
self.qualys_vuln_path,
request.parsed_body['scan_ref'][0].replace('/', '_'))
).read()
except:
# Can't find the file, just send an empty response
response_body = ''
return [200, response_headers, response_body]
def create_nessus_resource(self, framework):
for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)):
method, resource = filename.split('_',1)
resource = resource.replace('_', '/')
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, method, resource))
httpretty.register_uri(
getattr(httpretty, method), 'https://{}:443/{}'.format(framework, resource),
body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read()
)
def create_qualys_vuln_resource(self, framework):
# Create health check endpoint
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'GET', 'msp/about.php'))
httpretty.register_uri(
httpretty.GET,
'https://{}:443/{}'.format(framework, 'msp/about.php'),
body='')
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'POST', 'api/2.0/fo/scan'))
httpretty.register_uri(
httpretty.POST, 'https://{}:443/{}'.format(framework, 'api/2.0/fo/scan/'),
body=self.qualys_vuln_callback)
def mock_endpoints(self):
for framework in self.get_directories(self.mock_dir):
if framework in ['nessus', 'tenable']:
self.create_nessus_resource(framework)
elif framework == 'qualys_vuln':
self.qualys_vuln_path = self.mock_dir + '/' + framework
self.create_qualys_vuln_resource(framework)
httpretty.enable()

View File

@ -478,8 +478,9 @@ class vulnWhispererNessus(vulnWhispererBase):
self.conn.close()
self.logger.info('Scan aggregation complete! Connection to database closed.')
else:
self.logger.error('Failed to use scanner at {host}:{port}'.format(host=self.hostname, port=self.nessus_port))
return 1
return 0
class vulnWhispererQualys(vulnWhispererBase):
@ -1244,6 +1245,7 @@ class vulnWhisperer(object):
self.verbose = verbose
self.source = source
self.scanname = scanname
self.exit_code = 0
def whisper_vulnerabilities(self):
@ -1254,15 +1256,15 @@ class vulnWhisperer(object):
password=self.password,
verbose=self.verbose,
profile=self.profile)
vw.whisper_nessus()
self.exit_code += vw.whisper_nessus()
elif self.profile == 'qualys_web':
vw = vulnWhispererQualys(config=self.config)
vw.process_web_assets()
self.exit_code += vw.process_web_assets()
elif self.profile == 'openvas':
vw_openvas = vulnWhispererOpenVAS(config=self.config)
vw_openvas.process_openvas_scans()
self.exit_code += vw_openvas.process_openvas_scans()
elif self.profile == 'tenable':
vw = vulnWhispererNessus(config=self.config,
@ -1270,11 +1272,11 @@ class vulnWhisperer(object):
password=self.password,
verbose=self.verbose,
profile=self.profile)
vw.whisper_nessus()
self.exit_code += vw.whisper_nessus()
elif self.profile == 'qualys_vuln':
vw = vulnWhispererQualysVuln(config=self.config)
vw.process_vuln_scans()
self.exit_code += vw.process_vuln_scans()
elif self.profile == 'jira':
#first we check config fields are created, otherwise we create them
@ -1288,3 +1290,5 @@ class vulnWhisperer(object):
return 0
else:
vw.jira_sync(self.source, self.scanname)
return self.exit_code