Skip to content

Commit

Permalink
feat(b8): notify message
Browse files Browse the repository at this point in the history
  • Loading branch information
rokam committed Jul 23, 2024
1 parent 248c2a0 commit da41b0e
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 63 deletions.
16 changes: 10 additions & 6 deletions midealocal/devices/b8/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
B8CleanMode,
B8ControlType,
B8DeviceAttributes,
B8ErrorCanFixDescription,
B8ErrorType,
B8FanLevel,
B8FunctionType,
B8MopState,
B8Moviment,
B8Speed,
B8WaterLevel,
B8WorkMode,
B8WorkStatus,
)
from midealocal.devices.b8.message import (
MessageB8Response,
Expand Down Expand Up @@ -55,18 +59,18 @@ def __init__(
model=model,
subtype=subtype,
attributes={
B8DeviceAttributes.WORK_STATUS: None,
B8DeviceAttributes.FUNCTION_TYPE: None,
B8DeviceAttributes.WORK_STATUS: B8WorkStatus.NONE.name.lower(),
B8DeviceAttributes.FUNCTION_TYPE: B8FunctionType.NONE.name.lower(),
B8DeviceAttributes.CONTROL_TYPE: B8ControlType.NONE.name.lower(),
B8DeviceAttributes.MOVE_DIRECTION: B8Moviment.NONE.name.lower(),
B8DeviceAttributes.CLEAN_MODE: B8CleanMode.NONE.name.lower(),
B8DeviceAttributes.FAN_LEVEL: B8FanLevel.OFF.name.lower(),
B8DeviceAttributes.AREA: None,
B8DeviceAttributes.AREA: 0,
B8DeviceAttributes.WATER_LEVEL: B8WaterLevel.OFF.name.lower(),
B8DeviceAttributes.VOICE_VOLUME: 0,
B8DeviceAttributes.MOP: B8MopState.OFF,
B8DeviceAttributes.MOP: B8MopState.OFF.name.lower(),
B8DeviceAttributes.CARPET_SWITCH: False,
B8DeviceAttributes.SPEED: None,
B8DeviceAttributes.SPEED: B8Speed.HIGH.name.lower(),
B8DeviceAttributes.HAVE_RESERVE_TASK: False,
B8DeviceAttributes.BATTERY_PERCENT: 0,
B8DeviceAttributes.WORK_TIME: 0,
Expand All @@ -75,7 +79,7 @@ def __init__(
B8DeviceAttributes.VOICE_SWITCH: False,
B8DeviceAttributes.COMMAND_SOURCE: False,
B8DeviceAttributes.ERROR_TYPE: B8ErrorType.NO.name.lower(),
B8DeviceAttributes.ERROR_DESC: None,
B8DeviceAttributes.ERROR_DESC: B8ErrorCanFixDescription.NO.name.lower(),
B8DeviceAttributes.DEVICE_ERROR: False,
B8DeviceAttributes.BOARD_COMMUNICATION_ERROR: False,
B8DeviceAttributes.LASER_SENSOR_SHELTER: False,
Expand Down
16 changes: 9 additions & 7 deletions midealocal/devices/b8/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class B8WorkMode(IntEnum):
class B8WorkStatus(IntEnum):
"""Midea B8 work status."""

NONE = 0x00
CHARGE = 0x01
WORK = 0x02
STOP = 0x03
Expand All @@ -68,6 +69,7 @@ class B8WorkStatus(IntEnum):
class B8FunctionType(IntEnum):
"""Midea B8 function type."""

NONE = 0x00
DUST_BOX_CLEANING = 0x01
WATER_TANK_CLEANING = 0x02

Expand Down Expand Up @@ -127,19 +129,19 @@ class B8WaterLevel(IntEnum):
HIGH = 0x3


class B8MopState(StrEnum):
class B8MopState(IntEnum):
"""Midea B8 mop state."""

OFF = "off"
ON = "on"
LACK_WATER = "lack_water"
OFF = 0x0
ON = 0x1
LACK_WATER = 0x2


class B8Speed(StrEnum):
class B8Speed(IntEnum):
"""Midea B8 speed."""

LOW = "low"
HIGH = "high"
LOW = 0x1
HIGH = 0x0


class B8ErrorType(IntEnum):
Expand Down
173 changes: 128 additions & 45 deletions midealocal/devices/b8/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from midealocal.devices.b8.const import (
B8CleanMode,
B8ControlType,
B8DeviceAttributes,
B8ErrorCanFixDescription,
B8ErrorRebootDescription,
B8ErrorType,
Expand All @@ -19,6 +20,9 @@
)
from midealocal.message import (
BodyType,
BoolParser,
IntEnumParser,
IntParser,
MessageBody,
MessageRequest,
MessageResponse,
Expand Down Expand Up @@ -119,56 +123,133 @@ def _body(self) -> bytearray:
)


class MessageB8WorkStatusBody(MessageBody):
"""B8 message work status body."""
class MessageB8GenericBody(MessageBody):
"""B8 message generic body."""

def __init__(self, body: bytearray) -> None:
"""Initialize B8 message work status body."""
def __init__(self, body: bytearray, offset: int) -> None:
"""Initialize B8 message generic body."""
super().__init__(body)
self.work_status = B8WorkStatus(body[2])
self.function_type = B8FunctionType(body[3])
self.control_type = B8ControlType(body[4])
self.move_direction = B8Moviment(body[5])
self.clean_mode = B8CleanMode(body[6])
self.fan_level = B8FanLevel(body[7])
self.area = body[8]
self.water_level = B8WaterLevel(body[9])
self.voice_volume = body[10]
mop = body[17]
if mop == 0:
self.mop = B8MopState.OFF
elif mop == 1:
self.mop = B8MopState.ON
else:
self.mop = B8MopState.LACK_WATER
self.carpet_switch = body[18] == 1
self.speed = B8Speed.LOW if body[20] == 1 else B8Speed.HIGH
self.have_reserve_task = body[11] != 0
self.battery_percent = body[12]
self.work_time = body[13]
err_user_high = body[19]
status_summary = body[14]
self.error_type = B8ErrorType(body[15])
self.uv_switch = status_summary & 0x01 > 0
self.wifi_switch = status_summary & 0x02 > 0
self.voice_switch = status_summary & 0x04 > 0
self.command_source = status_summary & 0x40 > 0
self.device_error = status_summary & 0x80 > 0
self.board_communication_error = err_user_high & 0x4 > 0
self.laser_sensor_shelter = err_user_high & 0x2 > 0
self.laser_sensor_error = err_user_high & 0x1 > 0
self.parser_list.extend(
[
IntEnumParser[B8WorkStatus](
B8DeviceAttributes.WORK_STATUS,
1 + offset,
B8WorkStatus,
),
IntEnumParser[B8FunctionType](
B8DeviceAttributes.FUNCTION_TYPE,
2 + offset,
B8FunctionType,
),
IntEnumParser[B8ControlType](
B8DeviceAttributes.CONTROL_TYPE,
3 + offset,
B8ControlType,
),
IntEnumParser[B8Moviment](
B8DeviceAttributes.MOVE_DIRECTION,
4 + offset,
B8Moviment,
),
IntEnumParser[B8CleanMode](
B8DeviceAttributes.CLEAN_MODE,
5 + offset,
B8CleanMode,
),
IntEnumParser[B8FanLevel](
B8DeviceAttributes.FAN_LEVEL,
6 + offset,
B8FanLevel,
),
IntParser(B8DeviceAttributes.AREA, 7 + offset),
IntEnumParser[B8WaterLevel](
B8DeviceAttributes.WATER_LEVEL,
8 + offset,
B8WaterLevel,
),
IntParser(B8DeviceAttributes.VOICE_VOLUME, 9 + offset, max_value=100),
BoolParser(
B8DeviceAttributes.HAVE_RESERVE_TASK,
10 + offset,
),
IntParser(
B8DeviceAttributes.BATTERY_PERCENT,
11 + offset,
max_value=100,
),
IntParser(B8DeviceAttributes.WORK_TIME, 12 + offset),
BoolParser(B8DeviceAttributes.UV_SWITCH, 13 + offset, bit=0),
BoolParser(B8DeviceAttributes.WIFI_SWITCH, 13 + offset, bit=1),
BoolParser(B8DeviceAttributes.VOICE_SWITCH, 13 + offset, bit=2),
BoolParser(B8DeviceAttributes.COMMAND_SOURCE, 13 + offset, bit=6),
BoolParser(B8DeviceAttributes.DEVICE_ERROR, 13 + offset, bit=7),
IntEnumParser[B8ErrorType](
B8DeviceAttributes.ERROR_TYPE,
14 + offset,
B8ErrorType,
),
IntEnumParser[B8MopState](
B8DeviceAttributes.MOP,
16 + offset,
B8MopState,
default_value=B8MopState.LACK_WATER,
),
BoolParser(B8DeviceAttributes.CARPET_SWITCH, 17 + offset),
BoolParser(
B8DeviceAttributes.LASER_SENSOR_ERROR,
18 + offset,
bit=0,
),
BoolParser(
B8DeviceAttributes.LASER_SENSOR_SHELTER,
18 + offset,
bit=1,
),
BoolParser(
B8DeviceAttributes.BOARD_COMMUNICATION_ERROR,
18 + offset,
bit=2,
),
IntEnumParser[B8Speed](B8DeviceAttributes.SPEED, 19 + offset, B8Speed),
],
)
self.parse_all()

# Error description without parser
self.error_desc: (
B8ErrorCanFixDescription
| B8ErrorRebootDescription
| B8ErrorWarningDescription
| None
) = None
if self.error_type == B8ErrorType.CAN_FIX:
self.error_desc = B8ErrorCanFixDescription(body[16])
elif self.error_type == B8ErrorType.REBOOT:
self.error_desc = B8ErrorRebootDescription(body[16])
elif self.error_type == B8ErrorType.WARNING:
self.error_desc = B8ErrorWarningDescription(body[16])
) = B8ErrorCanFixDescription.NO
error_type = getattr(self, B8DeviceAttributes.ERROR_TYPE, B8ErrorType.NO)
if error_type == B8ErrorType.CAN_FIX:
self.error_desc = B8ErrorCanFixDescription(
self.read_byte(body, 15 + offset),
)
elif error_type == B8ErrorType.REBOOT:
self.error_desc = B8ErrorRebootDescription(
self.read_byte(body, 15 + offset),
)
elif error_type == B8ErrorType.WARNING:
self.error_desc = B8ErrorWarningDescription(
self.read_byte(body, 15 + offset),
)


class MessageB8WorkStatusBody(MessageB8GenericBody):
"""B8 message work status body."""

def __init__(self, body: bytearray) -> None:
"""Initialize B8 message work status body."""
super().__init__(body, 1)


class MessageB8NotifyBody(MessageB8GenericBody):
"""B8 message notify body."""

def __init__(self, body: bytearray) -> None:
"""Initialize B8 message notify body."""
super().__init__(body, 0)


class MessageB8Response(MessageResponse):
Expand All @@ -183,7 +264,7 @@ def __init__(self, message: bytes) -> None:
)
if body is not None:
self.set_body(body)
self.set_attr()
self.set_attr()

@staticmethod
def parse_body(message_type: MessageType, body: bytearray) -> MessageBody | None:
Expand All @@ -196,4 +277,6 @@ def parse_body(message_type: MessageType, body: bytearray) -> MessageBody | None
and status_type == B8StatusType.X01
):
return MessageB8WorkStatusBody(body)
if message_type == MessageType.notify1 and body_type == BodyType.X42:
return MessageB8NotifyBody(body)
return None
1 change: 1 addition & 0 deletions midealocal/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class BodyType(IntEnum):
X31 = 0x31
X32 = 0x32
X41 = 0x41
X42 = 0x42
X80 = 0x80


Expand Down
22 changes: 17 additions & 5 deletions tests/devices/b8/device_b8_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
B8DeviceAttributes,
B8ErrorType,
B8FanLevel,
B8FunctionType,
B8MopState,
B8Moviment,
B8Speed,
B8WaterLevel,
B8WorkStatus,
)
from midealocal.devices.b8.message import (
MessageQuery,
Expand Down Expand Up @@ -43,8 +46,14 @@ def _setup_device(self) -> None:

def test_initial_attributes(self) -> None:
"""Test initial attributes."""
assert self.device.attributes[B8DeviceAttributes.WORK_STATUS] is None
assert self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] is None
assert (
self.device.attributes[B8DeviceAttributes.WORK_STATUS]
== B8WorkStatus.NONE.name.lower()
)
assert (
self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE]
== B8FunctionType.NONE.name.lower()
)
assert (
self.device.attributes[B8DeviceAttributes.CONTROL_TYPE]
== B8ControlType.NONE.name.lower()
Expand All @@ -61,7 +70,7 @@ def test_initial_attributes(self) -> None:
self.device.attributes[B8DeviceAttributes.FAN_LEVEL]
== B8FanLevel.OFF.name.lower()
)
assert self.device.attributes[B8DeviceAttributes.AREA] is None
assert self.device.attributes[B8DeviceAttributes.AREA] == 0
assert (
self.device.attributes[B8DeviceAttributes.WATER_LEVEL]
== B8WaterLevel.OFF.name.lower()
Expand All @@ -72,7 +81,10 @@ def test_initial_attributes(self) -> None:
== B8MopState.OFF.name.lower()
)
assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False
assert self.device.attributes[B8DeviceAttributes.SPEED] is None
assert (
self.device.attributes[B8DeviceAttributes.SPEED]
== B8Speed.HIGH.name.lower()
)
assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False
assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 0
assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 0
Expand All @@ -84,7 +96,7 @@ def test_initial_attributes(self) -> None:
self.device.attributes[B8DeviceAttributes.ERROR_TYPE]
== B8ErrorType.NO.name.lower()
)
assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] is None
assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] == "no"
assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is False
assert (
self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR]
Expand Down

0 comments on commit da41b0e

Please sign in to comment.