-
Notifications
You must be signed in to change notification settings - Fork 2
/
txn.h
439 lines (374 loc) · 13.7 KB
/
txn.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
#pragma once
#include <stdint.h>
#include <sys/types.h>
#include <sparsehash/dense_hash_map>
#include <vector>
#include "dbcore/ddl.h"
#include "dbcore/dlog-tx.h"
#include "dbcore/dlog.h"
#include "dbcore/sm-config.h"
#include "dbcore/sm-object.h"
#include "dbcore/sm-oid.h"
#include "dbcore/sm-rc.h"
#include "dbcore/xid.h"
#include "macros.h"
#include "masstree/masstree_btree.h"
#include "str_arena.h"
#include "tuple.h"
using google::dense_hash_map;
namespace ermia {
struct schema_record;
#if defined(SSN) || defined(SSI)
#define set_tuple_xstamp(tuple, s) \
{ \
uint64_t x; \
do { \
x = volatile_read(tuple->xstamp); \
} while (x < s and \
not __sync_bool_compare_and_swap(&tuple->xstamp, x, s)); \
}
#endif
// A write-set entry is essentially a pointer to the OID array entry
// begin updated. The write-set is naturally de-duplicated: repetitive
// updates will leave only one entry by the first update. Dereferencing
// the entry pointer results a fat_ptr to the new object.
struct write_record_t {
fat_ptr *entry;
FID fid;
OID oid;
uint64_t size;
bool is_insert;
write_record_t(fat_ptr *entry, FID fid, OID oid, uint64_t size, bool insert)
: entry(entry), fid(fid), oid(oid), size(size), is_insert(insert) {}
write_record_t()
: entry(nullptr), fid(0), oid(0), size(0), is_insert(false) {}
inline Object *get_object() { return (Object *)entry->offset(); }
};
struct write_set_t {
static const uint32_t kMaxEntries = 256;
uint32_t num_entries;
write_record_t entries[kMaxEntries];
#ifdef LAZYDDL
write_record_t *_entries = &entries[0];
uint32_t len = kMaxEntries;
#endif
write_set_t() : num_entries(0) {}
inline void emplace_back(fat_ptr *oe, FID fid, OID oid, uint32_t size, bool insert) {
#ifdef LAZYDDL
// For some DDL workloads, 256 is not enough
if (num_entries >= len && len != 2048) {
write_record_t *new_entries = new write_record_t[2048];
memcpy(new_entries, _entries, sizeof(write_record_t) * len);
len = 2048;
_entries = new_entries;
}
new (&_entries[num_entries]) write_record_t(oe, fid, oid, size, insert);
++num_entries;
#else
ALWAYS_ASSERT(num_entries < kMaxEntries);
new (&entries[num_entries]) write_record_t(oe, fid, oid, size, insert);
++num_entries;
ASSERT(entries[num_entries - 1].entry == oe);
#endif
}
inline uint32_t size() { return num_entries; }
inline void clear() { num_entries = 0; }
inline write_record_t &operator[](uint32_t idx) {
#ifdef LAZYDDL
return _entries[idx];
#else
return entries[idx];
#endif
}
};
#if defined(SIDDL) || defined(BLOCKDDL)
struct write_record_block {
static const uint32_t kMaxEntries = 256;
write_record_t entries[kMaxEntries];
uint32_t num_entries;
write_record_block *next;
write_record_block() : num_entries(0), next(nullptr) {}
inline void emplace_back(fat_ptr *oe, FID fid, OID oid, uint32_t size,
bool insert) {
ALWAYS_ASSERT(num_entries < kMaxEntries);
new (&entries[num_entries]) write_record_t(oe, fid, oid, size, insert);
++num_entries;
ASSERT(entries[num_entries - 1].entry == oe);
}
inline uint32_t size() { return num_entries; }
inline void clear() { num_entries = 0; }
inline write_record_t &operator[](uint32_t idx) { return entries[idx]; }
};
struct write_record_block_info {
write_record_block *first_block;
write_record_block *cur_block;
uint32_t total_entries;
uint64_t log_size;
write_record_block_info() : total_entries(0), log_size(0) {
first_block = new ermia::write_record_block();
cur_block = first_block;
}
};
struct ddl_write_set_t {
std::vector<write_record_block_info *> write_record_block_info_vec;
inline void init() {
for (uint32_t i = 0; i < config::scan_threads + config::cdc_threads; i++) {
write_record_block_info_vec.push_back(new write_record_block_info());
}
}
inline void emplace_back(fat_ptr *oe, FID fid, OID oid, uint32_t size,
bool insert, uint64_t logrec_size, int wid) {
write_record_block_info *tmp = write_record_block_info_vec[wid];
if (tmp->cur_block->size() == write_record_block::kMaxEntries) {
tmp->cur_block->next = new write_record_block();
tmp->cur_block = tmp->cur_block->next;
}
tmp->cur_block->emplace_back(oe, fid, oid, size, insert);
tmp->total_entries++;
tmp->log_size += logrec_size;
}
inline uint32_t size() {
uint32_t total = 0;
for (std::vector<write_record_block_info *>::const_iterator it =
write_record_block_info_vec.begin();
it != write_record_block_info_vec.end(); ++it) {
total += (*it)->total_entries;
}
return total;
}
};
#endif
class transaction {
friend class ConcurrentMasstreeIndex;
friend struct sm_oid_mgr;
public:
typedef TXN::txn_state txn_state;
#if defined(SSN) || defined(SSI) || defined(MVOCC)
typedef std::vector<dbtuple *> read_set_t;
#endif
enum lock_type { INVALID, SHARED, EXCLUSIVE };
struct table_record_t {
TableDescriptor *table_desc;
FID table_fid;
OID schema_oid;
uint32_t version;
bool schema_ready;
TableDescriptor *old_table_desc;
table_record_t()
: table_desc(nullptr), table_fid(0), schema_oid(0), version(0),
schema_ready(false), old_table_desc(nullptr) {}
table_record_t(TableDescriptor *td, FID fid, OID oid, uint32_t version,
bool schema_ready, TableDescriptor *old_td)
: table_desc(td), table_fid(fid), schema_oid(oid), version(version),
schema_ready(schema_ready), old_table_desc(old_td) {}
};
struct table_set_t {
static const uint64_t kMaxEntries = 256;
table_record_t entries[kMaxEntries];
uint32_t num_entries;
table_set_t() : num_entries(0) {}
~table_set_t() {}
inline void emplace(TableDescriptor *td, FID fid, OID schema_oid, uint32_t version,
bool schema_ready, TableDescriptor *old_td) {
// Ensure there is no duplicates - it's the caller's responsibility (e.g., under blocking DDL)
// to make sure the table lock type is correct
if (find(fid)) {
return;
}
uint32_t idx = num_entries++;
LOG_IF(FATAL, num_entries > kMaxEntries);
new (&entries[idx]) table_record_t(td, fid, schema_oid, version, schema_ready, old_td);
}
inline table_record_t *find(FID fid) {
for (uint32_t i = 0; i < num_entries; ++i) {
if (entries[i].table_fid == fid) {
return &entries[i];
}
}
return nullptr;
}
};
enum {
// use the low-level scan protocol for checking scan consistency,
// instead of keeping track of absent ranges
TXN_FLAG_LOW_LEVEL_SCAN = 0x1,
// true to mark a read-only transaction- if a txn marked read-only
// does a write, it is aborted. SSN uses it to implement to safesnap.
// No bookeeping is done with SSN if this is enable for a tx.
TXN_FLAG_READ_ONLY = 0x2,
TXN_FLAG_READ_MOSTLY = 0x3,
// A context-switch transaction doesn't enter/exit thread during
// construct/destruct.
TXN_FLAG_CSWITCH = 0x8,
TXN_FLAG_DML = 0x10,
TXN_FLAG_DDL = 0x20,
};
inline bool is_read_mostly() { return flags & TXN_FLAG_READ_MOSTLY; }
inline bool is_read_only() { return flags & TXN_FLAG_READ_ONLY; }
inline bool is_dml() { return flags & TXN_FLAG_DML; }
inline bool is_ddl() { return flags & TXN_FLAG_DDL; }
protected:
inline txn_state state() const { return xc->state; }
// the absent set is a mapping from (masstree node -> version_number).
typedef dense_hash_map<const ConcurrentMasstree::node_opaque_t *, uint64_t>
MasstreeAbsentSet;
MasstreeAbsentSet masstree_absent_set;
public:
transaction(uint64_t flags) : flags(flags) {};
transaction(uint64_t flags, str_arena &sa, uint32_t coro_batch_idx,
ddl::ddl_executor *ddl_exe);
~transaction() {
#ifdef LAZYDDL
if (write_set.size() >= write_set.kMaxEntries) {
delete[] write_set._entries;
}
#endif
}
void uninitialize();
inline void ensure_active() {
volatile_write(xc->state, TXN::TXN_ACTIVE);
ASSERT(state() == TXN::TXN_ACTIVE);
}
rc_t commit(ddl::ddl_executor *ddl_exe = nullptr);
#ifdef SSN
rc_t parallel_ssn_commit();
rc_t ssn_read(dbtuple *tuple);
#elif defined SSI
rc_t parallel_ssi_commit();
rc_t ssi_read(dbtuple *tuple);
#elif defined MVOCC
rc_t mvocc_commit();
rc_t mvocc_read(dbtuple *tuple);
#else
rc_t si_commit(ddl::ddl_executor *ddl_exe = nullptr);
#endif
bool DMLConsistencyHandler();
bool MasstreeCheckPhantom();
void Abort(ddl::ddl_executor *ddl_exe = nullptr);
// Insert a record to the underlying table
OID Insert(TableDescriptor *td, varstr *value, dbtuple **out_tuple = nullptr);
#ifdef DDL
#ifdef COPYDDL
#if defined(LAZYDDL) && !defined(OPTLAZYDDL)
// DDL insert used for unoptimized lazy DDL
OID LazyDDLInsert(TableDescriptor *td, varstr *value, uint64_t tuple_csn,
fat_ptr **out_entry = nullptr);
#endif
// DDL CDC update
PROMISE(rc_t)
DDLUpdate(TableDescriptor *td, OID oid, varstr *value, uint64_t tuple_csn,
bool allow_write_set = false);
#endif
// DDL CDC insert
PROMISE(rc_t)
DDLInsert(TableDescriptor *td, OID oid, varstr *value, uint64_t tuple_csn,
bool allow_write_set = false, int wid = -1, ddl::ddl_executor *ddl_exe = nullptr);
// DML & DDL overlap check
PROMISE(bool)
OverlapCheck(TableDescriptor *new_td, TableDescriptor *old_td, OID oid);
#endif
// Set DDL schema state
PROMISE(rc_t)
SetSchemaState(TableDescriptor *td, OID oid, varstr *value, bool set_csn);
PROMISE(rc_t)
Update(TableDescriptor *td, OID oid, const varstr *k, varstr *v,
int wid = -1, ddl::ddl_executor *ddl_exe = nullptr);
// Same as Update but without support for logging key
inline PROMISE(rc_t)
Update(TableDescriptor *td, OID oid, varstr *v, int wid = -1,
ddl::ddl_executor *ddl_exe = nullptr) {
auto rc = AWAIT Update(td, oid, nullptr, v, wid, ddl_exe);
RETURN rc;
}
void LogIndexInsert(OrderedIndex *index, OID oid, const varstr *key);
// Table scan for single record
PROMISE(rc_t)
table_scan_single(TableDescriptor *td, const varstr *key, OID &oid);
// Table scan for multiple records
PROMISE(void)
table_scan_multi(
TableDescriptor *td, const varstr *start_key, const varstr *end_key,
ConcurrentMasstree::low_level_search_range_callback &callback);
// Table reverse scan for multiple records
PROMISE(void)
table_rscan_multi(
TableDescriptor *td, const varstr *start_key, const varstr *end_key,
ConcurrentMasstree::low_level_search_range_callback &callback);
public:
// Reads the contents of tuple into v within this transaction context
rc_t DoTupleRead(dbtuple *tuple, varstr *out_v);
// expected public overrides
inline str_arena &string_allocator() { return *sa; }
inline void add_to_write_set(bool is_allowed, fat_ptr *entry, FID fid,
OID oid, uint64_t size, bool insert, int wid = -1,
ddl::ddl_executor * ddl_exe = nullptr) {
#ifndef NDEBUG
for (uint32_t i = 0; i < write_set.size(); ++i) {
auto &w = write_set.entries[i];
ASSERT(w.entry);
ASSERT(w.entry != entry);
}
#endif
if (!is_ddl() || (is_ddl() && is_allowed)) {
// Work out the encoded size to be added to the log block later
auto logrec_size =
align_up(size + sizeof(dbtuple) + sizeof(dlog::log_record));
#if defined(SIDDL) || defined(BLOCKDDL)
if (wid >= 0 && is_ddl()) {
auto *ddl_write_set = ddl_exe->get_ddl_write_set();
ddl_write_set->emplace_back(entry, fid, oid, size + sizeof(dbtuple),
insert, logrec_size, wid);
return;
}
#endif
log_size += logrec_size;
// Each write set entry still just records the size of the actual "data"
// to be inserted to the log excluding dlog::log_record, which will be
// prepended by log_insert/update etc.
write_set.emplace_back(entry, fid, oid, size + sizeof(dbtuple), insert);
}
}
inline TXN::xid_context *GetXIDContext() { return xc; }
inline void SetXIDContext(TXN::xid_context *_xc) { xc = _xc; }
inline void SetWaitForNewSchema(bool _wait_for_new_schema) {
wait_for_new_schema = _wait_for_new_schema;
}
inline bool IsWaitForNewSchema() { return wait_for_new_schema; }
inline dlog::tls_log *get_log() { return log; }
inline table_set_t *get_table_set() { return &table_set; }
inline write_set_t &get_write_set() {
return write_set;
};
inline uint64_t get_log_size() { return log_size; }
inline void add_to_table_set(TableDescriptor *td, FID table_fid, OID schema_oid, uint32_t version,
bool schema_ready, TableDescriptor *old_td) {
ALWAYS_ASSERT(td);
table_set.emplace(td, table_fid, schema_oid, version, schema_ready, old_td);
}
inline table_record_t *find_in_table_set(FID table_fid) {
return table_set.find(table_fid);
}
#ifdef BLOCKDDL
inline void UnlockAll() {
for (uint32_t i = 0; i < table_set.num_entries; ++i) {
table_set.entries[i].table_desc->UnlockSchema();
}
}
#endif
protected:
const uint64_t flags;
XID xid;
TXN::xid_context *xc;
dlog::tls_log *log;
uint64_t log_size;
str_arena *sa;
uint32_t coro_batch_idx; // its index in the batch
table_set_t table_set;
bool wait_for_new_schema;
util::timer timer;
write_set_t write_set;
#if defined(SSN) || defined(SSI) || defined(MVOCC)
read_set_t read_set;
#endif
};
} // namespace ermia