Changeset 87
- Timestamp:
- 03/09/16 17:39:55 (9 years ago)
- Location:
- trunk
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/bin/bletchley-clonecertchain
r75 r87 24 24 25 25 26 Copyright (C) 2014 Blindspot Security LLC26 Copyright (C) 2014,2016 Blindspot Security LLC 27 27 Author: Timothy D. Morgan 28 28 … … 44 44 import traceback 45 45 import socket 46 from bletchley import ssltls 46 47 try: 47 48 import OpenSSL … … 52 53 sys.stderr.write('NOTE: pyOpenSSL version 0.14 or later is required!\n') 53 54 sys.exit(2) 54 55 56 def createClientContext(method=SSL.SSLv3_METHOD):57 tlsClientContext = SSL.Context(method)58 tlsClientContext.set_verify(SSL.VERIFY_NONE, (lambda a,b,c,d,e: True))59 return tlsClientContext60 61 62 def fetchCertificateChain(host, port):63 protocols = [SSL.SSLv23_METHOD, SSL.TLSv1_METHOD,64 SSL.TLSv1_1_METHOD, SSL.TLSv1_2_METHOD,65 SSL.SSLv3_METHOD, SSL.SSLv2_METHOD]66 67 chain = None68 for p in protocols:69 serverSock = socket.socket()70 serverSock.connect((host,port))71 72 try:73 server = SSL.Connection(createClientContext(p), serverSock)74 server.set_connect_state()75 server.do_handshake()76 except Exception as e:77 sys.stderr.write("Exception during handshake with server: \n")78 traceback.print_exc(file=sys.stderr)79 sys.stderr.write("\nThis could happen because the server requires "80 "certain SSL/TLS versions or a client certificiate."81 " Have no fear, we'll keep trying...\n\n")82 83 chain = server.get_peer_cert_chain()84 if chain:85 return chain86 87 return chain88 89 90 def normalizeCertificateName(cert_name):91 n = cert_name.get_components()92 n.sort()93 return tuple(n)94 95 96 def normalizeCertificateChain(chain):97 # Organize certificates by subject and issuer for quick lookups98 subject_table = {}99 issuer_table = {}100 for c in chain:101 subject_table[normalizeCertificateName(c.get_subject())] = c102 issuer_table[normalizeCertificateName(c.get_issuer())] = c103 104 # Now find root or highest-level intermediary105 root = None106 for c in chain:107 i = normalizeCertificateName(c.get_issuer())108 s = normalizeCertificateName(c.get_subject())109 if (i == s) or (i not in subject_table):110 if root != None:111 sys.stderr.write("WARN: Multiple root certificates found or broken certificate chain detected.")112 else:113 # Go with the first identified "root", since that's more likely to link up with the server cert114 root = c115 116 # Finally, build the chain from the top-down in the correct order117 new_chain = []118 nxt = root119 while nxt != None:120 new_chain = [nxt] + new_chain121 s = normalizeCertificateName(nxt.get_subject())122 nxt = issuer_table.get(s)123 124 return new_chain125 126 127 def genFakeKey(certificate):128 fake_key = OpenSSL.crypto.PKey()129 old_pubkey = certificate.get_pubkey()130 fake_key.generate_key(old_pubkey.type(), old_pubkey.bits())131 132 return fake_key133 134 135 def getDigestAlgorithm(certificate):136 # XXX: ugly hack because pyopenssl API for this is limited137 if b'md5' in certificate.get_signature_algorithm():138 return 'md5'139 else:140 return 'sha1'141 142 143 def deleteExtension(certificate, index):144 import cffi145 from cffi import FFI146 ffi = FFI()147 ffi.cdef('''void* X509_delete_ext(void* x, int loc);''')148 libssl = ffi.dlopen('libssl.so')149 ext = libssl.X509_delete_ext(certificate._x509, index)150 #XXX: supposed to free ext here151 152 153 def removePeskyExtensions(certificate):154 #for index in range(0,certificate.get_extension_count()):155 # e = certificate.get_extension(index)156 # print("extension %d: %s\n" % (index, e.get_short_name()), e)157 158 index = 0159 while index < certificate.get_extension_count():160 e = certificate.get_extension(index)161 if e.get_short_name() in (b'subjectKeyIdentifier', b'authorityKeyIdentifier'):162 deleteExtension(certificate, index)163 #XXX: would be nice if each of these extensions were re-added with appropriate values164 index -= 1165 index += 1166 167 #for index in range(0,certificate.get_extension_count()):168 # e = certificate.get_extension(index)169 # print("extension %d: %s\n" % (index, e.get_short_name()), e)170 171 172 def genFakeCertificateChain(cert_chain):173 ret_val = []174 cert_chain.reverse() # start with highest level authority175 176 c = cert_chain[0]177 i = normalizeCertificateName(c.get_issuer())178 s = normalizeCertificateName(c.get_subject())179 if s != i:180 # XXX: consider retrieving root locally and including a forged version instead181 c.set_issuer(c.get_subject())182 k = genFakeKey(c)183 c.set_pubkey(k)184 removePeskyExtensions(c)185 c.sign(k, getDigestAlgorithm(c))186 ret_val.append(c)187 188 prev = k189 for c in cert_chain[1:]:190 k = genFakeKey(c)191 c.set_pubkey(k)192 removePeskyExtensions(c)193 c.sign(prev, getDigestAlgorithm(c))194 prev = k195 ret_val.append(c)196 197 ret_val.reverse()198 return k,ret_val199 55 200 56 … … 224 80 225 81 #print("REAL CHAIN:") 226 chain = fetchCertificateChain(options.host[0],options.port) 82 connection = ssltls.ConnectSSLTLS(options.host[0],options.port) 83 chain = ssltls.fetchCertificateChain(connection) 227 84 #for c in chain: 228 85 # print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8')) … … 237 94 sys.exit(2) 238 95 239 fake_key, fake_chain = genFakeCertificateChain(chain)96 fake_key, fake_chain = ssltls.genFakeCertificateChain(chain) 240 97 print(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, fake_key).decode('utf-8')) 241 98 for c in fake_chain:
Note: See TracChangeset
for help on using the changeset viewer.