source: trunk/lib/bletchley/ssltls.py @ 123

Last change on this file since 123 was 123, checked in by tim, 7 years ago

.

File size: 9.8 KB
Line 
1'''
2Utilities for manipulating certificates and SSL/TLS connections.
3
4Copyright (C) 2014,2016 Blindspot Security LLC
5Author: Timothy D. Morgan
6
7 This program is free software: you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License, version 3,
9 as published by the Free Software Foundation.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program.  If not, see <http://www.gnu.org/licenses/>.
18'''
19
20import sys
21import argparse
22import traceback
23import random
24import socket
25try:
26    import OpenSSL
27    from OpenSSL import SSL
28except:
29    sys.stderr.write('ERROR: Could not locate pyOpenSSL module.  Under Debian-based systems, try:\n')
30    sys.stderr.write('       # apt-get install python3-openssl\n')
31    sys.stderr.write('NOTE: pyOpenSSL version 0.14 or later is required!\n')
32    sys.exit(2)
33try:
34    import cffi
35except:
36    sys.stderr.write('ERROR: Could not locate cffi module.  Under Debian-based systems, try:\n')
37    sys.stderr.write('       # apt-get install python3-cffi\n')
38    sys.stderr.write('NOTE: This is a requirement because pyOpenSSL does not provide '
39                     'certificate extension removal procedures.  Consider lobbying for the '
40                     'implementation of this:\n  https://github.com/pyca/pyopenssl/issues/152\n')
41    sys.exit(2)
42
43
44def createContext(method=SSL.TLSv1_METHOD, key=None, certChain=[]):
45    context = SSL.Context(method)
46    context.set_verify(SSL.VERIFY_NONE, (lambda a,b,c,d,e: True))
47
48    if key and len(certChain) > 0:
49        context.use_privatekey(key)
50        context.use_certificate(certChain[0])
51        for c in certChain[1:]:
52            context.add_extra_chain_cert(c)
53   
54    return context
55
56
57def startSSLTLS(sock, mode='client', protocol=SSL.TLSv1_METHOD, key=None, certChain=[], cipher_list=b'DES-CBC3-SHA:RC4-MD5:RC4-SHA:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:DHE-RSA-AES256-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-AES256-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES128-SHA:DHE-RSA-AES128-SHA:RSA-PSK-AES256-GCM-SHA384:DHE-PSK-AES256-GCM-SHA384:RSA-PSK-CHACHA20-POLY1305:DHE-PSK-CHACHA20-POLY1305:ECDHE-PSK-CHACHA20-POLY1305:AES256-GCM-SHA384:PSK-AES256-GCM-SHA384:PSK-CHACHA20-POLY1305:RSA-PSK-AES128-GCM-SHA256:DHE-PSK-AES128-GCM-SHA256:AES128-GCM-SHA256:PSK-AES128-GCM-SHA256:AES256-SHA256:AES128-SHA256:ECDHE-PSK-AES256-CBC-SHA384:ECDHE-PSK-AES256-CBC-SHA:SRP-RSA-AES-256-CBC-SHA:SRP-AES-256-CBC-SHA:RSA-PSK-AES256-CBC-SHA384:DHE-PSK-AES256-CBC-SHA384:RSA-PSK-AES256-CBC-SHA:DHE-PSK-AES256-CBC-SHA:AES256-SHA:PSK-AES256-CBC-SHA384:PSK-AES256-CBC-SHA:ECDHE-PSK-AES128-CBC-SHA256:ECDHE-PSK-AES128-CBC-SHA:SRP-RSA-AES-128-CBC-SHA:SRP-AES-128-CBC-SHA:RSA-PSK-AES128-CBC-SHA256:DHE-PSK-AES128-CBC-SHA256:RSA-PSK-AES128-CBC-SHA:DHE-PSK-AES128-CBC-SHA:AES128-SHA:PSK-AES128-CBC-SHA256:PSK-AES128-CBC-SHA:ADH-AES256-GCM-SHA384'):
58    '''
59    cipher_list names drawn from:
60      openssl ciphers -v "ALL:@SECLEVEL=0"
61    '''
62   
63    context = createContext(protocol, key=key, certChain=certChain)
64    if cipher_list:
65        context.set_cipher_list(cipher_list)
66    #if not key and mode == 'server':
67    #context.set_options(OpenSSL.SSL.OP_SINGLE_DH_USE)
68    #context.set_options(OpenSSL.SSL.OP_EPHEMERAL_RSA)
69   
70    conn = SSL.Connection(context, sock)
71    if mode == 'client':
72        conn.set_connect_state()
73        conn.do_handshake()
74    else:
75        conn.set_accept_state()
76   
77    return conn
78
79
80def ConnectSSLTLS(host, port, cipher_list=None, handshake_callback=None, verbose=True):
81    protocols = [("SSL 2/3", SSL.SSLv23_METHOD),
82                 ("TLS 1.0", SSL.TLSv1_METHOD), 
83                 ("TLS 1.1", SSL.TLSv1_1_METHOD),
84                 ("TLS 1.2", SSL.TLSv1_2_METHOD),
85                 ("SSL 3.0", SSL.SSLv3_METHOD),
86                 ("SSL 2.0", SSL.SSLv2_METHOD)]
87
88    conn = None
89    for pname,p in protocols:
90        serverSock = socket.socket()
91        serverSock.connect((host,port))
92       
93        try:
94            if handshake_callback:
95                if not handshake_callback(serverSock):
96                    return None
97        except Exception as e:
98            traceback.print_exc(file=sys.stderr)
99            return None
100           
101        try:
102            conn = startSSLTLS(serverSock, mode='client', protocol=p, cipher_list=cipher_list)
103            break
104        except ValueError as e:
105            if verbose:
106                sys.stderr.write("%s protocol not supported by your openssl library, trying others...\n" % pname)
107        except SSL.Error as e:
108            if verbose:
109                sys.stderr.write("Exception during %s handshake with server." % pname)
110                sys.stderr.write("\nThis could happen because the server requires "
111                                 "certain SSL/TLS versions or a client certificiate."
112                                 "  Have no fear, we'll keep trying...\n")           
113        except Exception as e:
114            sys.stderr.write("Unknown exception during %s handshake with server: \n" % pname)
115            traceback.print_exc(file=sys.stderr)
116
117    return conn
118
119
120def fetchCertificateChain(connection):
121    chain = connection.get_peer_cert_chain()
122    if chain:
123        return chain
124    return None
125
126
127def normalizeCertificateName(cert_name):
128    n = cert_name.get_components()
129    n.sort()
130    return tuple(n)
131
132
133def normalizeCertificateChain(chain):
134    # Organize certificates by subject and issuer for quick lookups
135    subject_table = {}
136    issuer_table = {}
137    for c in chain:
138        subject_table[normalizeCertificateName(c.get_subject())] = c
139        issuer_table[normalizeCertificateName(c.get_issuer())] = c
140
141    # Now find root or highest-level intermediary
142    root = None
143    for c in chain:
144        i = normalizeCertificateName(c.get_issuer())
145        s = normalizeCertificateName(c.get_subject())
146        if (i == s) or (i not in subject_table):
147            if root != None:
148                sys.stderr.write("WARN: Multiple root certificates found or broken certificate chain detected.")
149            else:
150                # Go with the first identified "root", since that's more likely to link up with the server cert
151                root = c
152
153    # Finally, build the chain from the top-down in the correct order
154    new_chain = []
155    nxt = root
156    while nxt != None:
157        new_chain = [nxt] + new_chain
158        s = normalizeCertificateName(nxt.get_subject())
159        nxt = issuer_table.get(s)
160   
161    return new_chain
162   
163
164def genFakeKey(certificate):
165    fake_key = OpenSSL.crypto.PKey()
166    old_pubkey = certificate.get_pubkey()
167    fake_key.generate_key(old_pubkey.type(), old_pubkey.bits())
168
169    return fake_key
170
171
172def getDigestAlgorithm(certificate):
173    # XXX: ugly hack because openssl API for this is limited
174    algo = certificate.get_signature_algorithm()
175    if b'With' in algo:
176        return algo.split(b'With', 1)[0].decode('utf-8')
177    return None
178
179
180def deleteExtension(certificate, index):
181    '''
182    A dirty hack until this is implemented in pyOpenSSL. See:
183    https://github.com/pyca/pyopenssl/issues/152
184    '''
185    ffi = cffi.FFI()
186    ffi.cdef('''void* X509_delete_ext(void* x, int loc);''')
187
188    # Try to load libssl using several recent names because package
189    # maintainers have the blinders on and don't have a universal
190    # symlink to the most recent version.
191    libssl = None
192    for libname in ('libssl.so','libssl.so.1.0.2', 'libssl.so.1.0.1', 'libssl.so.1.0.0','libssl.so.0.9.8'):
193        try:
194            libssl = ffi.dlopen(libname)
195            break
196        except OSError as e:
197            pass
198   
199    ext = libssl.X509_delete_ext(certificate._x509, index)
200    #XXX: memory leak.  supposed to free ext here
201
202
203def removePeskyExtensions(certificate):
204    #for index in range(0,certificate.get_extension_count()):
205    #    e = certificate.get_extension(index)
206    #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
207
208    index = 0
209    while index < certificate.get_extension_count():
210        e = certificate.get_extension(index)
211        if e.get_short_name() in (b'subjectKeyIdentifier', b'authorityKeyIdentifier'):
212            deleteExtension(certificate, index)
213            #XXX: would be nice if each of these extensions were re-added with appropriate values
214            index -= 1
215        index += 1
216   
217    #for index in range(0,certificate.get_extension_count()):
218    #    e = certificate.get_extension(index)
219    #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
220
221
222def randomizeSerialNumber(certificate):
223    certificate.set_serial_number(random.randint(0,2**64))
224   
225def genFakeCertificateChain(cert_chain):
226    ret_val = []
227    cert_chain.reverse() # start with highest level authority
228
229    c = cert_chain[0]
230    i = normalizeCertificateName(c.get_issuer())
231    s = normalizeCertificateName(c.get_subject())
232    if s != i:
233        # XXX: consider retrieving root locally and including a forged version instead
234        c.set_issuer(c.get_subject())
235    k = genFakeKey(c)
236    c.set_pubkey(k)
237    removePeskyExtensions(c)
238    randomizeSerialNumber(c)
239    c.sign(k, getDigestAlgorithm(c))
240    ret_val.append(c)
241
242    prev = k
243    for c in cert_chain[1:]:
244        k = genFakeKey(c)
245        c.set_pubkey(k)
246        removePeskyExtensions(c)
247        randomizeSerialNumber(c)
248        c.sign(prev, getDigestAlgorithm(c))
249        prev = k
250        ret_val.append(c)
251
252    ret_val.reverse()
253    return k,ret_val
Note: See TracBrowser for help on using the repository browser.