# Copyright (c) 2013, 2014, 2015 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
if sys.version_info >= (3, 3):
from ipaddress import (ip_address,
IPv4Address,
IPv6Address,
ip_network,
summarize_address_range,
collapse_addresses)
else:
from ipaddr import (IPAddress as ip_address,
IPv4Address,
IPv6Address,
IPNetwork as ip_network,
summarize_address_range,
collapse_address_list as collapse_addresses)
import socket
import dns.resolver
import re
import copy
import json
from .utils import ipv4_is_defined, ipv6_is_defined, unique_everseen
try:
from urllib.request import (OpenerDirector,
ProxyHandler,
build_opener,
Request)
except ImportError:
from urllib2 import (OpenerDirector,
ProxyHandler,
build_opener,
Request)
from time import sleep
from datetime import datetime
# Import the dnspython3 rdtypes to fix the dynamic import problem when frozen.
import dns.rdtypes.ANY.TXT # @UnusedImport
IETF_RFC_REFERENCES = {
# IPv4
'RFC 1122, Section 3.2.1.3':
'http://tools.ietf.org/html/rfc1122#section-3.2.1.3',
'RFC 1918': 'http://tools.ietf.org/html/rfc1918',
'RFC 3927': 'http://tools.ietf.org/html/rfc3927',
'RFC 5736': 'http://tools.ietf.org/html/rfc5736',
'RFC 5737': 'http://tools.ietf.org/html/rfc5737',
'RFC 3068': 'http://tools.ietf.org/html/rfc3068',
'RFC 2544': 'http://tools.ietf.org/html/rfc2544',
'RFC 3171': 'http://tools.ietf.org/html/rfc3171',
'RFC 919, Section 7': 'http://tools.ietf.org/html/rfc919#section-7',
# IPv6
'RFC 4291, Section 2.7': 'http://tools.ietf.org/html/rfc4291#section-2.7',
'RFC 4291': 'http://tools.ietf.org/html/rfc4291',
'RFC 4291, Section 2.5.2':
'http://tools.ietf.org/html/rfc4291#section-2.5.2',
'RFC 4291, Section 2.5.3':
'http://tools.ietf.org/html/rfc4291#section-2.5.3',
'RFC 4291, Section 2.5.6':
'http://tools.ietf.org/html/rfc4291#section-2.5.6',
'RFC 4291, Section 2.5.7':
'http://tools.ietf.org/html/rfc4291#section-2.5.7',
'RFC 4193': 'https://tools.ietf.org/html/rfc4193'
}
NIC_WHOIS = {
'arin': {
'server': 'whois.arin.net',
'url': (
'http://whois.arin.net/rest/nets;q={0}?'
'showDetails=true&showARIN=true'
),
'fields': {
'name': r'(NetName):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(NetHandle):[^\S\n]+(?P<val>.+?)\n',
'description': r'(OrgName|CustName):[^\S\n]+(?P<val>.+?)'
'(?=(\n\S):?)',
'country': r'(Country):[^\S\n]+(?P<val>.+?)\n',
'state': r'(StateProv):[^\S\n]+(?P<val>.+?)\n',
'city': r'(City):[^\S\n]+(?P<val>.+?)\n',
'address': r'(Address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'postal_code': r'(PostalCode):[^\S\n]+(?P<val>.+?)\n',
'abuse_emails': r'(OrgAbuseEmail):[^\S\n]+(?P<val>.+?)\n',
'tech_emails': r'(OrgTechEmail):[^\S\n]+(?P<val>.+?)\n',
'created': r'(RegDate):[^\S\n]+(?P<val>.+?)\n',
'updated': r'(Updated):[^\S\n]+(?P<val>.+?)\n',
},
'dt_format': '%Y-%m-%d',
'dt_rws_format': '%Y-%m-%dT%H:%M:%S%z'
},
'ripencc': {
'server': 'whois.ripe.net',
'url': 'http://rest.db.ripe.net/search.json?query-string={0}',
'fields': {
'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'abuse_emails': (
r'(abuse-mailbox:[^\S\n]+(?P<val>.+?))\n|(((?!abuse-mailbox).+'
'?:.*?[^\S\n]+(?P<val2>[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.'
'[\w\-]+)([^\S\n]+.*?)*?)\n)'
),
'misc_emails': (
r'(?!abuse-mailbox).+?:.*?[^\S\n]+(?P<val>(?!abuse)[\w\-\.]+?@'
'[\w\-\.]+\.[\w\-]+)([^\S\n]+.*?)*?\n'
),
'created': (
r'(created):[^\S\n]+(?P<val>[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]'
'{2}:[0-9]{2}:[0-9]{2}Z).*?\n'
),
'updated': (
r'(last-modified):[^\S\n]+(?P<val>[0-9]{4}-[0-9]{2}-[0-9]{2}T'
'[0-9]{2}:[0-9]{2}:[0-9]{2}Z).*?\n'
)
},
'dt_format': '%Y-%m-%dT%H:%M:%SZ',
'dt_rws_format': '%Y-%m-%dT%H:%M:%SZ'
},
'apnic': {
'server': 'whois.apnic.net',
'url': 'http://rdap.apnic.net/ip/{0}',
'fields': {
'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'abuse_emails': (
r'(abuse-mailbox:[^\S\n]+(?P<val>.+?))\n|(((?!abuse-mailbox).+'
'?:.*?[^\S\n]+(?P<val2>[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.'
'[\w\-]+)([^\S\n]+.*?)*?)\n)'
),
'misc_emails': (
r'(?!abuse-mailbox).+?:.*?[^\S\n]+(?P<val>(?!abuse)[\w\-\.]+?@'
'[\w\-\.]+\.[\w\-]+)([^\S\n]+.*?)*?\n'
),
'updated': r'(changed):[^\S\n]+.*(?P<val>[0-9]{8}).*?\n'
},
'dt_format': '%Y%m%d',
'dt_rws_format': '%Y-%m-%dT%H:%M:%SZ'
},
'lacnic': {
'server': 'whois.lacnic.net',
'url': 'http://rdap.lacnic.net/rdap/ip/{0}',
'fields': {
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(owner):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'abuse_emails': (
r'(abuse-mailbox:[^\S\n]+(?P<val>.+?))\n|(((?!abuse-mailbox).+'
'?:.*?[^\S\n]+(?P<val2>[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.'
'[\w\-]+)([^\S\n]+.*?)*?)\n)'
),
'misc_emails': (
r'(?!abuse-mailbox).+?:.*?[^\S\n]+(?P<val>(?!abuse)[\w\-\.]+?@'
'[\w\-\.]+\.[\w\-]+)([^\S\n]+.*?)*?\n'
),
'created': r'(created):[^\S\n]+(?P<val>[0-9]{8}).*?\n',
'updated': r'(changed):[^\S\n]+(?P<val>[0-9]{8}).*?\n'
},
'dt_format': '%Y%m%d',
'dt_rws_format': '%Y-%m-%dT%H:%M:%SZ'
},
'afrinic': {
'server': 'whois.afrinic.net',
'url': 'http://rest.db.ripe.net/search.json?query-string={0}',
'fields': {
'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'abuse_emails': (
r'(abuse-mailbox:[^\S\n]+(?P<val>.+?))\n|(((?!abuse-mailbox).+'
'?:.*?[^\S\n]+(?P<val2>[\w\-\.]*abuse[\w\-\.]*@[\w\-\.]+\.'
'[\w\-]+)([^\S\n]+.*?)*?)\n)'
),
'misc_emails': (
r'(?!abuse-mailbox).+?:.*?[^\S\n]+(?P<val>(?!abuse)[\w\-\.]+?@'
'[\w\-\.]+\.[\w\-]+)([^\S\n]+.*?)*?\n'
)
}
}
}
RWHOIS = {
'fields': {
'cidr': r'(network:IP-Network):(?P<val>.+?)\n',
'name': r'(network:ID):(?P<val>.+?)\n',
'description': (
r'(network:(Org-Name|Organization(;I)?)):(?P<val>.+?)\n'
),
'country': r'(network:(Country|Country-Code)):(?P<val>.+?)\n',
'state': r'(network:State):(?P<val>.+?)\n',
'city': r'(network:City):(?P<val>.+?)\n',
'address': r'(network:Street-Address):(?P<val>.+?)\n',
'postal_code': r'(network:Postal-Code):(?P<val>.+?)\n',
'abuse_emails': (
r'(network:Abuse-Contact(;I)?):[^\S\n]+(?P<val>[\w\-\.]+?@'
'[\w\-\.]+\.[\w\-]+)([^\S\n]+.*?)*?\n'
),
'tech_emails': (
r'(network:Tech-Contact(;I)?):[^\S\n]+(?P<val>[\w\-\.]+?@'
'[\w\-\.]+\.[\w\-]+)([^\S\n]+.*?)*?\n'
),
'misc_emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)'
'([^\S\n]+.*?)*?\n'
),
'created': r'(network:Created):(?P<val>.+?)\n',
'updated': r'(network:Updated):(?P<val>.+?)\n'
}
}
BLACKLIST = [
'root.rwhois.net'
]
ASN_REFERRALS = {
'whois://whois.ripe.net': 'ripencc',
'whois://whois.apnic.net': 'apnic',
'whois://whois.lacnic.net': 'lacnic',
'whois://whois.afrinic.net': 'afrinic',
}
CYMRU_WHOIS = 'whois.cymru.com'
IPV4_DNS_ZONE = '{0}.origin.asn.cymru.com'
IPV6_DNS_ZONE = '{0}.origin6.asn.cymru.com'
BASE_NET = {
'cidr': None,
'name': None,
'handle': None,
'range': None,
'description': None,
'country': None,
'state': None,
'city': None,
'address': None,
'postal_code': None,
'abuse_emails': None,
'tech_emails': None,
'misc_emails': None,
'created': None,
'updated': None
}
[docs]class IPDefinedError(Exception):
"""
An Exception for when the IP is defined (does not need to be resolved).
"""
[docs]class ASNLookupError(Exception):
"""
An Exception for when the ASN lookup failed.
"""
[docs]class ASNRegistryError(Exception):
"""
An Exception for when the ASN registry does not match one of the five
expected values (arin, ripencc, apnic, lacnic, afrinic).
"""
[docs]class WhoisLookupError(Exception):
"""
An Exception for when the whois lookup failed.
"""
[docs]class HostLookupError(Exception):
"""
An Exception for when the host lookup failed.
"""
[docs]class BlacklistError(Exception):
"""
An Exception for when the server is in a blacklist.
"""
[docs]class IPWhois:
"""
The class for performing ASN/whois lookups and parsing for IPv4 and IPv6
addresses.
Args:
address: An IPv4 or IPv6 address as a string, integer, IPv4Address, or
IPv6Address.
timeout: The default timeout for socket connections in seconds.
proxy_opener: The urllib.request.OpenerDirector request for proxy
support or None.
Raises:
IPDefinedError: The address provided is defined (does not need to be
resolved).
"""
def __init__(self, address, timeout=5, proxy_opener=None):
# IPv4Address or IPv6Address
if isinstance(address, IPv4Address) or isinstance(
address, IPv6Address):
self.address = address
else:
# Use ipaddress package exception handling.
self.address = ip_address(address)
# Default timeout for socket connections.
self.timeout = timeout
# Proxy opener.
if isinstance(proxy_opener, OpenerDirector):
self.opener = proxy_opener
else:
handler = ProxyHandler()
self.opener = build_opener(handler)
# IP address in string format for use in queries.
self.address_str = self.address.__str__()
# Determine the IP version, 4 or 6
self.version = self.address.version
if self.version == 4:
# Check if no ASN/whois resolution needs to occur.
is_defined = ipv4_is_defined(self.address_str)
if is_defined[0]:
raise IPDefinedError(
'IPv4 address %r is already defined as %r via '
'%r.' % (
self.address_str, is_defined[1], is_defined[2]
)
)
# Reverse the IPv4Address for the DNS ASN query.
split = self.address_str.split('.')
split.reverse()
self.reversed = '.'.join(split)
self.dns_zone = IPV4_DNS_ZONE.format(self.reversed)
else:
# Check if no ASN/whois resolution needs to occur.
is_defined = ipv6_is_defined(self.address_str)
if is_defined[0]:
raise IPDefinedError(
'IPv6 address %r is already defined as %r via '
'%r.' % (
self.address_str, is_defined[1], is_defined[2]
)
)
# Explode the IPv6Address to fill in any missing 0's.
exploded = self.address.exploded
# Cymru seems to timeout when the IPv6 address has trailing '0000'
# groups. Remove these groups.
groups = exploded.split(':')
for index, value in reversed(list(enumerate(groups))):
if value == '0000':
del groups[index]
else:
break
exploded = ':'.join(groups)
# Reverse the IPv6Address for the DNS ASN query.
val = str(exploded).replace(':', '')
val = val[::-1]
self.reversed = '.'.join(val)
self.dns_zone = IPV6_DNS_ZONE.format(self.reversed)
def __repr__(self):
return 'IPWhois(%r, %r, %r)' % (
self.address_str, self.timeout, self.opener
)
[docs] def get_asn_dns(self):
"""
The function for retrieving ASN information for an IP address from
Cymru via port 53 (DNS).
Returns:
Dictionary: A dictionary containing the following keys:
asn (String) - The Autonomous System Number.
asn_date (String) - The ASN Allocation date.
asn_registry (String) - The assigned ASN registry.
asn_cidr (String) - The assigned ASN CIDR.
asn_country_code (String) - The assigned ASN country code.
Raises:
ASNRegistryError: The ASN registry is not known.
ASNLookupError: The ASN lookup failed.
"""
try:
data = dns.resolver.query(self.dns_zone, 'TXT')
# Parse out the ASN information.
temp = str(data[0]).split('|')
ret = {'asn_registry': temp[3].strip(' \n')}
if ret['asn_registry'] not in NIC_WHOIS.keys():
raise ASNRegistryError(
'ASN registry %r is not known.' % ret['asn_registry']
)
ret['asn'] = temp[0].strip(' "\n')
ret['asn_cidr'] = temp[1].strip(' \n')
ret['asn_country_code'] = temp[2].strip(' \n').upper()
ret['asn_date'] = temp[4].strip(' "\n')
return ret
except ASNRegistryError:
raise
except:
raise ASNLookupError(
'ASN lookup failed for %r.' % self.address_str
)
[docs] def get_asn_whois(self, retry_count=3):
"""
The function for retrieving ASN information for an IP address from
Cymru via port 43 (WHOIS).
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
Dictionary: A dictionary containing the following keys:
asn (String) - The Autonomous System Number.
asn_date (String) - The ASN Allocation date.
asn_registry (String) - The assigned ASN registry.
asn_cidr (String) - The assigned ASN CIDR.
asn_country_code (String) - The assigned ASN country code.
Raises:
ASNRegistryError: The ASN registry is not known.
ASNLookupError: The ASN lookup failed.
"""
try:
# Create the connection for the Cymru whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
conn.connect((CYMRU_WHOIS, 43))
# Query the Cymru whois server, and store the results.
conn.send((
' -r -a -c -p -f -o %s%s' % (self.address_str, '\r\n')
).encode())
data = ''
while True:
d = conn.recv(4096).decode()
data += d
if not d:
break
conn.close()
# Parse out the ASN information.
temp = str(data).split('|')
ret = {'asn_registry': temp[4].strip(' \n')}
if ret['asn_registry'] not in NIC_WHOIS.keys():
raise ASNRegistryError(
'ASN registry %r is not known.' % ret['asn_registry']
)
ret['asn'] = temp[0].strip(' \n')
ret['asn_cidr'] = temp[2].strip(' \n')
ret['asn_country_code'] = temp[3].strip(' \n').upper()
ret['asn_date'] = temp[5].strip(' \n')
return ret
except (socket.timeout, socket.error):
if retry_count > 0:
return self.get_asn_whois(retry_count - 1)
else:
raise ASNLookupError(
'ASN lookup failed for %r.' % self.address_str
)
except ASNRegistryError:
raise
except:
raise ASNLookupError(
'ASN lookup failed for %r.' % self.address_str
)
[docs] def get_whois(self, asn_registry='arin', retry_count=3, server=None,
port=43, extra_blacklist=None):
"""
The function for retrieving whois or rwhois information for an IP
address via any port. Defaults to port 43 (WHOIS).
Args:
asn_registry: The NIC to run the query against.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
server: An optional server to connect to. If provided, asn_registry
will be ignored.
port: The network port to connect on.
extra_blacklist: A list of blacklisted whois servers in addition to
the global BLACKLIST.
Returns:
String: The raw whois data.
Raises:
BlacklistError: Raised if the whois server provided is in the
global BLACKLIST or extra_blacklist.
WhoisLookupError: The whois lookup failed.
"""
try:
extra_bl = extra_blacklist if extra_blacklist else []
if server in (BLACKLIST, extra_bl):
raise BlacklistError(
'The server %r is blacklisted.' % server
)
if server is None:
server = NIC_WHOIS[asn_registry]['server']
# Create the connection for the whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
conn.connect((server, port))
# Prep the query.
query = self.address_str + '\r\n'
if asn_registry == 'arin':
query = 'n + %s' % query
# Query the whois server, and store the results.
conn.send(query.encode())
response = ''
while True:
d = conn.recv(4096).decode('ascii', 'ignore')
response += d
if not d:
break
conn.close()
if 'Query rate limit exceeded' in response:
sleep(1)
return self.get_whois(asn_registry, retry_count, server, port,
extra_blacklist)
elif 'error 501' in response or 'error 230' in response:
raise ValueError
return str(response)
except (socket.timeout, socket.error):
if retry_count > 0:
return self.get_whois(asn_registry, retry_count - 1, server,
port, extra_blacklist)
else:
raise WhoisLookupError(
'Whois lookup failed for %r.' % self.address_str
)
except:
raise WhoisLookupError(
'Whois lookup failed for %r.' % self.address_str
)
[docs] def get_rws(self, url=None, retry_count=3):
"""
The function for retrieving Whois-RWS information for an IP address
via HTTP (Whois-RWS).
Args:
url: The URL to retrieve.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
Dictionary: The whois data in Json format.
Raises:
WhoisLookupError: The whois RWS lookup failed.
"""
try:
# Create the connection for the whois query.
conn = Request(url, headers={'Accept': 'application/json'})
data = self.opener.open(conn, timeout=self.timeout)
try:
d = json.loads(data.readall().decode())
except AttributeError:
d = json.loads(data.read().decode('ascii', 'ignore'))
return d
except (socket.timeout, socket.error):
if retry_count > 0:
return self.get_rws(url, retry_count - 1)
else:
raise WhoisLookupError('Whois RWS lookup failed for %r.' %
url)
except:
raise WhoisLookupError('Whois RWS lookup failed for %r.' % url)
[docs] def get_host(self, retry_count=3):
"""
The function for retrieving host information for an IP address.
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
Tuple: hostname, aliaslist, ipaddrlist
Raises:
HostLookupError: The host lookup failed.
"""
try:
default_timeout_set = False
if not socket.getdefaulttimeout():
socket.setdefaulttimeout(self.timeout)
default_timeout_set = True
ret = socket.gethostbyaddr(self.address_str)
if default_timeout_set:
socket.setdefaulttimeout(None)
return ret
except (socket.timeout, socket.error):
if retry_count > 0:
return self.get_host(retry_count - 1)
else:
raise HostLookupError(
'Host lookup failed for %r.' % self.address_str
)
except:
raise HostLookupError(
'Host lookup failed for %r.' % self.address_str
)
def _parse_fields(self, response, fields_dict, net_start=None,
net_end=None, dt_format=None):
"""
The function for parsing whois fields from a data input.
Args:
response: The response from the whois/rwhois server.
fields_dict: The dictionary of fields -> regex search values.
net_start: The starting point of the network (if parsing multiple
networks).
net_end: The ending point of the network (if parsing multiple
networks).
dt_format: The format of datetime fields if known.
Returns:
Dictionary: A dictionary of fields provided in fields_dict.
"""
ret = {}
for field in fields_dict:
pattern = re.compile(
str(fields_dict[field]),
re.DOTALL
)
if net_start is not None:
match = pattern.finditer(response, net_end, net_start)
elif net_end is not None:
match = pattern.finditer(response, net_end)
else:
match = pattern.finditer(response)
values = []
sub_section_end = None
for m in match:
if sub_section_end:
if field not in (
'abuse_emails',
'tech_emails',
'misc_emails'
) and (sub_section_end != (m.start() - 1)):
break
try:
values.append(m.group('val').strip())
except AttributeError:
values.append(m.group('val2').strip())
sub_section_end = m.end()
if len(values) > 0:
value = None
try:
if field == 'country':
value = values[0].upper()
elif field in ['created', 'updated'] and dt_format:
value = datetime.strptime(
values[0],
str(dt_format)).isoformat('T')
else:
values = unique_everseen(values)
value = '\n'.join(values)
except ValueError:
pass
ret[field] = value
return ret
[docs] def lookup(self, inc_raw=False, retry_count=3, get_referral=False,
extra_blacklist=None, ignore_referral_errors=False):
"""
The function for retrieving and parsing whois information for an IP
address via port 43 (WHOIS).
Args:
inc_raw: Boolean for whether to include the raw whois results in
the returned dictionary.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
get_referral: Boolean for whether to retrieve referral whois
information, if available.
extra_blacklist: A list of blacklisted whois servers in addition to
the global BLACKLIST.
ignore_referral_errors: Boolean for whether to ignore and continue
when an exception is encountered on referral whois lookups.
Returns:
Dictionary:
:query: The IP address (String)
:asn: The Autonomous System Number (String)
:asn_date: The ASN Allocation date (String)
:asn_registry: The assigned ASN registry (String)
:asn_cidr: The assigned ASN CIDR (String)
:asn_country_code: The assigned ASN country code (String)
:nets: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. (List)
:raw: Raw whois results if the inc_raw parameter is True. (String)
:referral: Dictionary of referral whois information if get_referral
is True and the server isn't blacklisted. Consists of fields
listed in the RWHOIS dictionary. Additional referral server
informaion is added in the server and port keys. (Dictionary)
:raw_referral: Raw referral whois results if the inc_raw parameter
is True. (String)
"""
# Initialize the whois response.
response = None
# Attempt to resolve ASN info via Cymru. DNS is faster, try that first.
try:
asn_data = self.get_asn_dns()
except (ASNLookupError, ASNRegistryError):
try:
asn_data = self.get_asn_whois(retry_count)
except (ASNLookupError, ASNRegistryError):
# Lets attempt to get the ASN registry information from ARIN.
response = self.get_whois('arin', retry_count)
asn_data = {
'asn_registry': None,
'asn': None,
'asn_cidr': None,
'asn_country_code': None,
'asn_date': None
}
matched = False
for match in re.finditer(
r'^ReferralServer:[^\S\n]+(.+)$',
response,
re.MULTILINE
):
matched = True
try:
referral = match.group(1)
referral = referral.replace(':43', '')
asn_data['asn_registry'] = ASN_REFERRALS[referral]
except KeyError:
raise ASNRegistryError('ASN registry lookup failed.')
break
if not matched:
asn_data['asn_registry'] = 'arin'
# Create the return dictionary.
results = {
'query': self.address_str,
'nets': [],
'raw': None,
'referral': None,
'raw_referral': None
}
# Add the ASN information to the return dictionary.
results.update(asn_data)
# The referral server and port. Only used if get_referral is True.
referral_server = None
referral_port = 0
# Only fetch the response if we haven't already.
if response is None or results['asn_registry'] is not 'arin':
# Retrieve the whois data.
response = self.get_whois(results['asn_registry'], retry_count,
extra_blacklist=extra_blacklist)
if get_referral:
# Search for a referral server.
for match in re.finditer(
r'^ReferralServer:[^\S\n]+(.+:[0-9]+)$',
response,
re.MULTILINE
):
try:
temp = match.group(1)
if 'rwhois://' not in temp:
raise ValueError
temp = temp.replace('rwhois://', '').split(':')
if int(temp[1]) > 65535:
raise ValueError
referral_server = temp[0]
referral_port = int(temp[1])
except (ValueError, KeyError):
continue
break
# Retrieve the referral whois data.
if get_referral and referral_server:
response_ref = None
if ignore_referral_errors:
try:
response_ref = self.get_whois('',
retry_count,
referral_server,
referral_port,
extra_blacklist)
except (BlacklistError, WhoisLookupError):
pass
else:
response_ref = self.get_whois('', retry_count,
referral_server, referral_port,
extra_blacklist)
if response_ref:
if inc_raw:
results['raw_referral'] = response_ref
temp_rnet = self._parse_fields(
response_ref,
RWHOIS['fields']
)
# Add the networks to the return dictionary.
results['referral'] = temp_rnet
# If inc_raw parameter is True, add the response to return dictionary.
if inc_raw:
results['raw'] = response
nets = []
if results['asn_registry'] == 'arin':
# Find the first NetRange value.
pattern = re.compile(
r'^NetRange:[^\S\n]+(.+)$',
re.MULTILINE
)
temp = pattern.search(response)
net_range = None
net_range_start = None
if temp is not None:
net_range = temp.group(1).strip()
net_range_start = temp.start()
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^CIDR:[^\S\n]+(.+?,[^\S\n].+|.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
if len(nets) > 0:
temp = pattern.search(response, match.start())
net_range = None
net_range_start = None
if temp is not None:
net_range = temp.group(1).strip()
net_range_start = temp.start()
if net_range is not None:
if net_range_start < match.start() or len(nets) > 0:
net['range'] = net_range
net['cidr'] = ', '.join(
[ip_network(c.strip()).__str__()
for c in match.group(1).split(', ')]
)
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except ValueError:
pass
elif results['asn_registry'] == 'lacnic':
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^(inetnum|inet6num|route):[^\S\n]+(.+?,[^\S\n].+|.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
net['range'] = match.group(2).strip()
temp = []
for addr in match.group(2).strip().split(', '):
count = addr.count('.')
if count is not 0 and count < 4:
addr_split = addr.strip().split('/')
for i in range(count + 1, 4):
addr_split[0] += '.0'
addr = '/'.join(addr_split)
temp.append(ip_network(addr.strip()).__str__())
net['cidr'] = ', '.join(temp)
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except ValueError:
pass
else:
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^(inetnum|inet6num|route):[^\S\n]+((.+?)[^\S\n]-[^\S\n](.+)|'
'.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
net['range'] = match.group(2)
if match.group(3) and match.group(4):
addrs = []
addrs.extend(summarize_address_range(
ip_address(match.group(3).strip()),
ip_address(match.group(4).strip())))
cidr = ', '.join(
[i.__str__() for i in collapse_addresses(addrs)]
)
else:
cidr = ip_network(match.group(2).strip()).__str__()
net['cidr'] = cidr
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except (ValueError, TypeError):
pass
# Iterate through all of the network sections and parse out the
# appropriate fields for each.
for index, net in enumerate(nets):
section_end = None
if index + 1 < len(nets):
section_end = nets[index + 1]['start']
try:
dt_format = NIC_WHOIS[results['asn_registry']]['dt_format']
except KeyError:
dt_format = None
temp_net = self._parse_fields(
response,
NIC_WHOIS[results['asn_registry']]['fields'],
section_end,
net['end'],
dt_format
)
# Merge the net dictionaries.
net.update(temp_net)
# The start and end values are no longer needed.
del net['start'], net['end']
# Add the networks to the return dictionary.
results['nets'] = nets
return results
def _lookup_rws_arin(self, response=None, retry_count=3):
"""
The function for retrieving and parsing whois information for an ARIN
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
nets = []
try:
net_list = response['nets']['net']
if not isinstance(net_list, list):
net_list = [net_list]
except KeyError:
net_list = []
for n in net_list:
if 'orgRef' in n and n['orgRef']['@handle'] in ('ARIN', 'VR-ARIN'):
continue
addrs = []
net = copy.deepcopy(BASE_NET)
try:
addrs.extend(summarize_address_range(
ip_address(n['startAddress']['$'].strip()),
ip_address(n['endAddress']['$'].strip())))
net['cidr'] = ', '.join(
[i.__str__() for i in collapse_addresses(addrs)]
)
net['range'] = (n['startAddress']['$'].strip() + ' - ' +
n['endAddress']['$'].strip())
except (KeyError, ValueError, TypeError):
pass
for k, v in {
'created': 'registrationDate',
'updated': 'updateDate',
'name': 'name'
}.items():
try:
net[k] = str(n[v]['$']).strip()
except KeyError:
pass
if 'handle' in n:
net['handle'] = n['handle']['$'].strip()
ref = None
if 'customerRef' in n:
ref = ['customerRef', 'customer']
elif 'orgRef' in n:
ref = ['orgRef', 'org']
if ref is not None:
try:
net['description'] = str(n[ref[0]]['@name']).strip()
except KeyError:
pass
try:
ref_url = n[ref[0]]['$'].strip() + '?showPocs=true'
ref_response = self.get_rws(ref_url, retry_count)
except (KeyError, WhoisLookupError):
nets.append(net)
continue
try:
addr_list = (
ref_response[ref[1]]['streetAddress']['line']
)
if not isinstance(addr_list, list):
addr_list = [addr_list]
net['address'] = '\n'.join(
[str(line['$']).strip() for line in addr_list]
)
except KeyError:
pass
for k, v in {
'postal_code': 'postalCode',
'city': 'city',
'state': 'iso3166-2'
}.items():
try:
net[k] = str(ref_response[ref[1]][v]['$'])
except KeyError:
pass
try:
net['country'] = (
str(ref_response[ref[1]]['iso3166-1']['code2']['$'])
).upper()
except KeyError:
pass
try:
for poc in (
ref_response[ref[1]]['pocs']['pocLinkRef']
):
if poc['@description'] in ('Abuse', 'Tech'):
poc_url = poc['$']
poc_response = self.get_rws(
poc_url,
retry_count
)
emails = poc_response['poc']['emails']['email']
if not isinstance(emails, list):
emails = [emails]
temp = []
for e in emails:
temp.append(str(e['$']).strip())
key = '%s_emails' % poc['@description'].lower()
net[key] = (
'\n'.join(unique_everseen(temp))
if len(temp) > 0 else None
)
except (KeyError, WhoisLookupError):
pass
nets.append(net)
return nets
def _lookup_rws_ripe(self, response=None):
"""
The function for retrieving and parsing whois information for a RIPE
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
nets = []
try:
object_list = response['objects']['object']
except KeyError:
object_list = []
ripe_abuse_emails = []
ripe_misc_emails = []
net = copy.deepcopy(BASE_NET)
for n in object_list:
try:
if n['type'] == 'role':
for attr in n['attributes']['attribute']:
if attr['name'] == 'abuse-mailbox':
ripe_abuse_emails.append(str(
attr['value']
).strip())
elif attr['name'] == 'e-mail':
ripe_misc_emails.append(str(attr['value']).strip())
elif attr['name'] == 'address':
if net['address'] is not None:
net['address'] += '\n%s' % (
str(attr['value']).strip()
)
else:
net['address'] = str(attr['value']).strip()
elif n['type'] in ('inetnum', 'inet6num'):
for attr in n['attributes']['attribute']:
if attr['name'] in ('inetnum', 'inet6num'):
net['range'] = str(attr['value']).strip()
ipr = str(attr['value']).strip()
ip_range = ipr.split(' - ')
try:
if len(ip_range) > 1:
addrs = []
addrs.extend(
summarize_address_range(
ip_address(ip_range[0]),
ip_address(ip_range[1])
)
)
cidr = ', '.join(
[i.__str__()
for i in collapse_addresses(addrs)]
)
else:
cidr = ip_network(ip_range[0]).__str__()
net['cidr'] = cidr
except (ValueError, TypeError):
pass
elif attr['name'] == 'netname':
net['name'] = str(attr['value']).strip()
elif attr['name'] == 'nic-hdl':
net['handle'] = str(attr['value']).strip()
elif attr['name'] == 'descr':
if net['description'] is not None:
net['description'] += '\n%s' % (
str(attr['value']).strip()
)
else:
net['description'] = str(attr['value']).strip()
elif attr['name'] == 'country':
net['country'] = str(attr['value']).strip().upper()
elif attr['name'] == 'created':
tmp = str(attr['value']).strip()
value = datetime.strptime(
tmp,
str(NIC_WHOIS['ripencc']['dt_rws_format'])
).isoformat('T')
net['created'] = value
elif attr['name'] == 'last-modified':
tmp = str(attr['value']).strip()
value = datetime.strptime(
tmp,
str(NIC_WHOIS['ripencc']['dt_rws_format'])
).isoformat('T')
net['updated'] = value
except KeyError:
pass
nets.append(net)
# This is nasty. Since RIPE RWS doesn't provide a granular
# contact to network relationship, we apply to all networks.
if len(ripe_abuse_emails) > 0 or len(ripe_misc_emails) > 0:
abuse = (
'\n'.join(unique_everseen(ripe_abuse_emails))
if len(ripe_abuse_emails) > 0 else None
)
misc = (
'\n'.join(unique_everseen(ripe_misc_emails))
if len(ripe_misc_emails) > 0 else None
)
for net in nets:
net['abuse_emails'] = abuse
net['misc_emails'] = misc
return nets
def _lookup_rws_apnic(self, response=None):
"""
The function for retrieving and parsing whois information for a APNIC
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
addrs = []
net = copy.deepcopy(BASE_NET)
try:
addrs.extend(summarize_address_range(
ip_address(response['startAddress'].strip()),
ip_address(response['endAddress'].strip())))
net['cidr'] = ', '.join(
[i.__str__() for i in collapse_addresses(addrs)]
)
net['range'] = (response['startAddress'].strip() + ' - ' +
response['endAddress'].strip())
except (KeyError, ValueError, TypeError):
pass
try:
net['country'] = str(response['country']).strip().upper()
except KeyError:
pass
try:
events = response['events']
if not isinstance(events, list):
events = [events]
except KeyError:
events = []
for ev in events:
try:
if ev['eventAction'] == 'registration':
tmp = str(ev['eventDate']).strip()
value = datetime.strptime(
tmp,
str(NIC_WHOIS['apnic']['dt_rws_format'])
).isoformat('T')
net['created'] = value
elif ev['eventAction'] == 'last changed':
tmp = str(ev['eventDate']).strip()
value = datetime.strptime(
tmp,
str(NIC_WHOIS['apnic']['dt_rws_format'])
).isoformat('T')
net['updated'] = value
except (KeyError, ValueError):
pass
try:
entities = response['entities']
if not isinstance(entities, list):
entities = [entities]
except KeyError:
entities = []
for en in entities:
try:
if 'handle' in en:
net['handle'] = en['handle']
temp = en['vcardArray'][1]
for t in temp:
if 'administrative' in en['roles'] and t[0] == 'fn':
net['name'] = str(t[3]).strip()
elif 'administrative' in en['roles'] and t[0] == 'adr':
try:
net['address'] = str(t[1]['label']).strip()
except KeyError:
pass
elif t[0] == 'email':
key = None
if (len(en['roles']) > 1 or
en['roles'][0] == 'administrative'):
key = 'misc_emails'
elif en['roles'][0] == 'abuse':
key = 'abuse_emails'
elif en['roles'][0] == 'technical':
key = 'tech_emails'
if key is not None:
if net[key] is not None:
net[key] += '\n%s' % str(t[3]).strip()
else:
net[key] = str(t[3]).strip()
except (KeyError, IndexError):
pass
try:
remarks = response['remarks']
if not isinstance(remarks, list):
remarks = [remarks]
except KeyError:
remarks = []
for rem in remarks:
try:
if rem['title'] == 'description':
net['description'] = str('\n'.join(rem['description']))
except (KeyError, IndexError):
pass
return [net]
def _lookup_rws_lacnic(self, response=None):
"""
The function for retrieving and parsing whois information for a LACNIC
IP address via HTTP (Whois-RWS).
Args:
response: The dictionary containing whois information to parse.
Returns:
List: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. Certain IPs
have more granular network listings, hence the need for a list
object.
"""
addrs = []
net = copy.deepcopy(BASE_NET)
try:
addrs.extend(summarize_address_range(
ip_address(response['startAddress'].strip()),
ip_address(response['endAddress'].strip())))
net['cidr'] = ', '.join(
[i.__str__() for i in collapse_addresses(addrs)]
)
net['range'] = (response['startAddress'].strip() + ' - ' +
response['endAddress'].strip())
except (KeyError, ValueError, TypeError):
pass
try:
net['country'] = str(response['country']).strip().upper()
except KeyError:
pass
try:
events = response['events']
if not isinstance(events, list):
events = [events]
except KeyError:
events = []
for ev in events:
try:
if ev['eventAction'] == 'registration':
tmp = str(ev['eventDate']).strip()
value = datetime.strptime(
tmp,
str(NIC_WHOIS['lacnic']['dt_rws_format'])
).isoformat('T')
net['created'] = value
elif ev['eventAction'] == 'last changed':
tmp = str(ev['eventDate']).strip()
value = datetime.strptime(
tmp,
str(NIC_WHOIS['lacnic']['dt_rws_format'])
).isoformat('T')
net['updated'] = value
except (KeyError, ValueError):
pass
try:
entities = response['entities']
if not isinstance(entities, list):
entities = [entities]
except KeyError:
entities = []
for en in entities:
try:
if 'handle' in en:
net['handle'] = en['handle']
temp = en['vcardArray'][1]
for t in temp:
if 'administrative' in en['roles'] and t[0] == 'fn':
net['name'] = str(t[3]).strip()
elif 'administrative' in en['roles'] and t[0] == 'adr':
try:
net['address'] = str(t[1]['label']).strip()
except KeyError:
pass
elif t[0] == 'email':
key = None
if (len(en['roles']) > 1 or
en['roles'][0] == 'administrative'):
key = 'misc_emails'
elif en['roles'][0] == 'abuse':
key = 'abuse_emails'
elif en['roles'][0] == 'technical':
key = 'tech_emails'
if key is not None:
if net[key] is not None:
net[key] += '\n%s' % str(t[3]).strip()
else:
net[key] = str(t[3]).strip()
except (KeyError, IndexError):
pass
try:
remarks = response['remarks']
if not isinstance(remarks, list):
remarks = [remarks]
except KeyError:
remarks = []
for rem in remarks:
try:
if rem['title'] == 'description':
net['description'] = str('\n'.join(rem['description']))
except (KeyError, IndexError):
pass
return [net]
[docs] def lookup_rws(self, inc_raw=False, retry_count=3):
"""
The function for retrieving and parsing whois information for an IP
address via HTTP (Whois-RWS).
**This should be faster than IPWhois.lookup(), but may not be as
reliable. AFRINIC does not have a Whois-RWS service yet. We have to
rely on the Ripe RWS service, which does not contain all of the data
we need. LACNIC RWS is in beta.**
Args:
inc_raw: Boolean for whether to include the raw whois results in
the returned dictionary.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
Returns:
Dictionary:
:query: The IP address (String)
:asn: The Autonomous System Number (String)
:asn_date: The ASN Allocation date (String)
:asn_registry: The assigned ASN registry (String)
:asn_cidr: The assigned ASN CIDR (String)
:asn_country_code: The assigned ASN country code (String)
:nets: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. (List)
:raw: (Dictionary) - Whois results in Json format if the
inc_raw parameter is True.
"""
# Initialize the response.
response = None
# Attempt to resolve ASN info via Cymru. DNS is faster, try that first.
try:
asn_data = self.get_asn_dns()
except (ASNLookupError, ASNRegistryError):
try:
asn_data = self.get_asn_whois(retry_count)
except (ASNLookupError, ASNRegistryError):
# Lets attempt to get the ASN registry information from ARIN.
response = self.get_rws(
str(NIC_WHOIS['arin']['url']).format(self.address_str),
retry_count
)
asn_data = {
'asn_registry': None,
'asn': None,
'asn_cidr': None,
'asn_country_code': None,
'asn_date': None
}
try:
net_list = response['nets']['net']
if not isinstance(net_list, list):
net_list = [net_list]
except KeyError:
net_list = []
for n in net_list:
try:
if n['orgRef']['@handle'] in ('ARIN', 'VR-ARIN'):
asn_data['asn_registry'] = 'arin'
elif n['orgRef']['@handle'] == 'RIPE':
asn_data['asn_registry'] = 'ripencc'
else:
test = NIC_WHOIS[n['orgRef']['@handle'].lower()]
asn_data['asn_registry'] = (
n['orgRef']['@handle'].lower()
)
except KeyError:
raise ASNRegistryError('ASN registry lookup failed.')
break
# Create the return dictionary.
results = {
'query': self.address_str,
'nets': [],
'raw': None
}
# Add the ASN information to the return dictionary.
results.update(asn_data)
# Only fetch the response if we haven't already.
if response is None or results['asn_registry'] is not 'arin':
# Retrieve the whois data.
response = self.get_rws(
str(NIC_WHOIS[results['asn_registry']]['url']).format(
self.address_str),
retry_count
)
# If inc_raw parameter is True, add the response to return dictionary.
if inc_raw:
results['raw'] = response
if results['asn_registry'] in ('ripencc', 'afrinic'):
nets = self._lookup_rws_ripe(response)
elif results['asn_registry'] == 'arin':
nets = self._lookup_rws_arin(response, retry_count)
elif results['asn_registry'] == 'apnic':
nets = self._lookup_rws_apnic(response)
else:
nets = self._lookup_rws_lacnic(response)
# Add the networks to the return dictionary.
results['nets'] = nets
return results