qualysapi dependency removal

This commit is contained in:
Quim
2018-11-05 15:06:29 +01:00
parent b7d6d6207f
commit 0d5b6479ac
23 changed files with 1 additions and 1866 deletions

View File

@ -1,47 +0,0 @@
*.py[cod]
# C extensions
*.so
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
lib64
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
nosetests.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Mac
.DS_Store
# Authenticatin configuration
*.qcrc
config.qcrc
config.ini
# PyCharm
.idea
.qcrc.swp

View File

@ -1,2 +0,0 @@
include README.md
recursive-include examples *.py

View File

@ -1,107 +0,0 @@
qualysapi
=========
Python package, qualysapi, that makes calling any Qualys API very simple. Qualys API versions v1, v2, & WAS & AM (asset management) are all supported.
My focus was making the API super easy to use. The only parameters the user needs to provide is the call, and data (optional). It automates the following:
* Automatically identifies API version through the call requested.
* Automatically identifies url from the above step.
* Automatically identifies http method as POST or GET for the request per Qualys documentation.
Usage
=====
Check out the example scripts in the [/examples directory](https://github.com/paragbaxi/qualysapi/blob/master/examples/).
Example
-------
Detailed example found at [qualysapi-example.py](https://github.com/paragbaxi/qualysapi/blob/master/examples/qualysapi-example.py).
Sample example below.
```python
>>> import qualysapi
>>> a = qualysapi.connect()
QualysGuard Username: my_username
QualysGuard Password:
>>> print a.request('about.php')
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE ABOUT SYSTEM "https://qualysapi.qualys.com/about.dtd">
<ABOUT>
<API-VERSION MAJOR="1" MINOR="4" />
<WEB-VERSION>7.10.61-1</WEB-VERSION>
<SCANNER-VERSION>7.1.10-1</SCANNER-VERSION>
<VULNSIGS-VERSION>2.2.475-2</VULNSIGS-VERSION>
</ABOUT>
<!-- Generated for username="my_username" date="2013-07-03T10:31:57Z" -->
<!-- CONFIDENTIAL AND PROPRIETARY INFORMATION. Qualys provides the QualysGuard Service "As Is," without any warranty of any kind. Qualys makes no warranty that the information contained in this report is complete or error-free. Copyright 2013, Qualys, Inc. //-->
```
Installation
============
Use pip to install:
```Shell
pip install qualysapi
```
NOTE: If you would like to experiment without installing globally, look into 'virtualenv'.
Requirements
------------
* requests (http://docs.python-requests.org)
* lxml (http://lxml.de/)
Tested successfully on Python 2.7.
Configuration
=============
By default, the package will ask at the command prompt for username and password. By default, the package connects to the Qualys documented host (qualysapi.qualys.com).
You can override these settings and prevent yourself from typing credentials by doing any of the following:
1. By running the following Python, `qualysapi.connect(remember_me=True)`. This automatically generates a .qcrc file in your current working directory, scoping the configuration to that directory.
2. By running the following Python, `qualysapi.connect(remember_me_always=True)`. This automatically generates a .qcrc file in your home directory, scoping the configuratoin to all calls to qualysapi, regardless of the directory.
3. By creating a file called '.qcrc' (for Windows, the default filename is 'config.ini') in your home directory or directory of the Python script.
4. This supports multiple configuration files. Just add the filename in your call to qualysapi.connect('config.txt').
Example config file
-------------------
```INI
; Note, it should be possible to omit any of these entries.
[info]
hostname = qualysapi.serviceprovider.com
username = jerry
password = I<3Elaine
# Set the maximum number of retries each connection should attempt. Note, this applies only to failed connections and timeouts, never to requests where the server returns a response.
max_retries = 10
[proxy]
; This section is optional. Leave it out if you're not using a proxy.
; You can use environmental variables as well: http://www.python-requests.org/en/latest/user/advanced/#proxies
; proxy_protocol set to https, if not specified.
proxy_url = proxy.mycorp.com
; proxy_port will override any port specified in proxy_url
proxy_port = 8080
; proxy authentication
proxy_username = kramer
proxy_password = giddy up!
```
License
=======
Apache License, Version 2.0
http://www.apache.org/licenses/LICENSE-2.0.html
Acknowledgements
================
Special thank you to Colin Bell for qualysconnect.

View File

@ -1,12 +0,0 @@
3.5.0
- Retooled authentication.
3.4.0
- Allows choice of configuration filenames. Easy to support those with multiple Qualys accounts, and need to automate tasks.
3.3.0
- Remove curl capability. Requests 2.0 and latest urllib3 can handle https proxy.
- Workaround for audience that does not have lxml. Warning: cannot handle lxml.builder E objects for AM & WAS APIs.
3.0.0
Proxy support.

View File

@ -1 +0,0 @@
__author__ = 'pbaxi'

View File

@ -1,113 +0,0 @@
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
__license__ = 'Apache License 2.0'
import qualysapi
from lxml import objectify
from lxml.builder import E
# Setup connection to QualysGuard API.
qgc = qualysapi.connect('config.txt')
#
# API v1 call: Scan the New York & Las Vegas asset groups
# The call is our request's first parameter.
call = 'scan.php'
# The parameters to append to the url is our request's second parameter.
parameters = {'scan_title': 'Go big or go home', 'asset_groups': 'New York&Las Vegas', 'option': 'Initial+Options'}
# Note qualysapi will automatically convert spaces into plus signs for API v1 & v2.
# Let's call the API and store the result in xml_output.
xml_output = qgc.request(call, parameters, concurrent_scans_retries=2, concurrent_scans_retry_delay=600)
# concurrent_retries: Retry the call this many times if your subscription hits the concurrent scans limit.
# concurrent_retries: Delay in seconds between retrying when subscription hits the concurrent scans limit.
# Example XML response when this happens below:
# <?xml version="1.0" encoding="UTF-8"?>
# <ServiceResponse xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://localhost:50205/qps/rest/app//xsd/3.0/was/wasscan.xsd">
# <responseCode>INVALID_REQUEST</responseCode>
# <responseErrorDetails>
# <errorMessage>You have reached the maximum number of concurrent running scans (10) for your account</errorMessage>
# <errorResolution>Please wait until your previous scans have completed</errorResolution>
# </responseErrorDetails>
#
print(xml_output)
#
# API v1 call: Print out all IPs associated with asset group "Looneyville Texas".
# Note that the question mark at the end is optional.
call = 'asset_group_list.php?'
# We can still use strings for the data (not recommended).
parameters = 'title=Looneyville Texas'
# Let's call the API and store the result in xml_output.
xml_output = qgc.request(call, parameters)
# Let's objectify the xml_output string.
root = objectify.fromstring(xml_output)
# Print out the IPs.
print(root.ASSET_GROUP.SCANIPS.IP.text)
# Prints out:
# 10.0.0.102
#
# API v2 call: Print out DNS name for a range of IPs.
call = '/api/2.0/fo/asset/host/'
parameters = {'action': 'list', 'ips': '10.0.0.10-10.0.0.11'}
xml_output = qgc.request(call, parameters)
root = objectify.fromstring(xml_output)
# Iterate hosts and print out DNS name.
for host in root.RESPONSE.HOST_LIST.HOST:
print(host.IP.text, host.DNS.text)
# Prints out:
# 10.0.0.10 mydns1.qualys.com
# 10.0.0.11 mydns2.qualys.com
#
# API v3 WAS call: Print out number of webapps.
call = '/count/was/webapp'
# Note that this call does not have a payload so we don't send any data parameters.
xml_output = qgc.request(call)
root = objectify.fromstring(xml_output)
# Print out count of webapps.
print(root.count.text)
# Prints out:
# 89
#
# API v3 WAS call: Print out number of webapps containing title 'Supafly'.
call = '/count/was/webapp'
# We can send a string XML for the data.
parameters = '<ServiceRequest><filters><Criteria operator="CONTAINS" field="name">Supafly</Criteria></filters></ServiceRequest>'
xml_output = qgc.request(call, parameters)
root = objectify.fromstring(xml_output)
# Print out count of webapps.
print(root.count.text)
# Prints out:
# 3
#
# API v3 WAS call: Print out number of webapps containing title 'Lightsabertooth Tiger'.
call = '/count/was/webapp'
# We can also send an lxml.builder E object.
parameters = (
E.ServiceRequest(
E.filters(
E.Criteria('Lightsabertooth Tiger', field='name',operator='CONTAINS'))))
xml_output = qgc.request(call, parameters)
root = objectify.fromstring(xml_output)
# Print out count of webapps.
print(root.count.text)
# Prints out:
# 0
# Too bad, because that is an awesome webapp name!
#
# API v3 Asset Management call: Count tags.
call = '/count/am/tag'
xml_output = qgc.request(call)
root = objectify.fromstring(xml_output)
# We can use XPATH to find the count.
print(root.xpath('count')[0].text)
# Prints out:
# 840
#
# API v3 Asset Management call: Find asset by name.
call = '/search/am/tag'
parameters = '''<ServiceRequest>
<preferences>
<limitResults>10</limitResults>
</preferences>
<filters>
<Criteria field="name" operator="CONTAINS">PB</Criteria>
</filters>
</ServiceRequest>'''
xml_output = qgc.request(call, parameters)

View File

@ -1,113 +0,0 @@
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
__license__ = 'Apache License 2.0'
import qualysapi
from lxml import objectify
from lxml.builder import E
# Setup connection to QualysGuard API.
qgc = qualysapi.connect('config.txt', 'qualys_vuln')
#
# API v1 call: Scan the New York & Las Vegas asset groups
# The call is our request's first parameter.
call = 'scan.php'
# The parameters to append to the url is our request's second parameter.
parameters = {'scan_title': 'Go big or go home', 'asset_groups': 'New York&Las Vegas', 'option': 'Initial+Options'}
# Note qualysapi will automatically convert spaces into plus signs for API v1 & v2.
# Let's call the API and store the result in xml_output.
xml_output = qgc.request(call, parameters, concurrent_scans_retries=2, concurrent_scans_retry_delay=600)
# concurrent_retries: Retry the call this many times if your subscription hits the concurrent scans limit.
# concurrent_retries: Delay in seconds between retrying when subscription hits the concurrent scans limit.
# Example XML response when this happens below:
# <?xml version="1.0" encoding="UTF-8"?>
# <ServiceResponse xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://localhost:50205/qps/rest/app//xsd/3.0/was/wasscan.xsd">
# <responseCode>INVALID_REQUEST</responseCode>
# <responseErrorDetails>
# <errorMessage>You have reached the maximum number of concurrent running scans (10) for your account</errorMessage>
# <errorResolution>Please wait until your previous scans have completed</errorResolution>
# </responseErrorDetails>
#
print(xml_output)
#
# API v1 call: Print out all IPs associated with asset group "Looneyville Texas".
# Note that the question mark at the end is optional.
call = 'asset_group_list.php?'
# We can still use strings for the data (not recommended).
parameters = 'title=Looneyville Texas'
# Let's call the API and store the result in xml_output.
xml_output = qgc.request(call, parameters)
# Let's objectify the xml_output string.
root = objectify.fromstring(xml_output)
# Print out the IPs.
print(root.ASSET_GROUP.SCANIPS.IP.text)
# Prints out:
# 10.0.0.102
#
# API v2 call: Print out DNS name for a range of IPs.
call = '/api/2.0/fo/asset/host/'
parameters = {'action': 'list', 'ips': '10.0.0.10-10.0.0.11'}
xml_output = qgc.request(call, parameters)
root = objectify.fromstring(xml_output)
# Iterate hosts and print out DNS name.
for host in root.RESPONSE.HOST_LIST.HOST:
print(host.IP.text, host.DNS.text)
# Prints out:
# 10.0.0.10 mydns1.qualys.com
# 10.0.0.11 mydns2.qualys.com
#
# API v3 WAS call: Print out number of webapps.
call = '/count/was/webapp'
# Note that this call does not have a payload so we don't send any data parameters.
xml_output = qgc.request(call)
root = objectify.fromstring(xml_output)
# Print out count of webapps.
print(root.count.text)
# Prints out:
# 89
#
# API v3 WAS call: Print out number of webapps containing title 'Supafly'.
call = '/count/was/webapp'
# We can send a string XML for the data.
parameters = '<ServiceRequest><filters><Criteria operator="CONTAINS" field="name">Supafly</Criteria></filters></ServiceRequest>'
xml_output = qgc.request(call, parameters)
root = objectify.fromstring(xml_output)
# Print out count of webapps.
print(root.count.text)
# Prints out:
# 3
#
# API v3 WAS call: Print out number of webapps containing title 'Lightsabertooth Tiger'.
call = '/count/was/webapp'
# We can also send an lxml.builder E object.
parameters = (
E.ServiceRequest(
E.filters(
E.Criteria('Lightsabertooth Tiger', field='name',operator='CONTAINS'))))
xml_output = qgc.request(call, parameters)
root = objectify.fromstring(xml_output)
# Print out count of webapps.
print(root.count.text)
# Prints out:
# 0
# Too bad, because that is an awesome webapp name!
#
# API v3 Asset Management call: Count tags.
call = '/count/am/tag'
xml_output = qgc.request(call)
root = objectify.fromstring(xml_output)
# We can use XPATH to find the count.
print(root.xpath('count')[0].text)
# Prints out:
# 840
#
# API v3 Asset Management call: Find asset by name.
call = '/search/am/tag'
parameters = '''<ServiceRequest>
<preferences>
<limitResults>10</limitResults>
</preferences>
<filters>
<Criteria field="name" operator="CONTAINS">PB</Criteria>
</filters>
</ServiceRequest>'''
xml_output = qgc.request(call, parameters)

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
import sys
import logging
import qualysapi
# Questions? See:
# https://bitbucket.org/uWaterloo_IST_ISS/python-qualysconnect
if __name__ == '__main__':
# Basic command line processing.
if len(sys.argv) != 2:
print('A single IPv4 address is expected as the only argument')
sys.exit(2)
# Set the MAXIMUM level of log messages displayed @ runtime.
logging.basicConfig(level=logging.INFO)
# Call helper that creates a connection w/ HTTP-Basic to QualysGuard API.
qgs=qualysapi.connect()
# Logging must be set after instanciation of connector class.
logger = logging.getLogger('qualysapi.connector')
logger.setLevel(logging.DEBUG)
# Log to sys.out.
logger_console = logging.StreamHandler()
logger_console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
logging.getLogger(__name__).addHandler(logger)
# Formulate a request to the QualysGuard V1 API.
# docs @
# https://community.qualys.com/docs/DOC-1324
# http://www.qualys.com/docs/QualysGuard_API_User_Guide.pdf
#
# Old way still works:
# ret = qgs.request(1,'asset_search.php', "target_ips=%s&"%(sys.argv[1]))
# New way is cleaner:
ret = qgs.request(1,'asset_search.php', {'target_ips': sys.argv[1]})
print(ret)

View File

@ -1,37 +0,0 @@
#!/usr/bin/env python
import sys
import logging
import qualysapi
if __name__ == '__main__':
# Basic command line processing.
if len(sys.argv) != 3:
print('A report template and scan reference respectively are expected as the only arguments.')
sys.exit(2)
# Set the MAXIMUM level of log messages displayed @ runtime.
logging.basicConfig(level=logging.DEBUG)
# Call helper that creates a connection w/ HTTP-Basic to QualysGuard v1 API
qgs=qualysapi.connect()
# Logging must be set after instanciation of connector class.
logger = logging.getLogger('qualysapi.connector')
logger.setLevel(logging.DEBUG)
# Log to sys.out.
logger_console = logging.StreamHandler()
logger_console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
logging.getLogger(__name__).addHandler(logger)
# Formulate a request to the QualysGuard V1 API
# docs @
# https://community.qualys.com/docs/DOC-1324
# http://www.qualys.com/docs/QualysGuard_API_User_Guide.pdf
#
ret = qgs.request('/api/2.0/fo/report',{'action': 'launch', 'report_refs': sys.argv[2], 'output_format': 'xml', 'template_id': sys.argv[1], 'report_type': 'Scan'})
print(ret)

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python
import sys
import logging
import qualysapi
# Questions? See:
# https://bitbucket.org/uWaterloo_IST_ISS/python-qualysconnect
if __name__ == '__main__':
# Basic command line processing.
if len(sys.argv) != 2:
print('A single IPv4 address is expected as the only argument.')
sys.exit(2)
# Set the MAXIMUM level of log messages displayed @ runtime.
logging.basicConfig(level=logging.INFO)
# Call helper that creates a connection w/ HTTP-Basic to QualysGuard v1 API
qgs=qualysapi.connect()
# Logging must be set after instanciation of connector class.
logger = logging.getLogger('qualysapi.connector')
logger.setLevel(logging.DEBUG)
# Log to sys.out.
logger_console = logging.StreamHandler()
logger_console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
logging.getLogger(__name__).addHandler(logger)
# Formulate a request to the QualysGuard V1 API
# docs @
# https://community.qualys.com/docs/DOC-1324
# http://www.qualys.com/docs/QualysGuard_API_User_Guide.pdf
#
# Old way still works:
# ret = qgs.request(2, "asset/host","?action=list&ips=%s&"%(sys.argv[1]))
# New way is cleaner:
ret = qgs.request('/api/2.0/fo/asset/host',{'action': 'list', 'ips': sys.argv[1]})
print(ret)

201
deps/qualysapi/license vendored
View File

@ -1,201 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2017 Parag Baxi
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,10 +0,0 @@
# This is the version string assigned to the entire egg post
# setup.py install
# Ownership and Copyright Information.
from __future__ import absolute_import
__author__ = "Parag Baxi <parag.baxi@gmail.com>"
__copyright__ = "Copyright 2011-2013, Parag Baxi"
__license__ = "BSD-new"
from qualysapi.util import connect

View File

@ -1,181 +0,0 @@
from __future__ import absolute_import
from lxml import objectify
import qualysapi.api_objects
from qualysapi.api_objects import *
class QGActions(object):
def getHost(self, host):
call = '/api/2.0/fo/asset/host/'
parameters = {'action': 'list', 'ips': host, 'details': 'All'}
hostData = objectify.fromstring(self.request(call, parameters)).RESPONSE
try:
hostData = hostData.HOST_LIST.HOST
return Host(hostData.DNS, hostData.ID, hostData.IP, hostData.LAST_VULN_SCAN_DATETIME, hostData.NETBIOS, hostData.OS, hostData.TRACKING_METHOD)
except AttributeError:
return Host("", "", host, "never", "", "", "")
def getHostRange(self, start, end):
call = '/api/2.0/fo/asset/host/'
parameters = {'action': 'list', 'ips': start + '-' + end}
hostData = objectify.fromstring(self.request(call, parameters))
hostArray = []
for host in hostData.RESPONSE.HOST_LIST.HOST:
hostArray.append(Host(host.DNS, host.ID, host.IP, host.LAST_VULN_SCAN_DATETIME, host.NETBIOS, host.OS, host.TRACKING_METHOD))
return hostArray
def listAssetGroups(self, groupName=''):
call = 'asset_group_list.php'
if groupName == '':
agData = objectify.fromstring(self.request(call))
else:
agData = objectify.fromstring(self.request(call, 'title=' + groupName)).RESPONSE
groupsArray = []
scanipsArray = []
scandnsArray = []
scannersArray = []
for group in agData.ASSET_GROUP:
try:
for scanip in group.SCANIPS:
scanipsArray.append(scanip.IP)
except AttributeError:
scanipsArray = [] # No IPs defined to scan.
try:
for scanner in group.SCANNER_APPLIANCES.SCANNER_APPLIANCE:
scannersArray.append(scanner.SCANNER_APPLIANCE_NAME)
except AttributeError:
scannersArray = [] # No scanner appliances defined for this group.
try:
for dnsName in group.SCANDNS:
scandnsArray.append(dnsName.DNS)
except AttributeError:
scandnsArray = [] # No DNS names assigned to group.
groupsArray.append(AssetGroup(group.BUSINESS_IMPACT, group.ID, group.LAST_UPDATE, scanipsArray, scandnsArray, scannersArray, group.TITLE))
return groupsArray
def listReportTemplates(self):
call = 'report_template_list.php'
rtData = objectify.fromstring(self.request(call))
templatesArray = []
for template in rtData.REPORT_TEMPLATE:
templatesArray.append(ReportTemplate(template.GLOBAL, template.ID, template.LAST_UPDATE, template.TEMPLATE_TYPE, template.TITLE, template.TYPE, template.USER))
return templatesArray
def listReports(self, id=0):
call = '/api/2.0/fo/report'
if id == 0:
parameters = {'action': 'list'}
repData = objectify.fromstring(self.request(call, parameters)).RESPONSE
reportsArray = []
for report in repData.REPORT_LIST.REPORT:
reportsArray.append(Report(report.EXPIRATION_DATETIME, report.ID, report.LAUNCH_DATETIME, report.OUTPUT_FORMAT, report.SIZE, report.STATUS, report.TYPE, report.USER_LOGIN))
return reportsArray
else:
parameters = {'action': 'list', 'id': id}
repData = objectify.fromstring(self.request(call, parameters)).RESPONSE.REPORT_LIST.REPORT
return Report(repData.EXPIRATION_DATETIME, repData.ID, repData.LAUNCH_DATETIME, repData.OUTPUT_FORMAT, repData.SIZE, repData.STATUS, repData.TYPE, repData.USER_LOGIN)
def notScannedSince(self, days):
call = '/api/2.0/fo/asset/host/'
parameters = {'action': 'list', 'details': 'All'}
hostData = objectify.fromstring(self.request(call, parameters))
hostArray = []
today = datetime.date.today()
for host in hostData.RESPONSE.HOST_LIST.HOST:
last_scan = str(host.LAST_VULN_SCAN_DATETIME).split('T')[0]
last_scan = datetime.date(int(last_scan.split('-')[0]), int(last_scan.split('-')[1]), int(last_scan.split('-')[2]))
if (today - last_scan).days >= days:
hostArray.append(Host(host.DNS, host.ID, host.IP, host.LAST_VULN_SCAN_DATETIME, host.NETBIOS, host.OS, host.TRACKING_METHOD))
return hostArray
def addIP(self, ips, vmpc):
# 'ips' parameter accepts comma-separated list of IP addresses.
# 'vmpc' parameter accepts 'vm', 'pc', or 'both'. (Vulnerability Managment, Policy Compliance, or both)
call = '/api/2.0/fo/asset/ip/'
enablevm = 1
enablepc = 0
if vmpc == 'pc':
enablevm = 0
enablepc = 1
elif vmpc == 'both':
enablevm = 1
enablepc = 1
parameters = {'action': 'add', 'ips': ips, 'enable_vm': enablevm, 'enable_pc': enablepc}
self.request(call, parameters)
def listScans(self, launched_after="", state="", target="", type="", user_login=""):
# 'launched_after' parameter accepts a date in the format: YYYY-MM-DD
# 'state' parameter accepts "Running", "Paused", "Canceled", "Finished", "Error", "Queued", and "Loading".
# 'title' parameter accepts a string
# 'type' parameter accepts "On-Demand", and "Scheduled".
# 'user_login' parameter accepts a user name (string)
call = '/api/2.0/fo/scan/'
parameters = {'action': 'list', 'show_ags': 1, 'show_op': 1, 'show_status': 1}
if launched_after != "":
parameters['launched_after_datetime'] = launched_after
if state != "":
parameters['state'] = state
if target != "":
parameters['target'] = target
if type != "":
parameters['type'] = type
if user_login != "":
parameters['user_login'] = user_login
scanlist = objectify.fromstring(self.request(call, parameters))
scanArray = []
for scan in scanlist.RESPONSE.SCAN_LIST.SCAN:
try:
agList = []
for ag in scan.ASSET_GROUP_TITLE_LIST.ASSET_GROUP_TITLE:
agList.append(ag)
except AttributeError:
agList = []
scanArray.append(Scan(agList, scan.DURATION, scan.LAUNCH_DATETIME, scan.OPTION_PROFILE.TITLE, scan.PROCESSED, scan.REF, scan.STATUS, scan.TARGET, scan.TITLE, scan.TYPE, scan.USER_LOGIN))
return scanArray
def launchScan(self, title, option_title, iscanner_name, asset_groups="", ip=""):
# TODO: Add ability to scan by tag.
call = '/api/2.0/fo/scan/'
parameters = {'action': 'launch', 'scan_title': title, 'option_title': option_title, 'iscanner_name': iscanner_name, 'ip': ip, 'asset_groups': asset_groups}
if ip == "":
parameters.pop("ip")
if asset_groups == "":
parameters.pop("asset_groups")
scan_ref = objectify.fromstring(self.request(call, parameters)).RESPONSE.ITEM_LIST.ITEM[1].VALUE
call = '/api/2.0/fo/scan/'
parameters = {'action': 'list', 'scan_ref': scan_ref, 'show_status': 1, 'show_ags': 1, 'show_op': 1}
scan = objectify.fromstring(self.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN
try:
agList = []
for ag in scan.ASSET_GROUP_TITLE_LIST.ASSET_GROUP_TITLE:
agList.append(ag)
except AttributeError:
agList = []
return Scan(agList, scan.DURATION, scan.LAUNCH_DATETIME, scan.OPTION_PROFILE.TITLE, scan.PROCESSED, scan.REF, scan.STATUS, scan.TARGET, scan.TITLE, scan.TYPE, scan.USER_LOGIN)

View File

@ -1,155 +0,0 @@
from __future__ import absolute_import
__author__ = 'pbaxi'
from collections import defaultdict
api_methods = defaultdict(set)
api_methods['1'] = set([
'about.php',
'action_log_report.php',
'asset_data_report.php',
'asset_domain.php',
'asset_domain_list.php',
'asset_group_delete.php',
'asset_group_list.php',
'asset_ip_list.php',
'asset_range_info.php',
'asset_search.php',
'get_host_info.php',
'ignore_vuln.php',
'iscanner_list.php',
'knowledgebase_download.php',
'map-2.php',
'map.php',
'map_report.php',
'map_report_list.php',
'password_change.php',
'scan.php',
'scan_cancel.php',
'scan_options.php',
'scan_report.php',
'scan_report_delete.php',
'scan_report_list.php',
'scan_running_list.php',
'scan_target_history.php',
'scheduled_scans.php',
'ticket_delete.php',
'ticket_edit.php',
'ticket_list.php',
'ticket_list_deleted.php',
'time_zone_code.php',
'user.php',
'user_list.php',
])
# API v1 POST methods.
api_methods['1 post'] = set([
'action_log_report.php',
'asset_group.php',
'asset_ip.php',
'ignore_vuln.php',
'knowledgebase_download.php',
'map-2.php',
'map.php',
'password_change.php',
'scan.php',
'scan_report.php',
'scan_target_history.php',
'scheduled_scans.php',
'ticket_delete.php',
'ticket_edit.php',
'ticket_list.php',
'ticket_list_deleted.php',
'user.php',
'user_list.php',
])
# API v2 methods (they're all POST).
api_methods['2'] = set([
'api/2.0/fo/appliance/',
'api/2.0/fo/asset/excluded_ip/',
'api/2.0/fo/asset/excluded_ip/history/',
'api/2.0/fo/asset/host/',
'api/2.0/fo/asset/host/cyberscope/',
'api/2.0/fo/asset/host/cyberscope/fdcc/policy/',
'api/2.0/fo/asset/host/cyberscope/fdcc/scan/',
'api/2.0/fo/asset/host/vm/detection/',
'api/2.0/fo/asset/ip/',
'api/2.0/fo/asset/ip/v4_v6/',
'api/2.0/fo/asset/vhost/',
'api/2.0/fo/auth/',
# 'api/2.0/fo/auth/{type}/', # Added below.
'api/2.0/fo/compliance/',
'api/2.0/fo/compliance/control',
'api/2.0/fo/compliance/fdcc/policy',
'api/2.0/fo/compliance/policy/',
'api/2.0/fo/compliance/posture/info/',
'api/2.0/fo/compliance/scap/arf/',
'api/2.0/fo/knowledge_base/vuln/',
'api/2.0/fo/report/',
'api/2.0/fo/report/scorecard/',
'api/2.0/fo/scan/',
'api/2.0/fo/scan/compliance/',
'api/2.0/fo/session/',
'api/2.0/fo/setup/restricted_ips/',
])
for auth_type in set([
'ibm_db2',
'ms_sql',
'oracle',
'oracle_listener',
'snmp',
'unix',
'windows',
]):
api_methods['2'].add('api/2.0/fo/auth/%s/' % auth_type)
# WAS GET methods when no POST data.
api_methods['was no data get'] = set([
'count/was/report',
'count/was/wasscan',
'count/was/wasscanschedule',
'count/was/webapp',
'download/was/report/',
'download/was/wasscan/',
])
# WAS GET methods.
api_methods['was get'] = set([
'download/was/report/',
'download/was/wasscan/',
'get/was/report/',
'get/was/wasscan/',
'get/was/wasscanschedule/',
'get/was/webapp/',
'status/was/report/',
'status/was/wasscan/',
])
# Asset Management GET methods.
api_methods['am get'] = set([
'count/am/asset',
'count/am/hostasset',
'count/am/tag',
'get/am/asset/',
'get/am/hostasset/',
'get/am/tag/',
])
# Asset Management v2 GET methods.
api_methods['am2 get'] = set([
'get/am/asset/',
'get/am/hostasset/',
'get/am/tag/',
'get/am/hostinstancevuln/',
'get/am/assetdataconnector/',
'get/am/awsassetdataconnector/',
'get/am/awsauthrecord/',
])
# Keep track of methods with ending slashes to autocorrect user when they forgot slash.
api_methods_with_trailing_slash = defaultdict(set)
for method_group in set(['1', '2', 'was', 'am', 'am2']):
for method in api_methods[method_group]:
if method[-1] == '/':
# Add applicable method with api_version preceding it.
# Example:
# WAS API has 'get/was/webapp/'.
# method_group = 'was get'
# method_group.split()[0] = 'was'
# Take off slash to match user provided method.
# api_methods_with_trailing_slash['was'] contains 'get/was/webapp'
api_methods_with_trailing_slash[method_group.split()[0]].add(method[:-1])

View File

@ -1,120 +0,0 @@
from __future__ import absolute_import
import datetime
from lxml import objectify
class Host(object):
def __init__(self, dns, id, ip, last_scan, netbios, os, tracking_method):
self.dns = str(dns)
self.id = int(id)
self.ip = str(ip)
last_scan = str(last_scan).replace('T', ' ').replace('Z', '').split(' ')
date = last_scan[0].split('-')
time = last_scan[1].split(':')
self.last_scan = datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2]))
self.netbios = str(netbios)
self.os = str(os)
self.tracking_method = str(tracking_method)
class AssetGroup(object):
def __init__(self, business_impact, id, last_update, scanips, scandns, scanner_appliances, title):
self.business_impact = str(business_impact)
self.id = int(id)
self.last_update = str(last_update)
self.scanips = scanips
self.scandns = scandns
self.scanner_appliances = scanner_appliances
self.title = str(title)
def addAsset(self, conn, ip):
call = '/api/2.0/fo/asset/group/'
parameters = {'action': 'edit', 'id': self.id, 'add_ips': ip}
conn.request(call, parameters)
self.scanips.append(ip)
def setAssets(self, conn, ips):
call = '/api/2.0/fo/asset/group/'
parameters = {'action': 'edit', 'id': self.id, 'set_ips': ips}
conn.request(call, parameters)
class ReportTemplate(object):
def __init__(self, isGlobal, id, last_update, template_type, title, type, user):
self.isGlobal = int(isGlobal)
self.id = int(id)
self.last_update = str(last_update).replace('T', ' ').replace('Z', '').split(' ')
self.template_type = template_type
self.title = title
self.type = type
self.user = user.LOGIN
class Report(object):
def __init__(self, expiration_datetime, id, launch_datetime, output_format, size, status, type, user_login):
self.expiration_datetime = str(expiration_datetime).replace('T', ' ').replace('Z', '').split(' ')
self.id = int(id)
self.launch_datetime = str(launch_datetime).replace('T', ' ').replace('Z', '').split(' ')
self.output_format = output_format
self.size = size
self.status = status.STATE
self.type = type
self.user_login = user_login
def download(self, conn):
call = '/api/2.0/fo/report'
parameters = {'action': 'fetch', 'id': self.id}
if self.status == 'Finished':
return conn.request(call, parameters)
class Scan(object):
def __init__(self, assetgroups, duration, launch_datetime, option_profile, processed, ref, status, target, title, type, user_login):
self.assetgroups = assetgroups
self.duration = str(duration)
launch_datetime = str(launch_datetime).replace('T', ' ').replace('Z', '').split(' ')
date = launch_datetime[0].split('-')
time = launch_datetime[1].split(':')
self.launch_datetime = datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2]))
self.option_profile = str(option_profile)
self.processed = int(processed)
self.ref = str(ref)
self.status = str(status.STATE)
self.target = str(target).split(', ')
self.title = str(title)
self.type = str(type)
self.user_login = str(user_login)
def cancel(self, conn):
cancelled_statuses = ['Cancelled', 'Finished', 'Error']
if any(self.status in s for s in cancelled_statuses):
raise ValueError("Scan cannot be cancelled because its status is " + self.status)
else:
call = '/api/2.0/fo/scan/'
parameters = {'action': 'cancel', 'scan_ref': self.ref}
conn.request(call, parameters)
parameters = {'action': 'list', 'scan_ref': self.ref, 'show_status': 1}
self.status = objectify.fromstring(conn.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN.STATUS.STATE
def pause(self, conn):
if self.status != "Running":
raise ValueError("Scan cannot be paused because its status is " + self.status)
else:
call = '/api/2.0/fo/scan/'
parameters = {'action': 'pause', 'scan_ref': self.ref}
conn.request(call, parameters)
parameters = {'action': 'list', 'scan_ref': self.ref, 'show_status': 1}
self.status = objectify.fromstring(conn.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN.STATUS.STATE
def resume(self, conn):
if self.status != "Paused":
raise ValueError("Scan cannot be resumed because its status is " + self.status)
else:
call = '/api/2.0/fo/scan/'
parameters = {'action': 'resume', 'scan_ref': self.ref}
conn.request(call, parameters)
parameters = {'action': 'list', 'scan_ref': self.ref, 'show_status': 1}
self.status = objectify.fromstring(conn.request(call, parameters)).RESPONSE.SCAN_LIST.SCAN.STATUS.STATE

View File

@ -1,201 +0,0 @@
""" Module providing a single class (QualysConnectConfig) that parses a config
file and provides the information required to build QualysGuard sessions.
"""
from __future__ import absolute_import
from __future__ import print_function
import os
import stat
import getpass
import logging
from six.moves import input
from six.moves.configparser import *
import qualysapi.settings as qcs
# Setup module level logging.
logger = logging.getLogger(__name__)
# try:
# from requests_ntlm import HttpNtlmAuth
# except ImportError, e:
# logger.warning('Warning: Cannot support NTML authentication.')
__author__ = "Parag Baxi <parag.baxi@gmail.com> & Colin Bell <colin.bell@uwaterloo.ca>"
__copyright__ = "Copyright 2011-2013, Parag Baxi & University of Waterloo"
__license__ = "BSD-new"
class QualysConnectConfig:
""" Class to create a ConfigParser and read user/password details
from an ini file.
"""
def __init__(self, filename=qcs.default_filename, section='info', remember_me=False, remember_me_always=False):
self._cfgfile = None
self._section = section
# Prioritize local directory filename.
# Check for file existence.
if os.path.exists(filename):
self._cfgfile = filename
elif os.path.exists(os.path.join(os.path.expanduser("~"), filename)):
# Set home path for file.
self._cfgfile = os.path.join(os.path.expanduser("~"), filename)
# create ConfigParser to combine defaults and input from config file.
self._cfgparse = ConfigParser(qcs.defaults)
if self._cfgfile:
self._cfgfile = os.path.realpath(self._cfgfile)
mode = stat.S_IMODE(os.stat(self._cfgfile)[stat.ST_MODE])
# apply bitmask to current mode to check ONLY user access permissions.
if (mode & (stat.S_IRWXG | stat.S_IRWXO)) != 0:
logger.warning('%s permissions allows more than user access.' % (filename,))
self._cfgparse.read(self._cfgfile)
# if 'info'/ specified section doesn't exist, create the section.
if not self._cfgparse.has_section(self._section):
self._cfgparse.add_section(self._section)
# Use default hostname (if one isn't provided).
if not self._cfgparse.has_option(self._section, 'hostname'):
if self._cfgparse.has_option('DEFAULT', 'hostname'):
hostname = self._cfgparse.get('DEFAULT', 'hostname')
self._cfgparse.set(self._section, 'hostname', hostname)
else:
raise Exception("No 'hostname' set. QualysConnect does not know who to connect to.")
# Use default max_retries (if one isn't provided).
if not self._cfgparse.has_option(self._section, 'max_retries'):
self.max_retries = qcs.defaults['max_retries']
else:
self.max_retries = self._cfgparse.get(self._section, 'max_retries')
try:
self.max_retries = int(self.max_retries)
except Exception:
logger.error('Value max_retries must be an integer.')
print('Value max_retries must be an integer.')
exit(1)
self._cfgparse.set(self._section, 'max_retries', str(self.max_retries))
self.max_retries = int(self.max_retries)
# Proxy support
proxy_config = proxy_url = proxy_protocol = proxy_port = proxy_username = proxy_password = None
# User requires proxy?
if self._cfgparse.has_option('proxy', 'proxy_url'):
proxy_url = self._cfgparse.get('proxy', 'proxy_url')
# Remove protocol prefix from url if included.
for prefix in ('http://', 'https://'):
if proxy_url.startswith(prefix):
proxy_protocol = prefix
proxy_url = proxy_url[len(prefix):]
# Default proxy protocol is http.
if not proxy_protocol:
proxy_protocol = 'https://'
# Check for proxy port request.
if ':' in proxy_url:
# Proxy port already specified in url.
# Set proxy port.
proxy_port = proxy_url[proxy_url.index(':') + 1:]
# Remove proxy port from proxy url.
proxy_url = proxy_url[:proxy_url.index(':')]
if self._cfgparse.has_option('proxy', 'proxy_port'):
# Proxy requires specific port.
if proxy_port:
# Warn that a proxy port was already specified in the url.
proxy_port_url = proxy_port
proxy_port = self._cfgparse.get('proxy', 'proxy_port')
logger.warning('Proxy port from url overwritten by specified proxy_port from config:')
logger.warning('%s --> %s' % (proxy_port_url, proxy_port))
else:
proxy_port = self._cfgparse.get('proxy', 'proxy_port')
if not proxy_port:
# No proxy port specified.
if proxy_protocol == 'http://':
# Use default HTTP Proxy port.
proxy_port = '8080'
else:
# Use default HTTPS Proxy port.
proxy_port = '443'
# Check for proxy authentication request.
if self._cfgparse.has_option('proxy', 'proxy_username'):
# Proxy requires username & password.
proxy_username = self._cfgparse.get('proxy', 'proxy_username')
proxy_password = self._cfgparse.get('proxy', 'proxy_password')
# Not sure if this use case below is valid.
# # Support proxy with username and empty password.
# try:
# proxy_password = self._cfgparse.get('proxy','proxy_password')
# except NoOptionError, e:
# # Set empty password.
# proxy_password = ''
# Sample proxy config:f
# 'http://user:pass@10.10.1.10:3128'
if proxy_url:
# Proxy requested.
proxy_config = proxy_url
if proxy_port:
# Proxy port requested.
proxy_config += ':' + proxy_port
if proxy_username:
# Proxy authentication requested.
proxy_config = proxy_username + ':' + proxy_password + '@' + proxy_config
# Prefix by proxy protocol.
proxy_config = proxy_protocol + proxy_config
# Set up proxy if applicable.
if proxy_config:
self.proxies = {'https': proxy_config}
else:
self.proxies = None
# ask username (if one doesn't exist)
if not self._cfgparse.has_option(self._section, 'username'):
username = input('QualysGuard Username: ')
self._cfgparse.set(self._section, 'username', username)
# ask password (if one doesn't exist)
if not self._cfgparse.has_option(self._section, 'password'):
password = getpass.getpass('QualysGuard Password: ')
self._cfgparse.set(self._section, 'password', password)
logger.debug(self._cfgparse.items(self._section))
if remember_me or remember_me_always:
# Let's create that config file for next time...
# Where to store this?
if remember_me:
# Store in current working directory.
config_path = filename
if remember_me_always:
# Store in home directory.
config_path = os.path.expanduser("~")
if not os.path.exists(config_path):
# Write file only if it doesn't already exists.
# http://stackoverflow.com/questions/5624359/write-file-with-specific-permissions-in-python
mode = stat.S_IRUSR | stat.S_IWUSR # This is 0o600 in octal and 384 in decimal.
umask_original = os.umask(0)
try:
config_file = os.fdopen(os.open(config_path, os.O_WRONLY | os.O_CREAT, mode), 'w')
finally:
os.umask(umask_original)
# Add the settings to the structure of the file, and lets write it out...
self._cfgparse.write(config_file)
config_file.close()
def get_config_filename(self):
return self._cfgfile
def get_config(self):
return self._cfgparse
def get_auth(self):
''' Returns username from the configfile. '''
return (self._cfgparse.get(self._section, 'username'), self._cfgparse.get(self._section, 'password'))
def get_hostname(self):
''' Returns hostname. '''
return self._cfgparse.get(self._section, 'hostname')

View File

@ -1,362 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
__copyright__ = 'Copyright 2013, Parag Baxi'
__license__ = 'Apache License 2.0'
""" Module that contains classes for setting up connections to QualysGuard API
and requesting data from it.
"""
import logging
import time
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from collections import defaultdict
import requests
import qualysapi.version
import qualysapi.api_methods
import qualysapi.api_actions
import qualysapi.api_actions as api_actions
# Setup module level logging.
logger = logging.getLogger(__name__)
try:
from lxml import etree
except ImportError as e:
logger.warning(
'Warning: Cannot consume lxml.builder E objects without lxml. Send XML strings for AM & WAS API calls.')
class QGConnector(api_actions.QGActions):
""" Qualys Connection class which allows requests to the QualysGuard API using HTTP-Basic Authentication (over SSL).
"""
def __init__(self, auth, server='qualysapi.qualys.com', proxies=None, max_retries=3):
# Read username & password from file, if possible.
self.auth = auth
# Remember QualysGuard API server.
self.server = server
# Remember rate limits per call.
self.rate_limit_remaining = defaultdict(int)
# api_methods: Define method algorithm in a dict of set.
# Naming convention: api_methods[api_version optional_blah] due to api_methods_with_trailing_slash testing.
self.api_methods = qualysapi.api_methods.api_methods
#
# Keep track of methods with ending slashes to autocorrect user when they forgot slash.
self.api_methods_with_trailing_slash = qualysapi.api_methods.api_methods_with_trailing_slash
self.proxies = proxies
logger.debug('proxies = \n%s' % proxies)
# Set up requests max_retries.
logger.debug('max_retries = \n%s' % max_retries)
self.session = requests.Session()
http_max_retries = requests.adapters.HTTPAdapter(max_retries=max_retries)
https_max_retries = requests.adapters.HTTPAdapter(max_retries=max_retries)
self.session.mount('http://', http_max_retries)
self.session.mount('https://', https_max_retries)
def __call__(self):
return self
def format_api_version(self, api_version):
""" Return QualysGuard API version for api_version specified.
"""
# Convert to int.
if type(api_version) == str:
api_version = api_version.lower()
if api_version[0] == 'v' and api_version[1].isdigit():
# Remove first 'v' in case the user typed 'v1' or 'v2', etc.
api_version = api_version[1:]
# Check for input matching Qualys modules.
if api_version in ('asset management', 'assets', 'tag', 'tagging', 'tags'):
# Convert to Asset Management API.
api_version = 'am'
elif api_version in ('am2'):
# Convert to Asset Management API v2
api_version = 'am2'
elif api_version in ('webapp', 'web application scanning', 'webapp scanning'):
# Convert to WAS API.
api_version = 'was'
elif api_version in ('pol', 'pc'):
# Convert PC module to API number 2.
api_version = 2
else:
api_version = int(api_version)
return api_version
def which_api_version(self, api_call):
""" Return QualysGuard API version for api_call specified.
"""
# Leverage patterns of calls to API methods.
if api_call.endswith('.php'):
# API v1.
return 1
elif api_call.startswith('api/2.0/'):
# API v2.
return 2
elif '/am/' in api_call:
# Asset Management API.
return 'am'
elif '/was/' in api_call:
# WAS API.
return 'was'
return False
def url_api_version(self, api_version):
""" Return base API url string for the QualysGuard api_version and server.
"""
# Set base url depending on API version.
if api_version == 1:
# QualysGuard API v1 url.
url = "https://%s/msp/" % (self.server,)
elif api_version == 2:
# QualysGuard API v2 url.
url = "https://%s/" % (self.server,)
elif api_version == 'was':
# QualysGuard REST v3 API url (Portal API).
url = "https://%s/qps/rest/3.0/" % (self.server,)
elif api_version == 'am':
# QualysGuard REST v1 API url (Portal API).
url = "https://%s/qps/rest/1.0/" % (self.server,)
elif api_version == 'am2':
# QualysGuard REST v1 API url (Portal API).
url = "https://%s/qps/rest/2.0/" % (self.server,)
else:
raise Exception("Unknown QualysGuard API Version Number (%s)" % (api_version,))
logger.debug("Base url =\n%s" % (url))
return url
def format_http_method(self, api_version, api_call, data):
""" Return QualysGuard API http method, with POST preferred..
"""
# Define get methods for automatic http request methodology.
#
# All API v2 requests are POST methods.
if api_version == 2:
return 'post'
elif api_version == 1:
if api_call in self.api_methods['1 post']:
return 'post'
else:
return 'get'
elif api_version == 'was':
# WAS API call.
# Because WAS API enables user to GET API resources in URI, let's chop off the resource.
# '/download/was/report/18823' --> '/download/was/report/'
api_call_endpoint = api_call[:api_call.rfind('/') + 1]
if api_call_endpoint in self.api_methods['was get']:
return 'get'
# Post calls with no payload will result in HTTPError: 415 Client Error: Unsupported Media Type.
if not data:
# No post data. Some calls change to GET with no post data.
if api_call_endpoint in self.api_methods['was no data get']:
return 'get'
else:
return 'post'
else:
# Call with post data.
return 'post'
else:
# Asset Management API call.
if api_call in self.api_methods['am get']:
return 'get'
else:
return 'post'
def preformat_call(self, api_call):
""" Return properly formatted QualysGuard API call.
"""
# Remove possible starting slashes or trailing question marks in call.
api_call_formatted = api_call.lstrip('/')
api_call_formatted = api_call_formatted.rstrip('?')
if api_call != api_call_formatted:
# Show difference
logger.debug('api_call post strip =\n%s' % api_call_formatted)
return api_call_formatted
def format_call(self, api_version, api_call):
""" Return properly formatted QualysGuard API call according to api_version etiquette.
"""
# Remove possible starting slashes or trailing question marks in call.
api_call = api_call.lstrip('/')
api_call = api_call.rstrip('?')
logger.debug('api_call post strip =\n%s' % api_call)
# Make sure call always ends in slash for API v2 calls.
if (api_version == 2 and api_call[-1] != '/'):
# Add slash.
logger.debug('Adding "/" to api_call.')
api_call += '/'
if api_call in self.api_methods_with_trailing_slash[api_version]:
# Add slash.
logger.debug('Adding "/" to api_call.')
api_call += '/'
return api_call
def format_payload(self, api_version, data):
""" Return appropriate QualysGuard API call.
"""
# Check if payload is for API v1 or API v2.
if (api_version in (1, 2)):
# Check if string type.
if type(data) == str:
# Convert to dictionary.
logger.debug('Converting string to dict:\n%s' % data)
# Remove possible starting question mark & ending ampersands.
data = data.lstrip('?')
data = data.rstrip('&')
# Convert to dictionary.
data = urlparse.parse_qs(data)
logger.debug('Converted:\n%s' % str(data))
elif api_version in ('am', 'was', 'am2'):
if type(data) == etree._Element:
logger.debug('Converting lxml.builder.E to string')
data = etree.tostring(data)
logger.debug('Converted:\n%s' % data)
return data
def request(self, api_call, data=None, api_version=None, http_method=None, concurrent_scans_retries=0,
concurrent_scans_retry_delay=0):
""" Return QualysGuard API response.
"""
logger.debug('api_call =\n%s' % api_call)
logger.debug('api_version =\n%s' % api_version)
logger.debug('data %s =\n %s' % (type(data), str(data)))
logger.debug('http_method =\n%s' % http_method)
logger.debug('concurrent_scans_retries =\n%s' % str(concurrent_scans_retries))
logger.debug('concurrent_scans_retry_delay =\n%s' % str(concurrent_scans_retry_delay))
concurrent_scans_retries = int(concurrent_scans_retries)
concurrent_scans_retry_delay = int(concurrent_scans_retry_delay)
#
# Determine API version.
# Preformat call.
api_call = self.preformat_call(api_call)
if api_version:
# API version specified, format API version inputted.
api_version = self.format_api_version(api_version)
else:
# API version not specified, determine automatically.
api_version = self.which_api_version(api_call)
#
# Set up base url.
url = self.url_api_version(api_version)
#
# Set up headers.
headers = {"X-Requested-With": "Parag Baxi QualysAPI (python) v%s" % (qualysapi.version.__version__,)}
logger.debug('headers =\n%s' % (str(headers)))
# Portal API takes in XML text, requiring custom header.
if api_version in ('am', 'was', 'am2'):
headers['Content-type'] = 'text/xml'
#
# Set up http request method, if not specified.
if not http_method:
http_method = self.format_http_method(api_version, api_call, data)
logger.debug('http_method =\n%s' % http_method)
#
# Format API call.
api_call = self.format_call(api_version, api_call)
logger.debug('api_call =\n%s' % (api_call))
# Append api_call to url.
url += api_call
#
# Format data, if applicable.
if data is not None:
data = self.format_payload(api_version, data)
# Make request at least once (more if concurrent_retry is enabled).
retries = 0
#
# set a warning threshold for the rate limit
rate_warn_threshold = 10
while retries <= concurrent_scans_retries:
# Make request.
logger.debug('url =\n%s' % (str(url)))
logger.debug('data =\n%s' % (str(data)))
logger.debug('headers =\n%s' % (str(headers)))
if http_method == 'get':
# GET
logger.debug('GET request.')
request = self.session.get(url, params=data, auth=self.auth, headers=headers, proxies=self.proxies)
else:
# POST
logger.debug('POST request.')
# Make POST request.
request = self.session.post(url, data=data, auth=self.auth, headers=headers, proxies=self.proxies)
logger.debug('response headers =\n%s' % (str(request.headers)))
#
# Remember how many times left user can make against api_call.
try:
self.rate_limit_remaining[api_call] = int(request.headers['x-ratelimit-remaining'])
logger.debug('rate limit for api_call, %s = %s' % (api_call, self.rate_limit_remaining[api_call]))
if (self.rate_limit_remaining[api_call] > rate_warn_threshold):
logger.debug('rate limit for api_call, %s = %s' % (api_call, self.rate_limit_remaining[api_call]))
elif (self.rate_limit_remaining[api_call] <= rate_warn_threshold) and (self.rate_limit_remaining[api_call] > 0):
logger.warning('Rate limit is about to being reached (remaining api calls = %s)' % self.rate_limit_remaining[api_call])
elif self.rate_limit_remaining[api_call] <= 0:
logger.critical('ATTENTION! RATE LIMIT HAS BEEN REACHED (remaining api calls = %s)!' % self.rate_limit_remaining[api_call])
except KeyError as e:
# Likely a bad api_call.
logger.debug(e)
pass
except TypeError as e:
# Likely an asset search api_call.
logger.debug(e)
pass
# Response received.
response = request.text
logger.debug('response text =\n%s' % (response))
# Keep track of how many retries.
retries += 1
# Check for concurrent scans limit.
if not ('<responseCode>INVALID_REQUEST</responseCode>' in response and
'<errorMessage>You have reached the maximum number of concurrent running scans' in response and
'<errorResolution>Please wait until your previous scans have completed</errorResolution>' in response):
# Did not hit concurrent scan limit.
break
else:
# Hit concurrent scan limit.
logger.critical(response)
# If trying again, delay next try by concurrent_scans_retry_delay.
if retries <= concurrent_scans_retries:
logger.warning('Waiting %d seconds until next try.' % concurrent_scans_retry_delay)
time.sleep(concurrent_scans_retry_delay)
# Inform user of how many retries.
logger.critical('Retry #%d' % retries)
else:
# Ran out of retries. Let user know.
print('Alert! Ran out of concurrent_scans_retries!')
logger.critical('Alert! Ran out of concurrent_scans_retries!')
return False
# Check to see if there was an error.
try:
request.raise_for_status()
except requests.HTTPError as e:
# Error
print('Error! Received a 4XX client error or 5XX server error response.')
print('Content = \n', response)
logger.error('Content = \n%s' % response)
print('Headers = \n', request.headers)
logger.error('Headers = \n%s' % str(request.headers))
request.raise_for_status()
if '<RETURN status="FAILED" number="2007">' in response:
print('Error! Your IP address is not in the list of secure IPs. Manager must include this IP (QualysGuard VM > Users > Security).')
print('Content = \n', response)
logger.error('Content = \n%s' % response)
print('Headers = \n', request.headers)
logger.error('Headers = \n%s' % str(request.headers))
return False
return response

View File

@ -1,20 +0,0 @@
''' Module to hold global settings reused throughout qualysapi. '''
from __future__ import absolute_import
__author__ = "Colin Bell <colin.bell@uwaterloo.ca>"
__copyright__ = "Copyright 2011-2013, University of Waterloo"
__license__ = "BSD-new"
import os
global defaults
global default_filename
if os.name == 'nt':
default_filename = "config.ini"
else:
default_filename = ".qcrc"
defaults = {'hostname': 'qualysapi.qualys.com',
'max_retries': '3'}

View File

@ -1,29 +0,0 @@
""" A set of utility functions for QualysConnect module. """
from __future__ import absolute_import
import logging
import qualysapi.config as qcconf
import qualysapi.connector as qcconn
import qualysapi.settings as qcs
__author__ = "Parag Baxi <parag.baxi@gmail.com> & Colin Bell <colin.bell@uwaterloo.ca>"
__copyright__ = "Copyright 2011-2013, Parag Baxi & University of Waterloo"
__license__ = 'Apache License 2.0'
# Set module level logger.
logger = logging.getLogger(__name__)
def connect(config_file=qcs.default_filename, section='info', remember_me=False, remember_me_always=False):
""" Return a QGAPIConnect object for v1 API pulling settings from config
file.
"""
# Retrieve login credentials.
conf = qcconf.QualysConnectConfig(filename=config_file, section=section, remember_me=remember_me,
remember_me_always=remember_me_always)
connect = qcconn.QGConnector(conf.get_auth(),
conf.get_hostname(),
conf.proxies,
conf.max_retries)
logger.info("Finished building connector.")
return connect

View File

@ -1,3 +0,0 @@
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
__pkgname__ = 'qualysapi'
__version__ = '5.0.3'

View File

@ -1,12 +0,0 @@
[metadata]
# This includes the license file in the wheel.
license_file = license
[bdist_wheel]
# This flag says to generate wheels that support both Python 2 and Python
# 3. If your code will not run unchanged on both Python 2 and 3, you will
# need to generate separate wheels for each Python version that you
# support. Removing this line (or setting universal to 0) will prevent
# bdist_wheel from trying to make a universal wheel. For more see:
# https://packaging.python.org/tutorials/distributing-packages/#wheels
universal=1

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python
from __future__ import absolute_import
import os
import sys
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
__author__ = 'Parag Baxi <parag.baxi@gmail.com>'
__copyright__ = 'Copyright 2011-2018, Parag Baxi'
__license__ = 'BSD-new'
# Make pyflakes happy.
__pkgname__ = None
__version__ = None
exec(compile(open('qualysapi/version.py').read(), 'qualysapi/version.py', 'exec'))
# A utility function to read the README file into the long_description field.
def read(fname):
""" Takes a filename and returns the contents of said file relative to
the current directory.
"""
return open(os.path.join(os.path.dirname(__file__), fname)).read()
setup(name=__pkgname__,
version=__version__,
author='Parag Baxi',
author_email='parag.baxi@gmail.com',
description='QualysGuard(R) Qualys API Package',
license='BSD-new',
keywords='Qualys QualysGuard API helper network security',
url='https://github.com/paragbaxi/qualysapi',
package_dir={'': '.'},
packages=['qualysapi', ],
# package_data={'qualysapi':['LICENSE']},
# scripts=['src/scripts/qhostinfo.py', 'src/scripts/qscanhist.py', 'src/scripts/qreports.py'],
long_description=read('README.md'),
classifiers=[
'Development Status :: 5 - Production/Stable',
'Topic :: Utilities',
'License :: OSI Approved :: Apache Software License',
'Intended Audience :: Developers',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
],
install_requires=[
'requests',
],
)

View File

@ -2,7 +2,7 @@ pandas==0.20.3
setuptools==40.4.3
pytz==2017.2
Requests==2.18.3
#qualysapi==4.1.0
qualysapi==5.0.4
lxml==4.1.1
bs4
jira