Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Nov 1, 2023
2 parents 865b0ec + a1e88bc commit edd1b9a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.jetlinks.community.device.measurements;

import lombok.Generated;
import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.dashboard.Measurement;
import org.jetlinks.community.dashboard.ObjectDefinition;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.DeviceMetadata;
import reactor.core.publisher.Flux;
Expand All @@ -21,33 +23,40 @@ public class DeviceDashboardObject implements DashboardObject {

private final DeviceDataService deviceDataService;

private final DeviceRegistry registry;

private DeviceDashboardObject(String id, String name,
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService) {
DeviceDataService dataService,
DeviceRegistry registry) {
this.id = id;
this.name = name;
this.productOperator = productOperator;
this.eventBus = eventBus;
this.deviceDataService = dataService;
this.registry = registry;
}

public static DeviceDashboardObject of(String id, String name,
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService ) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService);
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService,
DeviceRegistry registry) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService, registry);
}

@Override
public ObjectDefinition getDefinition() {
return new ObjectDefinition() {
@Override
@Generated
public String getId() {
return id;
}

@Override
@Generated
public String getName() {
return name;
}
Expand All @@ -56,40 +65,57 @@ public String getName() {

@Override
public Flux<Measurement> getMeasurements() {
return Flux.concat(

productOperator.getMetadata()
.flatMapIterable(DeviceMetadata::getEvents)
.map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)),

productOperator.getMetadata()
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)),

productOperator.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)),

productOperator.getMetadata()
.flatMapIterable(DeviceMetadata::getProperties)
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
);
return Flux
.concat(

productOperator
.getMetadata()
.flatMapIterable(DeviceMetadata::getEvents)
.map(event -> new DeviceEventMeasurement(productOperator.getId(),
eventBus,
event,
deviceDataService)),

Mono.just(new DevicePropertiesMeasurement(productOperator.getId(),
eventBus,
deviceDataService,
registry)),

productOperator
.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(),
eventBus,
metadata,
deviceDataService)),

productOperator
.getMetadata()
.flatMapIterable(DeviceMetadata::getProperties)
.map(event -> new DevicePropertyMeasurement(productOperator.getId(),
eventBus,
event,
deviceDataService))
);
}

@Override
public Mono<Measurement> getMeasurement(String id) {
if ("properties".equals(id)) {
return productOperator.getMetadata()
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata));
return Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, registry));
}
if ("events".equals(id)) {
return productOperator.getMetadata()
return productOperator
.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService));
}
return productOperator.getMetadata()
return productOperator
.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id)))
.<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
//事件没获取到则尝试获取属性
.switchIfEmpty(productOperator.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)));
.switchIfEmpty(productOperator
.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public Mono<DashboardObject> getObject(String id) {

protected Mono<DeviceDashboardObject> convertObject(DeviceProductEntity product) {
return registry.getProduct(product.getId())
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService));
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService,registry));
}
}
Original file line number Diff line number Diff line change
@@ -1,44 +1,51 @@
package org.jetlinks.community.device.measurements;

import lombok.Generated;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.property.Property;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.metadata.unit.ValueUnit;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
class DevicePropertiesMeasurement extends StaticMeasurement {

private final EventBus eventBus;

private final DeviceMetadata metadata;

private final DeviceDataService dataService;

private final String productId;

private final DeviceRegistry registry;

public DevicePropertiesMeasurement(String productId,
EventBus eventBus,
DeviceDataService dataService,
DeviceMetadata deviceMetadata) {
DeviceRegistry registry) {
super(MeasurementDefinition.of("properties", "属性记录"));
this.productId = productId;
this.eventBus = eventBus;
this.metadata = deviceMetadata;
this.dataService = dataService;
this.registry = registry;
addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension());

Expand All @@ -55,57 +62,83 @@ Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history, Set<Strin
.sort(MeasurementValue.sort());
}

Map<String, Object> createValue(String property, Object value) {
Map<String, Object> createValue(DeviceMetadata metadata, Property property) {
return metadata
.getProperty(property)
.getProperty(property.getId())
.map(meta -> {
Map<String, Object> values = new HashMap<>();
DataType type = meta.getValueType();
Object val = type instanceof Converter ? ((Converter<?>) type).convert(value) : value;
Object val;
if (type instanceof NumberType) {
NumberType<?> numberType = ((NumberType<?>) type);
val = NumberType.convertScaleNumber(property.getValue(), numberType.getScale(), numberType.getRound(), Function.identity());
} else if (type instanceof Converter) {
val = ((Converter<?>) type).convert(property.getValue());
} else {
val = property.getValue();
}
values.put("formatValue", type.format(val));
values.put("value", val);
values.put("property", property);

values.put("state", property.getState());
values.put("property", property.getId());
values.put("timestamp",property.getTimestamp());
if (type instanceof UnitSupported) {
UnitSupported unitSupported = (UnitSupported) type;
values.put("unit", Optional.ofNullable(unitSupported.getUnit())
.map(ValueUnit::getSymbol)
.orElse(null));
}
return values;
})
.orElseGet(() -> {
Map<String, Object> values = new HashMap<>();
values.put("formatValue", value);
values.put("value", value);
values.put("property", property);
values.put("formatValue", property.getValue());
values.put("value", property.getValue());
values.put("state", property.getState());
values.put("property", property.getId());
values.put("timestamp",property.getTimestamp());
return values;
});
}

Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties) {
static Subscription.Feature[] clusterFeature = {Subscription.Feature.local, Subscription.Feature.broker};
static Subscription.Feature[] nonClusterFeature = {Subscription.Feature.local};


Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties, boolean cluster) {

Subscription subscription = Subscription.of(
"realtime-device-properties-measurement",
new String[]{
"/device/" + productId + "/" + deviceId + "/message/property/report",
"/device/" + productId + "/" + deviceId + "/message/property/*/reply"
},
Subscription.Feature.local, Subscription.Feature.broker
cluster ? clusterFeature : nonClusterFeature
);
List<PropertyMetadata> props = metadata.getProperties();
Map<String, Integer> index = new HashMap<>();
int idx = 0;
for (PropertyMetadata prop : props) {
if (properties.isEmpty() || properties.contains(prop.getId())) {
index.put(prop.getId(), idx++);
}
}
return
eventBus
.subscribe(subscription, DeviceMessage.class)
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg)))
.flatMap(map -> Flux
.fromIterable(map.entrySet())
//对本次上报的属性进行排序
.sort(Comparator.comparingInt(e -> index.getOrDefault(e.getKey(), 0))))
.<MeasurementValue>map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
;
return registry
.getDevice(deviceId)
.flatMap(DeviceOperator::getMetadata)
.flatMapMany(metadata -> {
List<PropertyMetadata> props = metadata.getProperties();
Map<String, Integer> index = new HashMap<>();
int idx = 0;
for (PropertyMetadata prop : props) {
if (properties.isEmpty() || properties.contains(prop.getId())) {
index.put(prop.getId(), idx++);
}
}
return
eventBus
.subscribe(subscription, DeviceMessage.class)
.flatMap(msg -> Flux
.fromIterable(DeviceMessageUtils.tryGetCompleteProperties(msg))
.filter(e -> index.containsKey(e.getId()))
//对本次上报的属性进行排序
.sort(Comparator.comparingInt(e -> index.getOrDefault(e.getId(), 0)))
.<MeasurementValue>map(e -> SimpleMeasurementValue.of(createValue(metadata, e), e.getTimestamp())))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
;
});
}

static ConfigMetadata configMetadata = new DefaultConfigMetadata()
Expand All @@ -127,11 +160,13 @@ static Set<String> getPropertiesFromParameter(MeasurementParameter parameter) {
private class HistoryDevicePropertyDimension implements MeasurementDimension {

@Override
@Generated
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.history;
}

@Override
@Generated
public DataType getValueType() {
return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL)
Expand All @@ -140,11 +175,13 @@ public DataType getValueType() {
}

@Override
@Generated
public ConfigMetadata getParams() {
return configMetadata;
}

@Override
@Generated
public boolean isRealTime() {
return false;
}
Expand All @@ -167,11 +204,13 @@ public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
private class RealTimeDevicePropertyDimension implements MeasurementDimension {

@Override
@Generated
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.realTime;
}

@Override
@Generated
public DataType getValueType() {
return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL)
Expand All @@ -180,11 +219,13 @@ public DataType getValueType() {
}

@Override
@Generated
public ConfigMetadata getParams() {
return configMetadata;
}

@Override
@Generated
public boolean isRealTime() {
return true;
}
Expand All @@ -196,12 +237,12 @@ public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history, getPropertiesFromParameter(parameter))
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId, getPropertiesFromParameter(parameter))
fromRealTime(deviceId, getPropertiesFromParameter(parameter), parameter.getBoolean("cluster", true))
);
});
}
Expand Down
Loading

0 comments on commit edd1b9a

Please sign in to comment.