From bd58df3f955ed49a67ef3f707dbb16883b771991 Mon Sep 17 00:00:00 2001 From: secynic Date: Tue, 3 Dec 2013 18:15:30 -0600 Subject: [PATCH] v0.5.1 --- .gitignore | 3 +- CHANGES.rst | 15 + ipwhois/__init__.py | 2 +- ipwhois/ipwhois.py | 1160 +++++++++++++++++++-------------- ipwhois/tests/test_ipwhois.py | 69 +- ipwhois/utils.py | 58 +- 6 files changed, 775 insertions(+), 532 deletions(-) diff --git a/.gitignore b/.gitignore index ad2ae15..690d6be 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,5 @@ nosetests.xml .project .pydevproject -MANIFEST \ No newline at end of file +MANIFEST +.idea \ No newline at end of file diff --git a/CHANGES.rst b/CHANGES.rst index 86d1abd..11750c5 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,21 @@ Changelog ========= +0.5.1 (2013-12-03) +------------------ + +- Moved regex string literal declarations to NIC_WHOIS dict. +- Moved RWS parsing to own private functions. +- Moved base_net dict to global BASE_NET. +- More granular exception handling in lookup functions. +- Fixed email parsing for ARIN and RIPE RWS. +- Changed some 'if key in dict' statements to try/except for slight performance + increase in lookup functions. +- Removed generic exception handling (returned blank dict) on get_countries(). +- More PEP 8 reformatting. +- Minor docstring modifications. +- Added some unit tests to test_lookup() and test_lookup_rws(). + 0.5.0 (2013-11-20) ------------------ diff --git a/ipwhois/__init__.py b/ipwhois/__init__.py index e98c822..41da245 100644 --- a/ipwhois/__init__.py +++ b/ipwhois/__init__.py @@ -22,7 +22,7 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -__version__ = '0.5.0' +__version__ = '0.5.1' from .ipwhois import (IPWhois, IPDefinedError, ASNLookupError, WhoisLookupError, HostLookupError) diff --git a/ipwhois/ipwhois.py b/ipwhois/ipwhois.py index 27209e2..2fa1ba5 100644 --- a/ipwhois/ipwhois.py +++ b/ipwhois/ipwhois.py @@ -36,138 +36,144 @@ import dns.rdtypes.ANY.TXT # @UnusedImport IETF_RFC_REFERENCES = { - #IPv4 - 'RFC 1122, Section 3.2.1.3': - 'http://tools.ietf.org/html/rfc1122#section-3.2.1.3', - 'RFC 1918': 'http://tools.ietf.org/html/rfc1918', - 'RFC 3927': 'http://tools.ietf.org/html/rfc3927', - 'RFC 5736': 'http://tools.ietf.org/html/rfc5736', - 'RFC 5737': 'http://tools.ietf.org/html/rfc5737', - 'RFC 3068': 'http://tools.ietf.org/html/rfc3068', - 'RFC 2544': 'http://tools.ietf.org/html/rfc2544', - 'RFC 3171': 'http://tools.ietf.org/html/rfc3171', - 'RFC 919, Section 7': - 'http://tools.ietf.org/html/rfc919#section-7', - #IPv6 - 'RFC 4291, Section 2.7': - 'http://tools.ietf.org/html/rfc4291#section-2.7', - 'RFC 4291': 'http://tools.ietf.org/html/rfc4291', - 'RFC 4291, Section 2.5.2': - 'http://tools.ietf.org/html/rfc4291#section-2.5.2', - 'RFC 4291, Section 2.5.3': - 'http://tools.ietf.org/html/rfc4291#section-2.5.3', - 'RFC 4291, Section 2.5.6': - 'http://tools.ietf.org/html/rfc4291#section-2.5.6', - 'RFC 4291, Section 2.5.7': - 'http://tools.ietf.org/html/rfc4291#section-2.5.7', - 'RFC 4193': 'https://tools.ietf.org/html/rfc4193' - } + #IPv4 + 'RFC 1122, Section 3.2.1.3': + 'http://tools.ietf.org/html/rfc1122#section-3.2.1.3', + 'RFC 1918': 'http://tools.ietf.org/html/rfc1918', + 'RFC 3927': 'http://tools.ietf.org/html/rfc3927', + 'RFC 5736': 'http://tools.ietf.org/html/rfc5736', + 'RFC 5737': 'http://tools.ietf.org/html/rfc5737', + 'RFC 3068': 'http://tools.ietf.org/html/rfc3068', + 'RFC 2544': 'http://tools.ietf.org/html/rfc2544', + 'RFC 3171': 'http://tools.ietf.org/html/rfc3171', + 'RFC 919, Section 7': 'http://tools.ietf.org/html/rfc919#section-7', + #IPv6 + 'RFC 4291, Section 2.7': 'http://tools.ietf.org/html/rfc4291#section-2.7', + 'RFC 4291': 'http://tools.ietf.org/html/rfc4291', + 'RFC 4291, Section 2.5.2': + 'http://tools.ietf.org/html/rfc4291#section-2.5.2', + 'RFC 4291, Section 2.5.3': + 'http://tools.ietf.org/html/rfc4291#section-2.5.3', + 'RFC 4291, Section 2.5.6': + 'http://tools.ietf.org/html/rfc4291#section-2.5.6', + 'RFC 4291, Section 2.5.7': + 'http://tools.ietf.org/html/rfc4291#section-2.5.7', + 'RFC 4193': 'https://tools.ietf.org/html/rfc4193' +} NIC_WHOIS = { - 'arin': { - 'server': 'whois.arin.net', - 'url': ('http://whois.arin.net/rest/nets;q={0}?' - 'showDetails=true&showARIN=true'), - 'fields': { - 'name': '^(NetName):[^\S\n]+(?P.+)$', - 'description': - '^(OrgName|CustName):[^\S\n]+(?P.+)$', - 'country': '^(Country):[^\S\n]+(?P.+)$', - 'state': '^(StateProv):[^\S\n]+(?P.+)$', - 'city': '^(City):[^\S\n]+(?P.+)$', - 'address': '^(Address):[^\S\n]+(?P.+)$', - 'postal_code': - '^(PostalCode):[^\S\n]+(?P.+)$', - 'abuse_emails': - '^(OrgAbuseEmail):[^\S\n]+(?P.+)$', - 'tech_emails': - '^(OrgTechEmail):[^\S\n]+(?P.+)$', - 'created': '^(RegDate):[^\S\n]+(?P.+)$', - 'updated': '^(Updated):[^\S\n]+(?P.+)$' - }, - 'dt_format': '%Y-%m-%d', - 'dt_rws_format': '%Y-%m-%dT%H:%M:%S%z' - }, - 'ripencc': { - 'server': 'whois.ripe.net', - 'url': ('http://apps.db.ripe.net/whois/grs-search?' - 'query-string={0}&source=ripe-grs'), - 'fields': { - 'name': '^(netname):[^\S\n]+(?P.+)$', - 'description': '^(descr):[^\S\n]+(?P.+)$', - 'country': '^(country):[^\S\n]+(?P.+)$', - 'address': '^(address):[^\S\n]+(?P.+)$', - 'abuse_emails': ('^(abuse-mailbox:[^\S\n]+' - '(?P.+))|((?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*' - '@[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*)$'), - 'misc_emails': ('^(?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' - '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$') - } - }, - 'apnic': { - 'server': 'whois.apnic.net', - 'url': ('http://apps.db.ripe.net/whois/grs-search?' - 'query-string={0}&source=apnic-grs'), - 'fields': { - 'name': '^(netname):[^\S\n]+(?P.+)$', - 'description': '^(descr):[^\S\n]+(?P.+)$', - 'country': '^(country):[^\S\n]+(?P.+)$', - 'address': '^(address):[^\S\n]+(?P.+)$', - 'abuse_emails': ('^(abuse-mailbox:[^\S\n]+' - '(?P.+))|((?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*' - '@[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*)$'), - 'misc_emails': ('^(?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' - '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$'), - 'updated': - '^(changed):[^\S\n]+.*?(?P[0-9]{8})$' - }, - 'dt_format': '%Y%m%d' - }, - 'lacnic': { - 'server': 'whois.lacnic.net', - 'url': ('http://restfulwhoisv2.labs.lacnic.net/' - 'restfulwhois/ip/{0}'), - 'fields': { - 'description': '^(owner):[^\S\n]+(?P.+)$', - 'country': '^(country):[^\S\n]+(?P.+)$', - 'abuse_emails': ('^(abuse-mailbox:[^\S\n]+' - '(?P.+))|((?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*' - '@[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*)$'), - 'misc_emails': ('^(?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' - '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$'), - 'created': - '^(created):[^\S\n]+(?P[0-9]{8}).*$', - 'updated': - '^(changed):[^\S\n]+(?P[0-9]{8}).*$' - }, - 'dt_format': '%Y%m%d', - 'dt_rws_format': '%Y%m%d' - }, - 'afrinic': { - 'server': 'whois.afrinic.net', - 'url': ('http://apps.db.ripe.net/whois/grs-search?' - 'query-string={0}&source=afrinic-grs'), - 'fields': { - 'name': '^(netname):[^\S\n]+(?P.+)$', - 'description': '^(descr):[^\S\n]+(?P.+)$', - 'country': '^(country):[^\S\n]+(?P.+)$', - 'address': '^(address):[^\S\n]+(?P.+)$', - 'abuse_emails': ('^(abuse-mailbox:[^\S\n]+' - '(?P.+))|((?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*' - '@[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*)$'), - 'misc_emails': ('^(?!abuse-mailbox).+?:.*' - '[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' - '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$') - } - } - } + 'arin': { + 'server': 'whois.arin.net', + 'url': ( + 'http://whois.arin.net/rest/nets;q={0}?' + 'showDetails=true&showARIN=true' + ), + 'fields': { + 'name': r'^(NetName):[^\S\n]+(?P.+)$', + 'description': r'^(OrgName|CustName):[^\S\n]+(?P.+)$', + 'country': r'^(Country):[^\S\n]+(?P.+)$', + 'state': r'^(StateProv):[^\S\n]+(?P.+)$', + 'city': r'^(City):[^\S\n]+(?P.+)$', + 'address': r'^(Address):[^\S\n]+(?P.+)$', + 'postal_code': r'^(PostalCode):[^\S\n]+(?P.+)$', + 'abuse_emails': r'^(OrgAbuseEmail):[^\S\n]+(?P.+)$', + 'tech_emails': r'^(OrgTechEmail):[^\S\n]+(?P.+)$', + 'created': r'^(RegDate):[^\S\n]+(?P.+)$', + 'updated': r'^(Updated):[^\S\n]+(?P.+)$' + }, + 'dt_format': '%Y-%m-%d', + 'dt_rws_format': '%Y-%m-%dT%H:%M:%S%z' + }, + 'ripencc': { + 'server': 'whois.ripe.net', + 'url': ( + 'http://apps.db.ripe.net/whois/grs-search?' + 'query-string={0}&source=ripe-grs' + ), + 'fields': { + 'name': r'^(netname):[^\S\n]+(?P.+)$', + 'description': r'^(descr):[^\S\n]+(?P.+)$', + 'country': r'^(country):[^\S\n]+(?P.+)$', + 'address': r'^(address):[^\S\n]+(?P.+)$', + 'abuse_emails': ( + r'^(abuse-mailbox:[^\S\n]+(?P.+))|((?!abuse-mailbox).+?:' + '.*[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.[\w\-]' + '+)([^\S\n]+.*)*)$' + ), + 'misc_emails': ( + r'^(?!abuse-mailbox).+?:.*[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' + '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$' + ) + } + }, + 'apnic': { + 'server': 'whois.apnic.net', + 'url': ( + 'http://apps.db.ripe.net/whois/grs-search?' + 'query-string={0}&source=apnic-grs' + ), + 'fields': { + 'name': r'^(netname):[^\S\n]+(?P.+)$', + 'description': r'^(descr):[^\S\n]+(?P.+)$', + 'country': r'^(country):[^\S\n]+(?P.+)$', + 'address': r'^(address):[^\S\n]+(?P.+)$', + 'abuse_emails': ( + r'^(abuse-mailbox:[^\S\n]+(?P.+))|((?!abuse-mailbox).+?:' + '.*[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.[\w\-]' + '+)([^\S\n]+.*)*)$' + ), + 'misc_emails': ( + r'^(?!abuse-mailbox).+?:.*[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' + '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$' + ), + 'updated': r'^(changed):[^\S\n]+.*?(?P[0-9]{8})$' + }, + 'dt_format': '%Y%m%d' + }, + 'lacnic': { + 'server': 'whois.lacnic.net', + 'url': 'http://restfulwhoisv2.labs.lacnic.net/restfulwhois/ip/{0}', + 'fields': { + 'description': r'^(owner):[^\S\n]+(?P.+)$', + 'country': r'^(country):[^\S\n]+(?P.+)$', + 'abuse_emails': ( + r'^(abuse-mailbox:[^\S\n]+(?P.+))|((?!abuse-mailbox).+?:' + '.*[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.[\w\-]' + '+)([^\S\n]+.*)*)$' + ), + 'misc_emails': ( + r'^(?!abuse-mailbox).+?:.*[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' + '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$' + ), + 'created': r'^(created):[^\S\n]+(?P[0-9]{8}).*$', + 'updated': r'^(changed):[^\S\n]+(?P[0-9]{8}).*$' + }, + 'dt_format': '%Y%m%d', + 'dt_rws_format': '%Y%m%d' + }, + 'afrinic': { + 'server': 'whois.afrinic.net', + 'url': ( + 'http://apps.db.ripe.net/whois/grs-search?' + 'query-string={0}&source=afrinic-grs' + ), + 'fields': { + 'name': r'^(netname):[^\S\n]+(?P.+)$', + 'description': r'^(descr):[^\S\n]+(?P.+)$', + 'country': r'^(country):[^\S\n]+(?P.+)$', + 'address': r'^(address):[^\S\n]+(?P.+)$', + 'abuse_emails': ( + r'^(abuse-mailbox:[^\S\n]+(?P.+))|((?!abuse-mailbox).+?:' + '.*[^\S\n]+(?P[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.[\w\-]' + '+)([^\S\n]+.*)*)$' + ), + 'misc_emails': ( + r'^(?!abuse-mailbox).+?:.*[^\S\n]+(?P(?!abuse)[\w\-\.]+?@' + '[\w\-\.]+\.[\w\-]+)([^\S\n]+.*)*$' + ) + } + } +} CYMRU_WHOIS = 'whois.cymru.com' @@ -175,6 +181,22 @@ IPV6_DNS_ZONE = '{0}.origin6.asn.cymru.com' +BASE_NET = { + 'cidr': None, + 'name': None, + 'description': None, + 'country': None, + 'state': None, + 'city': None, + 'address': None, + 'postal_code': None, + 'abuse_emails': None, + 'tech_emails': None, + 'misc_emails': None, + 'created': None, + 'updated': None +} + class IPDefinedError(Exception): """ @@ -210,6 +232,10 @@ class IPWhois(): timeout: The default timeout for socket connections in seconds. proxy_opener: The urllib.request.OpenerDirector request for proxy support or None. + + Raises: + IPDefinedError: The address provided is defined (does not need to be + resolved). """ def __init__(self, address, timeout=5, proxy_opener=None): @@ -243,10 +269,11 @@ def __init__(self, address, timeout=5, proxy_opener=None): if is_defined[0]: - raise IPDefinedError(('IPv4 address %r is already defined as ' - '%r via %r.') % (self.address_str, - is_defined[1], - is_defined[2])) + raise IPDefinedError( + 'IPv4 address %r is already defined as %r via %r.' % ( + self.address_str, is_defined[1], is_defined[2] + ) + ) #Reverse the IPv4Address for the DNS ASN query. split = self.address_str.split('.') @@ -262,10 +289,11 @@ def __init__(self, address, timeout=5, proxy_opener=None): if is_defined[0]: - raise IPDefinedError(('IPv6 address %r is already defined as ' - '%r via %r.') % (self.address_str, - is_defined[1], - is_defined[2])) + raise IPDefinedError( + 'IPv6 address %r is already defined as %r via %r.' % ( + self.address_str, is_defined[1], is_defined[2] + ) + ) #Explode the IPv6Address to fill in any missing 0's. exploded = self.address.exploded @@ -294,9 +322,9 @@ def __init__(self, address, timeout=5, proxy_opener=None): def __repr__(self): - return 'IPWhois(%r, %r, %r)' % (self.address_str, - self.timeout, - self.opener) + return 'IPWhois(%r, %r, %r)' % ( + self.address_str, self.timeout, self.opener + ) def get_asn_dns(self): """ @@ -310,6 +338,9 @@ def get_asn_dns(self): asn_registry (String) - The assigned ASN registry. asn_cidr (String) - The assigned ASN CIDR. asn_country_code (String) - The assigned ASN country code. + + Raises: + ASNLookupError: The ASN lookup failed. """ try: @@ -319,9 +350,7 @@ def get_asn_dns(self): #Parse out the ASN information. temp = str(data[0]).split('|') - ret = {} - - ret['asn_registry'] = temp[3].strip(' \n') + ret = {'asn_registry': temp[3].strip(' \n')} if ret['asn_registry'] not in NIC_WHOIS.keys(): @@ -336,8 +365,9 @@ def get_asn_dns(self): except: - raise ASNLookupError('ASN lookup failed for %r.' % - self.address_str) + raise ASNLookupError( + 'ASN lookup failed for %r.' % self.address_str + ) def get_asn_whois(self, retry_count=3): """ @@ -355,6 +385,9 @@ def get_asn_whois(self, retry_count=3): asn_registry (String) - The assigned ASN registry. asn_cidr (String) - The assigned ASN CIDR. asn_country_code (String) - The assigned ASN country code. + + Raises: + ASNLookupError: The ASN lookup failed. """ try: @@ -365,8 +398,9 @@ def get_asn_whois(self, retry_count=3): conn.connect((CYMRU_WHOIS, 43)) #Query the Cymru whois server, and store the results. - conn.send((' -r -a -c -p -f -o %s%s' % (self.address_str, - '\r\n')).encode()) + conn.send(( + ' -r -a -c -p -f -o %s%s' % (self.address_str, '\r\n') + ).encode()) data = '' while True: @@ -383,9 +417,7 @@ def get_asn_whois(self, retry_count=3): #Parse out the ASN information. temp = str(data).split('|') - ret = {} - - ret['asn_registry'] = temp[4].strip(' \n') + ret = {'asn_registry': temp[4].strip(' \n')} if ret['asn_registry'] not in NIC_WHOIS.keys(): @@ -406,13 +438,15 @@ def get_asn_whois(self, retry_count=3): else: - raise ASNLookupError('ASN lookup failed for %r.' % - self.address_str) + raise ASNLookupError( + 'ASN lookup failed for %r.' % self.address_str + ) except: - raise ASNLookupError('ASN lookup failed for %r.' % - self.address_str) + raise ASNLookupError( + 'ASN lookup failed for %r.' % self.address_str + ) def get_whois(self, asn_registry='arin', retry_count=3): """ @@ -426,6 +460,9 @@ def get_whois(self, asn_registry='arin', retry_count=3): Returns: String: The raw whois data. + + Raises: + WhoisLookupError: The whois lookup failed. """ try: @@ -442,7 +479,7 @@ def get_whois(self, asn_registry='arin', retry_count=3): query = 'n + %s' % query #Query the whois server, and store the results. - conn.send((query).encode()) + conn.send(query.encode()) response = '' while True: @@ -472,13 +509,15 @@ def get_whois(self, asn_registry='arin', retry_count=3): else: - raise WhoisLookupError('Whois lookup failed for %r.' % - self.address_str) + raise WhoisLookupError( + 'Whois lookup failed for %r.' % self.address_str + ) except: - raise WhoisLookupError('Whois lookup failed for %r.' % - self.address_str) + raise WhoisLookupError( + 'Whois lookup failed for %r.' % self.address_str + ) def get_rws(self, url=None, retry_count=3): """ @@ -492,6 +531,9 @@ def get_rws(self, url=None, retry_count=3): Returns: Dictionary: The whois data in Json format. + + Raises: + WhoisLookupError: The whois RWS lookup failed. """ try: @@ -527,6 +569,9 @@ def get_host(self, retry_count=3): Returns: Tuple: hostname, aliaslist, ipaddrlist + + Raises: + HostLookupError: The host lookup failed. """ try: @@ -553,13 +598,15 @@ def get_host(self, retry_count=3): else: - raise HostLookupError('Host lookup failed for %r.' % - self.address_str) + raise HostLookupError( + 'Host lookup failed for %r.' % self.address_str + ) except: - raise HostLookupError('Host lookup failed for %r.' % - self.address_str) + raise HostLookupError( + 'Host lookup failed for %r.' % self.address_str + ) def lookup(self, inc_raw=False, retry_count=3): """ @@ -599,9 +646,9 @@ def lookup(self, inc_raw=False, retry_count=3): #Create the return dictionary. results = { - 'query': self.address_str, - 'nets': [], - 'raw': None + 'query': self.address_str, + 'nets': [], + 'raw': None } #Add the ASN information to the return dictionary. @@ -615,47 +662,30 @@ def lookup(self, inc_raw=False, retry_count=3): results['raw'] = response - #Create the network dictionary template. The start and end fields will - #be removed in the final returned dictionary. - base_net = { - 'cidr': None, - 'name': None, - 'description': None, - 'country': None, - 'state': None, - 'city': None, - 'address': None, - 'postal_code': None, - 'abuse_emails': None, - 'tech_emails': None, - 'misc_emails': None, - 'created': None, - 'updated': None, - 'start': None, - 'end': None - } - nets = [] if results['asn_registry'] == 'arin': #Iterate through all of the networks found, storing the CIDR value #and the start and end positions. - for match in re.finditer(r'^CIDR:[^\S\n]+(.+?,[^\S\n].+|.+)$', - response, - re.MULTILINE): + for match in re.finditer( + r'^CIDR:[^\S\n]+(.+?,[^\S\n].+|.+)$', + response, + re.MULTILINE + ): try: - net = base_net.copy() - net['cidr'] = ', '.join([ipaddress.ip_network( - c.strip() - ).__str__() for c in match.group(1).split(', ')]) + net = BASE_NET.copy() + net['cidr'] = ', '.join( + [ipaddress.ip_network(c.strip()).__str__() + for c in match.group(1).split(', ')] + ) net['start'] = match.start() net['end'] = match.end() nets.append(net) - except: + except ValueError: pass @@ -663,8 +693,11 @@ def lookup(self, inc_raw=False, retry_count=3): #Iterate through all of the networks found, storing the CIDR value #and the start and end positions. - for match in re.finditer((r'^(inetnum|inet6num):[^\S\n]+' - '(.+?,[^\S\n].+|.+)$'), response, re.MULTILINE): + for match in re.finditer( + r'^(inetnum|inet6num):[^\S\n]+(.+?,[^\S\n].+|.+)$', + response, + re.MULTILINE + ): try: @@ -683,13 +716,13 @@ def lookup(self, inc_raw=False, retry_count=3): temp.append(ipaddress.ip_network( addr.strip()).__str__()) - net = base_net.copy() + net = BASE_NET.copy() net['cidr'] = ', '.join(temp) net['start'] = match.start() net['end'] = match.end() nets.append(net) - except: + except ValueError: pass @@ -697,8 +730,11 @@ def lookup(self, inc_raw=False, retry_count=3): #Iterate through all of the networks found, storing the CIDR value #and the start and end positions. - for match in re.finditer((r'^(inetnum|inet6num):[^\S\n]+' - '((.+?)[^\S\n]-[^\S\n](.+)|.+)$'), response, re.MULTILINE): + for match in re.finditer( + r'^(inetnum|inet6num):[^\S\n]+((.+?)[^\S\n]-[^\S\n](.+)|.+)$', + response, + re.MULTILINE + ): try: @@ -709,21 +745,23 @@ def lookup(self, inc_raw=False, retry_count=3): ipaddress.ip_address(match.group(3).strip()), ipaddress.ip_address(match.group(4).strip()))) - cidr = ', '.join([i.__str__() - for i in ipaddress.collapse_addresses(addrs)]) + cidr = ', '.join( + [i.__str__() + for i in ipaddress.collapse_addresses(addrs)] + ) else: cidr = ipaddress.ip_network( match.group(2).strip()).__str__() - net = base_net.copy() + net = BASE_NET.copy() net['cidr'] = cidr net['start'] = match.start() net['end'] = match.end() nets.append(net) - except: + except (ValueError, TypeError): pass @@ -738,11 +776,12 @@ def lookup(self, inc_raw=False, retry_count=3): for field in NIC_WHOIS[results['asn_registry']]['fields']: - pattern = re.compile(r'' + + pattern = re.compile( NIC_WHOIS[results['asn_registry']]['fields'][field], - re.MULTILINE) + re.MULTILINE + ) - if end: + if end is not None: match = pattern.finditer(response, net['end'], end) @@ -756,10 +795,11 @@ def lookup(self, inc_raw=False, retry_count=3): if sub_end: - if field not in ('abuse_emails', - 'tech_emails', - 'misc_emails') and ( - sub_end != (m.start() - 1)): + if field not in ( + 'abuse_emails', + 'tech_emails', + 'misc_emails' + ) and (sub_end != (m.start() - 1)): break @@ -767,7 +807,7 @@ def lookup(self, inc_raw=False, retry_count=3): values.append(m.group('val').strip()) - except: + except AttributeError: values.append(m.group('val2').strip()) @@ -785,16 +825,15 @@ def lookup(self, inc_raw=False, retry_count=3): value = datetime.strptime( values[0], - NIC_WHOIS[ - results['asn_registry']]['dt_format'] - ).isoformat('T') + NIC_WHOIS[results['asn_registry']] + ['dt_format']).isoformat('T') else: values = list(set(values)) value = '\n'.join(values) - except: + except ValueError: value = None pass @@ -809,447 +848,608 @@ def lookup(self, inc_raw=False, retry_count=3): return results - def lookup_rws(self, inc_raw=False, retry_count=3): + def _lookup_rws_arin(self, response=None, retry_count=3): """ - The function for retrieving and parsing whois information for an IP - address via HTTP (Whois-RWS). - - NOTE: This should be faster than IPWhois.lookup(), but may not be as - reliable. APNIC, LACNIC, and AFRINIC do not have a Whois-RWS - service yet. We have to rely on the Ripe RWS service, which does - not contain all of the data we need. + The function for retrieving and parsing whois information for an ARIN + IP address via HTTP (Whois-RWS). Args: - inc_raw: Boolean for whether to include the raw whois results in - the returned dictionary. + response: The dictionary containing whois information to parse. retry_count: The number of times to retry in case socket errors, timeouts, connection resets, etc. are encountered. Returns: - Dictionary: A dictionary containing the following keys: - query (String) - The IP address. - asn (String) - The Autonomous System Number. - asn_date (String) - The ASN Allocation date. - asn_registry (String) - The assigned ASN registry. - asn_cidr (String) - The assigned ASN CIDR. - asn_country_code (String) - The assigned ASN country code. - nets (List) - Dictionaries containing network information - which consists of the fields listed in the NIC_WHOIS - dictionary. Certain IPs have more granular network - listings, hence the need for a list object. - raw (Dictionary) - Whois results in Json format if the - inc_raw parameter is True. + List: Dictionaries containing network information which consists + of the fields listed in the NIC_WHOIS dictionary. Certain IPs + have more granular network listings, hence the need for a list + object. """ - #Attempt to resolve ASN info via Cymru. DNS is faster, try that first. + nets = [] + try: - asn_data = self.get_asn_dns() + net_list = response['nets']['net'] - except ASNLookupError: + if not isinstance(net_list, list): - asn_data = self.get_asn_whois(retry_count) + net_list = [net_list] - #Create the return dictionary. - results = { - 'query': self.address_str, - 'nets': [], - 'raw': None - } + except KeyError: - #Add the ASN information to the return dictionary. - results.update(asn_data) + net_list = [] - #Retrieve the whois data. - try: + for n in net_list: - response = self.get_rws(NIC_WHOIS[results['asn_registry']] - ['url'].format(self.address_str), retry_count) + if 'orgRef' in n and n['orgRef']['@handle'] in ('ARIN', 'VR-ARIN'): - #If the query failed, try the radb-grs source. - except WhoisLookupError: + continue - response = self.get_rws(('http://apps.db.ripe.net/whois/grs-search' - '?query-string={0}&source=radb-grs').format(self.address_str), - retry_count) + addrs = [] + net = BASE_NET.copy() - #If inc_raw parameter is True, add the response to return dictionary. - if inc_raw: + try: - results['raw'] = response + addrs.extend(ipaddress.summarize_address_range( + ipaddress.ip_address(n['startAddress']['$'].strip()), + ipaddress.ip_address(n['endAddress']['$'].strip()))) - #Create the network dictionary template. - base_net = { - 'cidr': None, - 'name': None, - 'description': None, - 'country': None, - 'state': None, - 'city': None, - 'address': None, - 'postal_code': None, - 'abuse_emails': None, - 'tech_emails': None, - 'misc_emails': None, - 'created': None, - 'updated': None - } + net['cidr'] = ', '.join( + [i.__str__() + for i in ipaddress.collapse_addresses(addrs)] + ) - nets = [] + except (KeyError, ValueError, TypeError): - if results['asn_registry'] == 'arin': + pass try: - net_list = response['nets']['net'] + net['created'] = n['registrationDate']['$'].strip() - if not isinstance(net_list, list): + except KeyError: - net_list = [net_list] + pass - for n in net_list: + try: - if 'orgRef' in n and n['orgRef']['@handle'] in ('ARIN', - 'VR-ARIN'): + net['updated'] = n['updateDate']['$'].strip() - continue + except KeyError: - addrs = [] - addrs.extend(ipaddress.summarize_address_range( - ipaddress.ip_address(n['startAddress']['$'].strip()), - ipaddress.ip_address(n['endAddress']['$'].strip()))) + pass - net = base_net.copy() - net['cidr'] = ', '.join([i.__str__() - for i in ipaddress.collapse_addresses(addrs)]) + try: - if 'registrationDate' in n: + net['name'] = n['name']['$'].strip() - net['created'] = n['registrationDate']['$'].strip() + except KeyError: - if 'updateDate' in n: + pass - net['updated'] = n['updateDate']['$'].strip() + ref = None + if 'customerRef' in n: - if 'name' in n: + ref = ['customerRef', 'customer'] - net['name'] = n['name']['$'].strip() + elif 'orgRef' in n: - ref = None - if 'customerRef' in n: + ref = ['orgRef', 'org'] - ref = ['customerRef', 'customer'] + if ref is not None: - elif 'orgRef' in n: + try: - ref = ['orgRef', 'org'] + net['description'] = n[ref[0]]['@name'].strip() - if ref is not None: + except KeyError: - net['description'] = n[ref[0]]['@name'].strip() - ref_url = n[ref[0]]['$'].strip() + '?showPocs=true' + pass - try: + try: - ref_response = self.get_rws(ref_url, retry_count) + ref_url = n[ref[0]]['$'].strip() + '?showPocs=true' + ref_response = self.get_rws(ref_url, retry_count) - except WhoisLookupError: + except (KeyError, WhoisLookupError): - nets.append(net) - continue + nets.append(net) + continue - if 'streetAddress' in ref_response[ref[1]]: + try: - addr_list = ref_response[ref[1]][ - 'streetAddress']['line'] + addr_list = ( + ref_response[ref[1]]['streetAddress']['line'] + ) - if not isinstance(addr_list, list): + if not isinstance(addr_list, list): - addr_list = [addr_list] + addr_list = [addr_list] - net['address'] = '\n'.join([line['$'].strip() - for line in addr_list]) + net['address'] = '\n'.join( + [line['$'].strip() for line in addr_list] + ) - if 'postalCode' in ref_response[ref[1]]: + except KeyError: - net['postal_code'] = ref_response[ref[1]][ - 'postalCode']['$'] + pass - if 'city' in ref_response[ref[1]]: + try: - net['city'] = ref_response[ref[1]]['city']['$'] + net['postal_code'] = ( + ref_response[ref[1]]['postalCode']['$'] + ) - if 'iso3166-1' in ref_response[ref[1]]: + except KeyError: - net['country'] = ref_response[ref[1]]['iso3166-1'][ - 'code2']['$'] + pass - if 'iso3166-2' in ref_response[ref[1]]: + try: - net['state'] = ref_response[ref[1]]['iso3166-2'][ - '$'] + net['city'] = ref_response[ref[1]]['city']['$'] - if 'pocs' in ref_response[ref[1]]: + except KeyError: - for poc in ref_response[ref[1]]['pocs'][ - 'pocLinkRef']: + pass - if poc['@description'] in ('Abuse', 'Tech'): + try: - try: + net['country'] = ( + ref_response[ref[1]]['iso3166-1']['code2']['$'] + ) - poc_url = poc['$'] - poc_response = self.get_rws(poc_url, - retry_count) + except KeyError: - net['%s_emails' % poc['@description' - ].lower()] = poc_response['poc'][ - 'emails']['email']['$'].strip() + pass - except WhoisLookupError: + try: - pass + net['state'] = ( + ref_response[ref[1]]['iso3166-2']['$'] + ) - nets.append(net) + except KeyError: - except: + pass - pass + try: - elif results['asn_registry'] == 'lacnic': + for poc in ( + ref_response[ref[1]]['pocs']['pocLinkRef'] + ): - try: + if poc['@description'] in ('Abuse', 'Tech'): - addrs = [] - addrs.extend(ipaddress.summarize_address_range( - ipaddress.ip_address(response['startAddress'].strip()), - ipaddress.ip_address(response['endAddress'].strip()))) + poc_url = poc['$'] + poc_response = self.get_rws( + poc_url, + retry_count + ) - net = base_net.copy() - net['cidr'] = ', '.join([i.__str__() - for i in ipaddress.collapse_addresses(addrs)]) + emails = poc_response['poc']['emails']['email'] - net['country'] = response['country'].strip() + if not isinstance(emails, list): - events = response['events'] + emails = [emails] - if not isinstance(events, list): + temp = [] - events = [events] + for e in emails: - for ev in events: + temp.append(e['$'].strip()) - if 'eventAction' in ev and 'eventDate' in ev: + key = '%s_emails' % poc['@description'].lower() - if ev['eventAction'] == 'registration': + net[key] = ( + '\n'.join(set(temp)) if len(temp) > 0 else None + ) - tmp = ev['eventDate'].strip() + except (KeyError, WhoisLookupError): - value = datetime.strptime( - tmp, - NIC_WHOIS[ - results['asn_registry']]['dt_rws_format'] - ).isoformat('T') + pass - net['created'] = value + nets.append(net) - elif ev['eventAction'] == 'last changed': + return nets - tmp = ev['eventDate'].strip() + def _lookup_rws_lacnic(self, response=None): + """ + The function for retrieving and parsing whois information for a LACNIC + IP address via HTTP (Whois-RWS). - value = datetime.strptime( - tmp, - NIC_WHOIS[ - results['asn_registry']]['dt_rws_format'] - ).isoformat('T') + Args: + response: The dictionary containing whois information to parse. + + Returns: + List: Dictionaries containing network information which consists + of the fields listed in the NIC_WHOIS dictionary. Certain IPs + have more granular network listings, hence the need for a list + object. + """ - net['updated'] = value + addrs = [] + net = BASE_NET.copy() - entities = response['entities'] + try: - if not isinstance(entities, list): + addrs.extend(ipaddress.summarize_address_range( + ipaddress.ip_address(response['startAddress'].strip()), + ipaddress.ip_address(response['endAddress'].strip()))) - entities = [entities] + net['cidr'] = ', '.join( + [i.__str__() + for i in ipaddress.collapse_addresses(addrs)] + ) - for en in entities: + except (KeyError, ValueError, TypeError): - if en['roles'][0] == 'registrant': + pass - temp = en['vcardArray'][1] + try: - for t in temp: + net['country'] = response['country'].strip() - if t[0] == 'fn': + except KeyError: - net['name'] = t[3].strip() + pass - elif t[0] == 'org': + try: - net['description'] = t[3][0].strip() + events = response['events'] - elif t[0] == 'adr': + if not isinstance(events, list): - net['address'] = t[1]['label'].strip() + events = [events] - elif t[0] == 'email': + except KeyError: - net['misc_emails'] = t[3].strip() + events = [] - elif en['roles'][0] == 'abuse': + for ev in events: - temp = en['vcardArray'][1] + try: - for t in temp: + if ev['eventAction'] == 'registration': - if t[0] == 'email': + tmp = ev['eventDate'].strip() - net['abuse_emails'] = t[3].strip() + value = datetime.strptime( + tmp, + NIC_WHOIS['lacnic']['dt_rws_format'] + ).isoformat('T') - elif en['roles'][0] == 'tech': + net['created'] = value - temp = en['vcardArray'][1] + elif ev['eventAction'] == 'last changed': - for t in temp: + tmp = ev['eventDate'].strip() - if t[0] == 'email': + value = datetime.strptime( + tmp, + NIC_WHOIS['lacnic']['dt_rws_format'] + ).isoformat('T') - net['tech_emails'] = t[3].strip() - nets.append(net) + net['updated'] = value - except: + except (KeyError, ValueError): pass - else: + try: + + entities = response['entities'] + + if not isinstance(entities, list): + + entities = [entities] + + except KeyError: + + entities = [] + + for en in entities: try: - object_list = response['whois-resources']['objects']['object'] + if en['roles'][0] == 'registrant': - if not isinstance(object_list, list): + temp = en['vcardArray'][1] - object_list = [object_list] + for t in temp: - ripe_abuse_emails = [] - ripe_misc_emails = [] + if t[0] == 'fn': - for n in object_list: + net['name'] = t[3].strip() - if n['type'] == 'organisation': + elif t[0] == 'org': - for attr in n['attributes']['attribute']: + net['description'] = t[3][0].strip() - if attr['name'] == 'abuse-mailbox': + elif t[0] == 'adr': - ripe_abuse_emails.append(attr['value'].strip()) + net['address'] = t[1]['label'].strip() - elif attr['name'] == 'e-mail': + elif t[0] == 'email': - ripe_misc_emails.append(attr['value'].strip()) + net['misc_emails'] = t[3].strip() - if n['type'] in ('inetnum', 'inet6num', 'route', 'route6'): + elif en['roles'][0] == 'abuse': - net = base_net.copy() + temp = en['vcardArray'][1] - for attr in n['attributes']['attribute']: + for t in temp: - if attr['name'] in ('inetnum', 'inet6num'): + if t[0] == 'email': - ipr = attr['value'].strip() - ip_range = ipr.split(' - ') + net['abuse_emails'] = t[3].strip() - try: + elif en['roles'][0] == 'tech': - if len(ip_range) > 1: + temp = en['vcardArray'][1] - addrs = [] - addrs.extend( - ipaddress.summarize_address_range( - ipaddress.ip_address( - ip_range[0]), - ipaddress.ip_address( - ip_range[1]))) + for t in temp: - cidr = ', '.join([i.__str__() - for i in ipaddress.\ - collapse_addresses(addrs)]) + if t[0] == 'email': - else: + net['tech_emails'] = t[3].strip() - cidr = ipaddress.ip_network( - ip_range[0]).__str__() + except (KeyError, IndexError): - net['cidr'] = cidr + pass - except: + return [net] - pass + def _lookup_rws_ripe(self, response=None): + """ + The function for retrieving and parsing whois information for a RIPE + IP address via HTTP (Whois-RWS). - elif attr['name'] in ('route', 'route6'): + Args: + response: The dictionary containing whois information to parse. - ipr = attr['value'].strip() - ip_ranges = ipr.split(', ') + Returns: + List: Dictionaries containing network information which consists + of the fields listed in the NIC_WHOIS dictionary. Certain IPs + have more granular network listings, hence the need for a list + object. + """ - try: + nets = [] - net['cidr'] = ', '.join( - ipaddress.ip_network(r).__str__() - for r in ip_ranges) + try: - except: + object_list = response['whois-resources']['objects']['object'] - pass + if not isinstance(object_list, list): - elif attr['name'] == 'netname': + object_list = [object_list] - net['name'] = attr['value'].strip() + except KeyError: - elif attr['name'] == 'descr': + object_list = [] - if net['description']: + ripe_abuse_emails = [] + ripe_misc_emails = [] - net['description'] += '\n%s' % ( - attr['value'].strip()) + for n in object_list: - else: + try: + + if n['type'] == 'organisation': + + for attr in n['attributes']['attribute']: + + if attr['name'] == 'abuse-mailbox': - net['description'] = attr['value'].strip() + ripe_abuse_emails.append(attr['value'].strip()) - elif attr['name'] == 'country': + elif attr['name'] == 'e-mail': - net['country'] = attr['value'].strip() + ripe_misc_emails.append(attr['value'].strip()) - elif attr['name'] == 'address': + elif n['type'] in ('inetnum', 'inet6num', 'route', 'route6'): - if net['address']: + net = BASE_NET.copy() - net['address'] += '\n%s' % ( - attr['value'].strip()) + for attr in n['attributes']['attribute']: + + if attr['name'] in ('inetnum', 'inet6num'): + + ipr = attr['value'].strip() + ip_range = ipr.split(' - ') + + try: + + if len(ip_range) > 1: + + addrs = [] + addrs.extend( + ipaddress.summarize_address_range( + ipaddress.ip_address( + ip_range[0]), + ipaddress.ip_address( + ip_range[1]) + ) + ) + + cidr = ', '.join( + [i.__str__() + for i in ipaddress. + collapse_addresses(addrs)] + ) else: - net['address'] = attr['value'].strip() + cidr = ipaddress.ip_network( + ip_range[0] + ).__str__() + + net['cidr'] = cidr + + except (ValueError, TypeError): + + pass + + elif attr['name'] in ('route', 'route6'): + + ipr = attr['value'].strip() + ip_ranges = ipr.split(', ') + + try: + + net['cidr'] = ', '.join( + ipaddress.ip_network(r).__str__() + for r in ip_ranges + ) + + except ValueError: + + pass + + elif attr['name'] == 'netname': + + net['name'] = attr['value'].strip() + + elif attr['name'] == 'descr': + + if net['description'] is not None: + + net['description'] += '\n%s' % ( + attr['value'].strip() + ) - nets.append(net) + else: - #This is nasty. Since RIPE RWS doesn't provide a granular - #contact to network relationship, we apply to all networks. - if len(ripe_abuse_emails) > 0 or len(ripe_misc_emails) > 0: + net['description'] = attr['value'].strip() - abuse = ('\n'.join(ripe_abuse_emails) if ripe_abuse_emails - else None) - misc = ('\n'.join(ripe_misc_emails) if ripe_misc_emails - else None) + elif attr['name'] == 'country': - for net in nets: + net['country'] = attr['value'].strip() - net['abuse_emails'] = abuse - net['misc_emails'] = misc + elif attr['name'] == 'address': - except: + if net['address'] is not None: + + net['address'] += '\n%s' % ( + attr['value'].strip() + ) + + else: + + net['address'] = attr['value'].strip() + + nets.append(net) + + except KeyError: pass + #This is nasty. Since RIPE RWS doesn't provide a granular + #contact to network relationship, we apply to all networks. + if len(ripe_abuse_emails) > 0 or len(ripe_misc_emails) > 0: + + abuse = ( + '\n'.join(set(ripe_abuse_emails)) + if len(ripe_abuse_emails) > 0 else None + ) + misc = ( + '\n'.join(set(ripe_misc_emails)) + if len(ripe_misc_emails) > 0 else None + ) + + for net in nets: + + net['abuse_emails'] = abuse + net['misc_emails'] = misc + + return nets + + def lookup_rws(self, inc_raw=False, retry_count=3): + """ + The function for retrieving and parsing whois information for an IP + address via HTTP (Whois-RWS). + + NOTE: This should be faster than IPWhois.lookup(), but may not be as + reliable. APNIC and AFRINIC do not have a Whois-RWS + service yet. We have to rely on the Ripe RWS service, which does + not contain all of the data we need. LACNIC RWS is in beta v2. + + Args: + inc_raw: Boolean for whether to include the raw whois results in + the returned dictionary. + retry_count: The number of times to retry in case socket errors, + timeouts, connection resets, etc. are encountered. + + Returns: + Dictionary: A dictionary containing the following keys: + query (String) - The IP address. + asn (String) - The Autonomous System Number. + asn_date (String) - The ASN Allocation date. + asn_registry (String) - The assigned ASN registry. + asn_cidr (String) - The assigned ASN CIDR. + asn_country_code (String) - The assigned ASN country code. + nets (List) - Dictionaries containing network information + which consists of the fields listed in the NIC_WHOIS + dictionary. Certain IPs have more granular network + listings, hence the need for a list object. + raw (Dictionary) - Whois results in Json format if the + inc_raw parameter is True. + """ + + #Attempt to resolve ASN info via Cymru. DNS is faster, try that first. + try: + + asn_data = self.get_asn_dns() + + except ASNLookupError: + + asn_data = self.get_asn_whois(retry_count) + + #Create the return dictionary. + results = { + 'query': self.address_str, + 'nets': [], + 'raw': None + } + + #Add the ASN information to the return dictionary. + results.update(asn_data) + + #Retrieve the whois data. + try: + + response = self.get_rws( + NIC_WHOIS[results['asn_registry']]['url'].format( + self.address_str), + retry_count + ) + + #If the query failed, try the radb-grs source. + except WhoisLookupError: + + response = self.get_rws(( + 'http://apps.db.ripe.net/whois/grs-search' + '?query-string={0}&source=radb-grs').format(self.address_str), + retry_count + ) + + #If inc_raw parameter is True, add the response to return dictionary. + if inc_raw: + + results['raw'] = response + + if results['asn_registry'] == 'arin': + + nets = self._lookup_rws_arin(response, retry_count) + + elif results['asn_registry'] == 'lacnic': + + nets = self._lookup_rws_lacnic(response) + + else: + + nets = self._lookup_rws_ripe(response) + #Add the networks to the return dictionary. results['nets'] = nets diff --git a/ipwhois/tests/test_ipwhois.py b/ipwhois/tests/test_ipwhois.py index a8c8783..1864646 100644 --- a/ipwhois/tests/test_ipwhois.py +++ b/ipwhois/tests/test_ipwhois.py @@ -87,27 +87,60 @@ def test_get_host(self): self.fail('Unexpected exception raised: %r' % e) def test_lookup(self): - result = IPWhois('74.125.225.229') - try: - self.assertIsInstance(result.lookup(), dict) - except (ASNLookupError, WhoisLookupError): - pass - except AssertionError as e: - raise e - except Exception as e: - self.fail('Unexpected exception raised: %r' % e) + + ips = [ + '74.125.225.229', # ARIN + '2001:4860:4860::8888', + '62.239.237.1', # RIPE + '2a00:2381:ffff::1', + '210.107.73.73', # APNIC + '2001:240:10c:1::ca20:9d1d', + '200.57.141.161', # LACNIC + '2801:10:c000::', + '196.11.240.215', # AFRINIC + '2001:43f8:7b0::' + ] + + for ip in ips: + + result = IPWhois(ip) + try: + self.assertIsInstance(result.lookup(), dict) + except (ASNLookupError, WhoisLookupError): + pass + except AssertionError as e: + raise e + except Exception as e: + self.fail('Unexpected exception raised: %r' % e) def test_lookup_rws(self): from urllib import request - result = IPWhois('74.125.225.229') - try: - self.assertIsInstance(result.lookup_rws(), dict) - except (ASNLookupError, WhoisLookupError): - pass - except AssertionError as e: - raise e - except Exception as e: - self.fail('Unexpected exception raised: %r' % e) + + ips = [ + '74.125.225.229', # ARIN + '2001:4860:4860::8888', + '62.239.237.1', # RIPE + '2a00:2381:ffff::1', + '210.107.73.73', # APNIC + '2001:240:10c:1::ca20:9d1d', + '200.57.141.161', # LACNIC + '2801:10:c000::', + '196.11.240.215', # AFRINIC + '2001:43f8:7b0::' + ] + + for ip in ips: + + result = IPWhois(ip) + try: + self.assertIsInstance(result.lookup_rws(), dict) + except (ASNLookupError, WhoisLookupError): + pass + except AssertionError as e: + raise e + except Exception as e: + self.fail('Unexpected exception raised: %r' % e) + handler = request.ProxyHandler({'http': 'http://0.0.0.0:80/'}) opener = request.build_opener(handler) result = IPWhois('74.125.225.229', 0, opener) diff --git a/ipwhois/utils.py b/ipwhois/utils.py index a8b3117..ea4c285 100644 --- a/ipwhois/utils.py +++ b/ipwhois/utils.py @@ -41,51 +41,45 @@ def get_countries(): #Initialize the countries dictionary. countries = {} - try: + #Set the data directory based on if the script is a frozen executable. + if sys.platform == 'win32' and getattr(sys, 'frozen', False): - #Set the data directory based on if the script is a frozen executable. - if sys.platform == 'win32' and getattr(sys, 'frozen', False): + data_dir = path.dirname(sys.executable) - data_dir = path.dirname(sys.executable) + else: - else: + data_dir = path.dirname(__file__) - data_dir = path.dirname(__file__) + #Create the country codes file object. + f = open(str(data_dir) + '/data/iso_3166-1_list_en.xml', 'r') - #Create the country codes file object. - f = open(str(data_dir) + '/data/iso_3166-1_list_en.xml', 'r') + #Read the file. + data = f.read() - #Read the file. - data = f.read() + #Check if there is data. + if not data: - #Check if there is data. - if not data: - - return {} - - #Parse the data to get the DOM. - dom = parseString(data) - - #Retrieve the country entries. - entries = dom.getElementsByTagName('ISO_3166-1_Entry') + return {} - #Iterate through the entries and add to the countries dictionary. - for entry in entries: + #Parse the data to get the DOM. + dom = parseString(data) - #Retrieve the country code and name from the DOM. - code = entry.getElementsByTagName( - 'ISO_3166-1_Alpha-2_Code_element')[0].firstChild.data - name = entry.getElementsByTagName( - 'ISO_3166-1_Country_name')[0].firstChild.data + #Retrieve the country entries. + entries = dom.getElementsByTagName('ISO_3166-1_Entry') - #Add to the countries dictionary. - countries[code] = name.title() + #Iterate through the entries and add to the countries dictionary. + for entry in entries: - return countries + #Retrieve the country code and name from the DOM. + code = entry.getElementsByTagName( + 'ISO_3166-1_Alpha-2_Code_element')[0].firstChild.data + name = entry.getElementsByTagName( + 'ISO_3166-1_Country_name')[0].firstChild.data - except: + #Add to the countries dictionary. + countries[code] = name.title() - return {} + return countries def ipv4_is_defined(address):