Skip to content

Commit

Permalink
chore: typing fc (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
chemelli74 authored Jun 4, 2024
1 parent ccb5cf9 commit 25d9e9a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 33 deletions.
21 changes: 11 additions & 10 deletions midealocal/devices/fc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import sys
from typing import Any

from .message import MessageFCResponse, MessageQuery, MessageSet

Expand Down Expand Up @@ -56,7 +57,7 @@ def __init__(
model: str,
subtype: int,
customize: str,
):
) -> None:
super().__init__(
name=name,
device_id=device_id,
Expand Down Expand Up @@ -91,25 +92,25 @@ def __init__(
self.set_customize(customize)

@property
def modes(self):
def modes(self) -> list[str]:
return list(MideaFCDevice._modes.values())

@property
def fan_speeds(self):
def fan_speeds(self) -> list[str]:
return list(MideaFCDevice._speeds.values())

@property
def screen_displays(self):
def screen_displays(self) -> list[str]:
return list(MideaFCDevice._screen_displays.values())

@property
def detect_modes(self):
def detect_modes(self) -> list[str]:
return self._detect_modes

def build_query(self):
def build_query(self) -> list[MessageQuery]:
return [MessageQuery(self._protocol_version)]

def process_message(self, msg):
def process_message(self, msg: bytes) -> dict[str, Any]:
message = MessageFCResponse(msg)
_LOGGER.debug(f"[{self.device_id}] Received: {message}")
new_status = {}
Expand Down Expand Up @@ -143,7 +144,7 @@ def process_message(self, msg):
new_status[str(status)] = self._attributes[status]
return new_status

def make_message_set(self):
def make_message_set(self) -> MessageSet:
message = MessageSet(self._protocol_version)
message.power = self._attributes[DeviceAttributes.power]
message.child_lock = self._attributes[DeviceAttributes.child_lock]
Expand Down Expand Up @@ -188,7 +189,7 @@ def make_message_set(self):
message.standby_detect = self._standby_detect
return message

def set_attribute(self, attr, value):
def set_attribute(self, attr: str, value: Any) -> None:
if attr == DeviceAttributes.prompt_tone:
self._attributes[DeviceAttributes.prompt_tone] = value
self.update_all({DeviceAttributes.prompt_tone.value: value})
Expand Down Expand Up @@ -220,7 +221,7 @@ def set_attribute(self, attr, value):
setattr(message, str(attr), value)
self.build_send(message)

def set_customize(self, customize):
def set_customize(self, customize: str) -> None:
self._standby_detect = self._standby_detect_default
if customize and len(customize) > 0:
try:
Expand Down
44 changes: 21 additions & 23 deletions midealocal/devices/fc/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
class MessageFCBase(MessageRequest):
_message_serial = 0

def __init__(self, protocol_version, message_type, body_type):
def __init__(
self, protocol_version: int, message_type: int, body_type: int
) -> None:
super().__init__(
device_type=0xFC,
protocol_version=protocol_version,
Expand All @@ -23,26 +25,26 @@ def __init__(self, protocol_version, message_type, body_type):
self._message_id = MessageFCBase._message_serial

@property
def _body(self):
def _body(self) -> bytearray:
raise NotImplementedError

@property
def body(self):
def body(self) -> bytearray:
body = bytearray([self.body_type]) + self._body + bytearray([self._message_id])
body.append(calculate(body))
return body


class MessageQuery(MessageFCBase):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.query,
body_type=0x41,
)

@property
def _body(self):
def _body(self) -> bytearray:
return bytearray(
[
0x00,
Expand All @@ -69,7 +71,7 @@ def _body(self):


class MessageSet(MessageFCBase):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
Expand All @@ -87,7 +89,7 @@ def __init__(self, protocol_version):
self.standby_detect = [40, 20]

@property
def _body(self):
def _body(self) -> bytearray:
# byte1 power
power = 0x01 if self.power else 0x00
detect = 0x08 if self.detect_mode > 0 else 0x00
Expand Down Expand Up @@ -136,20 +138,20 @@ def _body(self):


class FCGeneralMessageBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
self.power = (body[1] & 0x01) > 0
self.mode = body[2] & 0xF0
self.fan_speed = body[3] & 0x7F
self.screen_display = body[9] & 0x07
self.pm25: int | None = None
self.tvoc: int | None = None
self.hcho: int | None = None

if len(body) > 14 and body[14] != 0xFF:
self.pm25 = body[13] + (body[14] << 8)
else:
self.pm25 = None
if len(body) > 15 and body[15] != 0xFF:
self.tvoc = body[15]
else:
self.tvoc = None
self.anion = (body[19] & 0x40 > 0) if len(body) > 19 else False
self.standby = ((body[34] & 0xFF) == 0x14) if len(body) > 34 else False
self.child_lock = (body[8] & 0x80 > 0) if len(body) > 8 else False
Expand All @@ -164,25 +166,23 @@ def __init__(self, body):
self.detect_mode = 0
if len(body) > 38 and body[38] != 0xFF:
self.hcho = body[37] + (body[38] << 8)
else:
self.hcho = None


class FCNotifyMessageBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
self.power = (body[1] & 0x01) > 0
self.mode = body[2] & 0xF0
self.fan_speed = body[3] & 0x7F
self.screen_display = body[9] & 0x07
self.pm25: int | None = None
self.tvoc: int | None = None
self.hcho: int | None = None

if len(body) > 14 and body[14] != 0xFF:
self.pm25 = body[13] + (body[14] << 8)
else:
self.pm25 = None
if len(body) > 15 and body[15] != 0xFF:
self.tvoc = body[15]
else:
self.tvoc = None
self.anion = (body[10] & 0x20 > 0) if len(body) > 10 else False
self.standby = (body[27] & 0x14 == 0xFF) if len(body) > 27 else False
self.child_lock = (body[10] & 0x10 > 0) if len(body) > 10 else False
Expand All @@ -193,13 +193,11 @@ def __init__(self, body):
self.detect_mode = 0
if len(body) > 31 and body[31] != 0xFF:
self.hcho = body[30] + (body[31] << 8)
else:
self.hcho = None


class MessageFCResponse(MessageResponse):
def __init__(self, message):
super().__init__(message)
def __init__(self, message: bytes) -> None:
super().__init__(bytearray(message))
if self.body_type in [0xB0, 0xB1]:
pass
elif (
Expand Down

0 comments on commit 25d9e9a

Please sign in to comment.