Add mock tests for qualys web
This commit is contained in:
@ -22,7 +22,7 @@ verbose=true
|
|||||||
|
|
||||||
[qualys_web]
|
[qualys_web]
|
||||||
#Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API
|
#Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API
|
||||||
enabled = false
|
enabled = true
|
||||||
hostname = qualys_web
|
hostname = qualys_web
|
||||||
username = exampleuser
|
username = exampleuser
|
||||||
password = examplepass
|
password = examplepass
|
||||||
@ -34,7 +34,7 @@ verbose=true
|
|||||||
#Note, this applies only to failed connections and timeouts, never to requests where the server returns a response.
|
#Note, this applies only to failed connections and timeouts, never to requests where the server returns a response.
|
||||||
max_retries = 10
|
max_retries = 10
|
||||||
# Template ID will need to be retrieved for each document. Please follow the reference guide above for instructions on how to get your template ID.
|
# Template ID will need to be retrieved for each document. Please follow the reference guide above for instructions on how to get your template ID.
|
||||||
template_id = 126024
|
template_id = 289109
|
||||||
|
|
||||||
[qualys_vuln]
|
[qualys_vuln]
|
||||||
#Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API
|
#Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API
|
||||||
|
@ -28,28 +28,11 @@ class mockAPI(object):
|
|||||||
|
|
||||||
def get_directories(self, path):
|
def get_directories(self, path):
|
||||||
dir, subdirs, files = next(os.walk(path))
|
dir, subdirs, files = next(os.walk(path))
|
||||||
return subdirs
|
return sorted(subdirs)
|
||||||
|
|
||||||
def get_files(self, path):
|
def get_files(self, path):
|
||||||
dir, subdirs, files = next(os.walk(path))
|
dir, subdirs, files = next(os.walk(path))
|
||||||
return files
|
return sorted(files)
|
||||||
|
|
||||||
def qualys_vuln_callback(self, request, uri, response_headers):
|
|
||||||
self.logger.debug('Simulating response for {} ({})'.format(uri, request.body))
|
|
||||||
if 'list' in request.parsed_body['action']:
|
|
||||||
return [200,
|
|
||||||
response_headers,
|
|
||||||
open('{}/{}'.format(self.qualys_vuln_path, 'scans')).read()]
|
|
||||||
elif 'fetch' in request.parsed_body['action']:
|
|
||||||
try:
|
|
||||||
response_body = open('{}/{}'.format(
|
|
||||||
self.qualys_vuln_path,
|
|
||||||
request.parsed_body['scan_ref'][0].replace('/', '_'))
|
|
||||||
).read()
|
|
||||||
except:
|
|
||||||
# Can't find the file, just send an empty response
|
|
||||||
response_body = ''
|
|
||||||
return [200, response_headers, response_body]
|
|
||||||
|
|
||||||
def create_nessus_resource(self, framework):
|
def create_nessus_resource(self, framework):
|
||||||
for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)):
|
for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)):
|
||||||
@ -61,33 +44,92 @@ class mockAPI(object):
|
|||||||
body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read()
|
body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def qualys_vuln_callback(self, request, uri, response_headers):
|
||||||
|
self.logger.debug('Simulating response for {} ({})'.format(uri, request.body))
|
||||||
|
if 'list' in request.parsed_body['action']:
|
||||||
|
return [200,
|
||||||
|
response_headers,
|
||||||
|
open(self.qualys_vuln_path + '/scans').read()]
|
||||||
|
elif 'fetch' in request.parsed_body['action']:
|
||||||
|
try:
|
||||||
|
response_body = open('{}/{}'.format(
|
||||||
|
self.qualys_vuln_path,
|
||||||
|
request.parsed_body['scan_ref'][0].replace('/', '_'))
|
||||||
|
).read()
|
||||||
|
except:
|
||||||
|
# Can't find the file, just send an empty response
|
||||||
|
response_body = ''
|
||||||
|
return [200, response_headers, response_body]
|
||||||
|
|
||||||
def create_qualys_vuln_resource(self, framework):
|
def create_qualys_vuln_resource(self, framework):
|
||||||
# Create health check endpoint
|
# Create health check endpoint
|
||||||
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'GET', 'msp/about.php'))
|
self.logger.debug('Adding mocked {} endpoint GET msp/about.php'.format(framework))
|
||||||
httpretty.register_uri(
|
httpretty.register_uri(
|
||||||
httpretty.GET,
|
httpretty.GET,
|
||||||
'https://{}:443/{}'.format(framework, 'msp/about.php'),
|
'https://{}:443/msp/about.php'.format(framework),
|
||||||
body='')
|
body='')
|
||||||
|
|
||||||
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'POST', 'api/2.0/fo/scan'))
|
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'POST', 'api/2.0/fo/scan'))
|
||||||
httpretty.register_uri(
|
httpretty.register_uri(
|
||||||
httpretty.POST, 'https://{}:443/{}'.format(framework, 'api/2.0/fo/scan/'),
|
httpretty.POST, 'https://{}:443/api/2.0/fo/scan/'.format(framework),
|
||||||
body=self.qualys_vuln_callback)
|
body=self.qualys_vuln_callback)
|
||||||
|
|
||||||
def create_openvas_resource(self, framework):
|
def qualys_web_callback(self, request, uri, response_headers):
|
||||||
|
self.logger.debug('Simulating response for {} ({})'.format(uri, request.body))
|
||||||
|
report_id = request.parsed_body.split('<WasScan><id>')[1].split('<')[0]
|
||||||
|
response_body = open('{}/create_{}'.format(self.qualys_web_path, report_id)).read()
|
||||||
|
return [200, response_headers, response_body]
|
||||||
|
|
||||||
|
def create_qualys_web_resource(self, framework):
|
||||||
for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)):
|
for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)):
|
||||||
try:
|
if filename.startswith('POST') or filename.startswith('GET'):
|
||||||
method, status, resource = self.openvas_requests[filename]
|
method, resource = filename.split('_', 1)
|
||||||
|
resource = resource.replace('_', '/')
|
||||||
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, method, resource))
|
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, method, resource))
|
||||||
except:
|
|
||||||
self.logger.error('Cound not find mocked {} endpoint for file {}/{}/{}'.format(framework, self.mock_dir, framework, filename))
|
|
||||||
continue
|
|
||||||
httpretty.register_uri(
|
httpretty.register_uri(
|
||||||
getattr(httpretty, method), 'https://{}:4000/{}'.format(framework, resource),
|
getattr(httpretty, method), 'https://{}:443/{}'.format(framework, resource),
|
||||||
body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read(),
|
body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read()
|
||||||
status=status
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'POST', 'qps/rest/3.0/create/was/report'))
|
||||||
|
httpretty.register_uri(
|
||||||
|
httpretty.POST, 'https://{}:443/qps/rest/3.0/create/was/report'.format(framework),
|
||||||
|
body=self.qualys_web_callback)
|
||||||
|
|
||||||
|
def openvas_callback(self, request, uri, response_headers):
|
||||||
|
self.logger.debug('Simulating response for {} ({})'.format(uri, request.body))
|
||||||
|
if request.querystring['cmd'][0] in ['get_reports', 'get_report_formats']:
|
||||||
|
response_body = open('{}/{}'.format(self.openvas_path, request.querystring['cmd'][0])).read()
|
||||||
|
|
||||||
|
if request.querystring['cmd'][0] == 'get_report':
|
||||||
|
response_body = open('{}/report_{}'.format(self.openvas_path, request.querystring['report_id'][0])).read()
|
||||||
|
|
||||||
|
return [200, response_headers, response_body]
|
||||||
|
|
||||||
|
def create_openvas_resource(self, framework):
|
||||||
|
# Create login endpoint
|
||||||
|
httpretty.register_uri(
|
||||||
|
httpretty.POST, 'https://{}:4000/omp'.format(framework),
|
||||||
|
body=open('{}/{}/{}'.format(self.mock_dir, framework, 'login')).read()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create GET requests endpoint
|
||||||
|
httpretty.register_uri(
|
||||||
|
httpretty.GET, 'https://{}:4000/omp'.format(framework),
|
||||||
|
body=self.openvas_callback
|
||||||
|
)
|
||||||
|
# try:
|
||||||
|
# method, status, resource = self.openvas_requests[filename]
|
||||||
|
# self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, method, resource))
|
||||||
|
# except:
|
||||||
|
# self.logger.error('Cound not find mocked {} endpoint for file {}/{}/{}'.format(framework, self.mock_dir, framework, filename))
|
||||||
|
# continue
|
||||||
|
# httpretty.register_uri(
|
||||||
|
# getattr(httpretty, method), 'https://{}:4000/{}'.format(framework, resource),
|
||||||
|
# body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read(),
|
||||||
|
# status=status
|
||||||
|
# )
|
||||||
|
|
||||||
def mock_endpoints(self):
|
def mock_endpoints(self):
|
||||||
for framework in self.get_directories(self.mock_dir):
|
for framework in self.get_directories(self.mock_dir):
|
||||||
if framework in ['nessus', 'tenable']:
|
if framework in ['nessus', 'tenable']:
|
||||||
@ -95,6 +137,10 @@ class mockAPI(object):
|
|||||||
elif framework == 'qualys_vuln':
|
elif framework == 'qualys_vuln':
|
||||||
self.qualys_vuln_path = self.mock_dir + '/' + framework
|
self.qualys_vuln_path = self.mock_dir + '/' + framework
|
||||||
self.create_qualys_vuln_resource(framework)
|
self.create_qualys_vuln_resource(framework)
|
||||||
|
elif framework == 'qualys_web':
|
||||||
|
self.qualys_web_path = self.mock_dir + '/' + framework
|
||||||
|
self.create_qualys_web_resource(framework)
|
||||||
elif framework == 'openvas':
|
elif framework == 'openvas':
|
||||||
|
self.openvas_path = self.mock_dir + '/' + framework
|
||||||
self.create_openvas_resource(framework)
|
self.create_openvas_resource(framework)
|
||||||
httpretty.enable()
|
httpretty.enable()
|
||||||
|
Reference in New Issue
Block a user