Changeset 15
- Timestamp:
- 11/25/12 18:44:45 (12 years ago)
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/TODO
r11 r15 16 16 * analyze 17 17 - add options to override decoding chain and block size 18 19 20 * Consider using an SMT solver for certain problems (linear PRNGs?) 21 http://en.wikipedia.org/wiki/Satisfiability_Modulo_Theories 22 23 24 * Add tools for testing and conducting hash length-extension attacks. 25 One tool: https://github.com/bwall/HashPump -
lib/bletchley/PaddingOracle/DecryptionOracle.py
r12 r15 3 3 4 4 Copyright (C) 2010 ELOI SANFÈLIX 5 Copyright (C) 2012 Timothy D. Morgan 5 6 @author: Eloi Sanfelix < eloi AT limited-entropy.com > 7 @author: Timothy D. Morgan < tmorgan {a} vsecurity . com > 6 8 7 9 This program is free software: you can redistribute it and/or modify … … 32 34 33 35 _thread_result = None 36 _oracle = None 37 _ciphertext = None 38 _iv = None 39 _decrypted = None 34 40 max_threads = None 35 41 log_fh = None 36 37 def __init__(self, oracle, block_size =8, max_threads=1, log_file=None):42 43 def __init__(self, oracle, block_size, ciphertext, iv=None, max_threads=1, log_file=None): 38 44 ''' 39 45 Creates a new DecryptionOracle object. Receives an oracle function which returns True … … 41 47 parameter defining the cipher block size in bytes is also supported (default is 8). 42 48 ''' 43 self.oracle = oracle 44 self.block_size = block_size 49 if(len(ciphertext)%block_size != 0 or len(ciphertext) < block_size): 50 raise InvalidBlockError(block_size,len(ciphertext)) 51 if(iv != None and len(iv)%block_size != 0): 52 raise InvalidBlockError(block_size,len(iv)) 53 54 self._oracle = oracle 55 self._block_size = block_size 56 self._ciphertext = ciphertext 57 self._iv = iv 58 self._decrypted = '' 45 59 self.max_threads = max_threads 46 60 self.log_fh = log_file 47 61 48 62 49 def log_message(self, message):63 def log_message(self, s): 50 64 if self.log_fh != None: 51 self.log_fh.write(message+'\n') 52 53 54 def probe_padding(self, blob, iv=None): 55 final = blob[0-self.block_size:] 56 prior = blob[0-2*self.block_size:0-self.block_size] 57 if len(blob) <= self.block_size: 58 # If only one block present, then try to use an IV 59 if iv!=None: 60 self.log_message("Only one block present, using IV as scratch pad") 61 prior = iv 62 else: 63 self.log_message("Only one block present, using 0 block as scratch pad") 64 prior = '\x00'*self.block_size 65 65 self.log_fh.write(s+'\n') 66 67 68 def probe_padding(self, prior, final): 69 ''' 70 Attempts to verify that a CBC padding oracle exists and then determines the 71 pad value. Returns the pad string, or None on failure. 72 XXX: Currently only works for PKCS 5/7. 73 ''' 74 ret_val = None 66 75 # First probe for beginning of pad 67 for i in range(0-self. block_size,0):76 for i in range(0-self._block_size,0): 68 77 if i == -1: 69 78 break 70 79 tweaked = struct.unpack("B", prior[i])[0] ^ 0xFF 71 80 tweaked = struct.pack("B", tweaked) 72 if not self. oracle(blob+prior[:i]+tweaked+prior[i+1:]+final):81 if not self._oracle(self._ciphertext+prior[:i]+tweaked+prior[i+1:]+final): 73 82 break 74 83 … … 80 89 tweaked = struct.unpack("B", prior[-1])[0] ^ (pad_length^1) 81 90 tweaked = struct.pack("B", tweaked) 82 if self.oracle(blob+prior[:-1]+tweaked+final): 83 return pad_length 84 else: 85 return None 91 if self._oracle(self._ciphertext+prior[:-1]+tweaked+final): 92 ret_val = buffertools.pkcs7Pad(pad_length) 93 86 94 else: 87 95 # Verify by changing pad byte to 2 and brute-force changing … … 92 100 guess = struct.unpack("B", prior[-2])[0] ^ j 93 101 guess = struct.pack("B", guess) 94 if self.oracle(blob+prior[:-2]+guess+tweaked+final): 95 print("verified padding through decryption") 96 return pad_length 97 98 return None 99 100 101 def decrypt_last_bytes(self,block): 102 ''' 103 Decrypts the last bytes of block using the oracle. 104 ''' 105 if(len(block)!=self.block_size): 106 raise InvalidBlockError(self.block_size,len(block)) 107 108 #First we get some random bytes 109 #rand = [random.getrandbits(8) for i in range(self.block_size)] 110 rand = [0 for i in range(self.block_size)] 111 112 for b in range(256): 113 114 #XOR with current guess 115 rand[-1] ^= b 116 #Generate padding string 117 randStr = "".join([ struct.pack("B",i) for i in rand ] ) 118 if(self.oracle(randStr+block)): 119 break 120 else: 121 #Remove current guess 122 rand[-1] ^= b 123 124 #Now we have a correct padding, test how many bytes we got! 125 for i in range(self.block_size-1): 126 #Modify currently tested byte 127 rand[i] = rand[i]^0x01 128 randStr = "".join([ struct.pack("B",j) for j in rand ] ) 129 if(not self.oracle(randStr+block)): 130 #We got a hit! Byte i is also part of the padding 131 paddingLen = self.block_size-i 132 #Correct random i 133 rand[i] = rand[i]^0x01 134 #Return paddingLen final bytes 135 return "".join([ struct.pack("B",i^paddingLen) for i in rand[-paddingLen:]]) 136 137 #Nothing to do when there is no hit. This byte is useless then. 138 139 #Could only recover 1 byte. Return it. 140 return "".join(struct.pack("B",rand[-1]^0x01)) 141 142 143 def _test_value_set(self, prefix, base, suffix, value_set): 102 if self._oracle(self._ciphertext+prior[:-2]+guess+tweaked+final): 103 # XXX: Save the decrypted byte for later 104 ret_val = buffertools.pkcs7Pad(pad_length) 105 106 if ret_val: 107 self._decrypted = ret_val 108 109 return ret_val 110 111 112 # XXX: This could be generalized as a byte probe utility for a variety of attacks 113 def _test_value_set(self, prefix, suffix, value_set): 144 114 for b in value_set: 145 if(self.oracle(prefix+struct.pack("B",base^b)+suffix)): 146 self._thread_result = base^b 147 break 148 149 150 def decrypt_next_byte(self,block,known_bytes): 151 ''' 152 Given some known final bytes, decrypts the next byte using the padding oracle. 153 ''' 154 if(len(block)!=self.block_size): 115 if self._thread_result != None: 116 # Stop if another thread found the result 117 break 118 if self._oracle(str(prefix+struct.pack("B",b)+suffix)): 119 self._thread_result = b 120 break 121 122 123 def decrypt_next_byte(self, prior, block, known_bytes): 124 ''' 125 Given some known final bytes, decrypts the next byte using the padding oracle. 126 prior - 127 block - 128 known_bytes - 129 ''' 130 if(len(block)!=self._block_size): 155 131 raise InvalidBlockError 156 132 numKnownBytes = len(known_bytes) 157 133 158 if(numKnownBytes >= self. block_size):134 if(numKnownBytes >= self._block_size): 159 135 return known_bytes 160 136 161 #rand = [random.getrandbits(8) for i in range(self.block_size-numKnownBytes)]162 rand = [0 for i in range(self.block_size-numKnownBytes)]163 prefix = struct.pack("B"*len(rand[0:-1]),*rand[0:-1])164 suffix = list(struct.unpack("B"*numKnownBytes, known_bytes))137 prior_prefix = prior[0:self._block_size-numKnownBytes-1] 138 base = ord(prior[self._block_size-numKnownBytes-1]) 139 # Adjust known bytes to appear as a PKCS 7 pad 140 suffix = [0]*numKnownBytes 165 141 for i in range(0,numKnownBytes): 166 suffix[i] ^= numKnownBytes+1142 suffix[i] ^= ord(prior[0-numKnownBytes+i])^ord(known_bytes[i])^(numKnownBytes+1) 167 143 suffix = struct.pack("B"*len(suffix),*suffix)+block 168 144 169 # Now we do same trick again to find next byte. 145 # Each thread spawned searches a subset of the next byte's 146 # 256 possible values 170 147 self._thread_result = None 171 148 threads = [] 172 149 for i in range(0,self.max_threads): 173 150 t = threading.Thread(target=self._test_value_set, 174 args=( prefix, rand[-1], suffix, range(i,255,self.max_threads)))151 args=(self._ciphertext+prior_prefix, suffix, range(i,256,self.max_threads))) 175 152 t.start() 176 153 threads.append(t) … … 182 159 raise Exception 183 160 161 decrypted = struct.pack("B",self._thread_result^base^(numKnownBytes+1)) 162 self._decrypted = decrypted + self._decrypted 184 163 # Return previous bytes together with current byte 185 return struct.pack("B",self._thread_result^(numKnownBytes+1))+known_bytes 186 #return "".join([struct.pack("B",rand[i]^(numKnownBytes+1)) for i in range(self.block_size-numKnownBytes-1,self.block_size)]) 187 188 189 def decrypt_block(self,block): 164 return decrypted+known_bytes 165 166 167 def decrypt_block(self, prior, block, last_bytes=''): 190 168 ''' 191 169 Decrypts the block of ciphertext provided as a parameter. 192 170 ''' 193 bytes = self.decrypt_last_bytes(block) 194 while(len(bytes)!=self.block_size): 195 bytes = self.decrypt_next_byte(block,bytes) 196 return bytes 197 198 199 def decrypt_message(self,ctext, iv = None): 171 while(len(last_bytes)!=self._block_size): 172 last_bytes = self.decrypt_next_byte(prior, block, last_bytes) 173 return last_bytes 174 175 176 # XXX: Enable recovery in case of intermittent failure by storing state of 177 # partial decryption on object 178 # XXX: Add option to strip padding from message 179 def decrypt(self): 200 180 ''' 201 181 Decrypts a message using CBC mode. If the IV is not provided, it assumes a null IV. 202 182 ''' 203 #Recover first block 204 result = self.decrypt_block(ctext[0:self.block_size]) 205 206 #XOR IV if provided, else we assume zero IV. 207 if( iv != None): 208 result = self.xor_strings(result, iv) 209 210 #Recover block by block, XORing with previous ctext block 211 for i in range(self.block_size,len(ctext),self.block_size): 212 prev = ctext[i-self.block_size:i] 213 current = self.decrypt_block(ctext[i:i+self.block_size]) 214 result += self.xor_strings(prev,current) 215 return result 216 217 218 def xor_strings(self,s1,s2): 219 result = "" 220 for i in range(len(s1)): 221 result += struct.pack("B",ord(s1[i])^ord(s2[i])) 222 return result 223 224 225 def hex_string(self,data): 226 return "".join([ hex(ord(i))+" " for i in data]) 183 blocks = buffertools.splitBuffer(self._ciphertext, self._block_size) 184 185 final = blocks[-1] 186 iv = self._iv 187 if iv == None: 188 iv = '\x00'*self._block_size 189 if len(blocks) == 1: 190 # If only one block present, then try to use IV as prior 191 prior = iv 192 else: 193 prior = blocks[-2] 194 195 # Decrypt last block, starting with padding (quicker to decrypt) 196 pad_bytes = self.probe_padding(prior, final) 197 decrypted = self.decrypt_block(prior, final, pad_bytes) 198 print(repr(decrypted)) 199 200 # Now decrypt all other blocks except first block 201 for i in range(len(blocks)-2, 0, -1): 202 decrypted = self.decrypt_block(blocks[i-1], blocks[i]) + decrypted 203 204 # Finally decrypt first block 205 decrypted = self.decrypt_block(iv, blocks[0]) + decrypted 206 207 return decrypted 208 209 210 def encrypt_block(self, plaintext, ciphertext): 211 if len(plaintext) != self._block_size or len(plaintext) != len(ciphertext): 212 raise InvalidBlockError(self._block_size,len(plaintext)) 213 214 ptext = self.decrypt_block('\x00'*self._block_size, ciphertext) 215 prior = buffertools.xorBuffers(ptext, plaintext) 216 return prior,ciphertext 217 218 219 # XXX: Add option to encrypt only the last N blocks. Supplying a shorter 220 # plaintext and subsequent concatenation can easily achieve this as well... 221 def encrypt(self,plaintext): 222 blocks = buffertools.splitBuffer(buffertools.pkcs7PadBuffer(plaintext, self._block_size), 223 self._block_size) 224 225 if (len(self._decrypted) >= self._block_size 226 and len(self._ciphertext) >= 2*self._block_size): 227 # If possible, reuse work from prior decryption efforts on original 228 # message for last block 229 old_prior = self._ciphertext[0-self._block_size*2:0-self._block_size] 230 final_plaintext = self._decrypted[0-self._block_size:] 231 prior = buffertools.xorBuffers(old_prior, 232 buffertools.xorBuffers(final_plaintext, blocks[-1])) 233 ciphertext = self._ciphertext[0-self._block_size:] 234 else: 235 # Otherwise, select a random last block and generate the prior block 236 ciphertext = struct.pack("B"*self._block_size, 237 *[random.getrandbits(8) for i in range(self._block_size)]) 238 prior,ciphertext = self.encrypt_block(blocks[-1], ciphertext) 239 240 # Continue generating all prior blocks 241 for i in range(len(blocks)-2, -1, -1): 242 prior,cblock = self.encrypt_block(blocks[i],prior) 243 ciphertext = cblock+ciphertext 244 245 # prior as IV 246 return str(prior),str(ciphertext) -
lib/bletchley/buffertools.py
r7 r15 117 117 ''' 118 118 padding = block_size - (len(buf) % block_size) 119 return buf + (chr(padding) * padding) 119 return buf + pkcs7Pad(padding) 120 121 def pkcs7Pad(length): 122 return chr(length) * length
Note: See TracChangeset
for help on using the changeset viewer.