From 827dc1ef6be130ee358594fe325932d54b89f78c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 31 Oct 2023 16:02:49 +0800 Subject: [PATCH 1/3] build(deps): bump org.elasticsearch:elasticsearch from 7.17.5 to 7.17.13 (#434) Bumps [org.elasticsearch:elasticsearch](https://github.com/elastic/elasticsearch) from 7.17.5 to 7.17.13. - [Release notes](https://github.com/elastic/elasticsearch/releases) - [Changelog](https://github.com/elastic/elasticsearch/blob/main/CHANGELOG.md) - [Commits](https://github.com/elastic/elasticsearch/compare/v7.17.5...v7.17.13) --- updated-dependencies: - dependency-name: org.elasticsearch:elasticsearch dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7fc2dd95f..433c2c01f 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ Borca-SR2 3.0.2 4.1.97.Final - 7.17.5 + 7.17.13 3.7.0 1.2.83 2020.0.31 From 2814fac218731613b784874ac509b12976e0ff4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E5=91=A8?= Date: Tue, 31 Oct 2023 16:03:14 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=9C=A8=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E9=87=8C=E5=8D=95=E7=8B=AC=E5=AE=9A=E4=B9=89=E7=89=A9?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E6=97=B6,=E8=AE=A2=E9=98=85=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=A0=BC=E5=BC=8F=E4=B8=8D=E5=AF=B9=E9=97=AE?= =?UTF-8?q?=E9=A2=98.=20(#432)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: 修复在设备里单独定义物模型时,订阅的数据格式不对问题. * fix: 修复编译错误 --- .../measurements/DeviceDashboardObject.java | 82 ++++++++----- .../measurements/DeviceDynamicDashboard.java | 2 +- .../DevicePropertiesMeasurement.java | 111 ++++++++++++------ 3 files changed, 131 insertions(+), 64 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java index aee05203c..7b37da5c5 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java @@ -1,10 +1,12 @@ package org.jetlinks.community.device.measurements; +import lombok.Generated; import org.jetlinks.community.dashboard.DashboardObject; import org.jetlinks.community.dashboard.Measurement; import org.jetlinks.community.dashboard.ObjectDefinition; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.core.device.DeviceProductOperator; +import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.metadata.DeviceMetadata; import reactor.core.publisher.Flux; @@ -21,33 +23,40 @@ public class DeviceDashboardObject implements DashboardObject { private final DeviceDataService deviceDataService; + private final DeviceRegistry registry; + private DeviceDashboardObject(String id, String name, DeviceProductOperator productOperator, EventBus eventBus, - DeviceDataService dataService) { + DeviceDataService dataService, + DeviceRegistry registry) { this.id = id; this.name = name; this.productOperator = productOperator; this.eventBus = eventBus; this.deviceDataService = dataService; + this.registry = registry; } public static DeviceDashboardObject of(String id, String name, - DeviceProductOperator productOperator, - EventBus eventBus, - DeviceDataService dataService ) { - return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService); + DeviceProductOperator productOperator, + EventBus eventBus, + DeviceDataService dataService, + DeviceRegistry registry) { + return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService, registry); } @Override public ObjectDefinition getDefinition() { return new ObjectDefinition() { @Override + @Generated public String getId() { return id; } @Override + @Generated public String getName() { return name; } @@ -56,40 +65,57 @@ public String getName() { @Override public Flux getMeasurements() { - return Flux.concat( - - productOperator.getMetadata() - .flatMapIterable(DeviceMetadata::getEvents) - .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)), - - productOperator.getMetadata() - .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)), - - productOperator.getMetadata() - .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)), - - productOperator.getMetadata() - .flatMapIterable(DeviceMetadata::getProperties) - .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) - ); + return Flux + .concat( + + productOperator + .getMetadata() + .flatMapIterable(DeviceMetadata::getEvents) + .map(event -> new DeviceEventMeasurement(productOperator.getId(), + eventBus, + event, + deviceDataService)), + + Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), + eventBus, + deviceDataService, + registry)), + + productOperator + .getMetadata() + .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), + eventBus, + metadata, + deviceDataService)), + + productOperator + .getMetadata() + .flatMapIterable(DeviceMetadata::getProperties) + .map(event -> new DevicePropertyMeasurement(productOperator.getId(), + eventBus, + event, + deviceDataService)) + ); } @Override public Mono getMeasurement(String id) { if ("properties".equals(id)) { - return productOperator.getMetadata() - .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)); + return Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, registry)); } if ("events".equals(id)) { - return productOperator.getMetadata() + return productOperator + .getMetadata() .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)); } - return productOperator.getMetadata() + return productOperator + .getMetadata() .flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id))) .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) //事件没获取到则尝试获取属性 - .switchIfEmpty(productOperator.getMetadata() - .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id))) - .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))); + .switchIfEmpty(productOperator + .getMetadata() + .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id))) + .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java index 98b574d08..58edcd519 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java @@ -53,6 +53,6 @@ public Mono getObject(String id) { protected Mono convertObject(DeviceProductEntity product) { return registry.getProduct(product.getId()) - .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService)); + .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService,registry)); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java index 6bb0b2c46..c17e67233 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java @@ -1,22 +1,29 @@ package org.jetlinks.community.device.measurements; +import lombok.Generated; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.gateway.DeviceMessageUtils; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.property.Property; import org.jetlinks.core.metadata.*; +import org.jetlinks.core.metadata.types.NumberType; import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.metadata.unit.ValueUnit; import org.jetlinks.reactor.ql.utils.CastUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -24,21 +31,21 @@ class DevicePropertiesMeasurement extends StaticMeasurement { private final EventBus eventBus; - private final DeviceMetadata metadata; - private final DeviceDataService dataService; private final String productId; + private final DeviceRegistry registry; + public DevicePropertiesMeasurement(String productId, EventBus eventBus, DeviceDataService dataService, - DeviceMetadata deviceMetadata) { + DeviceRegistry registry) { super(MeasurementDefinition.of("properties", "属性记录")); this.productId = productId; this.eventBus = eventBus; - this.metadata = deviceMetadata; this.dataService = dataService; + this.registry = registry; addDimension(new RealTimeDevicePropertyDimension()); addDimension(new HistoryDevicePropertyDimension()); @@ -55,29 +62,50 @@ Flux fromHistory(String deviceId, int history, Set createValue(String property, Object value) { + Map createValue(DeviceMetadata metadata, Property property) { return metadata - .getProperty(property) + .getProperty(property.getId()) .map(meta -> { Map values = new HashMap<>(); DataType type = meta.getValueType(); - Object val = type instanceof Converter ? ((Converter) type).convert(value) : value; + Object val; + if (type instanceof NumberType) { + NumberType numberType = ((NumberType) type); + val = NumberType.convertScaleNumber(property.getValue(), numberType.getScale(), numberType.getRound(), Function.identity()); + } else if (type instanceof Converter) { + val = ((Converter) type).convert(property.getValue()); + } else { + val = property.getValue(); + } values.put("formatValue", type.format(val)); values.put("value", val); - values.put("property", property); - + values.put("state", property.getState()); + values.put("property", property.getId()); + values.put("timestamp",property.getTimestamp()); + if (type instanceof UnitSupported) { + UnitSupported unitSupported = (UnitSupported) type; + values.put("unit", Optional.ofNullable(unitSupported.getUnit()) + .map(ValueUnit::getSymbol) + .orElse(null)); + } return values; }) .orElseGet(() -> { Map values = new HashMap<>(); - values.put("formatValue", value); - values.put("value", value); - values.put("property", property); + values.put("formatValue", property.getValue()); + values.put("value", property.getValue()); + values.put("state", property.getState()); + values.put("property", property.getId()); + values.put("timestamp",property.getTimestamp()); return values; }); } - Flux fromRealTime(String deviceId, Set properties) { + static Subscription.Feature[] clusterFeature = {Subscription.Feature.local, Subscription.Feature.broker}; + static Subscription.Feature[] nonClusterFeature = {Subscription.Feature.local}; + + + Flux fromRealTime(String deviceId, Set properties, boolean cluster) { Subscription subscription = Subscription.of( "realtime-device-properties-measurement", @@ -85,27 +113,32 @@ Flux fromRealTime(String deviceId, Set properties) { "/device/" + productId + "/" + deviceId + "/message/property/report", "/device/" + productId + "/" + deviceId + "/message/property/*/reply" }, - Subscription.Feature.local, Subscription.Feature.broker + cluster ? clusterFeature : nonClusterFeature ); - List props = metadata.getProperties(); - Map index = new HashMap<>(); - int idx = 0; - for (PropertyMetadata prop : props) { - if (properties.isEmpty() || properties.contains(prop.getId())) { - index.put(prop.getId(), idx++); - } - } - return - eventBus - .subscribe(subscription, DeviceMessage.class) - .flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg))) - .flatMap(map -> Flux - .fromIterable(map.entrySet()) - //对本次上报的属性进行排序 - .sort(Comparator.comparingInt(e -> index.getOrDefault(e.getKey(), 0)))) - .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis())) - .onErrorContinue((err, v) -> log.error(err.getMessage(), err)) - ; + return registry + .getDevice(deviceId) + .flatMap(DeviceOperator::getMetadata) + .flatMapMany(metadata -> { + List props = metadata.getProperties(); + Map index = new HashMap<>(); + int idx = 0; + for (PropertyMetadata prop : props) { + if (properties.isEmpty() || properties.contains(prop.getId())) { + index.put(prop.getId(), idx++); + } + } + return + eventBus + .subscribe(subscription, DeviceMessage.class) + .flatMap(msg -> Flux + .fromIterable(DeviceMessageUtils.tryGetCompleteProperties(msg)) + .filter(e -> index.containsKey(e.getId())) + //对本次上报的属性进行排序 + .sort(Comparator.comparingInt(e -> index.getOrDefault(e.getId(), 0))) + .map(e -> SimpleMeasurementValue.of(createValue(metadata, e), e.getTimestamp()))) + .onErrorContinue((err, v) -> log.error(err.getMessage(), err)) + ; + }); } static ConfigMetadata configMetadata = new DefaultConfigMetadata() @@ -127,11 +160,13 @@ static Set getPropertiesFromParameter(MeasurementParameter parameter) { private class HistoryDevicePropertyDimension implements MeasurementDimension { @Override + @Generated public DimensionDefinition getDefinition() { return CommonDimensionDefinition.history; } @Override + @Generated public DataType getValueType() { return new ObjectType() .addProperty("property", "属性", StringType.GLOBAL) @@ -140,11 +175,13 @@ public DataType getValueType() { } @Override + @Generated public ConfigMetadata getParams() { return configMetadata; } @Override + @Generated public boolean isRealTime() { return false; } @@ -167,11 +204,13 @@ public Flux getValue(MeasurementParameter parameter) { private class RealTimeDevicePropertyDimension implements MeasurementDimension { @Override + @Generated public DimensionDefinition getDefinition() { return CommonDimensionDefinition.realTime; } @Override + @Generated public DataType getValueType() { return new ObjectType() .addProperty("property", "属性", StringType.GLOBAL) @@ -180,11 +219,13 @@ public DataType getValueType() { } @Override + @Generated public ConfigMetadata getParams() { return configMetadata; } @Override + @Generated public boolean isRealTime() { return true; } @@ -196,12 +237,12 @@ public Flux getValue(MeasurementParameter parameter) { .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); //合并历史数据和实时数据 - return Flux.concat( + return Flux.concat( //查询历史数据 fromHistory(deviceId, history, getPropertiesFromParameter(parameter)) , //从消息网关订阅实时事件消息 - fromRealTime(deviceId, getPropertiesFromParameter(parameter)) + fromRealTime(deviceId, getPropertiesFromParameter(parameter), parameter.getBoolean("cluster", true)) ); }); } From a1e88bc6086e6c01017e9e419943fce028416c55 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 31 Oct 2023 17:05:07 +0800 Subject: [PATCH 3/3] build(deps): bump org.json:json from 20230227 to 20231013 (#435) Bumps [org.json:json](https://github.com/douglascrockford/JSON-java) from 20230227 to 20231013. - [Release notes](https://github.com/douglascrockford/JSON-java/releases) - [Changelog](https://github.com/stleary/JSON-java/blob/master/docs/RELEASES.md) - [Commits](https://github.com/douglascrockford/JSON-java/commits) --- updated-dependencies: - dependency-name: org.json:json dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 433c2c01f..94c97a756 100644 --- a/pom.xml +++ b/pom.xml @@ -223,7 +223,7 @@ org.json json - 20230227 + 20231013