diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java index 01fb11cc4..45dca6642 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java @@ -1,83 +1,62 @@ package org.jetlinks.community.things.data; -import com.github.benmanes.caffeine.cache.Caffeine; import io.netty.buffer.*; import io.netty.util.ReferenceCountUtil; -import lombok.Getter; -import lombok.Setter; -import lombok.SneakyThrows; +import lombok.*; import org.h2.mvstore.Cursor; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.BasicDataType; +import org.jetlinks.community.codec.Serializers; +import org.jetlinks.core.things.ThingEvent; import org.jetlinks.core.things.ThingProperty; import org.jetlinks.core.things.ThingsDataManager; import org.jetlinks.core.utils.SerializeUtils; import org.jetlinks.core.utils.StringBuilderUtils; -import org.jetlinks.community.codec.Serializers; +import org.jetlinks.supports.utils.MVStoreUtils; import reactor.core.publisher.Mono; -import reactor.function.Function3; +import reactor.function.Function4; import javax.annotation.Nonnull; import java.io.*; import java.lang.reflect.Array; import java.nio.ByteBuffer; -import java.time.Duration; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.BiConsumer; public class LocalFileThingsDataManager implements ThingsDataManager, ThingsDataWriter { - //单个属性最大缓存数量 java -Dthing.data.store.max-size=4 - private final static int DEFAULT_MAX_STORE_SIZE_EACH_KEY = Integer + //单个属性最大缓存数量 java -Dthings.data.store.max-size=8 + static int DEFAULT_MAX_STORE_SIZE_EACH_KEY = Integer .parseInt( - System.getProperty("thing.data.store.max-size", "4") + System.getProperty("jetlinks.things.data.store.max-size", "8") ); protected final MVStore mvStore; + //记录key的标签缓存,此方式决定了支持的最大(物实例数量+属性数量)为2^32(42亿) private final Map tagCache = new ConcurrentHashMap<>(); - private final Map historyCache = - Caffeine - .newBuilder() - .expireAfterAccess(Duration.ofMinutes(10)) - .build() - .asMap(); - private final MVMap tagStore; - private final MVMap history; - - private static MVStore open(String fileName) { - return new MVStore.Builder() - .fileName(fileName) - .autoCommitBufferSize(64 * 1024) - .compress() - .keysPerPage(1024) - .cacheSize(64) - .open(); - } + // 历史数据缓存 + // fixme 内存占用可能过多, 根据当前jvm内存来决定使用CaffeineCache还是ConcurrentHashMap + // key为什么不直接使用Long?因为tag的生成规则会导致hash冲突严重.(同一个物实例的所有属性tag hash值一样) + private final Map historyCache = new ConcurrentHashMap<>(); + private final MVMap historyStore; @SuppressWarnings("all") private static MVStore load(String fileName) { - File file = new File(fileName); - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - try { - return open(fileName); - } catch (Throwable err) { - if (file.exists()) { - file.renameTo(new File(fileName + "_load_err_" + System.currentTimeMillis())); - file.delete(); - return open(fileName); - } else { - throw err; - } - } + return MVStoreUtils + .open(new File(fileName), + "things-data-manager", + c -> { + return c.keysPerPage(1024) + .cacheSize(64); + }); } public LocalFileThingsDataManager(String fileName) { @@ -87,25 +66,25 @@ public LocalFileThingsDataManager(String fileName) { public LocalFileThingsDataManager(MVStore store) { this.mvStore = store; this.tagStore = mvStore.openMap("tags"); - this.history = mvStore + this.historyStore = mvStore .openMap("store", new MVMap .Builder() .valueType(new HistoryType())); } public void shutdown() { - for (Map.Entry entry : historyCache.entrySet()) { + for (Map.Entry entry : historyCache.entrySet()) { if (!entry.getValue().stored) { entry.getValue().stored = true; - history.put(entry.getKey(), entry.getValue()); + historyStore.put(entry.getKey().toTag(), entry.getValue()); } } - for (Map.Entry entry : history.entrySet()) { + for (Map.Entry entry : historyStore.entrySet()) { if (!entry.getValue().stored) { - history.put(entry.getKey(), entry.getValue()); + historyStore.put(entry.getKey(), entry.getValue()); } } - mvStore.compactMoveChunks(); + mvStore.compactFile(60_000); mvStore.close(60_000); } @@ -159,10 +138,10 @@ public Mono> getProperties(String thingType, String property, long baseTime) { return this.getProperties(thingType, - thingId, - property, - 0, - baseTime); + thingId, + property, + 0, + baseTime); } @Override @@ -181,12 +160,12 @@ public Mono> getProperties(String thingType, protected PropertyHistory getHistory(String thingType, String thingId, String property) { - long key = getPropertyStoreKey(thingType, thingId, property); + StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); PropertyHistory his = historyCache.get(key); if (his != null) { return his; } - his = history.get(key); + his = historyStore.get(key.toTag()); if (his != null) { historyCache.putIfAbsent(key, his); return his; @@ -200,7 +179,7 @@ public Mono getLastPropertyTime(String thingType, String thingId, long bas thingId, 0L, baseTime, - (init, arg, history) -> { + (init, arg, key, history) -> { Property store = history.getProperty(arg); if (store != null) { return Math.max(init, store.time); @@ -215,7 +194,7 @@ protected T scanProperty(String thingType, String thingId, T init, ARG arg, - Function3 historyConsumer) { + Function4 historyConsumer) { long thingTag = getThingTag(thingType, thingId); int tagSize = tagStore.size(); @@ -223,34 +202,31 @@ protected T scanProperty(String thingType, //左移32位表示物ID标记 long fromTag = thingTag << 32; //加上标签总大小,表示可能的所有属性key范围 - long toTag = fromTag + tagSize; + long toTag = fromTag + tagSize + 1; //获取搜索的key范围 - Long fromKey = history.higherKey(fromTag); + Long fromKey = historyStore.higherKey(fromTag); //没有key,说明没有数据 if (fromKey == null) { return init; } - Long toKey = history.lowerKey(toTag); + Long toKey = historyStore.lowerKey(toTag); //查找大于此标记的key,可能是同一个物的属性数据 - Cursor cursor = history.cursor(fromKey, toKey, false); + Cursor cursor = historyStore.cursor(fromKey, toKey, false); if (cursor == null) { return init; } - final int maxLoop = tagSize / 2; - int loop = maxLoop; //迭代游标来对比数据 - while (cursor.hasNext() && loop > 0) { - long _tag = cursor.getKey() >> 32; - if (_tag != thingTag) { - loop--; - cursor.next(); - continue; + while (cursor.hasNext()) { + long key = cursor.getKey(); + long tag = key >> 32; + + if (tag == thingTag) { + PropertyHistory propertyStore = cursor.getValue(); + init = historyConsumer.apply(init, arg, key, propertyStore); } - loop = maxLoop; - PropertyHistory propertyStore = cursor.getValue(); - init = historyConsumer.apply(init, arg, propertyStore); + cursor.next(); } return init; @@ -262,7 +238,7 @@ public Mono getFirstPropertyTime(String thingType, String thingId) { thingId, null, null, - (init, arg, history) -> { + (init, arg, key, history) -> { Property store = history.first; if (store != null) { if (init == null) { @@ -309,7 +285,7 @@ protected long getThingTag(String thingType, String thingId) { (a, b, sb) -> sb.append(a).append(':').append(b))); } - protected long getPropertyStoreKey(String thingType, String thingId, String property) { + protected long getPropertyStoreTag(String thingType, String thingId, String property) { long thingTag = getThingTag(thingType, thingId); @@ -319,6 +295,12 @@ protected long getPropertyStoreKey(String thingType, String thingId, String prop return (thingTag << 32) + propertyTag; } + protected StoreKey getPropertyStoreKeyObj(String thingType, String thingId, String property) { + long thingTag = getThingTag(thingType, thingId); + int propertyTag = getTag(property); + return new StoreKey(thingTag, propertyTag); + } + @Nonnull @Override public Mono updateProperty(@Nonnull String thingType, @@ -338,39 +320,144 @@ protected final void updateProperty0(String thingType, Object value, String state) { - long key = getPropertyStoreKey(thingType, thingId, property); + StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); PropertyHistory propertyStore = historyCache - .computeIfAbsent(key, k -> history.computeIfAbsent(k, k1 -> new PropertyHistory())); + .computeIfAbsent(key, k -> historyStore.computeIfAbsent(k.toTag(), k1 -> new PropertyHistory())); Property p = new Property(); p.setTime(timestamp); p.setValue(value); p.setState(state); propertyStore.update(p); - propertyStore.tryStore(key, history::put); + propertyStore.tryStore(key.toTag(), historyStore::put); } protected final void updateProperty(String thingType, String thingId, String property, PropertyHistory propertyHistory) { - long key = getPropertyStoreKey(thingType, thingId, property); - PropertyHistory propertyStore = history.computeIfAbsent(key, (ignore) -> new PropertyHistory()); + long key = getPropertyStoreTag(thingType, thingId, property); + PropertyHistory propertyStore = historyStore.computeIfAbsent(key, (ignore) -> new PropertyHistory()); if (propertyHistory.first != null) { propertyStore.update(propertyHistory.first); } - if (propertyHistory.refs != null) { - for (Property ref : propertyHistory.refs) { - propertyStore.update(ref); - } + for (Property ref : propertyHistory.refs.values()) { + propertyStore.update(ref); + } + } + + @Nonnull + @Override + public Mono updateEvent(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String eventId, + long timestamp, + @Nonnull Object data) { + return this.updateProperty(thingType, thingId, createEventProperty(eventId), timestamp, data, null); + } + + @Nonnull + @Override + public Mono removeProperties(@Nonnull String thingType, @Nonnull String thingId) { + + scanProperty(thingType, thingId, null, null, (init, arg, key, value) -> { + long thingTag = key >> 32; + int propertyTag = (int) (key - (key << 32)); + + historyStore.remove(key); + historyCache.remove(new StoreKey(thingTag, propertyTag)); + return null; + }); + + return Mono.empty(); + + } + + @Nonnull + @Override + public Mono removeEvent(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String eventId) { + return this.removeProperty(thingType, thingId, createEventProperty(eventId)); + } + + @Nonnull + @Override + public Mono removeProperty(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String property) { + StoreKey key = getPropertyStoreKeyObj(thingType, thingId, property); + + historyCache.remove(key); + + historyStore.remove(key.toTag()); + + return Mono.empty(); + } + + @Override + public Mono getLastEvent(String thingType, + String thingId, + String event, + long baseTime) { + String eventKey = createEventProperty(event); + PropertyHistory propertyStore = getHistory(thingType, thingId, eventKey); + if (propertyStore == null) { + return Mono.empty(); + } + Property pro = propertyStore.getProperty(baseTime); + if (pro == null) { + return Mono.empty(); + } + return pro + .toProperty(eventKey) + .map(PropertyThingEvent::new); + } + + protected String createEventProperty(String event) { + return "e@" + event; + } + + @AllArgsConstructor + private static class PropertyThingEvent implements ThingEvent { + private final ThingProperty property; + + @Override + public String getEvent() { + return property + .getProperty() + .substring(2); + } + + @Override + public long getTimestamp() { + return property.getTimestamp(); + } + + @Override + public Object getData() { + return property.getValue(); + } + } + + @AllArgsConstructor + @EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY) + protected static class StoreKey { + protected final long thingTag; + protected final int propertyTag; + + public long toTag() { + return (thingTag << 32) + propertyTag; } } public static class PropertyHistory implements Externalizable { + private Property first; - private Property[] refs; + private final NavigableMap refs = new ConcurrentSkipListMap<>(); + private long minTime = -1; private long elapsedTime; @@ -378,33 +465,29 @@ public static class PropertyHistory implements Externalizable { private boolean stored; public Property getProperty(long baseTime) { - if (refs == null) { - return null; - } - for (Property ref : refs) { - if (ref != null && ref.time <= baseTime) { - return ref; - } + Map.Entry ref = refs.floorEntry(baseTime); + if (ref != null) { + return ref.getValue(); } return null; } public List getProperties(String property, long from, long to) { - if (refs == null) { + if (refs.isEmpty()) { return Collections.emptyList(); } if (DEFAULT_MAX_STORE_SIZE_EACH_KEY == 0) { return Collections.emptyList(); } + List properties = new ArrayList<>(Math.min(32, DEFAULT_MAX_STORE_SIZE_EACH_KEY)); - for (Property ref : refs) { - if (ref != null && ref.time >= from && ref.time < to) { + refs.subMap(from, true, to, false) + .forEach((ts, ref) -> { ThingProperty prop = ref.toPropertyNow(property); if (prop != null) { properties.add(prop); } - } - } + }); return properties; } @@ -420,13 +503,9 @@ public void tryStore(long key, BiConsumer store) { } } - //更新,并返回距离上传更新的时间差 + //更新 public void update(Property ref) { - //init - if (refs == null) { - refs = new Property[0]; - } //更新首次时间 if (first == null || first.time >= ref.time) { first = ref; @@ -439,44 +518,20 @@ public void update(Property ref) { } } - boolean newEl = false; - if (refs.length < DEFAULT_MAX_STORE_SIZE_EACH_KEY) { - refs = Arrays.copyOf(refs, refs.length + 1); - newEl = true; - } - - Property last = refs[0]; - //fast - if (last == null || ref.time >= last.time || newEl) { - refs[refs.length - 1] = ref; - } - //slow - else { - for (int i = 1; i < refs.length; i++) { - last = refs[i]; - if (ref.time == last.time) { - refs[i] = ref; - } else if (ref.time > last.time) { - System.arraycopy(refs, i, refs, i + 1, refs.length - i - 1); - refs[i] = ref; - break; - } - } + refs.put(ref.time, ref); + if (refs.size() > DEFAULT_MAX_STORE_SIZE_EACH_KEY) { + refs.remove(refs.firstKey()); } + minTime = refs.firstKey(); - Arrays.sort(refs, Comparator.comparingLong(r -> r == null ? 0 : -r.time)); - minTime = refs[refs.length - 1].time; - - return; } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(refs.length); - for (Property ref : refs) { + out.writeShort(refs.size()); + for (Property ref : refs.values()) { ref.writeExternal(out); } - out.writeBoolean(first != null); if (first != null) { first.writeExternal(out); @@ -487,13 +542,11 @@ public void writeExternal(ObjectOutput out) throws IOException { public void readExternal(ObjectInput in) throws IOException { this.stored = true; int len = in.readShort(); - - refs = new Property[len]; for (int i = 0; i < len; i++) { - refs[i] = new Property(); - refs[i].readExternal(in); + Property property = new Property(); + property.readExternal(in); + refs.put(property.time, property); } - if (in.readBoolean()) { first = new Property(); first.readExternal(in); @@ -505,11 +558,9 @@ public int memory() { if (first != null) { i += first.memory(); } - if (refs != null) { - for (Property ref : refs) { - if (ref != null) { - i += ref.memory(); - } + for (Property ref : refs.values()) { + if (ref != null) { + i += ref.memory(); } } return i; @@ -576,16 +627,19 @@ private class HistoryType extends BasicDataType { @Override public int compare(PropertyHistory a, PropertyHistory b) { - if (a.refs == null && b.refs == null) { + Long aLastKey = a.refs.lastKey(); + Long bLastKey = b.refs.lastKey(); + + if (aLastKey == null && bLastKey == null) { return 0; } - if (a.refs == null) { + if (aLastKey == null) { return -1; } - if (b.refs == null) { + if (bLastKey == null) { return 1; } - return Long.compare(a.refs[0].time, b.refs[0].time); + return Long.compare(aLastKey, bLastKey); } @Override @@ -598,9 +652,11 @@ public int getMemory(PropertyHistory obj) { public void write(WriteBuffer buff, PropertyHistory data) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); try (ObjectOutput output = createOutput(buffer)) { + data.writeExternal(output); - buff.put(buffer.nioBuffer()); output.flush(); + + buff.put(buffer.nioBuffer()); } finally { ReferenceCountUtil.safeRelease(buffer); } @@ -612,11 +668,14 @@ public void write(WriteBuffer buff, PropertyHistory data) { public void write(WriteBuffer buff, Object obj, int len) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); try (ObjectOutput output = createOutput(buffer)) { + for (int i = 0; i < len; i++) { ((PropertyHistory) Array.get(obj, i)).writeExternal(output); } output.flush(); + buff.put(buffer.nioBuffer()); + } finally { ReferenceCountUtil.safeRelease(buffer); } diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java index 4e3618bf8..6132042e8 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java @@ -19,4 +19,25 @@ Mono updateProperty(@Nonnull String thingType, long timestamp, @Nonnull Object value, String state); + + @Nonnull + Mono updateEvent(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String eventId, + long timestamp, + @Nonnull Object data); + + @Nonnull + Mono removeProperty(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String property); + + @Nonnull + Mono removeProperties(@Nonnull String thingType, + @Nonnull String thingId); + + @Nonnull + Mono removeEvent(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String eventId); }