Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c3): add disinfect and fix tbh set error #340

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions midealocal/devices/c3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
MessageC3Response,
MessageQuery,
MessageQueryBasic,
MessageQueryDisinfect,
MessageQueryECO,
MessageQueryHMIPara,
MessageQueryInstall,
MessageQuerySilence,
MessageQueryUnitPara,
MessageSet,
MessageSetDisinfect,
MessageSetECO,
MessageSetSilent,
)
Expand Down Expand Up @@ -117,7 +123,12 @@ def build_query(self) -> list[MessageQuery]:
"""Midea C3 device build query."""
return [
MessageQueryBasic(self._message_protocol_version),
MessageQueryDisinfect(self._message_protocol_version),
MessageQuerySilence(self._message_protocol_version),
MessageQueryECO(self._message_protocol_version),
MessageQueryUnitPara(self._message_protocol_version),
MessageQueryHMIPara(self._message_protocol_version),
MessageQueryInstall(self._message_protocol_version),
]

def process_message(self, msg: bytes) -> dict[str, Any]:
Expand Down Expand Up @@ -221,30 +232,33 @@ def make_message_set(self) -> MessageSet:
message.room_target_temp = self._attributes[DeviceAttributes.room_target_temp]
message.zone1_curve = self._attributes[DeviceAttributes.zone1_curve]
message.zone2_curve = self._attributes[DeviceAttributes.zone2_curve]
message.disinfect = self._attributes[DeviceAttributes.disinfect]
message.tbh = self._attributes[DeviceAttributes.tbh]
message.fast_dhw = self._attributes[DeviceAttributes.fast_dhw]
return message

def set_attribute(self, attr: str, value: bool | int | str) -> None:
"""Midea C3 device set attribute."""
message: MessageSet | MessageSetECO | MessageSetSilent | None = None
message: (
MessageSet | MessageSetECO | MessageSetSilent | MessageSetDisinfect | None
) = None
if attr in [
DeviceAttributes.zone1_power,
DeviceAttributes.zone2_power,
DeviceAttributes.dhw_power,
DeviceAttributes.zone1_curve,
DeviceAttributes.zone2_curve,
DeviceAttributes.disinfect,
DeviceAttributes.tbh,
DeviceAttributes.fast_dhw,
DeviceAttributes.dhw_target_temp,
DeviceAttributes.tbh,
]:
message = self.make_message_set()
setattr(message, str(attr), value)
elif attr == DeviceAttributes.eco_mode:
message = MessageSetECO(self._message_protocol_version)
setattr(message, str(attr), value)
elif attr == DeviceAttributes.disinfect:
message = MessageSetDisinfect(self._message_protocol_version)
setattr(message, str(attr), value)
elif attr in [
DeviceAttributes.silent_mode.value,
DeviceAttributes.SILENT_LEVEL.value,
Expand Down
224 changes: 205 additions & 19 deletions midealocal/devices/c3/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,46 @@ def __init__(self, protocol_version: int) -> None:
super().__init__(protocol_version, ListTypes.X05)


class MessageQueryECO(MessageQuery):
"""C3 Message query ECO."""

def __init__(self, protocol_version: int) -> None:
"""Initialize C3 message query silence."""
super().__init__(protocol_version, ListTypes.X07)


class MessageQueryInstall(MessageQuery):
"""C3 Message query INSTALL."""

def __init__(self, protocol_version: int) -> None:
"""Initialize C3 message query silence."""
super().__init__(protocol_version, ListTypes.X08)


class MessageQueryDisinfect(MessageQuery):
"""C3 Message query Disinfect."""

def __init__(self, protocol_version: int) -> None:
"""Initialize C3 message query silence."""
super().__init__(protocol_version, ListTypes.X09)


class MessageQueryUnitPara(MessageQuery):
"""C3 Message query UNITPARA."""

def __init__(self, protocol_version: int) -> None:
"""Initialize C3 message query silence."""
super().__init__(protocol_version, ListTypes.X10)


class MessageQueryHMIPara(MessageQuery):
"""C3 Message query HMIPARA."""

def __init__(self, protocol_version: int) -> None:
"""Initialize C3 message query silence."""
super().__init__(protocol_version, ListTypes.X0A)


class MessageSet(MessageC3Base):
"""C3 message set."""

Expand All @@ -87,7 +127,6 @@ def __init__(self, protocol_version: int) -> None:
self.room_target_temp = 25.0
self.zone1_curve = False
self.zone2_curve = False
self.disinfect = False
self.fast_dhw = False
self.tbh = False

Expand All @@ -100,7 +139,7 @@ def _body(self) -> bytearray:
# Byte 7
zone1_curve = 0x01 if self.zone1_curve else 0x00
zone2_curve = 0x02 if self.zone2_curve else 0x00
disinfect = 0x04 if self.disinfect or self.tbh else 0x00
tbh = 0x04 if self.tbh else 0x00
fast_dhw = 0x08 if self.fast_dhw else 0x00
room_target_temp = int(self.room_target_temp * 2)
zone1_target_temp = int(self.zone_target_temp[0])
Expand All @@ -114,7 +153,7 @@ def _body(self) -> bytearray:
zone2_target_temp,
dhw_target_temp,
room_target_temp,
zone1_curve | zone2_curve | disinfect | fast_dhw,
zone1_curve | zone2_curve | tbh | fast_dhw,
],
)

Expand Down Expand Up @@ -168,31 +207,65 @@ def _body(self) -> bytearray:
return bytearray([eco_mode, 0x00, 0x00, 0x00, 0x00, 0x00])


class C3MessageBody(MessageBody):
"""C3 message body."""
class MessageSetDisinfect(MessageC3Base):
"""C3 message set Disinfect."""

def __init__(self, protocol_version: int) -> None:
"""Initialize C3 message set eco."""
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
body_type=ListTypes.X09,
)
self.disinfect = False

@property
def _body(self) -> bytearray:
disinfect = 0x01 if self.disinfect else 0

return bytearray([disinfect, 0x00, 0x00, 0x00])


class C3BasicBody(MessageBody):
"""C3 Basic message body."""

def __init__(self, body: bytearray, data_offset: int = 0) -> None:
"""Initialize C3 message body."""
super().__init__(body)
# BodyBytes 1
self.zone1_power = body[data_offset + 0] & 0x01 > 0
self.zone2_power = body[data_offset + 0] & 0x02 > 0
self.dhw_power = body[data_offset + 0] & 0x04 > 0
self.zone1_curve = body[data_offset + 0] & 0x08 > 0
self.zone2_curve = body[data_offset + 0] & 0x10 > 0
self.disinfect = body[data_offset + 0] & 0x20 > 0
self.tbh = body[data_offset + 0] & 0x20 > 0
self.fast_dhw = body[data_offset + 0] & 0x40 > 0
self.remote_onoff = body[data_offset + 0] & 0x80 > 0
# BodyBytes 2
self.heat = body[data_offset + 1] & 0x01 > 0
self.cool = body[data_offset + 1] & 0x02 > 0
self.dhw = body[data_offset + 1] & 0x04 > 0
self.double_zone = body[data_offset + 1] & 0x08 > 0
self.zone_temp_type = [
body[data_offset + 1] & 0x10 > 0,
body[data_offset + 1] & 0x20 > 0,
]
self.room_thermal_support = body[data_offset + 1] & 0x40 > 0
self.room_thermal_state = body[data_offset + 1] & 0x80 > 0
# BodyBytes 3
self.time_set = body[data_offset + 2] & 0x01 > 0
self.silent_mode = body[data_offset + 2] & 0x02 > 0
self.holiday_on = body[data_offset + 2] & 0x04 > 0
self.eco_mode = body[data_offset + 2] & 0x08 > 0
self.zone_terminal_type = body[data_offset + 2]
# BodyBytes 4
self.mode = body[data_offset + 3]
self.mode_auto = body[data_offset + 4]
# zone1, zone2
self.zone_target_temp = [body[data_offset + 5], body[data_offset + 6]]
self.dhw_target_temp = body[data_offset + 7]
self.room_target_temp = body[data_offset + 8] / 2
# zone1, zone2
self.zone_heating_temp_max = [body[data_offset + 9], body[data_offset + 13]]
self.zone_heating_temp_min = [body[data_offset + 10], body[data_offset + 14]]
self.zone_cooling_temp_max = [body[data_offset + 11], body[data_offset + 15]]
Expand All @@ -203,27 +276,34 @@ def __init__(self, body: bytearray, data_offset: int = 0) -> None:
self.dhw_temp_min = body[data_offset + 20]
self.tank_actual_temperature = body[data_offset + 21]
self.error_code = body[data_offset + 22]
self.tbh_control = body[data_offset + 23] & 0x80 > 0


class C3Notify1MessageBody(MessageBody):
"""C3 notify1 message body."""
class C3EnergyBody(MessageBody):
"""C3 Energy MSG_TYPE_UP_POWER4 message body."""

def __init__(self, body: bytearray, data_offset: int = 0) -> None:
"""Initialize C3 notify1 message body."""
super().__init__(body)
status_byte = body[data_offset]
self.status_tbh = (status_byte & 0x08) > 0
self.status_dhw = (status_byte & 0x04) > 0
self.status_ibh = (status_byte & 0x02) > 0
# bit0
self.status_heating = (status_byte & 0x01) > 0

# bit1
self.status_cool = (status_byte & 0x02) > 0
# bit2
self.status_dhw = (status_byte & 0x04) > 0
# bit3
self.status_tbh = (status_byte & 0x08) > 0
# bit4
self.status_ibh = (status_byte & 0x10) > 0
# total_energy_consumption
self.total_energy_consumption = (
(body[data_offset + 1] << 32)
+ (body[data_offset + 2] << 16)
+ (body[data_offset + 3] << 8)
+ (body[data_offset + 4])
)

# total_produced_energy
self.total_produced_energy = (
(body[data_offset + 5] << 32)
+ (body[data_offset + 6] << 16)
Expand All @@ -234,13 +314,18 @@ def __init__(self, body: bytearray, data_offset: int = 0) -> None:
self.outdoor_temperature = (
(base_value - 256) if base_value > TEMP_NEG_VALUE else base_value
)
self.t4 = body[data_offset + 9]
self.zone1_temp_set = body[data_offset + 10]
self.zone2_temp_set = body[data_offset + 11]
self.t5s = body[data_offset + 12]
self.tas = body[data_offset + 13]


class C3QuerySilenceMessageBody(MessageBody):
"""C3 Query silence message body."""
class C3SilenceBody(MessageBody):
"""C3 Silence message body."""

def __init__(self, body: bytearray, data_offset: int = 0) -> None:
"""Initialize C3 notify1 message body."""
"""Initialize C3 query silence message body."""
super().__init__(body)
self.silent_mode = body[data_offset] & 0x1 > 0
self.silent_level = C3SilentLevel(
Expand All @@ -263,6 +348,103 @@ def __init__(self, body: bytearray, data_offset: int = 0) -> None:
# silence_timer2_endmin: Byte 9


class C3DisinfectBody(MessageBody):
"""C3 Disinfect message body."""

def __init__(self, body: bytearray, data_offset: int = 0) -> None:
"""Initialize C3 Disinfect message body."""
super().__init__(body)
self.disinfect = body[data_offset] & 0x01 > 0
self.disinfect_run = body[data_offset] & 0x02 > 0
self.disinfect_set_weekday = body[data_offset + 1]
self.disinfect_start_hour = body[data_offset + 2]
self.disinfect_start_minutes = body[data_offset + 3]


class C3UnitParaBody(MessageBody):
"""C3 UnitPara message body."""

def __init__(self, body: bytearray, data_offset: int = 0) -> None:
"""Initialize C3 UnitPara message body."""
super().__init__(body)
self.comp_run_freq = body[data_offset]
self.unit_mode_run = body[data_offset + 1]
self.fan_speed = body[data_offset + 3] * 10
self.fg_capacity_need = body[data_offset + 5]
self.temp_t3 = body[data_offset + 6]
self.temp_t4 = body[data_offset + 7]
self.temp_tp = body[data_offset + 8]
self.temp_tw_in = body[data_offset + 9]
self.temp_tw_out = body[data_offset + 10]
self.temp_tsolar = body[data_offset + 11]
self.hydbox_subtype = body[data_offset + 12]
self.fg_usb_info_connect = body[data_offset + 13]
# self.usb_index_max body[data_offset + 14]
# self.odu_comp_current body[data_offset + 16]
self.odu_voltage = body[data_offset + 17] * 256 + body[data_offset + 18]
self.exv_current = body[data_offset + 19] * 256 + body[data_offset + 20]
self.odu_model = body[data_offset + 21]
# self.unit_online_num body[data_offset + 22]
# self.current_code body[data_offset + 23]
self.temp_t1 = body[data_offset + 33]
self.temp_tw2 = body[data_offset + 34]
self.temp_t2 = body[data_offset + 35]
self.temp_t2b = body[data_offset + 36]
self.temp_t5 = body[data_offset + 37]
self.temp_ta = body[data_offset + 38]
self.temp_tb_t1 = body[data_offset + 39]
self.temp_tb_t2 = body[data_offset + 40]
self.hydrobox_capacity = body[data_offset + 41]
self.pressure_high = body[data_offset + 42] * 256 + body[data_offset + 43]
self.pressure_low = body[data_offset + 44] * 256 + body[data_offset + 45]
self.temp_th = body[data_offset + 46]
self.machine_type = body[data_offset + 47]
self.odu_target_fre = body[data_offset + 48]
self.dc_current = body[data_offset + 49]
self.temp_tf = body[data_offset + 51]
self.idu_t1s1 = body[data_offset + 52]
self.idu_t1s2 = body[data_offset + 53]
self.water_flower = body[data_offset + 54] * 256 + body[data_offset + 55]
self.odu_plan_vol_lmt = body[data_offset + 56]
self.current_unit_capacity = body[data_offset + 57]
self.sphera_ahs_voltage = body[data_offset + 59]
self.temp_t4a_ver = body[data_offset + 60]
self.water_pressure = body[data_offset + 61] * 256 + body[data_offset + 62]
self.room_rel_hum = body[data_offset + 63]
self.pwm_pump_out = body[data_offset + 63]
self.total_electricity0 = (
(body[data_offset + 66] << 32)
+ (body[data_offset + 67] << 16)
+ (body[data_offset + 68] << 8)
+ (body[data_offset + 69])
)
self.total_thermal0 = (
(body[data_offset + 70] << 32)
+ (body[data_offset + 71] << 16)
+ (body[data_offset + 72] << 8)
+ (body[data_offset + 73])
)
self.heat_elec_total_consum0 = (
(body[data_offset + 74] << 32)
+ (body[data_offset + 75] << 16)
+ (body[data_offset + 76] << 8)
+ (body[data_offset + 77])
)
self.heat_elec_total_capacity0 = (
(body[data_offset + 78] << 32)
+ (body[data_offset + 79] << 16)
+ (body[data_offset + 80] << 8)
+ (body[data_offset + 81])
)
self.instant_power0 = (body[data_offset + 82] << 8) + (body[data_offset + 83])
self.instant_renew_power0 = (body[data_offset + 84] << 8) + (
body[data_offset + 85]
)
self.total_renew_power0 = (body[data_offset + 84] << 8) + (
body[data_offset + 85]
)


class MessageC3Response(MessageResponse):
"""C3 message response."""

Expand All @@ -274,11 +456,15 @@ def __init__(self, message: bytes) -> None:
in [MessageType.set, MessageType.notify1, MessageType.query]
and self.body_type == ListTypes.X01
) or self.message_type == MessageType.notify2:
self.set_body(C3MessageBody(super().body, data_offset=1))
self.set_body(C3BasicBody(super().body, data_offset=1))
elif (
self.message_type == MessageType.notify1 and self.body_type == ListTypes.X04
):
self.set_body(C3Notify1MessageBody(super().body, data_offset=1))
self.set_body(C3EnergyBody(super().body, data_offset=1))
elif self.message_type == MessageType.query and self.body_type == ListTypes.X05:
self.set_body(C3QuerySilenceMessageBody(super().body, data_offset=1))
self.set_body(C3SilenceBody(super().body, data_offset=1))
elif self.body_type == ListTypes.X09:
self.set_body(C3DisinfectBody(super().body, data_offset=1))
elif self.body_type == ListTypes.X10:
self.set_body(C3UnitParaBody(super().body, data_offset=1))
self.set_attr()
Loading