Ignore:
Timestamp:
03/09/16 17:39:55 (9 years ago)
Author:
tim
Message:

moved general purpose SSL/TLS and certificate functions to a library module
minor improvements to error reporting

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/bin/bletchley-clonecertchain

    r75 r87  
    2424
    2525
    26 Copyright (C) 2014 Blindspot Security LLC
     26Copyright (C) 2014,2016 Blindspot Security LLC
    2727Author: Timothy D. Morgan
    2828
     
    4444import traceback
    4545import socket
     46from bletchley import ssltls
    4647try:
    4748    import OpenSSL
     
    5253    sys.stderr.write('NOTE: pyOpenSSL version 0.14 or later is required!\n')
    5354    sys.exit(2)
    54 
    55 
    56 def createClientContext(method=SSL.SSLv3_METHOD):
    57     tlsClientContext = SSL.Context(method)
    58     tlsClientContext.set_verify(SSL.VERIFY_NONE, (lambda a,b,c,d,e: True))
    59     return tlsClientContext
    60 
    61 
    62 def fetchCertificateChain(host, port):
    63     protocols = [SSL.SSLv23_METHOD, SSL.TLSv1_METHOD,
    64                  SSL.TLSv1_1_METHOD, SSL.TLSv1_2_METHOD,
    65                  SSL.SSLv3_METHOD, SSL.SSLv2_METHOD]
    66 
    67     chain = None
    68     for p in protocols:
    69         serverSock = socket.socket()
    70         serverSock.connect((host,port))
    71    
    72         try:
    73             server = SSL.Connection(createClientContext(p), serverSock)
    74             server.set_connect_state()
    75             server.do_handshake()
    76         except Exception as e:
    77             sys.stderr.write("Exception during handshake with server: \n")
    78             traceback.print_exc(file=sys.stderr)
    79             sys.stderr.write("\nThis could happen because the server requires "
    80                              "certain SSL/TLS versions or a client certificiate."
    81                              "  Have no fear, we'll keep trying...\n\n")
    82 
    83         chain = server.get_peer_cert_chain()
    84         if chain:
    85             return chain
    86 
    87     return chain
    88 
    89 
    90 def normalizeCertificateName(cert_name):
    91     n = cert_name.get_components()
    92     n.sort()
    93     return tuple(n)
    94 
    95 
    96 def normalizeCertificateChain(chain):
    97     # Organize certificates by subject and issuer for quick lookups
    98     subject_table = {}
    99     issuer_table = {}
    100     for c in chain:
    101         subject_table[normalizeCertificateName(c.get_subject())] = c
    102         issuer_table[normalizeCertificateName(c.get_issuer())] = c
    103 
    104     # Now find root or highest-level intermediary
    105     root = None
    106     for c in chain:
    107         i = normalizeCertificateName(c.get_issuer())
    108         s = normalizeCertificateName(c.get_subject())
    109         if (i == s) or (i not in subject_table):
    110             if root != None:
    111                 sys.stderr.write("WARN: Multiple root certificates found or broken certificate chain detected.")
    112             else:
    113                 # Go with the first identified "root", since that's more likely to link up with the server cert
    114                 root = c
    115 
    116     # Finally, build the chain from the top-down in the correct order
    117     new_chain = []
    118     nxt = root
    119     while nxt != None:
    120         new_chain = [nxt] + new_chain
    121         s = normalizeCertificateName(nxt.get_subject())
    122         nxt = issuer_table.get(s)
    123    
    124     return new_chain
    125    
    126 
    127 def genFakeKey(certificate):
    128     fake_key = OpenSSL.crypto.PKey()
    129     old_pubkey = certificate.get_pubkey()
    130     fake_key.generate_key(old_pubkey.type(), old_pubkey.bits())
    131 
    132     return fake_key
    133 
    134 
    135 def getDigestAlgorithm(certificate):
    136     # XXX: ugly hack because pyopenssl API for this is limited
    137     if b'md5' in certificate.get_signature_algorithm():
    138         return 'md5'
    139     else:
    140         return 'sha1'
    141 
    142 
    143 def deleteExtension(certificate, index):
    144     import cffi
    145     from cffi import FFI
    146     ffi = FFI()
    147     ffi.cdef('''void* X509_delete_ext(void* x, int loc);''')
    148     libssl = ffi.dlopen('libssl.so')
    149     ext = libssl.X509_delete_ext(certificate._x509, index)
    150     #XXX: supposed to free ext here
    151 
    152 
    153 def removePeskyExtensions(certificate):
    154     #for index in range(0,certificate.get_extension_count()):
    155     #    e = certificate.get_extension(index)
    156     #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
    157 
    158     index = 0
    159     while index < certificate.get_extension_count():
    160         e = certificate.get_extension(index)
    161         if e.get_short_name() in (b'subjectKeyIdentifier', b'authorityKeyIdentifier'):
    162             deleteExtension(certificate, index)
    163             #XXX: would be nice if each of these extensions were re-added with appropriate values
    164             index -= 1
    165         index += 1
    166    
    167     #for index in range(0,certificate.get_extension_count()):
    168     #    e = certificate.get_extension(index)
    169     #    print("extension %d: %s\n" % (index, e.get_short_name()), e)
    170 
    171 
    172 def genFakeCertificateChain(cert_chain):
    173     ret_val = []
    174     cert_chain.reverse() # start with highest level authority
    175 
    176     c = cert_chain[0]
    177     i = normalizeCertificateName(c.get_issuer())
    178     s = normalizeCertificateName(c.get_subject())
    179     if s != i:
    180         # XXX: consider retrieving root locally and including a forged version instead
    181         c.set_issuer(c.get_subject())
    182     k = genFakeKey(c)
    183     c.set_pubkey(k)
    184     removePeskyExtensions(c)
    185     c.sign(k, getDigestAlgorithm(c))
    186     ret_val.append(c)
    187 
    188     prev = k
    189     for c in cert_chain[1:]:
    190         k = genFakeKey(c)
    191         c.set_pubkey(k)
    192         removePeskyExtensions(c)
    193         c.sign(prev, getDigestAlgorithm(c))
    194         prev = k
    195         ret_val.append(c)
    196 
    197     ret_val.reverse()
    198     return k,ret_val
    19955
    20056
     
    22480
    22581#print("REAL CHAIN:")
    226 chain = fetchCertificateChain(options.host[0],options.port)
     82connection = ssltls.ConnectSSLTLS(options.host[0],options.port)
     83chain = ssltls.fetchCertificateChain(connection)
    22784#for c in chain:
    22885#    print(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, c).decode('utf-8'))
     
    23794    sys.exit(2)
    23895
    239 fake_key, fake_chain = genFakeCertificateChain(chain)
     96fake_key, fake_chain = ssltls.genFakeCertificateChain(chain)
    24097print(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, fake_key).decode('utf-8'))
    24198for c in fake_chain:
Note: See TracChangeset for help on using the changeset viewer.