From f922e396def2fd8bfab0cf8febdeb925ee53aaa7 Mon Sep 17 00:00:00 2001 From: pemontto Date: Thu, 25 Apr 2019 14:14:03 +0100 Subject: [PATCH] Add mock tests for qualys web --- configs/test.ini | 4 +- vulnwhisp/test/mock.py | 114 +++++++++++++++++++++++++++++------------ 2 files changed, 82 insertions(+), 36 deletions(-) diff --git a/configs/test.ini b/configs/test.ini index 1990aaf..6cd5424 100755 --- a/configs/test.ini +++ b/configs/test.ini @@ -22,7 +22,7 @@ verbose=true [qualys_web] #Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API -enabled = false +enabled = true hostname = qualys_web username = exampleuser password = examplepass @@ -34,7 +34,7 @@ verbose=true #Note, this applies only to failed connections and timeouts, never to requests where the server returns a response. max_retries = 10 # Template ID will need to be retrieved for each document. Please follow the reference guide above for instructions on how to get your template ID. -template_id = 126024 +template_id = 289109 [qualys_vuln] #Reference https://www.qualys.com/docs/qualys-was-api-user-guide.pdf to find your API diff --git a/vulnwhisp/test/mock.py b/vulnwhisp/test/mock.py index 4e668e9..c18e25e 100644 --- a/vulnwhisp/test/mock.py +++ b/vulnwhisp/test/mock.py @@ -28,28 +28,11 @@ class mockAPI(object): def get_directories(self, path): dir, subdirs, files = next(os.walk(path)) - return subdirs + return sorted(subdirs) def get_files(self, path): dir, subdirs, files = next(os.walk(path)) - return files - - def qualys_vuln_callback(self, request, uri, response_headers): - self.logger.debug('Simulating response for {} ({})'.format(uri, request.body)) - if 'list' in request.parsed_body['action']: - return [200, - response_headers, - open('{}/{}'.format(self.qualys_vuln_path, 'scans')).read()] - elif 'fetch' in request.parsed_body['action']: - try: - response_body = open('{}/{}'.format( - self.qualys_vuln_path, - request.parsed_body['scan_ref'][0].replace('/', '_')) - ).read() - except: - # Can't find the file, just send an empty response - response_body = '' - return [200, response_headers, response_body] + return sorted(files) def create_nessus_resource(self, framework): for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)): @@ -61,32 +44,91 @@ class mockAPI(object): body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read() ) + def qualys_vuln_callback(self, request, uri, response_headers): + self.logger.debug('Simulating response for {} ({})'.format(uri, request.body)) + if 'list' in request.parsed_body['action']: + return [200, + response_headers, + open(self.qualys_vuln_path + '/scans').read()] + elif 'fetch' in request.parsed_body['action']: + try: + response_body = open('{}/{}'.format( + self.qualys_vuln_path, + request.parsed_body['scan_ref'][0].replace('/', '_')) + ).read() + except: + # Can't find the file, just send an empty response + response_body = '' + return [200, response_headers, response_body] + def create_qualys_vuln_resource(self, framework): # Create health check endpoint - self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'GET', 'msp/about.php')) + self.logger.debug('Adding mocked {} endpoint GET msp/about.php'.format(framework)) httpretty.register_uri( httpretty.GET, - 'https://{}:443/{}'.format(framework, 'msp/about.php'), + 'https://{}:443/msp/about.php'.format(framework), body='') - + self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'POST', 'api/2.0/fo/scan')) httpretty.register_uri( - httpretty.POST, 'https://{}:443/{}'.format(framework, 'api/2.0/fo/scan/'), + httpretty.POST, 'https://{}:443/api/2.0/fo/scan/'.format(framework), body=self.qualys_vuln_callback) - def create_openvas_resource(self, framework): + def qualys_web_callback(self, request, uri, response_headers): + self.logger.debug('Simulating response for {} ({})'.format(uri, request.body)) + report_id = request.parsed_body.split('')[1].split('<')[0] + response_body = open('{}/create_{}'.format(self.qualys_web_path, report_id)).read() + return [200, response_headers, response_body] + + def create_qualys_web_resource(self, framework): for filename in self.get_files('{}/{}'.format(self.mock_dir, framework)): - try: - method, status, resource = self.openvas_requests[filename] + if filename.startswith('POST') or filename.startswith('GET'): + method, resource = filename.split('_', 1) + resource = resource.replace('_', '/') self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, method, resource)) - except: - self.logger.error('Cound not find mocked {} endpoint for file {}/{}/{}'.format(framework, self.mock_dir, framework, filename)) - continue - httpretty.register_uri( - getattr(httpretty, method), 'https://{}:4000/{}'.format(framework, resource), - body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read(), - status=status - ) + httpretty.register_uri( + getattr(httpretty, method), 'https://{}:443/{}'.format(framework, resource), + body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read() + ) + + self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, 'POST', 'qps/rest/3.0/create/was/report')) + httpretty.register_uri( + httpretty.POST, 'https://{}:443/qps/rest/3.0/create/was/report'.format(framework), + body=self.qualys_web_callback) + + def openvas_callback(self, request, uri, response_headers): + self.logger.debug('Simulating response for {} ({})'.format(uri, request.body)) + if request.querystring['cmd'][0] in ['get_reports', 'get_report_formats']: + response_body = open('{}/{}'.format(self.openvas_path, request.querystring['cmd'][0])).read() + + if request.querystring['cmd'][0] == 'get_report': + response_body = open('{}/report_{}'.format(self.openvas_path, request.querystring['report_id'][0])).read() + + return [200, response_headers, response_body] + + def create_openvas_resource(self, framework): + # Create login endpoint + httpretty.register_uri( + httpretty.POST, 'https://{}:4000/omp'.format(framework), + body=open('{}/{}/{}'.format(self.mock_dir, framework, 'login')).read() + ) + + # Create GET requests endpoint + httpretty.register_uri( + httpretty.GET, 'https://{}:4000/omp'.format(framework), + body=self.openvas_callback + ) + # try: + # method, status, resource = self.openvas_requests[filename] + # self.logger.debug('Adding mocked {} endpoint {} {}'.format(framework, method, resource)) + # except: + # self.logger.error('Cound not find mocked {} endpoint for file {}/{}/{}'.format(framework, self.mock_dir, framework, filename)) + # continue + # httpretty.register_uri( + # getattr(httpretty, method), 'https://{}:4000/{}'.format(framework, resource), + # body=open('{}/{}/{}'.format(self.mock_dir, framework, filename)).read(), + # status=status + # ) def mock_endpoints(self): for framework in self.get_directories(self.mock_dir): @@ -95,6 +137,10 @@ class mockAPI(object): elif framework == 'qualys_vuln': self.qualys_vuln_path = self.mock_dir + '/' + framework self.create_qualys_vuln_resource(framework) + elif framework == 'qualys_web': + self.qualys_web_path = self.mock_dir + '/' + framework + self.create_qualys_web_resource(framework) elif framework == 'openvas': + self.openvas_path = self.mock_dir + '/' + framework self.create_openvas_resource(framework) httpretty.enable()