source: trunk/lib/bletchley/chosenct.py @ 65

Last change on this file since 65 was 65, checked in by tmorgan, 11 years ago

added result set object and HTML generator to chosenct probe routine

Python3 fixes and code reorganization

File size: 6.7 KB
Line 
1'''
2A collection of tools to assist in analyzing encrypted data
3through chosen ciphertext attacks.
4
5Copyright (C) 2012-2013 Virtual Security Research, LLC
6Author: Timothy D. Morgan
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License, version 3,
10 as published by the Free Software Foundation.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program.  If not, see <http://www.gnu.org/licenses/>.
19'''
20
21import sys
22import threading
23import struct
24import queue
25import hashlib
26
27# Wish Python had a better function for this that escaped more characters
28_html_escape_table = {
29    "&": "&amp;",
30    '"': "&quot;",
31    "'": "&apos;",
32    ">": "&gt;",
33    "<": "&lt;",
34    "\n": "&#x0a",
35    "\r": "&#x0d",
36    }
37
38def _html_escape(text):
39    return "".join(_html_escape_table.get(c,c) for c in text)
40
41
42class ProbeResults:
43    '''TODO
44    '''
45    _values = None
46    _raw_table = None #indexes are byte offset, then XORed value
47    _messages = None
48    _html_header = """<head>
49<script>
50function displayMessage(id)
51{
52  alert(document.getElementById(id).value);
53}
54</script>
55<style>
56td
57{
58  border-style: solid;
59  border-width: medium;
60  border-color: #FFFFFF;
61  border-spacing: 0px;
62  min-width: 100px;
63  max-width: 100px;
64  word-wrap: break-word;
65}
66</style></head>"""
67
68   
69    def __init__(self, ct_length, values):
70        self._ct_length = ct_length
71        self._values = values
72        self._raw_table = {}
73        self._messages = {}
74        return
75
76    def _generate_colors(self, s):
77        base=bytes(hashlib.md5(s).digest()[0:6])
78        color1 = "#%.2X%.2X%.2X" % tuple(base[:3])
79        color2 = "#%.2X%.2X%.2X" % tuple(base[3:])
80
81        return color1,color2
82
83
84    def toHTML(self):
85        maxlen = 20
86        ret_val = self._html_header
87        ret_val += '<table><tr><td>&nbsp;&nbsp;&nbsp;OFFSET<br /><br />VALUE</td>'
88
89        for offset in self._raw_table.keys():
90            ret_val += '<td>%d<br /><br /></td>' % offset
91        ret_val += '</tr>'
92
93        for v in self._values:
94            ret_val += '<tr><td>0x%.2X</td>' % v
95            for offset in range(0,self._ct_length):
96                message = self._raw_table[offset][v]
97                bg,fg = self._generate_colors(message)
98                message = message.decode('utf-8')
99
100                truncated = message[0:maxlen]
101                if len(message) > maxlen:
102                    truncated += '...'
103                msg_id = 'cell_%.2X_%.2X' % (offset, v)
104                ret_val += ('''<td style="background-color:%s; border-color:%s" onclick="displayMessage('%s')">'''
105                            '''<input type="hidden" id="%s" value="%s" />%s</td>\n''')\
106                            % (bg,fg, msg_id, msg_id, _html_escape(message), _html_escape(truncated))
107            ret_val += '</tr>'
108           
109        ret_val += '</table>'
110
111        return ret_val
112
113
114def probe_bytes(checker, ciphertext, values, max_threads=1):
115    '''For each offset in the ciphertext, XORs each of the values with
116    it and sends it to the checker to determine what kind of response or
117    error message was generated.
118
119    Arguments:
120    checker -- A function which sends a specified ciphertext to the targeted
121               application and returns a string describing the kind of response
122               that was encountered.  This function should be thread-safe when
123               max_threads > 1.
124
125               This function should implement the prototype:
126                 def myChecker(ciphertext): ...
127
128               The function should return strings that are relevant to
129               the kind of overall response generated by the targeted
130               system or application.  For instance, if detailed error
131               messages are returned, then the important parts of those
132               errors should be returned.  If error messages are not
133               returned in some cases, then simple tokens that describe
134               the behavior of the response should suffice.  For
135               instance, if in some cases the application returns a
136               generic HTTP 500 error, in other cases it drops the TCP
137               connection, and still in other cases it doesn't return an
138               error, then the checker function could return "500",
139               "dropped", and "success" respectively for those cases.
140
141    ciphertext -- A ciphertext buffer (bytes/bytearray) that will be repeatedly
142               modified and tested using the checker function.
143
144    values --  A sequence of integers in the range [0..255].  These values
145               will be XORed with each byte in the ciphertext and tested, one
146               after another.  To make a single change to each byte in the
147               ciphertext, provide something like [1].  To flip every bit
148               in the entire ciphertext individually, supply: [1,2,4,8,16,32,64,128]
149
150    max_threads -- The maximum number of threads to run in parallel while
151               testing modified ciphertexts.
152    '''
153    if max_threads < 1:
154        return None
155
156    ciphertext = bytearray(ciphertext)
157    values = bytearray(values)
158
159    # XXX: Improve threading model
160    #      Instead of forking threads and joining them for each byte,
161    #      Generate all ciphertext variants up front, putting them in
162    #      a jobs queue, and then have persistent threads pull from
163    #      the jobs queue  (or use a generator, rather than a queue)
164    ret_val = ProbeResults(len(ciphertext), values)
165    num_threads = min(len(values),max_threads)
166    threads = []
167    for j in range(0,len(ciphertext)):
168        prefix = ciphertext[0:j]
169        target = ciphertext[j]
170        suffix = ciphertext[j+1:]
171        results = queue.Queue()
172        for i in range(0,num_threads):
173            subset = [values[s] for s in range(i,len(values),num_threads)]
174            t = threading.Thread(target=probe_worker, 
175                                 args=(checker, prefix, suffix, target,
176                                       subset, results))
177            t.start()
178            threads.append(t)
179
180        for t in threads:
181            t.join()
182
183        # XXX: add functions to ProbeResults class to add results here,
184        #      rather than accessing members directly.
185        ret_val._raw_table[j] = {}
186        while not results.empty():
187            ret_val._raw_table[j].update(results.get())
188
189    return ret_val
190
191
192def probe_worker(checker, prefix, suffix, target, value_subset, results):
193    for v in value_subset:
194        results.put({v:checker(prefix+bytearray((v^target,))+suffix)})
195
Note: See TracBrowser for help on using the repository browser.