Skip to content

Commit

Permalink
chore: typing b6
Browse files Browse the repository at this point in the history
  • Loading branch information
chemelli74 committed Jun 3, 2024
1 parent 4f18938 commit 173202c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 31 deletions.
17 changes: 9 additions & 8 deletions midealocal/devices/b6/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import sys
from typing import Any

from .message import MessageB6Response, MessageQuery, MessageSet

Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(
model: str,
subtype: int,
customize: str,
):
) -> None:
super().__init__(
name=name,
device_id=device_id,
Expand Down Expand Up @@ -66,17 +67,17 @@ def __init__(
self.set_customize(customize)

@property
def speed_count(self):
def speed_count(self) -> int:
return len(self._speeds) - 1

@property
def preset_modes(self):
def preset_modes(self) -> list[str]:
return list(self._speeds.values())

def build_query(self):
def build_query(self) -> list[MessageQuery]:
return [MessageQuery(self._protocol_version)]

def process_message(self, msg):
def process_message(self, msg: bytes) -> dict[str, Any]:
message = MessageB6Response(msg)
self._protocol_version = message.protocol_version
_LOGGER.debug(f"[{self.device_id}] Received: {message}")
Expand Down Expand Up @@ -105,7 +106,7 @@ def process_message(self, msg):
new_status[str(status)] = self._attributes[status]
return new_status

def set_attribute(self, attr, value):
def set_attribute(self, attr: str, value: Any) -> None:
message = None
if attr == DeviceAttributes.fan_speed:
if value < len(self._speeds):
Expand All @@ -130,7 +131,7 @@ def set_attribute(self, attr, value):
if message is not None:
self.build_send(message)

def turn_on(self, fan_speed=None, mode=None):
def turn_on(self, fan_speed: int | None = None, mode: str | None = None) -> None:
message = MessageSet(self._protocol_version)
message.power = True
if fan_speed is not None and fan_speed < len(self._speeds):
Expand All @@ -143,7 +144,7 @@ def turn_on(self, fan_speed=None, mode=None):
]
self.build_send(message)

def set_customize(self, customize):
def set_customize(self, customize: str) -> None:
self._speeds = self._default_speeds
self._power_speed = self._default_power_speed
if customize and len(customize) > 0:
Expand Down
46 changes: 23 additions & 23 deletions midealocal/devices/b6/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@


class MessageB6Base(MessageRequest):
def __init__(self, protocol_version, message_type, body_type):
def __init__(
self, protocol_version: int, message_type: int, body_type: int
) -> None:
super().__init__(
device_type=0xB6,
protocol_version=protocol_version,
Expand All @@ -16,49 +18,49 @@ def __init__(self, protocol_version, message_type, body_type):
)

@property
def _body(self):
def _body(self) -> bytearray:
raise NotImplementedError


class MessageQuery(MessageB6Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.query,
body_type=0x11 if protocol_version == 2 else 0x31,
)

@property
def _body(self):
def _body(self) -> bytearray:
return bytearray([])


class MessageQueryTips(MessageB6Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.query,
body_type=0x02,
)

@property
def _body(self):
def _body(self) -> bytearray:
return bytearray([0x01])


class MessageSet(MessageB6Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
body_type=0x22 if protocol_version in [0x00, 0x01] else 0x11,
)
self.light = None
self.power = None
self.fan_level = None
self.light: int | None = None
self.power: bool | None = None
self.fan_level: int | None = None

@property
def _body(self):
def _body(self) -> bytearray:
if self.protocol_version in [0x00, 0x01]:
light = 0xFF
value2 = 0xFF
Expand Down Expand Up @@ -116,22 +118,22 @@ def _body(self):


class B6FeedbackBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)


class B6GeneralBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
if body[1] != 0xFF:
self.light = body[1] > 0x00
self.power = False
fan_level = None
fan_level: int = 0
if body[2] != 0xFF:
self.power = body[2] in [0x02, 0x06, 0x07, 0x14, 0x15, 0x16]
if body[2] in [0x14, 0x16]:
fan_level = 0x16
if fan_level is None and body[3] != 0xFF:
if fan_level == 0 and body[3] != 0xFF:
fan_level = body[3]
if fan_level > 100:
if fan_level < 130:
Expand All @@ -142,15 +144,13 @@ def __init__(self, body):
fan_level = 3
else:
fan_level = 4
else:
self.fan_level = fan_level
self.fan_level = 0 if fan_level is None else fan_level
self.fan_level = fan_level
self.oilcup_full = (body[5] & 0x01) > 0
self.cleaning_reminder = (body[5] & 0x02) > 0


class B6NewProtocolBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
if body[1] == 0x01:
pack_bytes = body[3 : 3 + body[2]]
Expand All @@ -166,7 +166,7 @@ def __init__(self, body):


class B6SpecialBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
if body[2] != 0xFF:
self.light = body[2] > 0x00
Expand All @@ -178,13 +178,13 @@ def __init__(self, body):


class B6ExceptionBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)


class MessageB6Response(MessageResponse):
def __init__(self, message):
super().__init__(message)
def __init__(self, message: bytes) -> None:
super().__init__(bytearray(message))
if (
self.message_type == MessageType.set
and self.body_type == 0x22
Expand Down

0 comments on commit 173202c

Please sign in to comment.