60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
|
import struct
|
||
|
|
||
|
|
||
|
class BrComMessage:
|
||
|
SOH = 0x0A
|
||
|
OP_READ = 0
|
||
|
OP_WRITE = 1
|
||
|
RES_READ = 100
|
||
|
RES_WRITE = 101
|
||
|
|
||
|
def __init__(self, operation, addr: int, val: float = 0.0):
|
||
|
self.operation = operation
|
||
|
self.addr = addr
|
||
|
self.val = val
|
||
|
|
||
|
def gen_message(self):
|
||
|
# 消息内容的组成:
|
||
|
# [SOH, operation_code, addr, val, checksum]
|
||
|
msg_pre = [self.SOH, self.operation] + \
|
||
|
list(struct.pack('H', self.addr)) + \
|
||
|
list(struct.pack('f', self.val)) + \
|
||
|
[self.compute_checksum()]
|
||
|
return msg_pre
|
||
|
|
||
|
def compute_checksum(self):
|
||
|
_sum = self.SOH
|
||
|
_sum += self.operation
|
||
|
|
||
|
for a_byte in struct.pack('<i', self.addr):
|
||
|
print(a_byte)
|
||
|
_sum += int(a_byte)
|
||
|
|
||
|
for a_byte in struct.pack('<f', self.val):
|
||
|
print('f', a_byte)
|
||
|
_sum += int(a_byte)
|
||
|
return _sum & 0xFF
|
||
|
|
||
|
# 静态方法:将一个消息内容列表解析为消息对象
|
||
|
@classmethod
|
||
|
def resolve_message(cls, message: list):
|
||
|
if message[0] != cls.SOH:
|
||
|
return (False,)
|
||
|
else:
|
||
|
# 获取操作码
|
||
|
op_code = message[1]
|
||
|
# 获取操作地址
|
||
|
addr = struct.unpack('H', bytes(message[2:4]))
|
||
|
# 获取操作值
|
||
|
val = struct.unpack('<f', bytes(message[4:8]))
|
||
|
print(val)
|
||
|
return True, BrComMessage(op_code, addr[0], val[0])
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
my_message = BrComMessage(BrComMessage.OP_READ, 100, val=1)
|
||
|
print(my_message.gen_message())
|
||
|
test_message = [0x0A, 0, 1, 0, 0, 0, 128, 63, 100]
|
||
|
res, msg = BrComMessage.resolve_message(test_message)
|
||
|
print(msg.val)
|