From e8b6f27afef90b52bde5f97c9a63911f2d9e7eb2 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Tue, 10 Sep 2024 17:06:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96LocalFileThingsDataMa?= =?UTF-8?q?nager=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/LocalFileThingsDataManager.java | 837 ++++++++++++------ 1 file changed, 557 insertions(+), 280 deletions(-) diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java index 4e225bf15..5d673f943 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java @@ -10,6 +10,8 @@ import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.BasicDataType; import org.jetlinks.community.codec.Serializers; +import org.jetlinks.community.utils.TimeUtils; +import org.jetlinks.core.message.property.SimplePropertyValue; import org.jetlinks.core.things.ThingEvent; import org.jetlinks.core.things.ThingProperty; import org.jetlinks.core.things.ThingTag; @@ -19,19 +21,28 @@ import org.jetlinks.core.utils.SerializeUtils; import org.jetlinks.core.utils.StringBuilderUtils; import org.jetlinks.supports.utils.MVStoreUtils; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.function.Function4; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; import javax.annotation.Nonnull; import java.io.*; import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; @Slf4j public class LocalFileThingsDataManager implements ThingsDataManager, ThingsDataWriter { @@ -45,6 +56,13 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData System.getProperty("jetlinks.things.data.store.max-size", "8") ); + static Duration FLUSH_INTERVAL = TimeUtils + .parse( + System.getProperty("jetlinks.things.data.store.flush-interval", "30s") + ); + + static int CACHE_SIZE = (int) Math.max(64, Runtime.getRuntime().maxMemory() / 1024 / 1024 / 64); + protected final MVStore mvStore; //记录key的标签缓存,此方式决定了支持的最大(物实例数量+属性数量)为2^32(42亿) @@ -52,55 +70,95 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData private final MVMap tagStore; // 历史数据缓存 - // fixme 内存占用可能过多, 根据当前jvm内存来决定使用CaffeineCache还是ConcurrentHashMap // key为什么不直接使用Long?因为tag的生成规则会导致hash冲突严重.(同一个物实例的所有属性tag hash值一样) - private final Map historyCache = new ConcurrentHashMap<>(); + private final Map l1Cache = new ConcurrentHashMap<>(); private final MVMap historyStore; - + private final Scheduler writerScheduler = Schedulers.newSingle("things-data-writer"); + private final Scheduler readerScheduler = Schedulers.newSingle("things-data-reader"); + private final Disposable.Composite disposable = Disposables.composite(); private volatile int tagInc; - @SuppressWarnings("all") - private static MVStore load(String fileName) { - return MVStoreUtils + public LocalFileThingsDataManager(String fileName) { + + Tuple3, + MVMap> tp3 = MVStoreUtils .open(new File(fileName), "things-data-manager", - c -> { - return c.keysPerPage(1024) - .cacheSize(64); - }); + c -> c.keysPerPage(1024) + .cacheSize(CACHE_SIZE), + store -> Tuples.of( + store, + MVStoreUtils.openMap(store, "tags", new MVMap.Builder<>()), + MVStoreUtils.openMap(store, "store", new MVMap + .Builder() + .valueType(new HistoryType())))); + + this.mvStore = tp3.getT1(); + this.tagStore = tp3.getT2(); + this.historyStore = tp3.getT3(); + this.tagInc = this.tagStore.size(); + init(); } - public LocalFileThingsDataManager(String fileName) { - this(load(fileName)); + public void shutdown() { + flushNow(); + mvStore.close(10_000); + this.disposable.dispose(); } - public LocalFileThingsDataManager(MVStore store) { - this.mvStore = store; - this.tagStore = mvStore.openMap("tags"); - this.tagInc = this.tagStore.size(); - this.historyStore = MVStoreUtils - .openMap( - mvStore, - "store", - new MVMap - .Builder() - .valueType(new HistoryType()) - ); + private void init() { + disposable.add( + Flux.interval(FLUSH_INTERVAL) + .onBackpressureDrop(dropped -> log.info("flush thing data too slow! in memory size:{}", l1Cache.size())) + .concatMap(ignore -> flushAsync(), 1) + .subscribe()); + disposable.add(this.writerScheduler); + disposable.add(this.readerScheduler); } - public void shutdown() { - for (Map.Entry entry : historyCache.entrySet()) { - if (!entry.getValue().stored) { - entry.getValue().stored = true; - historyStore.put(entry.getKey().toTag(), entry.getValue()); + static final MVMap.DecisionMaker MERGE = new MVMap.DecisionMaker() { + @Override + public MVMap.Decision decide(PropertyHistory existingValue, PropertyHistory providedValue) { + if (existingValue != null && providedValue.isDirty()) { + providedValue + .merge(existingValue) + .setDirty(false); } + providedValue.setStored(true); + return MVMap.Decision.PUT; } - for (Map.Entry entry : historyStore.entrySet()) { - if (!entry.getValue().stored) { - historyStore.put(entry.getKey(), entry.getValue()); + }; + + synchronized void flushNow() { + long ms = System.currentTimeMillis(); + log.info("flushing thing data, in memory size:{}", l1Cache.size()); + for (Map.Entry entry : l1Cache.entrySet()) { + StoreKey key = entry.getKey(); + PropertyHistory history = entry.getValue(); + if (!history.isStored()) { + historyStore.operate(key.toTag(), history, MERGE); + } + //上一次被标记为空闲,则本次移除 + if (history.isIdle()) { + l1Cache.remove(key); + } + //标记为空闲 + else { + history.setIdle(true); } } - mvStore.close(-1); + log.info("flushing thing data complete {}ms, in memory size:{}", System.currentTimeMillis() - ms, l1Cache.size()); + } + + protected Mono flushAsync() { + return Mono + .fromRunnable(this::flushNow) + .subscribeOn(writerScheduler) + .onErrorResume(err -> { + log.warn("write thing data error. in memory size:{}", l1Cache.size(), err); + return Mono.empty(); + }); } @Override @@ -108,15 +166,18 @@ public Mono getLastProperty(String thingType, String thingId, String property, long baseTime) { - PropertyHistory propertyStore = getHistory(thingType, thingId, property); - if (propertyStore == null) { - return lastPropertyNotFound(thingType, thingId, property, baseTime); - } - Property pro = propertyStore.getProperty(baseTime); - if (pro == null) { - return lastPropertyNotFound(thingType, thingId, property, baseTime); - } - return pro.toProperty(property); + return this.getHistory( + thingType, + thingId, + property, + prop -> { + Property p = prop.getProperty(baseTime); + if (p == null) { + return null; + } + return p.toProperty(property); + }); + } protected Mono lastPropertyNotFound(String thingType, @@ -136,15 +197,17 @@ protected Mono firstPropertyNotFound(String thingType, public Mono getFirstProperty(String thingType, String thingId, String property) { - PropertyHistory propertyStore = getHistory(thingType, thingId, property); - if (propertyStore == null) { - return firstPropertyNotFound(thingType, thingId, property); - } - Property pro = propertyStore.first; - if (pro == null) { - return firstPropertyNotFound(thingType, thingId, property); - } - return pro.toProperty(property); + return this.getHistory( + thingType, + thingId, + property, + prop -> { + Property p = prop.first; + if (p == null) { + return null; + } + return p.toProperty(property); + }); } @Override @@ -165,44 +228,63 @@ public Mono> getProperties(String thingType, String property, long from, long to) { - PropertyHistory propertyStore = getHistory(thingType, thingId, property); - if (propertyStore == null) { - return Mono.empty(); - } - return Mono.just(propertyStore.getProperties(property, from, to)); + return this.getHistory( + thingType, + thingId, + property, + prop -> prop.getProperties(property, from, to)); } - protected PropertyHistory getHistory(String thingType, - String thingId, - String property) { + protected Mono getHistory(String thingType, + String thingId, + String property, + Function mapper) { StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); - PropertyHistory his = historyCache.get(key); - if (his != null) { - return his; + PropertyHistory his = l1Cache.get(key); + //fast path + if (his != null && !his.isDirty()) { + return Mono.justOrEmpty(mapper.apply(his)); } - his = historyStore.get(key.toTag()); - if (his != null) { - historyCache.putIfAbsent(key, his); - return his; - } - return null; + // slow path + return Mono + .fromCallable(() -> { + PropertyHistory _his = historyStore.get(key.toTag()); + if (_his != null) { + + PropertyHistory l1 = l1Cache.computeIfAbsent( + key, _ignore -> new PropertyHistory().setDirty(true)); + if (l1.isDirty() && _his != l1) { + l1.merge(_his); + } + return mapper.apply(l1); + + } + if (his != null) { + return mapper.apply(his); + } + return null; + }) + .subscribeOn(readerScheduler); } @Override public Mono getLastPropertyTime(String thingType, String thingId, long baseTime) { - long time = scanProperty(thingType, - thingId, - 0L, - baseTime, - (init, arg, key, history) -> { - Property store = history.getProperty(arg); - if (store != null) { - return Math.max(init, store.time); - } - return init; - - }); - return time == 0 ? Mono.empty() : Mono.just(time); + return Mono.fromCallable(() -> { + long time = scanProperty(thingType, + thingId, + 0L, + baseTime, + (init, arg, key, history) -> { + Property store = history.getProperty(arg); + if (store != null) { + return Math.max(init, store.time); + } + return init; + + }); + return time == 0 ? null : time; + }) + .subscribeOn(readerScheduler); } protected T scanProperty(String thingType, @@ -210,7 +292,7 @@ protected T scanProperty(String thingType, T init, ARG arg, Function4 historyConsumer) { - long thingTag = getThingTag(thingType, thingId); + long thingTag = getThingIndex(thingType, thingId); int tagSize = tagStore.size(); @@ -239,6 +321,12 @@ protected T scanProperty(String thingType, if (tag == thingTag) { PropertyHistory propertyStore = cursor.getValue(); + //与一级缓存进行合并 + PropertyHistory l1 = l1Cache.get(new StoreKey(thingTag, (int) (key - (key << 32)))); + if (l1 != null && l1.isDirty()) { + l1.merge(propertyStore); + propertyStore = l1; + } init = historyConsumer.apply(init, arg, key, propertyStore); } @@ -249,25 +337,28 @@ protected T scanProperty(String thingType, @Override public Mono getFirstPropertyTime(String thingType, String thingId) { - Long time = scanProperty(thingType, - thingId, - null, - null, - (init, arg, key, history) -> { - Property store = history.first; - if (store != null) { - if (init == null) { - return store.time; - } - return Math.min(init, store.time); - } - return init; - - }); - return time == null ? Mono.empty() : Mono.just(time); - } - - protected final int getTag(String key) { + return Mono + .fromCallable( + () -> scanProperty( + thingType, + thingId, + (Long) null, + null, + (init, arg, key, history) -> { + Property store = history.first; + if (store != null) { + if (init == null) { + return store.time; + } + return Math.min(init, store.time); + } + return init; + + })) + .subscribeOn(readerScheduler); + } + + protected final int getIndex(String key) { return tagCache .computeIfAbsent(key, _key -> tagStore.computeIfAbsent(_key, k -> TAG_INC.incrementAndGet(this))); @@ -294,25 +385,25 @@ public final Mono updateProperty(@Nonnull String thingType, @Nonnull Strin property.getState()); } - protected long getThingTag(String thingType, String thingId) { - return getTag(StringBuilderUtils.buildString( + protected long getThingIndex(String thingType, String thingId) { + return getIndex(StringBuilderUtils.buildString( thingType, thingId, (a, b, sb) -> sb.append(a).append(':').append(b))); } - protected long getPropertyStoreTag(String thingType, String thingId, String property) { + protected long getPropertyStoreIndex(String thingType, String thingId, String property) { - long thingTag = getThingTag(thingType, thingId); + long thingTag = getThingIndex(thingType, thingId); - int propertyTag = getTag(property); + int propertyTag = getIndex(property); //物ID对应的tag左移32位和属性tag相加,表示 一个物的某个属性. return (thingTag << 32) + propertyTag; } protected StoreKey getPropertyStoreKeyObj(String thingType, String thingId, String property) { - long thingTag = getThingTag(thingType, thingId); - int propertyTag = getTag(property); + long thingTag = getThingIndex(thingType, thingId); + int propertyTag = getIndex(property); return new StoreKey(thingTag, propertyTag); } @@ -324,8 +415,7 @@ public Mono updateProperty(@Nonnull String thingType, long timestamp, @Nonnull Object value, String state) { - updateProperty0(thingType, thingId, property, timestamp, value, state); - return Mono.empty(); + return Mono.fromRunnable(() -> updateProperty0(thingType, thingId, property, timestamp, value, state)); } protected final void updateProperty0(String thingType, @@ -335,31 +425,31 @@ protected final void updateProperty0(String thingType, Object value, String state) { - StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); - PropertyHistory propertyStore = historyCache - .computeIfAbsent(key, k -> historyStore.computeIfAbsent(k.toTag(), k1 -> new PropertyHistory())); + StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); + PropertyHistory propertyStore = l1Cache.computeIfAbsent(key, k -> new PropertyHistory().setDirty(true)); Property p = new Property(); p.setTime(timestamp); p.setValue(tryIntern(value)); p.setState(RecyclerUtils.intern(state)); propertyStore.update(p); - propertyStore.tryStore(key.toTag(), historyStore::put); } - protected final void updateProperty(String thingType, - String thingId, - String property, - PropertyHistory propertyHistory) { - long key = getPropertyStoreTag(thingType, thingId, property); - PropertyHistory propertyStore = historyStore.computeIfAbsent(key, (ignore) -> new PropertyHistory()); - if (propertyHistory.first != null) { - propertyStore.update(propertyHistory.first); - } - for (Property ref : propertyHistory.refs.values()) { - propertyStore.update(ref); - } + protected final Mono updateProperty(String thingType, + String thingId, + String property, + PropertyHistory propertyHistory) { + StoreKey storeKey = getPropertyStoreKeyObj(thingType, thingId, property); + + PropertyHistory history = l1Cache + .computeIfAbsent(storeKey, (ignore) -> new PropertyHistory().setDirty(true)); + + //直接合并1级缓存 + history.merge(propertyHistory); + + return Mono.empty(); + } @Nonnull @@ -386,16 +476,23 @@ public Mono updateTag(@Nonnull String thingType, @Override public Mono removeProperties(@Nonnull String thingType, @Nonnull String thingId) { - scanProperty(thingType, thingId, null, null, (init, arg, key, value) -> { - long thingTag = key >> 32; - int propertyTag = (int) (key - (key << 32)); - historyStore.remove(key); - historyCache.remove(new StoreKey(thingTag, propertyTag)); - return null; - }); + return Mono + .fromCallable(() -> this + .scanProperty( + thingType, + thingId, + null, + null, + (init, arg, key, value) -> { + long thingTag = key >> 32; + int propertyTag = (int) (key - (key << 32)); - return Mono.empty(); + historyStore.remove(key); + l1Cache.remove(new StoreKey(thingTag, propertyTag)); + return null; + })) + .subscribeOn(writerScheduler); } @@ -422,11 +519,12 @@ public Mono removeProperty(@Nonnull String thingType, @Nonnull String property) { StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); - historyCache.remove(key); - - historyStore.remove(key.toTag()); + l1Cache.remove(key); - return Mono.empty(); + return Mono + .fromRunnable(() -> historyStore.remove(key.toTag())) + .subscribeOn(writerScheduler) + .then(); } @Override @@ -435,17 +533,17 @@ public Mono getLastEvent(String thingType, String event, long baseTime) { String eventKey = createEventProperty(event); - PropertyHistory propertyStore = getHistory(thingType, thingId, eventKey); - if (propertyStore == null) { - return Mono.empty(); - } - Property pro = propertyStore.getProperty(baseTime); - if (pro == null) { - return Mono.empty(); - } - return pro - .toProperty(eventKey) - .map(PropertyThingEvent::new); + return this.getHistory( + thingType, + thingId, + eventKey, + prop -> { + Property p = prop.getProperty(baseTime); + if (p == null) { + return null; + } + return new PropertyThingEvent(event, p.time, p.value); + }); } protected String createEventProperty(String event) { @@ -453,25 +551,12 @@ protected String createEventProperty(String event) { } @AllArgsConstructor + @Getter private static class PropertyThingEvent implements ThingEvent { - private final ThingProperty property; - - @Override - public String getEvent() { - return property - .getProperty() - .substring(2); - } + private final String event; + private final long timestamp; + private final Object data; - @Override - public long getTimestamp() { - return property.getTimestamp(); - } - - @Override - public Object getData() { - return property.getValue(); - } } @Override @@ -479,18 +564,18 @@ public Mono getLastTag(String thingType, String thingId, String tag, long baseTime) { - String eventKey = createTagProperty(tag); - PropertyHistory propertyStore = getHistory(thingType, thingId, eventKey); - if (propertyStore == null) { - return Mono.empty(); - } - Property pro = propertyStore.getProperty(baseTime); - if (pro == null) { - return Mono.empty(); - } - return pro - .toProperty(eventKey) - .map(PropertyThingTag::new); + String key = createTagProperty(tag); + return this.getHistory( + thingType, + thingId, + key, + prop -> { + Property p = prop.getProperty(baseTime); + if (p == null) { + return null; + } + return new PropertyThingTag(tag, p.time, p.value); + }); } protected String createTagProperty(String tag) { @@ -498,25 +583,12 @@ protected String createTagProperty(String tag) { } @AllArgsConstructor + @Getter private static class PropertyThingTag implements ThingTag { - private final ThingProperty property; - - @Override - public String getTag() { - return property - .getProperty() - .substring(2); - } - - @Override - public long getTimestamp() { - return property.getTimestamp(); - } + private final String tag; + private final long timestamp; + private final Object value; - @Override - public Object getValue() { - return property.getValue(); - } } @AllArgsConstructor @@ -531,82 +603,174 @@ public long toTag() { } public static class PropertyHistory implements Externalizable { + private static final byte + STORED = 1, //已经持久化 + IDLE = 1 << 1, //空闲的 + DIRTY = 1 << 2; //脏数据,等待和磁盘中的数据合并 + private static final Property NULL = new Property(); + + static { + NULL.setTime(Long.MIN_VALUE); + } private Property first; + private volatile Property last; + + // private long minTime = -1; + + private transient volatile byte state; + + PropertyHistory merge(PropertyHistory another) { + if (another == null) { + return this; + } + synchronized (this) { + if (this.first == null) { + this.first = another.first.copy(); + } else if (another.first != null && another.first.time < this.first.time) { + this.first = another.first.copy(); + } + if (another.last != null) { + another.last.forEach(pro -> updateLastUnsafe(pro.copy())); + } + } + return this; + } + + @SuppressWarnings("all") + void setStateUnsafe(boolean state, byte flag) { + if (state) { + this.state |= flag; + } else { + this.state &= ~flag; + } + } + + boolean isStored() { + synchronized (this) { + return (state & STORED) != 0; + } + } + + void setStored(boolean stored) { + synchronized (this) { + setStateUnsafe(stored, STORED); + } + } - private final NavigableMap refs = new ConcurrentSkipListMap<>(); + boolean isIdle() { + synchronized (this) { + return (state & IDLE) != 0; + } + } - private long minTime = -1; + void setIdle(boolean idle) { + synchronized (this) { + setStateUnsafe(idle, IDLE); + } + } - private long elapsedTime; + boolean isDirty() { + synchronized (this) { + return (state & DIRTY) != 0; + } + } + + PropertyHistory setDirty(boolean dirty) { + synchronized (this) { + setStateUnsafe(dirty, DIRTY); + } + return this; + } - private boolean stored; public Property getProperty(long baseTime) { - Map.Entry ref = refs.floorEntry(baseTime); - if (ref != null) { - return ref.getValue(); + setIdle(false); + if (last == null) { + return null; } - return null; + synchronized (this) { + return last.getProperty(baseTime); + } + } + + public Long lastKey() { + setIdle(false); + if (last == null) { + return null; + } + return last.time; } public List getProperties(String property, long from, long to) { - if (refs.isEmpty()) { + if (last == null) { return Collections.emptyList(); } if (DEFAULT_MAX_STORE_SIZE_EACH_KEY == 0) { return Collections.emptyList(); } - List properties = new ArrayList<>(Math.min(32, DEFAULT_MAX_STORE_SIZE_EACH_KEY)); - refs.subMap(from, true, to, false) - .forEach((ts, ref) -> { - ThingProperty prop = ref.toPropertyNow(property); - if (prop != null) { - properties.add(prop); + synchronized (this) { + setStateUnsafe(false, IDLE); + Property prop = last; + while (prop != null) { + if (prop.time >= from && prop.time < to) { + properties.add(prop.toProperty(property)); + prop = prop.older; + } else { + break; } - }); + } + } return properties; } - public void tryStore(long key, BiConsumer store) { - long now = System.currentTimeMillis(); - long elapsed = elapsedTime; - elapsedTime = now; - if (now - elapsed >= 5_000) { - stored = true; - store.accept(key, this); + + @SuppressWarnings("all") + public void updateLastUnsafe(Property ref) { + if (last == null) { + last = ref; } else { - stored = false; + last = last.update(ref, 1); } } - //更新 - public void update(Property ref) { - + @SuppressWarnings("all") + public void updateUnsafe(Property ref) { //更新首次时间 - if (first == null || first.time >= ref.time) { + if (first == null) { first = ref; + } else if (first.time >= ref.time) { + first.time = ref.time; + first.state = ref.state; + first.value = ref.value; } + updateLastUnsafe(ref); + setStateUnsafe(false, IDLE); + } - if (minTime > 0) { - //时间回退? - if (ref.time < minTime) { - return; - } + Collection values(Collection properties) { + for (Property prop = last; prop != null; prop = prop.older) { + properties.add(prop); } + return properties; + } - refs.put(ref.time, ref); - if (refs.size() > DEFAULT_MAX_STORE_SIZE_EACH_KEY) { - refs.remove(refs.firstKey()); - } - minTime = refs.firstKey(); + Collection values() { + return values(new ArrayList<>(DEFAULT_MAX_STORE_SIZE_EACH_KEY)); + } + //更新 + public void update(Property ref) { + synchronized (this) { + updateUnsafe(ref); + setStateUnsafe(false, STORED); + } } @Override public void writeExternal(ObjectOutput out) throws IOException { - Collection properties = refs.values(); + Collection properties = this.values(); int size = Math.min(properties.size(), DEFAULT_MAX_STORE_SIZE_EACH_KEY); out.writeShort(size); for (Property ref : properties) { @@ -615,6 +779,11 @@ public void writeExternal(ObjectOutput out) throws IOException { } ref.writeExternal(out); } + if (size > 0) { + for (int i = 0; i < size; i++) { + NULL.writeExternal(out); + } + } out.writeBoolean(first != null); if (first != null) { first.writeExternal(out); @@ -623,39 +792,59 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException { - this.stored = true; + this.setStored(true); int len = in.readShort(); - for (int i = 0; i < len; i++) { - Property property = new Property(); - property.readExternal(in); - refs.put(property.time, property); - } - if (in.readBoolean()) { - first = new Property(); - first.readExternal(in); + synchronized (this) { + for (int i = 0; i < len; i++) { + Property property = new Property(); + property.readExternal(in); + if (property.time == NULL.time) { + continue; + } + this.updateLastUnsafe(property); + } + if (in.readBoolean()) { + this.first = new Property(); + first.readExternal(in); + } } } public int memory() { - int i = 0; - if (first != null) { - i += first.memory(); + if (last != null) { + return last.memory(); } - for (Property ref : refs.values()) { - if (ref != null) { - i += ref.memory(); - } - } - return i; + return 16; } } public static T tryIntern(T val) { - + //针对数字类型进行常量化 if (val instanceof Number) { - if (NumberUtils.isIntNumber(((Number) val))) { + if (val instanceof BigDecimal) { + BigDecimal decimal = ((BigDecimal) val); + //整数 + if (decimal.scale() == 0) { + int intVal = decimal.intValue(); + if (intVal > Short.MIN_VALUE && intVal < 65535) { + return RecyclerUtils.intern(val); + } + } + //2位小数 + else if (decimal.scale() <= 2) { + double intVal = decimal.doubleValue() * Math.pow(10, decimal.scale()); + if (intVal > Short.MIN_VALUE && intVal < 65535) { + return RecyclerUtils.intern(val); + } + } + } else if (val instanceof BigInteger) { + long value = ((BigInteger) val).longValue(); + if (value > Short.MIN_VALUE && value < 65535) { + return RecyclerUtils.intern(val); + } + } else if (NumberUtils.isIntNumber(((Number) val))) { int v = ((Number) val).intValue(); - if (v > Short.MIN_VALUE && v < Short.MAX_VALUE) { + if (v > Short.MIN_VALUE && v < 65535) { return RecyclerUtils.intern(val); } } else { @@ -669,6 +858,12 @@ public static T tryIntern(T val) { } } + //字符串长度小于64常量化 + if (val instanceof String) { + if (((String) val).length() < 64) { + return RecyclerUtils.intern(val); + } + } return val; } @@ -678,37 +873,119 @@ public static class Property implements Externalizable { private long time; private String state; private Object value; + private transient Property older; + + int olderSize() { + int i = 0; + Property _o = older; + while (_o != null) { + i++; + _o = _o.older; + } + return i; + } + + public Property copy() { + Property property = new Property(); + property.time = this.time; + property.state = this.state; + property.value = this.value; + return property; + } + + protected void forEach(Consumer consumer) { + for (Property that = this; + that != null; + that = that.older) { + consumer.accept(that); + } + } - private volatile Mono _temp; + private Property getProperty(long baseTime) { + if (time <= baseTime) { + return this; + } + if (older != null) { + return older.getProperty(baseTime); + } + return null; + } + + private void computeCapacity(int deep) { + int itr = DEFAULT_MAX_STORE_SIZE_EACH_KEY - deep; + if (itr <= 0) { + this.older = null; + return; + } + Property node = this.older; + while (node != null && --itr > 0) { + node = node.older; + } + if (node != null) { + node.older = null; + } + } + + public Property update(Property property) { + return update(property, 1); + } - public Mono toProperty(String property) { - if (_temp == null) { - _temp = Mono.just(ThingProperty.of(property, value, time, state)); + private Property update(Property property, int deep) { + if (this == property) { + return this; } - return _temp; + //新的数据,更新节点 + if (property.time > this.time) { + property.older = this; + property.computeCapacity(deep); + return property; + } + //旧的数据? + if (property.time < this.time) { + if (older != null) { + older = older.update(property, deep + 1); + } else { + property.older = null; + older = property; + } + computeCapacity(deep); + return this; + } + //相同时间,直接更新值 + this.state = property.state; + this.value = property.value; + return this; + } + + public ThingProperty toProperty(String property) { + return SimplePropertyValue.of(property, value, time, state); } @SneakyThrows @SuppressWarnings("all") public ThingProperty toPropertyNow(String property) { - if (_temp == null) { - return ThingProperty.of(property, value, time, state); - } - if (_temp instanceof Callable) { - return ((Callable) _temp).call(); - } - return _temp.toFuture().getNow(null); + return SimplePropertyValue.of(property, value, time, state); } public int memory() { - int i = 8; - if (state != null) { - i += state.length() * 2; - } - if (value instanceof Number) { - i += 8; - } else { - i += 64; + // 对象固定大小 this (32) + time (8) + int i = 40; + + for (Property property = this; + property != null; + property = property.older) { + Object value = property.value; + //数字,固定8 + if (value instanceof Number) { + i += 8; + } else if (value instanceof String) { + i += ((String) value).length() * 2; + } else { + i += 64; + } + if (property.state != null) { + i += property.state.length() * 2; + } } return i; } @@ -737,8 +1014,8 @@ private class HistoryType extends BasicDataType { @Override public int compare(PropertyHistory a, PropertyHistory b) { - Long aLastKey = a.refs.lastKey(); - Long bLastKey = b.refs.lastKey(); + Long aLastKey = a.lastKey(); + Long bLastKey = b.lastKey(); if (aLastKey == null && bLastKey == null) { return 0;