diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableCreatorImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableCreatorImpl.java
index fe745db0b2a..76e643d4dcd 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableCreatorImpl.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableCreatorImpl.java
@@ -8,8 +8,8 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableFactory;
-import io.deephaven.engine.table.impl.util.AppendOnlyArrayBackedMutableTable;
-import io.deephaven.engine.table.impl.util.KeyedArrayBackedMutableTable;
+import io.deephaven.engine.table.impl.util.AppendOnlyArrayBackedInputTable;
+import io.deephaven.engine.table.impl.util.KeyedArrayBackedInputTable;
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.EmptyTable;
@@ -163,14 +163,14 @@ public static UpdatableTable of(InputTable inputTable) {
@Override
public UpdatableTable visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly) {
final TableDefinition definition = DefinitionAdapter.of(inMemoryAppendOnly.schema());
- return AppendOnlyArrayBackedMutableTable.make(definition);
+ return AppendOnlyArrayBackedInputTable.make(definition);
}
@Override
public UpdatableTable visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
final TableDefinition definition = DefinitionAdapter.of(inMemoryKeyBacked.schema());
final String[] keyColumnNames = inMemoryKeyBacked.keys().toArray(String[]::new);
- return KeyedArrayBackedMutableTable.make(definition, keyColumnNames);
+ return KeyedArrayBackedInputTable.make(definition, keyColumnNames);
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/AppendOnlyArrayBackedMutableTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/AppendOnlyArrayBackedInputTable.java
similarity index 59%
rename from engine/table/src/main/java/io/deephaven/engine/table/impl/util/AppendOnlyArrayBackedMutableTable.java
rename to engine/table/src/main/java/io/deephaven/engine/table/impl/util/AppendOnlyArrayBackedInputTable.java
index f40908ed679..a65210dc3ea 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/AppendOnlyArrayBackedMutableTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/AppendOnlyArrayBackedInputTable.java
@@ -9,7 +9,6 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSequenceFactory;
-import io.deephaven.engine.util.config.InputTableStatusListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.ChunkSink;
@@ -18,15 +17,13 @@
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.function.Consumer;
/**
* An in-memory table that allows you to add rows as if it were an InputTable, which can be updated on the UGP.
*
* The table is not keyed, all rows are added to the end of the table. Deletions and edits are not permitted.
*/
-public class AppendOnlyArrayBackedMutableTable extends BaseArrayBackedMutableTable {
+public class AppendOnlyArrayBackedInputTable extends BaseArrayBackedInputTable {
static final String DEFAULT_DESCRIPTION = "Append Only In-Memory Input Table";
/**
@@ -36,64 +33,40 @@ public class AppendOnlyArrayBackedMutableTable extends BaseArrayBackedMutableTab
*
* @return an empty AppendOnlyArrayBackedMutableTable with the given definition
*/
- public static AppendOnlyArrayBackedMutableTable make(@NotNull TableDefinition definition) {
- return make(definition, Collections.emptyMap());
- }
-
- /**
- * Create an empty AppendOnlyArrayBackedMutableTable with the given definition.
- *
- * @param definition the definition of the new table.
- * @param enumValues a map of column names to enumeration values
- *
- * @return an empty AppendOnlyArrayBackedMutableTable with the given definition
- */
- public static AppendOnlyArrayBackedMutableTable make(@NotNull TableDefinition definition,
- final Map enumValues) {
+ public static AppendOnlyArrayBackedInputTable make(
+ @NotNull TableDefinition definition) {
// noinspection resource
return make(new QueryTable(definition, RowSetFactory.empty().toTracking(),
- NullValueColumnSource.createColumnSourceMap(definition)), enumValues);
- }
-
- /**
- * Create an AppendOnlyArrayBackedMutableTable with the given initial data.
- *
- * @param initialTable the initial values to copy into the AppendOnlyArrayBackedMutableTable
- *
- * @return an empty AppendOnlyArrayBackedMutableTable with the given definition
- */
- public static AppendOnlyArrayBackedMutableTable make(final Table initialTable) {
- return make(initialTable, Collections.emptyMap());
+ NullValueColumnSource.createColumnSourceMap(definition)));
}
/**
* Create an AppendOnlyArrayBackedMutableTable with the given initial data.
*
* @param initialTable the initial values to copy into the AppendOnlyArrayBackedMutableTable
- * @param enumValues a map of column names to enumeration values
*
* @return an empty AppendOnlyArrayBackedMutableTable with the given definition
*/
- public static AppendOnlyArrayBackedMutableTable make(final Table initialTable,
- final Map enumValues) {
- final AppendOnlyArrayBackedMutableTable result = new AppendOnlyArrayBackedMutableTable(
- initialTable.getDefinition(), enumValues, new ProcessPendingUpdater());
+ public static AppendOnlyArrayBackedInputTable make(final Table initialTable) {
+ final AppendOnlyArrayBackedInputTable result =
+ new AppendOnlyArrayBackedInputTable(
+ initialTable.getDefinition(), new ProcessPendingUpdater());
result.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE);
+ result.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE);
result.setFlat();
processInitial(initialTable, result);
return result;
}
- private AppendOnlyArrayBackedMutableTable(@NotNull TableDefinition definition,
- final Map enumValues, final ProcessPendingUpdater processPendingUpdater) {
+ private AppendOnlyArrayBackedInputTable(@NotNull TableDefinition definition,
+ final ProcessPendingUpdater processPendingUpdater) {
// noinspection resource
super(RowSetFactory.empty().toTracking(), makeColumnSourceMap(definition),
- enumValues, processPendingUpdater);
+ processPendingUpdater);
}
@Override
- protected void processPendingTable(Table table, boolean allowEdits, RowSetChangeRecorder rowSetChangeRecorder,
- Consumer errorNotifier) {
+ protected void processPendingTable(Table table, RowSetChangeRecorder rowSetChangeRecorder) {
try (final RowSet addRowSet = table.getRowSet().copy()) {
final long firstRow = nextRow;
final long lastRow = firstRow + addRowSet.intSize() - 1;
@@ -135,28 +108,15 @@ protected List getKeyNames() {
}
@Override
- ArrayBackedMutableInputTable makeHandler() {
- return new AppendOnlyArrayBackedMutableInputTable();
+ ArrayBackedInputTableUpdater makeUpdater() {
+ return new Updater();
}
- private class AppendOnlyArrayBackedMutableInputTable extends ArrayBackedMutableInputTable {
- @Override
- public void setRows(@NotNull Table defaultValues, int[] rowArray, Map[] valueArray,
- InputTableStatusListener listener) {
- throw new UnsupportedOperationException();
- }
+ private class Updater extends ArrayBackedInputTableUpdater {
@Override
public void validateDelete(Table tableToDelete) {
throw new UnsupportedOperationException("Table doesn't support delete operation");
}
-
- @Override
- public void addRows(Map[] valueArray, boolean allowEdits, InputTableStatusListener listener) {
- if (allowEdits) {
- throw new UnsupportedOperationException();
- }
- super.addRows(valueArray, allowEdits, listener);
- }
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BaseArrayBackedMutableTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BaseArrayBackedInputTable.java
similarity index 62%
rename from engine/table/src/main/java/io/deephaven/engine/table/impl/util/BaseArrayBackedMutableTable.java
rename to engine/table/src/main/java/io/deephaven/engine/table/impl/util/BaseArrayBackedInputTable.java
index fc1c75d69df..f74f4b82907 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BaseArrayBackedMutableTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BaseArrayBackedInputTable.java
@@ -4,9 +4,6 @@
package io.deephaven.engine.table.impl.util;
import io.deephaven.base.verify.Assert;
-import io.deephaven.base.verify.Require;
-import io.deephaven.datastructures.util.CollectionUtil;
-import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderSequential;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingRowSet;
@@ -15,9 +12,8 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
-import io.deephaven.engine.util.config.InputTableStatusListener;
-import io.deephaven.engine.util.config.MutableInputTable;
-import io.deephaven.engine.table.impl.QueryTable;
+import io.deephaven.engine.util.input.InputTableStatusListener;
+import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.engine.table.impl.UpdatableTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.annotations.TestUseOnly;
@@ -26,11 +22,8 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-abstract class BaseArrayBackedMutableTable extends UpdatableTable {
-
- private static final Object[] BOOLEAN_ENUM_ARRAY = new Object[] {true, false, null};
+abstract class BaseArrayBackedInputTable extends UpdatableTable {
/**
* Queue of pending changes. Only synchronized access is permitted.
@@ -45,30 +38,27 @@ abstract class BaseArrayBackedMutableTable extends UpdatableTable {
*/
private long processedSequence = 0L;
- private final Map enumValues;
-
private String description = getDefaultDescription();
private Runnable onPendingChange = updateGraph::requestRefresh;
long nextRow = 0;
private long pendingProcessed = -1L;
- public BaseArrayBackedMutableTable(TrackingRowSet rowSet, Map> nameToColumnSource,
- Map enumValues, ProcessPendingUpdater processPendingUpdater) {
+ public BaseArrayBackedInputTable(TrackingRowSet rowSet, Map> nameToColumnSource,
+ ProcessPendingUpdater processPendingUpdater) {
super(rowSet, nameToColumnSource, processPendingUpdater);
- this.enumValues = enumValues;
- MutableInputTable mutableInputTable = makeHandler();
- setAttribute(Table.INPUT_TABLE_ATTRIBUTE, mutableInputTable);
+ InputTableUpdater inputTableUpdater = makeUpdater();
+ setAttribute(Table.INPUT_TABLE_ATTRIBUTE, inputTableUpdater);
setRefreshing(true);
processPendingUpdater.setThis(this);
}
- public MutableInputTable mutableInputTable() {
- return (MutableInputTable) getAttribute(Table.INPUT_TABLE_ATTRIBUTE);
+ public InputTableUpdater inputTable() {
+ return (InputTableUpdater) getAttribute(Table.INPUT_TABLE_ATTRIBUTE);
}
public Table readOnlyCopy() {
- return copy(BaseArrayBackedMutableTable::applicableForReadOnly);
+ return copy(BaseArrayBackedInputTable::applicableForReadOnly);
}
private static boolean applicableForReadOnly(String attributeName) {
@@ -84,9 +74,9 @@ private static boolean applicableForReadOnly(String attributeName) {
return resultMap;
}
- static void processInitial(Table initialTable, BaseArrayBackedMutableTable result) {
+ static void processInitial(Table initialTable, BaseArrayBackedInputTable result) {
final RowSetBuilderSequential builder = RowSetFactory.builderSequential();
- result.processPendingTable(initialTable, true, new RowSetChangeRecorder() {
+ result.processPendingTable(initialTable, new RowSetChangeRecorder() {
@Override
public void addRowKey(long key) {
builder.appendKey(key);
@@ -101,14 +91,13 @@ public void removeRowKey(long key) {
public void modifyRowKey(long key) {
throw new UnsupportedOperationException();
}
- }, (e) -> {
});
result.getRowSet().writableCast().insert(builder.build());
result.getRowSet().writableCast().initializePreviousValue();
result.getUpdateGraph().addSource(result);
}
- public BaseArrayBackedMutableTable setDescription(String newDescription) {
+ public BaseArrayBackedInputTable setDescription(String newDescription) {
this.description = newDescription;
return this;
}
@@ -132,8 +121,7 @@ private void processPending(RowSetChangeRecorder rowSetChangeRecorder) {
if (pendingChange.delete) {
processPendingDelete(pendingChange.table, rowSetChangeRecorder);
} else {
- processPendingTable(pendingChange.table, pendingChange.allowEdits, rowSetChangeRecorder,
- (e) -> pendingChange.error = e);
+ processPendingTable(pendingChange.table, rowSetChangeRecorder);
}
pendingProcessed = pendingChange.sequence;
}
@@ -154,8 +142,7 @@ public void run() {
}
}
- protected abstract void processPendingTable(Table table, boolean allowEdits,
- RowSetChangeRecorder rowSetChangeRecorder, Consumer errorNotifier);
+ protected abstract void processPendingTable(Table table, RowSetChangeRecorder rowSetChangeRecorder);
protected abstract void processPendingDelete(Table table, RowSetChangeRecorder rowSetChangeRecorder);
@@ -164,74 +151,73 @@ protected abstract void processPendingTable(Table table, boolean allowEdits,
protected abstract List getKeyNames();
protected static class ProcessPendingUpdater implements Updater {
- private BaseArrayBackedMutableTable baseArrayBackedMutableTable;
+ private BaseArrayBackedInputTable baseArrayBackedInputTable;
@Override
public void accept(RowSetChangeRecorder rowSetChangeRecorder) {
- baseArrayBackedMutableTable.processPending(rowSetChangeRecorder);
+ baseArrayBackedInputTable.processPending(rowSetChangeRecorder);
}
- public void setThis(BaseArrayBackedMutableTable keyedArrayBackedMutableTable) {
- this.baseArrayBackedMutableTable = keyedArrayBackedMutableTable;
+ public void setThis(BaseArrayBackedInputTable keyedArrayBackedMutableTable) {
+ this.baseArrayBackedInputTable = keyedArrayBackedMutableTable;
}
}
private final class PendingChange {
final boolean delete;
+ @NotNull
final Table table;
final long sequence;
- final boolean allowEdits;
String error;
- private PendingChange(Table table, boolean delete, boolean allowEdits) {
+ private PendingChange(@NotNull Table table, boolean delete) {
Assert.holdsLock(pendingChanges, "pendingChanges");
+ Assert.neqNull(table, "table");
this.table = table;
this.delete = delete;
- this.allowEdits = allowEdits;
this.sequence = ++enqueuedSequence;
}
}
- ArrayBackedMutableInputTable makeHandler() {
- return new ArrayBackedMutableInputTable();
+ ArrayBackedInputTableUpdater makeUpdater() {
+ return new ArrayBackedInputTableUpdater();
}
- protected class ArrayBackedMutableInputTable implements MutableInputTable {
+ protected class ArrayBackedInputTableUpdater implements InputTableUpdater {
@Override
public List getKeyNames() {
- return BaseArrayBackedMutableTable.this.getKeyNames();
+ return BaseArrayBackedInputTable.this.getKeyNames();
}
@Override
public TableDefinition getTableDefinition() {
- return BaseArrayBackedMutableTable.this.getDefinition();
+ return BaseArrayBackedInputTable.this.getDefinition();
}
@Override
public void add(@NotNull final Table newData) throws IOException {
checkBlockingEditSafety();
- PendingChange pendingChange = enqueueAddition(newData, true);
+ PendingChange pendingChange = enqueueAddition(newData);
blockingContinuation(pendingChange);
}
@Override
public void addAsync(
@NotNull final Table newData,
- final boolean allowEdits,
@NotNull final InputTableStatusListener listener) {
checkAsyncEditSafety(newData);
- final PendingChange pendingChange = enqueueAddition(newData, allowEdits);
+ final PendingChange pendingChange = enqueueAddition(newData);
asynchronousContinuation(pendingChange, listener);
}
- private PendingChange enqueueAddition(@NotNull final Table newData, final boolean allowEdits) {
+ private PendingChange enqueueAddition(@NotNull final Table newData) {
validateAddOrModify(newData);
// we want to get a clean copy of the table; that can not change out from under us or result in long reads
// during our UGP run
final Table newDataSnapshot = snapshotData(newData);
final PendingChange pendingChange;
synchronized (pendingChanges) {
- pendingChange = new PendingChange(newDataSnapshot, false, allowEdits);
+ pendingChange = new PendingChange(newDataSnapshot, false);
pendingChanges.add(pendingChange);
}
onPendingChange.run();
@@ -239,38 +225,33 @@ private PendingChange enqueueAddition(@NotNull final Table newData, final boolea
}
@Override
- public void delete(@NotNull final Table table, @NotNull final TrackingRowSet rowsToDelete) throws IOException {
+ public void delete(@NotNull final Table table) throws IOException {
checkBlockingEditSafety();
- final PendingChange pendingChange = enqueueDeletion(table, rowsToDelete);
+ final PendingChange pendingChange = enqueueDeletion(table);
blockingContinuation(pendingChange);
}
@Override
public void deleteAsync(
@NotNull final Table table,
- @NotNull final TrackingRowSet rowsToDelete,
@NotNull final InputTableStatusListener listener) {
checkAsyncEditSafety(table);
- final PendingChange pendingChange = enqueueDeletion(table, rowsToDelete);
+ final PendingChange pendingChange = enqueueDeletion(table);
asynchronousContinuation(pendingChange, listener);
}
- private PendingChange enqueueDeletion(@NotNull final Table table, @NotNull final TrackingRowSet rowsToDelete) {
+ private PendingChange enqueueDeletion(@NotNull final Table table) {
validateDelete(table);
- final Table oldDataSnapshot = snapshotData(table, rowsToDelete);
+ final Table oldDataSnapshot = snapshotData(table);
final PendingChange pendingChange;
synchronized (pendingChanges) {
- pendingChange = new PendingChange(oldDataSnapshot, true, false);
+ pendingChange = new PendingChange(oldDataSnapshot, true);
pendingChanges.add(pendingChange);
}
onPendingChange.run();
return pendingChange;
}
- private Table snapshotData(@NotNull final Table data, @NotNull final TrackingRowSet rowSet) {
- return snapshotData(data.getSubTable(rowSet));
- }
-
private Table snapshotData(@NotNull final Table data) {
Table dataSnapshot;
if (data.isRefreshing()) {
@@ -333,7 +314,7 @@ void waitForSequence(long sequence) {
// in order to allow updates.
while (processedSequence < sequence) {
try {
- BaseArrayBackedMutableTable.this.awaitUpdate();
+ BaseArrayBackedInputTable.this.awaitUpdate();
} catch (InterruptedException ignored) {
}
}
@@ -350,84 +331,6 @@ void waitForSequence(long sequence) {
}
}
- @Override
- public void setRows(@NotNull Table defaultValues, int[] rowArray, Map[] valueArray,
- InputTableStatusListener listener) {
- Assert.neqNull(defaultValues, "defaultValues");
- if (defaultValues.isRefreshing()) {
- updateGraph.checkInitiateSerialTableOperation();
- }
-
- final List> columnDefinitions = getTableDefinition().getColumns();
- final Map> sources =
- buildSourcesMap(valueArray.length, columnDefinitions);
- final String[] kabmtColumns =
- getTableDefinition().getColumnNames().toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY);
- // noinspection unchecked
- final WritableColumnSource