source: trunk/lib/bletchley/chosenct.py @ 71

Last change on this file since 71 was 71, checked in by tim, 10 years ago

chosenct and http2py convenience fixes

File size: 6.8 KB
Line 
1'''
2A collection of tools to assist in analyzing encrypted data
3through chosen ciphertext attacks.
4
5Copyright (C) 2012-2013 Virtual Security Research, LLC
6Author: Timothy D. Morgan
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License, version 3,
10 as published by the Free Software Foundation.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program.  If not, see <http://www.gnu.org/licenses/>.
19'''
20
21import sys
22import threading
23import struct
24import queue
25import hashlib
26
27# Wish Python had a better function for this that escaped more characters
28_html_escape_table = {
29    "&": "&amp;",
30    '"': "&quot;",
31    "'": "&apos;",
32    ">": "&gt;",
33    "<": "&lt;",
34    "\n": "&#x0a",
35    "\r": "&#x0d",
36    }
37
38def _html_escape(text):
39    return "".join(_html_escape_table.get(c,c) for c in text)
40
41
42class ProbeResults:
43    '''TODO
44    '''
45    _values = None
46    _raw_table = None #indexes are byte offset, then XORed value
47    _messages = None
48    _html_header = """<head>
49<script>
50function displayMessage(id)
51{
52  alert(document.getElementById(id).value);
53}
54</script>
55<style>
56td
57{
58  border-style: solid;
59  border-width: medium;
60  border-color: #FFFFFF;
61  border-spacing: 0px;
62  min-width: 100px;
63  max-width: 100px;
64  word-wrap: break-word;
65}
66</style></head>"""
67
68   
69    def __init__(self, ct_length, values):
70        self._ct_length = ct_length
71        self._values = values
72        self._raw_table = {}
73        self._messages = {}
74        return
75
76    def _generate_colors(self, s):
77        if isinstance(s, str):
78            s = s.encode('utf-8')
79        base=bytes(hashlib.md5(s).digest()[0:6])
80        color1 = "#%.2X%.2X%.2X" % tuple(base[:3])
81        color2 = "#%.2X%.2X%.2X" % tuple(base[3:])
82
83        return color1,color2
84
85
86    def toHTML(self):
87        maxlen = 20
88        ret_val = self._html_header
89        ret_val += '<table><tr><td>&nbsp;&nbsp;&nbsp;OFFSET<br /><br />VALUE</td>'
90
91        for offset in self._raw_table.keys():
92            ret_val += '<td>%d<br /><br /></td>' % offset
93        ret_val += '</tr>'
94
95        for v in self._values:
96            ret_val += '<tr><td>0x%.2X</td>' % v
97            for offset in range(0,self._ct_length):
98                message = self._raw_table[offset][v]
99                bg,fg = self._generate_colors(message)
100                if not isinstance(message, str):
101                    message = message.decode('utf-8')
102
103                truncated = message[0:maxlen]
104                if len(message) > maxlen:
105                    truncated += '...'
106                msg_id = 'cell_%.2X_%.2X' % (offset, v)
107                ret_val += ('''<td style="background-color:%s; border-color:%s" onclick="displayMessage('%s')">'''
108                            '''<input type="hidden" id="%s" value="%s" />%s</td>\n''')\
109                            % (bg,fg, msg_id, msg_id, _html_escape(message), _html_escape(truncated))
110            ret_val += '</tr>'
111           
112        ret_val += '</table>'
113
114        return ret_val
115
116
117def probe_bytes(checker, ciphertext, values, max_threads=1):
118    '''For each offset in the ciphertext, XORs each of the values with
119    it and sends it to the checker to determine what kind of response or
120    error message was generated.
121
122    Arguments:
123    checker -- A function which sends a specified ciphertext to the targeted
124               application and returns a string describing the kind of response
125               that was encountered.  This function should be thread-safe when
126               max_threads > 1.
127
128               This function should implement the prototype:
129                 def myChecker(ciphertext): ...
130
131               The function should return strings that are relevant to
132               the kind of overall response generated by the targeted
133               system or application.  For instance, if detailed error
134               messages are returned, then the important parts of those
135               errors should be returned.  If error messages are not
136               returned in some cases, then simple tokens that describe
137               the behavior of the response should suffice.  For
138               instance, if in some cases the application returns a
139               generic HTTP 500 error, in other cases it drops the TCP
140               connection, and still in other cases it doesn't return an
141               error, then the checker function could return "500",
142               "dropped", and "success" respectively for those cases.
143
144    ciphertext -- A ciphertext buffer (bytes/bytearray) that will be repeatedly
145               modified and tested using the checker function.
146
147    values --  A sequence of integers in the range [0..255].  These values
148               will be XORed with each byte in the ciphertext and tested, one
149               after another.  To make a single change to each byte in the
150               ciphertext, provide something like [1].  To flip every bit
151               in the entire ciphertext individually, supply: [1,2,4,8,16,32,64,128]
152
153    max_threads -- The maximum number of threads to run in parallel while
154               testing modified ciphertexts.
155    '''
156    if max_threads < 1:
157        return None
158
159    ciphertext = bytearray(ciphertext)
160    values = bytearray(values)
161
162    # XXX: Improve threading model
163    #      Instead of forking threads and joining them for each byte,
164    #      Generate all ciphertext variants up front, putting them in
165    #      a jobs queue, and then have persistent threads pull from
166    #      the jobs queue  (or use a generator, rather than a queue)
167    ret_val = ProbeResults(len(ciphertext), values)
168    num_threads = min(len(values),max_threads)
169    threads = []
170    for j in range(0,len(ciphertext)):
171        prefix = ciphertext[0:j]
172        target = ciphertext[j]
173        suffix = ciphertext[j+1:]
174        results = queue.Queue()
175        for i in range(0,num_threads):
176            subset = [values[s] for s in range(i,len(values),num_threads)]
177            t = threading.Thread(target=probe_worker, 
178                                 args=(checker, prefix, suffix, target,
179                                       subset, results))
180            t.start()
181            threads.append(t)
182
183        for t in threads:
184            t.join()
185
186        # XXX: add functions to ProbeResults class to add results here,
187        #      rather than accessing members directly.
188        ret_val._raw_table[j] = {}
189        while not results.empty():
190            ret_val._raw_table[j].update(results.get())
191
192    return ret_val
193
194
195def probe_worker(checker, prefix, suffix, target, value_subset, results):
196    for v in value_subset:
197        results.put({v:checker(prefix+bytearray((v^target,))+suffix)})
198
Note: See TracBrowser for help on using the repository browser.