1 """ndg_httpsclient - module containing SSL peer verification class.
2 """
3 __author__ = "P J Kershaw (STFC)"
4 __date__ = "09/12/11"
5 __copyright__ = "(C) 2012 Science and Technology Facilities Council"
6 __license__ = "BSD - see LICENSE file in top-level directory"
7 __contact__ = "Philip.Kershaw@stfc.ac.uk"
8 __revision__ = '$Id$'
9 import re
10 import logging
11 log = logging.getLogger(__name__)
12
13 try:
14 from ndg.httpsclient.subj_alt_name import SubjectAltName
15 from pyasn1.codec.der import decoder as der_decoder
16 SUBJ_ALT_NAME_SUPPORT = True
17
18 except ImportError as e:
19 SUBJ_ALT_NAME_SUPPORT = False
20 SUBJ_ALT_NAME_SUPPORT_MSG = (
21 'SubjectAltName support is disabled - check pyasn1 package '
22 'installation to enable'
23 )
24 import warnings
25 warnings.warn(SUBJ_ALT_NAME_SUPPORT_MSG)
29 """Check server identity. If hostname doesn't match, allow match of
30 host's Distinguished Name against server DN setting"""
31 DN_LUT = {
32 'commonName': 'CN',
33 'organisationalUnitName': 'OU',
34 'organisation': 'O',
35 'countryName': 'C',
36 'emailAddress': 'EMAILADDRESS',
37 'localityName': 'L',
38 'stateOrProvinceName': 'ST',
39 'streetAddress': 'STREET',
40 'domainComponent': 'DC',
41 'userid': 'UID'
42 }
43 SUBJ_ALT_NAME_EXT_NAME = 'subjectAltName'
44 PARSER_RE_STR = '/(%s)=' % '|'.join(list(DN_LUT.keys()) + list(DN_LUT.values()))
45 PARSER_RE = re.compile(PARSER_RE_STR)
46
47 __slots__ = ('__hostname', '__certDN', '__subj_alt_name_match')
48
50 """Override parent class __init__ to enable setting of certDN
51 setting
52
53 @type certDN: string
54 @param certDN: Set the expected Distinguished Name of the
55 server to avoid errors matching hostnames. This is useful
56 where the hostname is not fully qualified
57 @type hostname: string
58 @param hostname: hostname to match against peer certificate
59 subjectAltNames or subject common name
60 @type subj_alt_name_match: bool
61 @param subj_alt_name_match: flag to enable/disable matching of hostname
62 against peer certificate subjectAltNames. Nb. A setting of True will
63 be ignored if the pyasn1 package is not installed
64 """
65 self.__certDN = None
66 self.__hostname = None
67
68 if certDN is not None:
69 self.certDN = certDN
70
71 if hostname is not None:
72 self.hostname = hostname
73
74 if subj_alt_name_match:
75 if not SUBJ_ALT_NAME_SUPPORT:
76 log.warning('Overriding "subj_alt_name_match" keyword setting: '
77 'peer verification with subjectAltNames is disabled')
78 self.__subj_alt_name_match = False
79 else:
80 self.__subj_alt_name_match = True
81 else:
82 log.debug('Disabling peer verification with subject '
83 'subjectAltNames!')
84 self.__subj_alt_name_match = False
85
88 """Verify server certificate
89
90 @type connection: OpenSSL.SSL.Connection
91 @param connection: SSL connection object
92 @type peerCert: basestring
93 @param peerCert: server host certificate as OpenSSL.crypto.X509
94 instance
95 @type errorStatus: int
96 @param errorStatus: error status passed from caller. This is the value
97 returned by the OpenSSL C function X509_STORE_CTX_get_error(). Look-up
98 x509_vfy.h in the OpenSSL source to get the meanings of the different
99 codes. PyOpenSSL doesn't help you!
100 @type errorDepth: int
101 @param errorDepth: a non-negative integer representing where in the
102 certificate chain the error occurred. If it is zero it occured in the
103 end entity certificate, one if it is the certificate which signed the
104 end entity certificate and so on.
105
106 @type preverifyOK: int
107 @param preverifyOK: the error status - 0 = Error, 1 = OK of the current
108 SSL context irrespective of any verification checks done here. If this
109 function yields an OK status, it should enforce the preverifyOK value
110 so that any error set upstream overrides and is honoured.
111 @rtype: int
112 @return: status code - 0/False = Error, 1/True = OK
113 """
114 if peerCert.has_expired():
115
116 log.error('Certificate %r in peer certificate chain has expired',
117 peerCert.get_subject())
118
119 return False
120
121 elif errorDepth == 0:
122
123
124 peerCertSubj = peerCert.get_subject()
125 peerCertDN = peerCertSubj.get_components()
126 peerCertDN.sort()
127
128 if self.certDN is None:
129
130 if self.hostname is None:
131 log.error('No "hostname" or "certDN" set to check peer '
132 'certificate against')
133 return False
134
135
136 if self.__subj_alt_name_match:
137 dns_names = self._get_subj_alt_name(peerCert)
138 if self.hostname in dns_names:
139 return preverifyOK
140
141
142 if peerCertSubj.commonName == self.hostname:
143 return preverifyOK
144 else:
145 log.error('Peer certificate CN %r doesn\'t match the '
146 'expected CN %r', peerCertSubj.commonName,
147 self.hostname)
148 return False
149 else:
150 if peerCertDN == self.certDN:
151 return preverifyOK
152 else:
153 log.error('Peer certificate DN %r doesn\'t match the '
154 'expected DN %r', peerCertDN, self.certDN)
155 return False
156 else:
157 return preverifyOK
158
160 def verify_server_cert(connection, peerCert, errorStatus, errorDepth,
161 preverifyOK):
162 return self.__call__(connection, peerCert, errorStatus,
163 errorDepth, preverifyOK)
164
165 return verify_server_cert
166
167 @classmethod
169 '''Extract subjectAltName DNS name settings from certificate extensions
170
171 @param peer_cert: peer certificate in SSL connection. subjectAltName
172 settings if any will be extracted from this
173 @type peer_cert: OpenSSL.crypto.X509
174 '''
175
176 dns_name = []
177 general_names = SubjectAltName()
178 for i in range(peer_cert.get_extension_count()):
179 ext = peer_cert.get_extension(i)
180 ext_name = ext.get_short_name()
181 if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME:
182
183 ext_dat = ext.get_data()
184 decoded_dat = der_decoder.decode(ext_dat,
185 asn1Spec=general_names)
186
187 for name in decoded_dat:
188 if isinstance(name, SubjectAltName):
189 for entry in range(len(name)):
190 component = name.getComponentByPosition(entry)
191 dns_name.append(str(component.getComponent()))
192
193 return dns_name
194
197
199 if isinstance(val, str):
200
201 certDN = val.strip('"')
202
203 dnFields = self.__class__.PARSER_RE.split(certDN)
204 if len(dnFields) < 2:
205 raise TypeError('Error parsing DN string: "%s"' % certDN)
206
207 self.__certDN = list(zip(dnFields[1::2], dnFields[2::2]))
208 self.__certDN.sort()
209
210 elif not isinstance(val, list):
211 for i in val:
212 if not len(i) == 2:
213 raise TypeError('Expecting list of two element DN field, '
214 'DN field value pairs for "certDN" '
215 'attribute')
216 self.__certDN = val
217 else:
218 raise TypeError('Expecting list or string type for "certDN" '
219 'attribute')
220
221 certDN = property(fget=_getCertDN,
222 fset=_setCertDN,
223 doc="Distinguished Name for Server Certificate")
224
225
228
230 if not isinstance(val, str):
231 raise TypeError("Expecting string type for hostname "
232 "attribute")
233 self.__hostname = val
234
235 hostname = property(fget=_getHostname,
236 fset=_setHostname,
237 doc="hostname of server")
238