- Timestamp:
- 12/05/12 20:26:26 (12 years ago)
- Location:
- lib/bletchley
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/bletchley/CBC/__init__.py
r17 r18 26 26 from .Exceptions import * 27 27 28 class DecryptionOracle: 29 ''' 30 This class implements a decryption oracle based on a given padding oracle. 31 The attacked padding scheme is the one defined in PKCS#5 and RFC2040, and maybe other places. 32 The attack was first described in the "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC, WTLS... by Serge Vaudenay" 33 ''' 34 28 class POA: 29 """This class implements padding oracle attacks given a ciphertext and 30 function that acts as a padding oracle. 31 32 The padding scheme is assumed to be PKCS#5/#7, also defined in RFC2040. 33 This attack was first described in: 34 "Security Flaws Induced by CBC Padding. Applications to SSL, IPSEC, 35 WTLS" by Serge Vaudenay (2002) 36 37 POA objects are not thread-safe. If multiple threads need to work 38 simultaneously on the same ciphertext and oracle, create a 39 separate instance. 40 41 """ 42 43 ## private 35 44 _thread_result = None 36 45 _oracle = None 37 46 _ciphertext = None 38 47 _iv = None 39 _decrypted = None 40 max_threads = None 48 49 ## protected (reading ok, changing not ok) 50 block_size = None 51 52 ## public (r/w ok) 53 decrypted = None 54 threads = None 41 55 log_fh = None 42 56 43 def __init__(self, oracle, block_size, ciphertext, iv=None, max_threads=1, log_file=None): 44 ''' 45 Creates a new DecryptionOracle object. Receives an oracle function which returns True 46 if the given ciphertext results in a correct padding and False otherwise. A second 47 parameter defining the cipher block size in bytes is also supported (default is 8). 48 ''' 57 def __init__(self, oracle, block_size, ciphertext, iv=None, 58 threads=1, decrypted='', log_file=None): 59 """Creates a new padding oracle attack (POA) object. 60 61 Arguments: 62 oracle -- A function which returns True if the given ciphertext 63 results in a correct padding upon decryption and False 64 otherwise. This function should implement the prototype: 65 def myOracle(ciphertext, iv) 66 If the initialization vector (iv) is unknown is not included in 67 the ciphertext message, it can be ignored in the oracle 68 implementation (though some limitations will result from this). 69 70 block_size -- The block size of the ciphertext being attacked. 71 Is almost always 8 or 16. 72 73 ciphertext -- The ciphertext to be decrypted 74 75 iv -- The initialization vector associated with the ciphertext. 76 If none provided, it is assumed to be a block of 0's 77 78 threads -- The maximum number of parallel threads to use during 79 decryption. If more than one thread is used, then the oracle 80 function will be called in parallel. It should implement any 81 internal locking necessary to prevent race conditions where 82 applicable. 83 84 decrypted -- If a portion of the plaintext is already known (due 85 to a prior, partially successful decryption attempt), then this 86 may be used to restart the decryption process where it was 87 previously left off. This argument is assumed to contain the 88 final N bytes (for an N-byte argument) of the plaintext; that 89 is, the tail of the plaintext. 90 91 log_file -- A Python file object where log messages will be 92 written. 93 94 """ 95 49 96 if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size): 50 97 raise InvalidBlockError(block_size,len(ciphertext)) … … 53 100 54 101 self._oracle = oracle 55 self._block_size = block_size56 102 self._ciphertext = ciphertext 57 103 self._iv = iv 58 self._decrypted = '' 59 self.max_threads = max_threads 104 self.block_size = block_size 105 self.decrypted = decrypted 106 self.threads = threads 60 107 self.log_fh = log_file 61 108 … … 67 114 68 115 def probe_padding(self, prior, final): 69 ''' 70 Attempts to verify that a CBC padding oracle exists and then determines the 71 pad value. Returns the pad string, or None on failure. 116 """Attempts to verify that a CBC padding oracle exists and then determines the 117 pad value. 118 119 Returns the pad string, or None on failure. 72 120 XXX: Currently only works for PKCS 5/7. 73 ''' 121 """ 122 74 123 ret_val = None 75 124 # First probe for beginning of pad 76 for i in range(0-self. _block_size,0):125 for i in range(0-self.block_size,0): 77 126 if i == -1: 78 127 break … … 105 154 106 155 if ret_val: 107 self. _decrypted = ret_val156 self.decrypted = ret_val 108 157 109 158 return ret_val … … 122 171 123 172 def decrypt_next_byte(self, prior, block, known_bytes): 124 ''' 125 Given some known final bytes, decrypts the next byte using the padding oracle. 126 prior - 127 block - 128 known_bytes - 129 ''' 130 if(len(block)!=self._block_size): 173 """Decrypts one byte of ciphertext by modifying the prior 174 ciphertext block at the same relative offset. 175 176 Arguments: 177 prior -- Ciphertext block appearing prior to the current target 178 block -- Currently targeted ciphertext block 179 known_bytes -- Bytes in this block already decrypted 180 181 """ 182 183 if(len(block)!=self.block_size): 131 184 raise InvalidBlockError 132 185 numKnownBytes = len(known_bytes) 133 186 134 if(numKnownBytes >= self. _block_size):187 if(numKnownBytes >= self.block_size): 135 188 return known_bytes 136 189 137 prior_prefix = prior[0:self. _block_size-numKnownBytes-1]138 base = ord(prior[self. _block_size-numKnownBytes-1])190 prior_prefix = prior[0:self.block_size-numKnownBytes-1] 191 base = ord(prior[self.block_size-numKnownBytes-1]) 139 192 # Adjust known bytes to appear as a PKCS 7 pad 140 193 suffix = [0]*numKnownBytes … … 147 200 self._thread_result = None 148 201 threads = [] 149 for i in range(0,self. max_threads):202 for i in range(0,self.threads): 150 203 t = threading.Thread(target=self._test_value_set, 151 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self. max_threads)))204 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.threads))) 152 205 t.start() 153 206 threads.append(t) … … 160 213 161 214 decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1)) 162 self. _decrypted = decrypted + self._decrypted215 self.decrypted = decrypted + self.decrypted 163 216 # Return previous bytes together with current byte 164 217 return decrypted+known_bytes … … 166 219 167 220 def decrypt_block(self, prior, block, last_bytes=''): 168 ''' 169 Decrypts the block of ciphertext provided as a parameter. 170 ''' 171 while(len(last_bytes)!=self._block_size): 221 """Decrypts the block of ciphertext provided as a parameter. 222 223 """ 224 225 while(len(last_bytes)!=self.block_size): 172 226 last_bytes = self.decrypt_next_byte(prior, block, last_bytes) 173 227 return last_bytes 174 228 175 229 176 # XXX: Enable recovery in case of intermittent failure by storing state of 177 # partial decryption on object 178 # XXX: Add option to strip padding from message 230 # XXX: Add logic to begin where decryption previously left off 179 231 def decrypt(self): 180 ''' 181 Decrypts a message using CBC mode. If the IV is not provided, it assumes a null IV. 182 ''' 183 blocks = buffertools.splitBuffer(self._ciphertext, self._block_size) 232 """Decrypts a message using CBC mode. If the IV is not provided, 233 it assumes a null IV. 234 235 """ 236 237 blocks = buffertools.splitBuffer(self._ciphertext, self.block_size) 184 238 185 239 final = blocks[-1] 186 240 iv = self._iv 187 241 if iv == None: 188 iv = '\x00'*self. _block_size242 iv = '\x00'*self.block_size 189 243 if len(blocks) == 1: 190 244 # If only one block present, then try to use IV as prior … … 196 250 pad_bytes = self.probe_padding(prior, final) 197 251 decrypted = self.decrypt_block(prior, final, pad_bytes) 198 print(repr(decrypted))199 252 200 253 # Now decrypt all other blocks except first block … … 205 258 decrypted = self.decrypt_block(iv, blocks[0]) + decrypted 206 259 207 return decrypted260 return buffertools.stripPKCS7Pad(decrypted) 208 261 209 262 210 263 def encrypt_block(self, plaintext, ciphertext): 211 if len(plaintext) != self. _block_size or len(plaintext) != len(ciphertext):212 raise InvalidBlockError(self. _block_size,len(plaintext))213 214 ptext = self.decrypt_block('\x00'*self. _block_size, ciphertext)264 if len(plaintext) != self.block_size or len(plaintext) != len(ciphertext): 265 raise InvalidBlockError(self.block_size,len(plaintext)) 266 267 ptext = self.decrypt_block('\x00'*self.block_size, ciphertext) 215 268 prior = buffertools.xorBuffers(ptext, plaintext) 216 269 return prior,ciphertext … … 220 273 # plaintext and subsequent concatenation can easily achieve this as well... 221 274 def encrypt(self,plaintext): 222 blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self. _block_size),223 self. _block_size)224 225 if (len(self. _decrypted) >= self._block_size226 and len(self._ciphertext) >= 2*self. _block_size):275 blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self.block_size), 276 self.block_size) 277 278 if (len(self.decrypted) >= self.block_size 279 and len(self._ciphertext) >= 2*self.block_size): 227 280 # If possible, reuse work from prior decryption efforts on original 228 281 # message for last block 229 old_prior = self._ciphertext[0-self. _block_size*2:0-self._block_size]230 final_plaintext = self. _decrypted[0-self._block_size:]282 old_prior = self._ciphertext[0-self.block_size*2:0-self.block_size] 283 final_plaintext = self.decrypted[0-self.block_size:] 231 284 prior = buffertools.xorBuffers(old_prior, 232 285 buffertools.xorBuffers(final_plaintext, blocks[-1])) 233 ciphertext = self._ciphertext[0-self. _block_size:]286 ciphertext = self._ciphertext[0-self.block_size:] 234 287 else: 235 288 # Otherwise, select a random last block and generate the prior block 236 ciphertext = struct.pack("B"*self. _block_size,237 *[random.getrandbits(8) for i in range(self. _block_size)])289 ciphertext = struct.pack("B"*self.block_size, 290 *[random.getrandbits(8) for i in range(self.block_size)]) 238 291 prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext) 239 292 -
lib/bletchley/buffertools.py
r15 r18 121 121 def pkcs7Pad(length): 122 122 return chr(length) * length 123 124 def stripPKCS7Pad(decrypted, block_size=16): 125 ''' 126 Validates a plaintext containing a PKCS5/7 pad, then returns the plaintext 127 without the pad. 128 ''' 129 if len(decrypted) % block_size != 0: 130 return None 131 132 length = ord(decrypted[-1]) 133 if length > block_size: 134 return None 135 136 if decrypted[0-length:] != pkcs7Pad(length): 137 return None 138 139 return decrypted[0:0-length]
Note: See TracChangeset
for help on using the changeset viewer.