forked from OmniLayer/omniEngine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
omniEngine.py
282 lines (241 loc) · 8.81 KB
/
omniEngine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
from sql import *
import os
import sys
from datetime import datetime
from datetime import timedelta
from cacher import *
import config
USER=os.getenv("USER")
lockFile='/tmp/omniEngine.lock.'+str(USER)
now=datetime.now()
sys.argv.pop(0)
lastStatusUpdateTime=None
if os.path.isfile(lockFile):
#open the lock file to read pid and timestamp
file=open(lockFile,'r')
data=file.readline()
file.close()
pid=data.split(',')[0]
timestamp=data.split(',')[1]
#check if the pid is still running
if os.path.exists("/proc/"+str(pid)):
print "Exiting: OmniEngine already running with pid:", pid, " Last parse started at ", timestamp
else:
print "Stale OmniEngine found, no running pid:", pid, " Process last started at: ", timestamp
print "Removing lock file and waiting for restart"
os.remove(lockFile)
#exit program and wait for next run
exit(1)
else:
#start/create our lock file
file = open(lockFile, "w")
file.write(str(os.getpid())+','+str(now))
file.close()
#set our debug level, all outputs will be controlled by this
try:
if len(sys.argv) == 1:
#use debug level from cmd line
debuglevel = int(sys.argv[0])
else:
#invlid cmdline options use default value
debuglevel = 5
except:
#invlid cmdline options use default value
debuglevel = 5
setdebug(debuglevel)
printdebug(("Processing started at",now), 0)
#block with first MP transaction
if config.TESTNET:
firstMPtxBlock=263137
else:
firstMPtxBlock=252317
#get last known block processed from db
currentBlock=dbSelect("select max(blocknumber) from blocks", None)[0][0]
printdebug(("Current block is ",currentBlock), 0)
if currentBlock is not None:
currentBlock=currentBlock+1
else:
currentBlock=firstMPtxBlock
#Find most recent block mastercore has available
endBlock=getinfo()['result']['blocks']
#reorg protection/check go back 10 blocks from where we last parsed
checkBlock=max(currentBlock-10,firstMPtxBlock)
while checkBlock < currentBlock:
hash = getblockhash(checkBlock)['result']
dbhash=dbSelect('select blockhash from blocks where blocknumber=%s',[checkBlock])[0][0]
if hash == dbhash:
#everything looks good, go to next block
checkBlock+=1
else:
#reorg took place
try:
print "Reorg detected, Attempting roll back to ",checkBlock-1
reorgRollback(checkBlock-1)
currentBlock=checkBlock
dbCommit()
break
except Exception,e:
#Catch any issues and stop processing. Try to undo any incomplete changes
print "Problem with ", e
if dbRollback():
print "Database rolledback, last successful block", (currentBlock -1)
else:
print "Problem rolling database back, check block data for", currentBlock
exit(1)
if currentBlock > endBlock:
printdebug("Already up to date",0)
updateRan=False
else:
rExpireAllBalBTC()
updateRan=True
#get highest TxDBSerialNum (number of rows in the Transactions table)
#22111443 btc tx's before block 252317
TxDBSerialNum=dbSelect('select coalesce(max(txdbserialnum), 22111443) from transactions')[0][0]+1
#main loop, process new blocks
while currentBlock <= endBlock:
try:
hash = getblockhash(currentBlock)['result']
block_data = getblock(hash)
height = block_data['result']['height']
#don't waste resources looking for MP transactions before the first one occurred
if height >= firstMPtxBlock:
block_data_MP = listblocktransactions_MP(height)
else:
block_data_MP = {"error": None, "id": None, "result": []}
#status update
if lastStatusUpdateTime == None:
printdebug(("Block",height,"of",endBlock),1)
lastStatusUpdateTime=datetime.now()
else:
statusUpdateTime=datetime.now()
timeDelta = statusUpdateTime - lastStatusUpdateTime
blocksLeft = endBlock - currentBlock
projectedTime = str(timedelta(microseconds=timeDelta.microseconds * blocksLeft))
printdebug(("Block",height,"of",endBlock, "(took", timeDelta.microseconds, "microseconds, blocks left:", blocksLeft, ", eta", projectedTime,")"),1)
lastStatusUpdateTime=statusUpdateTime
#Process Bitcoin Transacations
Protocol="Bitcoin"
#Find number of tx's in block
txcount=len(block_data['result']['tx'])
printdebug((txcount,"BTC tx"), 1)
#Write the blocks table row
insertBlock(block_data, Protocol, height, txcount)
#check for pendingtx's to cleanup
checkPending(block_data['result']['tx'])
#count position in block
x=1
for tx in block_data['result']['tx']:
#rawtx=getrawtransaction(tx)
#serial=insertTx(rawtx, Protocol, height, x, TxDBSerialNum)
#serial=insertTx(rawtx, Protocol, height, x)
#insertTxAddr(rawtx, Protocol, serial, currentBlock)
#increment the number of transactions
TxDBSerialNum+=1
#increment tx sequence number in block
x+=1
#Process Mastercoin Transacations (if any)
Protocol="Omni"
#Find number of msc tx
y=len(block_data_MP['result'])
if y != 0:
printdebug((y,"OMNI tx"), 1)
#count position in block
x=1
#MP tx processing
for tx in block_data_MP['result']:
rawtx=gettransaction_MP(tx)
#Process the bare tx and insert it into the db
#TxDBSerialNum can be specified for explit insert or left out to auto assign from next value in db
serial=insertTx(rawtx, Protocol, height, x, TxDBSerialNum)
#serial=insertTx(rawtx, Protocol, height, x)
#Process all the addresses in the tx and insert them into db
#This also calls the functions that update the DEx, SmartProperty and AddressBalance tables
insertTxAddr(rawtx, Protocol, serial, currentBlock)
#increment the number of transactions
TxDBSerialNum+=1
#increment tx sequence number in block
x+=1
#Clean up any offers/crowdsales that expired in this block
#Run these after we processes the tx's in the block as tx in the current block would be valid
#expire the current active offers if block time has passed
expireAccepts(height)
#check active crowdsales and update json if the endtime has passed (based on block time)
expireCrowdsales(block_data['result']['time'], Protocol)
#exodus address generates dev msc, sync our balance to match the generated balanace
if config.TESTNET:
syncAddress('mpexoDuSkGGqvqrkrjiFng38QPkJQVFyqv', Protocol)
#upadate temp orderbook
#updateorderblob()
else:
syncAddress('1EXoDusjGwvnjZUyKkxZ4UHEf77z6A5S4P', Protocol)
#Also make sure we update the json data in SmartProperties table used by exchange view
updateProperty(1,"Omni")
updateProperty(2,"Omni")
#check any pending activations
checkPendingActivations()
#make sure we store the last serialnumber used
dbExecute("select setval('transactions_txdbserialnum_seq', %s)", [TxDBSerialNum-1])
#write db changes for entire block
dbCommit()
except Exception,e:
#Catch any issues and stop processing. Try to undo any incomplete changes
print "Problem with ", e
if dbRollback():
print "Database rolledback, last successful block", (currentBlock -1)
else:
print "Problem rolling database back, check block data for", currentBlock
os.remove(lockFile)
exit(1)
try:
#Also make sure we update the txstats data per block
printdebug("TxStats update started",0)
updateTxStats()
dbCommit()
printdebug("TxStats updated",0)
except:
pass
#increment/process next block if everything went clean
currentBlock += 1
#/while loop. Finished processing all current blocks.
try:
#Also make sure we update the json data in SmartProperties
updateProperty(0,"Bitcoin")
dbCommit()
except:
pass
try:
#Try to update consensushash of last block
updateConsensusHash()
dbCommit()
except:
pass
if config.TESTNET and updateRan:
try:
#Reset omni balances on testnet to account for moneyman transactions
resetbalances_MP([1,2])
dbCommit()
except:
pass
#check/add/update and pending tx in the database
try:
updateAddPending()
dbCommit()
printdebug("Pending List updated",0)
except Exception,e:
#Catch any issues and stop processing. Try to undo any incomplete changes
print "Problem updating pending ", e
if dbRollback():
print "Database rolledback"
else:
print "Problem rolling database back, check pending data"
os.remove(lockFile)
exit(1)
try:
#Also make sure we update the last run
updateLastRun()
dbCommit()
except:
pass
#remove the lock file and let ourself finish
os.remove(lockFile)
#/end else for lock file