Skip to content

Commit

Permalink
Reverted back to 'unmaterialized' records in source operators and exp…
Browse files Browse the repository at this point in the history
…licit copy() operation for HeapSortOperator, LimitingHeapSortOperator and MergeLimitingHeapSortOperator.
  • Loading branch information
Ralph Gasser committed Feb 23, 2022
1 parent 2e8c5f3 commit f207891
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ open class HeapSortOperator(parent: Operator, sortOn: List<Pair<ColumnDef<*>, So
val parentFlow = this.parent.toFlow(context)
return flow {
val queue = ObjectHeapPriorityQueue(this@HeapSortOperator.queueSize, this@HeapSortOperator.comparator)
parentFlow.collect { queue.enqueue(it) } /* Important: Materialization! */
parentFlow.collect { queue.enqueue(it.copy()) } /* Important: Materialization! */
while (!queue.isEmpty) {
emit(queue.dequeue())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class LimitingHeapSortOperator(parent: Operator, sortOn: List<Pair<ColumnDef<*>,
val parentFlow = this.parent.toFlow(context)
return flow {
val selection = HeapSelection(this@LimitingHeapSortOperator.limit + this@LimitingHeapSortOperator.skip, this@LimitingHeapSortOperator.comparator)
parentFlow.collect { selection.offer(it) } /* Important: Materialization! */
parentFlow.collect { selection.offer(it.copy()) } /* Important: Materialization! */
for (i in this@LimitingHeapSortOperator.skip until selection.size) {
emit(selection[i])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,19 @@ class MergeLimitingHeapSortOperator(parents: List<Operator>, val context: Bindin
* @param context The [TransactionContext] used for execution
* @return [Flow] representing this [MergeLimitingHeapSortOperator]
*/
override fun toFlow(context: TransactionContext): Flow<Record> {
val parents = this.parents.map { it.toFlow(context) }.toTypedArray()
return flow {
/* Collect incoming flows. */
val selection = HeapSelection(this@MergeLimitingHeapSortOperator.limit, this@MergeLimitingHeapSortOperator.comparator)
flowOf(*parents).flattenMerge(parents.size).collect {
selection.offer(it)
}
override fun toFlow(context: TransactionContext): Flow<Record> = flow {
/* Collect incoming flows. */
val parents = this@MergeLimitingHeapSortOperator.parents.map { it.toFlow(context) }.toTypedArray()
val selection = HeapSelection(this@MergeLimitingHeapSortOperator.limit, this@MergeLimitingHeapSortOperator.comparator)
flowOf(*parents).flattenMerge(parents.size).collect {
selection.offer(it.copy()) /* Important: Materialization! */
}

/* Emit sorted and limited values. */
for (i in 0 until selection.size) {
val rec = selection[i]
this@MergeLimitingHeapSortOperator.context.update(rec)
emit(rec)
}
/* Emit sorted and limited values. */
for (i in 0 until selection.size) {
val rec = selection[i]
this@MergeLimitingHeapSortOperator.context.update(rec)
emit(rec)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.vitrivr.cottontail.core.database.ColumnDef
import org.vitrivr.cottontail.core.queries.GroupId
import org.vitrivr.cottontail.core.queries.binding.Binding
import org.vitrivr.cottontail.core.recordset.StandaloneRecord
import org.vitrivr.cottontail.core.values.types.Value
import org.vitrivr.cottontail.dbms.entity.Entity
import org.vitrivr.cottontail.dbms.entity.EntityTx
import org.vitrivr.cottontail.dbms.execution.TransactionContext
Expand All @@ -31,17 +32,17 @@ class EntitySampleOperator(groupId: GroupId, val entity: EntityTx, val fetch: Li
* @param context The [QueryContext] used for execution.
* @return [Flow] representing this [EntitySampleOperator].
*/
override fun toFlow(context: TransactionContext): Flow<Record> {
val fetch = this.fetch.map { it.second }.toTypedArray()
val columns = this.fetch.map { it.first.column }.toTypedArray()
return flow {
val random = SplittableRandom(this@EntitySampleOperator.seed)
for (record in this@EntitySampleOperator.entity.scan(fetch)) {
if (random.nextDouble(0.0, 1.0) <= this@EntitySampleOperator.p) {
val rec = StandaloneRecord(record.tupleId, columns, Array(this@EntitySampleOperator.fetch.size) { record[it] })
this@EntitySampleOperator.fetch.first().first.context.update(rec)
emit(rec)
}
override fun toFlow(context: TransactionContext): Flow<Record> = flow {
val fetch = this@EntitySampleOperator.fetch.map { it.second }.toTypedArray()
val columns = this@EntitySampleOperator.fetch.map { it.first.column }.toTypedArray()
val values = arrayOfNulls<Value>(columns.size)
val random = SplittableRandom(this@EntitySampleOperator.seed)
for (record in this@EntitySampleOperator.entity.scan(fetch)) {
if (random.nextDouble(0.0, 1.0) <= this@EntitySampleOperator.p) {
for (i in values.indices) values[i] = record[i]
val rec = StandaloneRecord(record.tupleId, columns, values)
this@EntitySampleOperator.fetch.first().first.context.update(rec)
emit(rec)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.vitrivr.cottontail.core.database.ColumnDef
import org.vitrivr.cottontail.core.queries.GroupId
import org.vitrivr.cottontail.core.queries.binding.Binding
import org.vitrivr.cottontail.core.recordset.StandaloneRecord
import org.vitrivr.cottontail.core.values.types.Value
import org.vitrivr.cottontail.dbms.entity.Entity
import org.vitrivr.cottontail.dbms.entity.EntityTx
import org.vitrivr.cottontail.dbms.execution.TransactionContext
Expand All @@ -29,15 +30,15 @@ class EntityScanOperator(groupId: GroupId, val entity: EntityTx, val fetch: List
* @param context The [TransactionContext] used for execution
* @return [Flow] representing this [EntityScanOperator]
*/
override fun toFlow(context: TransactionContext): Flow<Record> {
val fetch = this.fetch.map { it.second }.toTypedArray()
val columns = this.fetch.map { it.first.column }.toTypedArray()
return flow {
for (record in this@EntityScanOperator.entity.scan(fetch, this@EntityScanOperator.partitionIndex, this@EntityScanOperator.partitions)) {
val rec = StandaloneRecord(record.tupleId, columns, Array(this@EntityScanOperator.fetch.size) { record[it] })
this@EntityScanOperator.fetch.first().first.context.update(rec)
emit(rec)
}
override fun toFlow(context: TransactionContext): Flow<Record> = flow {
val fetch = this@EntityScanOperator.fetch.map { it.second }.toTypedArray()
val columns = this@EntityScanOperator.fetch.map { it.first.column }.toTypedArray()
val values = arrayOfNulls<Value>(columns.size)
for (record in this@EntityScanOperator.entity.scan(fetch, this@EntityScanOperator.partitionIndex, this@EntityScanOperator.partitions)) {
for (i in values.indices) values[i] = record[i]
val rec = StandaloneRecord(record.tupleId, columns, values)
this@EntityScanOperator.fetch.first().first.context.update(rec)
emit(rec)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.vitrivr.cottontail.core.queries.GroupId
import org.vitrivr.cottontail.core.queries.binding.Binding
import org.vitrivr.cottontail.core.queries.predicates.Predicate
import org.vitrivr.cottontail.core.recordset.StandaloneRecord
import org.vitrivr.cottontail.core.values.types.Value
import org.vitrivr.cottontail.dbms.execution.TransactionContext
import org.vitrivr.cottontail.dbms.execution.operators.basics.Operator
import org.vitrivr.cottontail.dbms.index.Index
Expand Down Expand Up @@ -40,19 +41,19 @@ class IndexScanOperator(
* @param context The [TransactionContext] used for execution.
* @return [Flow] representing this [IndexScanOperator]
*/
override fun toFlow(context: TransactionContext): Flow<Record> {
val columns = this.fetch.map { it.first.column }.toTypedArray()
return flow {
val iterator = if (this@IndexScanOperator.partitions == 1) {
this@IndexScanOperator.index.filter(this@IndexScanOperator.predicate)
} else {
this@IndexScanOperator.index.filterRange(this@IndexScanOperator.predicate, this@IndexScanOperator.partitionIndex, this@IndexScanOperator.partitions)
}
for (record in iterator) {
val rec = StandaloneRecord(record.tupleId, columns, Array(fetch.size) { record[it] })
this@IndexScanOperator.fetch.first().first.context.update(rec)
emit(rec)
}
override fun toFlow(context: TransactionContext): Flow<Record> = flow {
val columns = this@IndexScanOperator.fetch.map { it.first.column }.toTypedArray()
val values = arrayOfNulls<Value>(columns.size)
val iterator = if (this@IndexScanOperator.partitions == 1) {
this@IndexScanOperator.index.filter(this@IndexScanOperator.predicate)
} else {
this@IndexScanOperator.index.filterRange(this@IndexScanOperator.predicate, this@IndexScanOperator.partitionIndex, this@IndexScanOperator.partitions)
}
for (record in iterator) {
for (i in values.indices) values[i] = record[i]
val rec = StandaloneRecord(record.tupleId, columns, values)
this@IndexScanOperator.fetch.first().first.context.update(rec)
emit(rec)
}
}
}

0 comments on commit f207891

Please sign in to comment.