ABC-BMS (SOK LiFePO4 Bluetooth) #571
Replies: 9 comments 2 replies
-
I have reverse engineered the ABC-BMS (SOK-BMS) protocol from tearing down their Android app and capturing bluetooth between mobile phone and battery. I'll share working PoC with @Louisvdw. |
Beta Was this translation helpful? Give feedback.
-
Awesome, thanks much! Looking forward to trying it out. |
Beta Was this translation helpful? Give feedback.
-
Hello, how's it coming along for this? |
Beta Was this translation helpful? Give feedback.
-
I would love to see this implemented! Currently I have a home assistant instance running in my RV, with ESPHome pulling in BLE sensor data. If the spec for the ABC-BMS were known it would enable many other projects to take advantage of the BMS values. |
Beta Was this translation helpful? Give feedback.
-
@zuccaro I'm about to start reverse engineering the SOK bluetooth services, but if you've already done this would you mind sharing with me as well? |
Beta Was this translation helpful? Give feedback.
-
I don't have a full understanding of their protocol but it should help you get started! |
Beta Was this translation helpful? Give feedback.
-
#!/usr/bin/python3
# communicates with SOK BMS via BLE commands
import BLE_GATT
import logging
import argparse
import struct
import sys
import threading
import time
import statistics
mac_address = '00:00:01:AA:EE:DD'
svc_id = '0000ffe0-0000-1000-8000-00805f9b34fb'
notify_id = '0000ffe1-0000-1000-8000-00805f9b34fb'
tx_id = '0000ffe2-0000-1000-8000-00805f9b34fb'
cmd_name = [ 0xee, 0xc0, 0x00, 0x00, 0x00 ]
cmd_info = [ 0xee, 0xc1, 0x00, 0x00, 0x00 ]
cmd_detail = [ 0xee, 0xc2, 0x00, 0x00, 0x00 ]
cmd_setting = [ 0xee, 0xc3, 0x00, 0x00, 0x00 ]
cmd_protection = [ 0xee, 0xc4, 0x00, 0x00, 0x00 ]
cmd_break = [ 0xdd, 0xc0, 0x00, 0x00, 0x00 ]
def get_str(ubit, uuid):
""" reads utf8 string from specified bluetooth uuid """
return ''.join(bytes(ubit.char_read(uuid)).decode('UTF-8'))
def unpack(pattern,data):
""" slightly simpler unpack call """
return struct.unpack(pattern, data)[0]
def getBeUint4(data, offset):
""" gets big-endian unsigned integer """
return unpack('>I',bytes(data[offset:offset+4]))
def getBeUint3(data, offset):
""" reads 3b big-endian unsigned int """
return unpack('>I',bytes( [0]+data[offset:offset+3]))
def getLeInt3(data, offset):
""" reads 3b little-endian signed int """
return unpack('<i',bytes( [0] + data[offset:offset+3]))
def getLeShort(data, offset):
""" reads little-endian signed short """
return unpack('<h',bytes(data[offset:offset+2]))
def getLeUShort(data, offset):
""" reads little-endian unsigned short """
return unpack('<H',bytes(data[offset:offset+2]))
def send(device, data):
""" sends data (appends crc - don't send in your own) """
data2 = data + [minicrc(data)]
logging.debug(f'Sending [{bytes(data2).hex().upper()}]')
device.char_write(tx_id, data2)
def minicrc(data):
""" computes crc8 of specified data"""
i = 0;
for b in data:
i ^= b & 255;
for i2 in range(0, 8):
i = (i >> 1) ^ 140 if (i & 1) != 0 else i >> 1;
return i
def send_commands(device, cmds):
for c in cmds:
time.sleep(3)
send(device, c)
class SokBms(object):
def __init__(self):
self.soc = 0
self.volts = 0
self.name = None # name given by manufacturer
self.cycles = None # number of times battery has been power cycled
self.year = None # year battery was manufactured
self.rated = None # rated
self.cells = None # array for individual lifepo4 cell voltages
self.temp = None
self.central = None
def _notify_handler(self, value):
""" callback when ble device has data """
cmd = unpack('>H',bytes(value[0:2]))
logging.debug(f'Received {hex(cmd)[2:]} msg: {bytes(value).hex()} ({len(value)}b)')
if value[-1] != minicrc(value[0:-1]):
logging.fatal('Received invalid checksum, aborting')
self.central.cleanup()
return
if cmd == 0xccf0: # status
#this is not accurate, find out why
# self.volts = (getLeInt3(value, 2) * 4) / 1000**2
self.ma = getLeInt3(value, 5) / 1000**2
self.cycles = (struct.unpack('<H',bytes(value[14:16]))[0])
self.soc = struct.unpack('<H',bytes(value[16:18]))[0]
ema = getLeInt3(value, 8) / 1000
curr = getLeInt3(value, 11) / 1000
logging.debug(f'v={self.volts}({bytes(value[2:5]).hex()}) ma={self.ma}({bytes(value[5:8]).hex()}) edingma={ema}({bytes(value[8:11]).hex()}) curma={curr}({bytes(value[11:14]).hex()})')
elif cmd == 0xCCF1: # name
self.name = bytes(value[2:10]).decode('utf-8').rstrip()
elif cmd == 0xCCF2: # temps
self.temp = getLeShort(value, 5)
self.temp_f = self.c2f(self.temp)
elif cmd == 0xCCF3: # year, mv, hot
self.year = 2000 + value[2]
self.rated = getBeUint3(value, 5) / 128
hot = getLeUShort(value,8)
if hot:
logging.warn('device heater is on!?')
elif cmd == 0xCCF4: # details of each cell
self.cells = [0,0,0,0]
for x in range(0,4):
cell = value[2+(x*4)]
self.cells[cell - 1] = getLeUShort(value, 3+(x*4))
self.volts = (statistics.mean(self.cells)*4)/1000
else:
logging.warn('Do not understand this response, please investigate')
pass
if self.callback:
self.callback(self)
def receive_data(self):
logging.debug('waiting for notifications')
logging.debug('no longer waiting for notifications')
def refresh(self, callback=None):
if callback:
self.callback=callback
logging.debug(f'Sending commands (callback={callback})')
t = threading.Thread(target = send_commands, args = (self.central, [cmd_info, cmd_name, cmd_detail],))
t.daemon = True
t.start()
def connect(self, mac_address):
self.central = BLE_GATT.Central(mac_address)
self.central.connect()
self.manufacturer = get_str(self.central,'00002a29-0000-1000-8000-00805f9b34fb')
self.model = get_str(self.central,'00002a24-0000-1000-8000-00805f9b34fb')
self.firmware_ver = get_str(self.central,'00002a26-0000-1000-8000-00805f9b34fb')
self.serial_num = get_str(self.central,'00002a25-0000-1000-8000-00805f9b34fb')
logging.info (f'Connected to {self.manufacturer} {self.model} [{self.serial_num}]')
def listen_async(self):
t = threading.Thread(target = self.listen)
t.daemon = True
t.start()
return t
def listen(self):
self.central.on_value_change(notify_id, self._notify_handler)
logging.info (f'Registered for notifications on {notify_id}')
self.central.wait_for_notifications()
def is_complete(self):
return self.name and self.cells and (self.temp!=None) and self.year
def c2f(self,temp):
""" converts celsius to fahrenheit """
return (temp*(9/5))+32
def __str__(self):
ret = ''
if self.name:
ret += self.name
else:
ret += "BMS"
ret += ":"
ret += f' SOC:{self.soc}%'
if self.rated:
ret += f" Rated:{self.rated:.2f}AH"
if self.year:
ret += f' Manufactured:{self.year}'
if self.volts:
ret += f' {self.volts:.2f}V'
if self.cycles:
ret += f' Cycles:{self.cycles}'
if self.temp:
ret += f' Temp:{self.c2f(self.temp):.1f}F'
if self.cells:
ret += f' Cells:{self.cells}'
return ret
if __name__=='__main__':
parser = argparse.ArgumentParser(
prog = 'SOK BMS Reader',
description = 'Communicates with weird BMS on SOK battery',
epilog = 'Reverse engineered from ABC BMS Android app (com.sjty.sbs_bms)')
parser.add_argument('-a', '--address', type=str, default=mac_address)
parser.add_argument('-d', '--debug', action='store_true', default=False)
args = parser.parse_args()
stream_handler = logging.StreamHandler(stream=sys.stderr)
lx = logging.DEBUG if args.debug else logging.INFO
stream_handler.setLevel(lx)
logging.basicConfig(level=lx,format='%(asctime)s %(levelname)s %(message)s', handlers=[stream_handler])
device = SokBms()
device.connect(args.address)
logging.info(device)
logging.debug('Data collection complete, exiting')
device.central.cleanup()
|
Beta Was this translation helpful? Give feedback.
-
@zuccaro This is amazing! Do you mind if I build on this and contribute this BMS to https://github.com/fl4p/batmon-ha? I'd be glad to credit you in the pull request. |
Beta Was this translation helpful? Give feedback.
-
Hello, how's it coming along for this? |
Beta Was this translation helpful? Give feedback.
-
Please add support for ABC-BMS (the BMS for SOK LiFePO4 Bluetooth batteries).
Thanks in advance!
Beta Was this translation helpful? Give feedback.
All reactions