forked from SeattleTestbed/seattlelib_v2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
encasementlib.r2py
242 lines (180 loc) · 6.68 KB
/
encasementlib.r2py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""
Author: Armon Dadgar
Description:
This module is designed to help construct security layers.
Its main mechanism is secure_dispatch_module() which takes a context
definition and executes the next layer within a secure context.
"""
# Copy our clean context for importing
CLEAN_CONTEXT = _context.copy()
CLEAN_CONTEXT["_context"] = CLEAN_CONTEXT
CLEAN_CONTEXT["mycontext"] = {}
# These are the calls that are initially copied to form
# INIT_DEFINITION
#
COPIED_API = ["gethostbyname", "getmyip", "sendmessage", "listenformessage",
"openconnection", "listenforconnection", "openfile", "listfiles",
"removefile", "exitall", "createlock", "getruntime", "randombytes",
"createthread", "sleep", "log", "getthreadname", "createvirtualnamespace",
"getresources", "getlasterror",]
# This is the definition of the first security layer
# Copy all of the functions named by COPIED_API
INIT_DEFINITION = {}
# This is the maximum number of bytes we will try to read in for a file.
# Anything larger and we abort
MAX_FILE_SIZE = 100000
# Constants
TYPE = "type"
ANY = "any"
TARGET = "target"
#### Helper functions ####
# Helper macro to build INIT_DEFINITION
def _build_INIT_DEFINITION():
for API in COPIED_API:
INIT_DEFINITION[API] = {TYPE:ANY,
TARGET:_context[API]}
# Expose wrap_references
INIT_DEFINITION["wrap_references"] = {
"type":"func",
"args":(dict,),
"return":dict,
"exceptions":RepyArgumentError,
"target":wrap_references
}
# Imports the wrapper module
def _import_wrapper():
# The code for the module
virt = _layer_code("wrapper.r2py")
# Evaluate the module in a copy of the clean context
eval_context = CLEAN_CONTEXT.copy()
virt.evaluate(eval_context)
# Export the wrap_references function
_context["wrap_references"] = eval_context["wrap_references"]
# Gets the code object for a layer
def _layer_code(layer):
# Try to get a file handle to the module
try:
fileh = openfile(layer, False)
except FileNotFoundError:
raise FileNotFoundError, "Cannot load security layer '"+layer+"'. File not found."
except ResourceExhaustedError:
raise FileNotFoundError, "Cannot load security layer '"+layer+"'. No file resources."
# Read in the code
code = fileh.readat(MAX_FILE_SIZE, 0)
fileh.close()
if len(code) == MAX_FILE_SIZE:
log("Failed to read all of security layer '"+layer+"'! File size exceeds 100K bytes!")
# Create a new virtual namespace
try:
virt = createvirtualnamespace(code,layer)
except CodeUnsafeError, e:
raise CodeUnsafeError, "Compiling security layer '"+layer+"' failed! Got exception: '" + str(e) + "'"
# Return the new namespace
return virt
# Generate the context for a layer
def _layer_context(caller_context, context_def):
"""
<Purpose>
Generates the context for the next layer.
<Arguments>
caller_context: The context of the caller.
context_def: The definition of the context
<Exceptions>
RepyArgumentError if the definition is invalid.
<Returns>
A SafeDict execution context.
"""
# Wrap all the references in a SafeDict
wrapped_context = SafeDict(wrap_references(context_def))
# Provide a reference to itself
wrapped_context["_context"] = wrapped_context
# Provide a clean mycontext dictionary
wrapped_context["mycontext"] = {}
# Try to copy in callargs and callfunc
if "callfunc" in caller_context and type(caller_context["callfunc"]) is str:
wrapped_context["callfunc"] = caller_context["callfunc"]
if "callargs" in caller_context and type(caller_context["callargs"]) is list:
# Offset the list by 1
offset_args = caller_context["callargs"][1:]
# If they are all strings, copy over
for elem in offset_args:
if type(elem) is not str:
break
else:
wrapped_context["callargs"] = offset_args
# Generate a new CHILD_CONTEXT_DEF
new_def = _layer_generic_def(wrapped_context, context_def)
wrapped_context["CHILD_CONTEXT_DEF"] = new_def
# Generate a layer-specific secure_dispatch_module
dispatch_func = _layer_instance_dispatch(wrapped_context, new_def)
wrapped_context["secure_dispatch_module"] = dispatch_func
# Return the context
return wrapped_context
# Takes the definition used to generate a context
# and generates a generic definition can can be used
# to just copy the existing references
def _layer_generic_def(built_context, context_def):
# Create a new definition dict
new_def = {}
# Just map the reference to any, this will result
# in a simple copy of the reference
for key in context_def:
new_def[key] = {TYPE:ANY,
TARGET:built_context[key]}
# Return the new definition
return new_def
# Returns a new function which calls secure_dispatch
# using the specified caller, and a default context
# definition dictionary.
def _layer_instance_dispatch(caller, default_definition):
def _secure_dispatch_closure(context_def=None):
# Check if we should use the default definition
if context_def is None:
context_def = default_definition
# Call secure dispatch
secure_dispatch(caller, context_def)
# Return this new closure
return _secure_dispatch_closure
#### Support for dispatch ####
# Securely dispatches the next security layer
def secure_dispatch(caller_context, context_def):
"""
<Purpose>
Wraps the references provided by a security layer and
dispatches the next security layer.
<Arguments>
caller_context:
The context of the caller.
context_def:
The definition of the context for the next layer
<Exceptions>
As with the module being evaluated. An exception will be raised
if the module to be evaluated cannot be initialized in a VirtualNamespace
due to safety or syntax problems, or if the module does not exist.
If the definition of the context is invalid, a RepyArgumentError will
be raised.
<Side Effects>
Execution will switch to the next module.
<Returns>
True if a recursive evaluation was performed, false otherwise.
"""
# Check that there is a next module
if not "callargs" in caller_context or len(caller_context["callargs"]) == 0:
return False
# Get the next layer to run
next_layer = caller_context["callargs"][0]
# Get the code for the layer
virt = _layer_code(next_layer)
# Get the evaluation context for the layer
eval_context = _layer_context(caller_context, context_def)
# Evaluate the new security layer
virt.evaluate(eval_context)
# Return success
return True
#### Dispatch the next module ####
# Import the wrapper module
_import_wrapper()
# Build INIT_DEFINITION
_build_INIT_DEFINITION()
# Dispatch now
secure_dispatch(_context, INIT_DEFINITION)