-
Notifications
You must be signed in to change notification settings - Fork 1
/
demo.py
337 lines (242 loc) · 9.52 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import base64
import random
import json
from typing import Tuple
from algosdk.v2client import algod
from algosdk.encoding import msgpack_encode
from algosdk.future.transaction import *
from sandbox import get_accounts
from app import get_clear_src, get_approval_src
from sig import get_sig_tmpl
token = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
url = "http://localhost:4001"
client = algod.AlgodClient(token, url)
# To delete accounts in seq loop
cleanup = False
app_id = None
seed_amt = int(1e9)
max_keys = 16
max_bytes_per_key = 127
bits_per_byte = 8
bits_per_key = max_bytes_per_key * bits_per_byte
max_bytes = max_bytes_per_key * max_keys
max_bits = bits_per_byte * max_bytes
# TmplSig is a class to hold the source of a template contract
# populate can be called to get a LogicSig with the variables replaced
class TmplSig:
def __init__(self, app_id, admin_addr, seed_amt):
# Get template logic sig
self.tmpl = get_sig_tmpl(
app_id=app_id, seed_amt=seed_amt, admin_addr=admin_addr
)
# Get map of bytecode positions for compiled version
self.map = json.loads(open("sig.tmpl.teal.map.json", "r").read())
# Just string replace the var in the contract and recompile
# This can be done with a compiled contract but we're lazy in
# this demo
def populate(self, vars):
src = self.tmpl
for k, v in vars.items():
src = src.replace(k, str(v))
res = client.compile(src)
return LogicSigAccount(base64.b64decode(res["result"]))
# This is _not_ meant to be generic
# we know we have 3 chunks
def get_bytecode_chunks(self) -> Tuple[str, str, str]:
bytecode_chunks = []
src = self.tmpl
for k, v in self.map["template_labels"].items():
if v["bytes"]:
src = src.replace(k, '""')
else:
src = src.replace(k, "0")
res = base64.b64decode(client.compile(src)["result"])
last = 0
for v in self.map["template_labels"].values():
bytecode_chunks.append(res[last : v["position"]].hex())
last = v["position"] + 1 # account for 0 byte
bytecode_chunks.append(res[last:].hex())
return tuple(bytecode_chunks)
def demo():
global app_id
# Get Account from sandbox
addr, sk = get_accounts()[0]
print("Using {}".format(addr))
# Create app if needed
if app_id is None:
# Dummy tmpl sig so we can get something before we create the app
tsig = TmplSig(1, addr, seed_amt)
app_id = create_app(addr, sk, seed_amt, tsig.get_bytecode_chunks())
print("Created app: {}".format(app_id))
# Instantiate once, has ref to sig
tsig = TmplSig(app_id, addr, seed_amt)
# No need for this when you're not debugging
update_app(app_id, addr, sk, tsig.get_bytecode_chunks())
print("Updated app: {}".format(app_id))
# Lazy cache accts we see
cache = {}
# Get some random sequence
seq = [random.randint(0, int(1e3)) for x in range(1000)]
emitter_id = "deadbeef" * 4
for seq_id in seq:
lsa = tsig.populate(
{
"TMPL_ADDR_IDX": get_addr_idx(seq_id),
"TMPL_EMITTER_ID": "0x" + emitter_id,
}
)
print("For seq {} address is {}".format(seq_id, lsa.address()))
sig_addr = lsa.address()
print(lsa.lsig.logic.hex())
if sig_addr not in cache and not account_exists(app_id, sig_addr):
# Create it
sp = client.suggested_params()
seed_txn = PaymentTxn(addr, sp, sig_addr, seed_amt)
optin_txn = ApplicationOptInTxn(sig_addr, sp, app_id)
assign_group_id([seed_txn, optin_txn])
signed_seed = seed_txn.sign(sk)
signed_optin = LogicSigTransaction(optin_txn, lsa)
send("create", [signed_seed, signed_optin])
try:
# Flip the bit
sp = client.suggested_params()
flip_txn = ApplicationNoOpTxn(
addr,
sp,
app_id,
["flip_bit", seq_id.to_bytes(8, "big"), bytes.fromhex(emitter_id)],
accounts=[sig_addr],
)
signed_flip = flip_txn.sign(sk)
result = send("flip_bit", [signed_flip])
if "logs" in result:
print(result["logs"])
bits = check_bits_set(app_id, get_start_bit(seq_id), sig_addr)
cache[sig_addr] = bits
print(
"Accounts: {}, Bits Flipped: {}".format(
len(cache), sum([len(v) for _, v in cache.items()])
)
)
except Exception as e:
print("failed to flip bit :( {}".format(e.with_traceback()))
if cleanup:
# destroy it
sp = client.suggested_params()
closeout_txn = ApplicationCloseOutTxn(sig_addr, sp, app_id)
close_txn = PaymentTxn(sig_addr, sp, addr, 0, close_remainder_to=addr)
assign_group_id([closeout_txn, close_txn])
signed_closeout = LogicSigTransaction(closeout_txn, lsa)
signed_close = LogicSigTransaction(close_txn, lsa)
send("destroy", [signed_closeout, signed_close])
del cache[sig_addr]
# We're calling out tot he chain here, but in practice you'd be able to store
# the already created accounts in some local cache
def account_exists(app_id, addr):
try:
ai = client.account_info(addr)
if "apps-local-state" not in ai:
return False
for app in ai["apps-local-state"]:
if app["id"] == app_id:
return True
except:
print("Failed to find account {}".format(addr))
return False
def check_bits_set(app_id, start, addr):
bits_set = {}
ai = client.account_info(addr)
for app in ai["apps-local-state"]:
if app["id"] == app_id:
app_state = app["key-value"]
for kv in app_state:
key = list(base64.b64decode(kv["key"]))[0]
v = list(base64.b64decode(kv["value"]["bytes"]))
for byte_idx, val in enumerate(v):
if val == 0:
continue
bits = list(format(val, "b").zfill(8))
bits.reverse()
for bit_idx, bit in enumerate(bits):
if bit == "0":
continue
byte_start = byte_idx + key * max_bytes_per_key
seq = start + byte_start * bits_per_byte + bit_idx
bits_set[seq] = True
return bits_set
def update_app(id, addr, sk, bytecode):
# Read in approval teal source && compile
print(bytecode)
app_result = client.compile(
get_approval_src(admin_addr=addr, seed_amt=seed_amt, tmpl_bytecode=bytecode)
)
app_bytes = base64.b64decode(app_result["result"])
# Read in clear teal source && compile
clear_result = client.compile(get_clear_src())
clear_bytes = base64.b64decode(clear_result["result"])
# Get suggested params from network
sp = client.suggested_params()
# Create the transaction
update_txn = ApplicationUpdateTxn(addr, sp, id, app_bytes, clear_bytes)
# Sign it
signed_txn = update_txn.sign(sk)
# Ship it
txid = client.send_transaction(signed_txn)
# Wait for the result so we can return the app id
print(wait_for_confirmation(client, txid, 4))
def create_app(addr, sk, seed_amt, bytecode):
# Read in approval teal source && compile
app_result = client.compile(
get_approval_src(admin_addr=addr, seed_amt=seed_amt, tmpl_bytecode=bytecode)
)
app_bytes = base64.b64decode(app_result["result"])
# Read in clear teal source && compile
clear_result = client.compile(get_clear_src())
clear_bytes = base64.b64decode(clear_result["result"])
gschema = StateSchema(0, 0)
lschema = StateSchema(0, 16)
# Get suggested params from network
sp = client.suggested_params()
# Create the transaction
create_txn = ApplicationCreateTxn(
addr, sp, 0, app_bytes, clear_bytes, gschema, lschema
)
# Sign it
signed_txn = create_txn.sign(sk)
# Ship it
txid = client.send_transaction(signed_txn)
# Wait for the result so we can return the app id
result = wait_for_confirmation(client, txid, 4)
return result["application-index"]
def send(name, signed_group, debug=False):
print("Sending Transaction for {}".format(name))
if debug:
with open(name + ".msgp", "wb") as f:
f.write(
base64.b64decode(msgpack_encode(create_dryrun(client, signed_group)))
)
with open(name + ".txns", "wb") as f:
for tx in signed_group:
f.write(base64.b64decode(msgpack_encode(tx)))
txid = client.send_transactions(signed_group)
return wait_for_confirmation(client, txid, 4)
# Sanity checks
def get_addr_idx(seq_id):
return int(seq_id / max_bits)
def get_byte_idx(seq_id):
return int(seq_id / bits_per_byte) % max_bytes
def get_byte_key(seq_id):
return int(get_byte_idx(seq_id) / max_bytes_per_key)
def get_bit_idx(seq_id):
return int(seq_id % max_bits)
def get_start_bit(seq_id):
return int(seq_id / max_bits) * max_bits
def debug_seq(s):
print("for seq id: {}".format(s))
print("\taddr idx: {}".format(get_addr_idx(s)))
print("\tStart Bit: {}".format(get_start_bit(s)))
print("\tbyte key: {}".format(get_byte_key(s)))
print("\tbyte offset: {}".format(get_byte_idx(s)))
print("\tbit offset: {}".format(get_bit_idx(s)))
if __name__ == "__main__":
demo()