Skip to content
44 changes: 43 additions & 1 deletion packages/modules/devices/varta/varta/bat_modbus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python3
from typing import TypedDict, Any
from typing import TypedDict, Any, Optional
import logging
import struct

from modules.common.abstract_device import AbstractBat
from modules.common.component_state import BatState
Expand All @@ -12,6 +14,8 @@
from modules.common.utils.peak_filter import PeakFilter
from modules.common.component_type import ComponentType

log = logging.getLogger(__name__)


class KwargsDict(TypedDict):
device_id: int
Expand All @@ -30,6 +34,7 @@ def initialize(self) -> None:
self.client: ModbusTcpClient_ = self.kwargs['client']
self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
self.store = get_bat_value_store(self.component_config.id)
self.last_mode = 'Undefined'
self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
self.peak_filter = PeakFilter(ComponentType.BAT, self.component_config.id, self.fault_state)

Expand All @@ -49,5 +54,42 @@ def set_state(self, state: BatState) -> None:
state.imported, state.exported = self.sim_counter.sim_count(state.power)
self.store.set(state)

def set_power_limit(self, power_limit: Optional[int]) -> None:
unit = self.__modbus_id
log.debug(f'last_mode: {self.last_mode}')

Comment on lines +59 to +60
if power_limit is None:
Comment on lines +57 to +61
log.debug("Keine Batteriesteuerung, Selbstregelung durch Wechselrichter")
if self.last_mode is not None:
# hier muss die maximale Entladeleistung des Systems einmalig gesetzt werden
# Wir nehmen default -4000W an. Nach 120s setzt sich das Register
# automatisch zurück
max_discharge_w = -4000
uint16_value = struct.unpack(">H", struct.pack(">h", max_discharge_w))[0]
self.client.write_register(1074, uint16_value, data_type=ModbusDataType.UINT_16, unit=unit)
self.last_mode = None
elif power_limit < 0:
# Das Register muss kontinuierlich geschrieben werden, da der Speicher
# sonst nach 120s die Steuerung aufhebt.
log.debug(f"Aktive Batteriesteuerung. Batterie darf mit {power_limit} W entladen werden "
"für den Hausverbrauch")
uint16_value = struct.unpack(">H", struct.pack(">h", power_limit))[0]
self.client.write_register(1074, uint16_value, data_type=ModbusDataType.INT_16, unit=unit)
self.last_mode = 'discharge'
else:
# Das Register muss kontinuierlich geschrieben werden, da der Speicher
# sonst nach 120s die Steuerung aufhebt.
if power_limit == 0:
log.debug("Aktive Batteriesteuerung, Speicher wird auf Stop gesetzt.")
else:
log.debug(f"Aktive Batteriesteuerung, übergebene Leistung: {power_limit}W. "
"Aktive Ladung nicht möglich. Speicher wird auf Stop gesetzt.")

self.client.write_register(1074, 0, data_type=ModbusDataType.INT_16, unit=unit)
self.last_mode = 'stop'

Comment on lines +57 to +90
def power_limit_controllable(self) -> bool:
return True


component_descriptor = ComponentDescriptor(configuration_factory=VartaBatModbusSetup)
Loading