using LFP_Manager.DataStructure; using LFP_Manager.Function; using LFP_Manager.Utils; using Microsoft.Extensions.Logging; using System; using System.Data; using System.Data.SQLite; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; namespace LFP_Manager.Threads { public class DbAsyncTask : IDisposable { #region FIELDS private readonly CommConfig _config; private DeviceSystemData[] _systemData; private readonly short[] _status = new short[2]; private readonly short[] _oldStatus = new short[2]; private readonly int _moduleId; private readonly int _moduleQuantity; private readonly ILogger _logger; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private Task _dbTask; private readonly SemaphoreSlim _dataLock = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _statusLock = new SemaphoreSlim(1, 1); private volatile bool _isRunning; private volatile bool _disposed; // Connection pooling for better performance private readonly object _connectionLock = new object(); private SQLiteConnection _connection; public event DbDataPrint OnPrint; #endregion #region CONSTRUCTORS public DbAsyncTask(int moduleId, CommConfig config, DeviceSystemData[] systemData, ILogger logger = null) { if (config == null) throw new ArgumentNullException(nameof(config)); if (systemData == null) throw new ArgumentNullException(nameof(systemData)); if (moduleId <= 0) throw new ArgumentOutOfRangeException(nameof(moduleId), "Module ID must be positive"); _moduleId = moduleId; _config = config; _systemData = systemData; _logger = logger ?? CreateDefaultLogger(); _moduleQuantity = CalculateModuleQuantity(_config.CommType, _config.ModuleQty); _logger.LogInformation("DbAsyncTask initialized for Module {0} with {1} modules", _moduleId, _moduleQuantity); } private static ILogger CreateDefaultLogger() { var loggerFactory = LoggerFactory.Create(builder => { builder.AddConsole() .AddDebug() .SetMinimumLevel(LogLevel.Information); }); return loggerFactory.CreateLogger(); } #endregion #region PUBLIC METHODS public async Task StartAsync(CancellationToken cancellationToken) { if (_disposed) throw new ObjectDisposedException(nameof(DbAsyncTask)); if (_isRunning) { _logger.LogWarning("DB thread for Module {0} is already running", _moduleId); return true; } try { _logger.LogInformation("Starting DB thread for Module {0}", _moduleId); await InitializeDatabaseAsync(cancellationToken).ConfigureAwait(false); await InitializeConnectionAsync().ConfigureAwait(false); _dbTask = DbThreadProcessAsync(_cancellationTokenSource.Token); _isRunning = true; _logger.LogInformation("DB thread started successfully for Module {0}", _moduleId); OnPrint?.Invoke(this, string.Format("DB thread started for Module {0}", _moduleId)); return true; } catch (Exception ex) { _logger.LogError(ex, "Failed to start DB thread for Module {0}", _moduleId); OnPrint?.Invoke(this, string.Format("Failed to start DB thread: {0}", ex.Message)); await CleanupResourcesAsync().ConfigureAwait(false); return false; } } public async Task StopAsync(CancellationToken cancellationToken) { if (_disposed || !_isRunning) return; _logger.LogInformation("Stopping DB thread for Module {0}", _moduleId); _isRunning = false; _cancellationTokenSource.Cancel(); if (_dbTask != null) { try { await _dbTask.ConfigureAwait(false); _logger.LogInformation("DB thread stopped successfully for Module {0}", _moduleId); } catch (OperationCanceledException) { _logger.LogDebug("DB thread was cancelled for Module {0}", _moduleId); } catch (Exception ex) { _logger.LogError(ex, "Error stopping DB thread for Module {0}", _moduleId); OnPrint?.Invoke(this, string.Format("Error stopping DB thread: {0}", ex.Message)); } } await CleanupResourcesAsync().ConfigureAwait(false); } public async Task UpdateStatusAsync(short status1, short status2, CancellationToken cancellationToken) { await _statusLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { _status[0] = status1; _status[1] = status2; } finally { _statusLock.Release(); } _logger.LogTrace("Status updated for Module {ModuleId}: Status1={Status1}, Status2={Status2}", _moduleId, status1, status2); } public async Task UpdateDataAsync(int moduleId, DeviceSystemData[] systemData, CancellationToken cancellationToken) { if (systemData == null) throw new ArgumentNullException(nameof(systemData)); await _dataLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { _systemData = new DeviceSystemData[systemData.Length]; Array.Copy(systemData, _systemData, systemData.Length); } finally { _dataLock.Release(); } _logger.LogDebug("System data updated for Module {ModuleId}", moduleId); } // Backward compatibility methods public void UpdateStatus(short status1, short status2) { var task = UpdateStatusAsync(status1, status2, CancellationToken.None); task.GetAwaiter().GetResult(); } public void UpdateData(int moduleId, DeviceSystemData[] systemData) { var task = UpdateDataAsync(moduleId, systemData, CancellationToken.None); task.GetAwaiter().GetResult(); } #endregion #region PRIVATE METHODS private static int CalculateModuleQuantity(int commType, int configModuleQty) { if (commType == csConstData.CommType.COMM_RS485) return Math.Max(1, configModuleQty); return 1; } private async Task InitializeDatabaseAsync(CancellationToken cancellationToken) { string dbPath = System.IO.Path.GetDirectoryName(Application.ExecutablePath); if (string.IsNullOrEmpty(dbPath)) throw new InvalidOperationException("Unable to determine executable path"); await Task.Run(() => csHistoryFunction.DbCreate(dbPath), cancellationToken).ConfigureAwait(false); } private async Task InitializeConnectionAsync() { string dbFilename = System.IO.Path.GetDirectoryName(Application.ExecutablePath) + csDbConstData.DataBase.FileName; string connectionString = string.Format("Data Source={0};Pooling=true;Max Pool Size=10;", dbFilename); _connection = new SQLiteConnection(connectionString); await _connection.OpenAsync().ConfigureAwait(false); } private async Task DbThreadProcessAsync(CancellationToken cancellationToken) { DateTime lastLogTime = DateTime.MinValue; int logPeriodSeconds = Math.Max(1, _config.DbLogPeriod); _logger.LogDebug("DB thread process started for Module {ModuleId}", _moduleId); try { while (!cancellationToken.IsCancellationRequested && _isRunning) { try { DateTime currentTime = DateTime.Now; if (ShouldLogData(lastLogTime, currentTime, logPeriodSeconds)) { await ProcessDatabaseLoggingAsync(currentTime, cancellationToken).ConfigureAwait(false); lastLogTime = currentTime; } // Check for alarms await CheckAlarmAsync(cancellationToken).ConfigureAwait(false); await Task.Delay(100, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { _logger.LogDebug("DB thread process cancelled for Module {ModuleId}", _moduleId); break; } catch (Exception ex) { _logger.LogError(ex, "DB thread error for Module {ModuleId}", _moduleId); OnPrint?.Invoke(this, string.Format("DB thread error: {0}", ex.Message)); // Exponential backoff for error recovery int delay = Math.Min(10000, 1000 * (int)Math.Pow(2, Math.Min(5, 1))); await Task.Delay(delay, cancellationToken).ConfigureAwait(false); } } } finally { _logger.LogDebug("DB thread process completed for Module {ModuleId}", _moduleId); } } private static bool ShouldLogData(DateTime lastTime, DateTime currentTime, int periodSeconds) { if (lastTime == DateTime.MinValue) return true; TimeSpan timeDiff = currentTime - lastTime; return timeDiff.TotalSeconds >= periodSeconds; } private async Task ProcessDatabaseLoggingAsync(DateTime currentTime, CancellationToken cancellationToken) { string modelName = csConstData.MODEL_STR[_config.UartModelIndex]; await Task.Run(() => csDbUtils.LogDbCreate(modelName), cancellationToken).ConfigureAwait(false); DeviceSystemData[] systemDataCopy; await _dataLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { systemDataCopy = new DeviceSystemData[_systemData.Length]; Array.Copy(_systemData, systemDataCopy, _systemData.Length); } finally { _dataLock.Release(); } await LogDataByCommTypeAsync(modelName, systemDataCopy, currentTime, cancellationToken).ConfigureAwait(false); } private async Task LogDataByCommTypeAsync(string modelName, DeviceSystemData[] systemData, DateTime currentTime, CancellationToken cancellationToken) { switch (_config.CommType) { case csConstData.CommType.COMM_UART: case csConstData.CommType.COMM_SNMP: await LogSingleModuleAsync(modelName, systemData, currentTime, cancellationToken).ConfigureAwait(false); break; case csConstData.CommType.COMM_RS485: await LogMultipleModulesAsync(modelName, systemData, currentTime, cancellationToken).ConfigureAwait(false); break; default: _logger.LogWarning("Unknown communication type: {0}", _config.CommType); break; } } private async Task LogSingleModuleAsync(string modelName, DeviceSystemData[] systemData, DateTime currentTime, CancellationToken cancellationToken) { if (_moduleId > 0 && _moduleId <= systemData.Length) { systemData[_moduleId - 1].mNo = _moduleId; await Task.Run(() => csDbUtils.BmsLogDataInsert(modelName, ref systemData[_moduleId - 1], currentTime, 1000), cancellationToken) .ConfigureAwait(false); } } private async Task LogMultipleModulesAsync(string modelName, DeviceSystemData[] systemData, DateTime currentTime, CancellationToken cancellationToken) { if (_moduleQuantity <= 1) { await LogSingleModuleAsync(modelName, systemData, currentTime, cancellationToken).ConfigureAwait(false); return; } using (var semaphore = new SemaphoreSlim(Environment.ProcessorCount)) { var tasks = new Task[Math.Min(_moduleQuantity, systemData.Length)]; for (int i = 0; i < tasks.Length; i++) { int moduleIndex = i; tasks[i] = ProcessModuleAsync(semaphore, modelName, systemData, moduleIndex, currentTime, cancellationToken); } await Task.WhenAll(tasks).ConfigureAwait(false); } } private async Task ProcessModuleAsync(SemaphoreSlim semaphore, string modelName, DeviceSystemData[] systemData, int index, DateTime currentTime, CancellationToken cancellationToken) { await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { systemData[index].mNo = index + 1; await Task.Run(() => csDbUtils.BmsLogDataInsert(modelName, ref systemData[index], currentTime, 1000), cancellationToken) .ConfigureAwait(false); } finally { semaphore.Release(); } } private async Task CheckAlarmAsync(CancellationToken cancellationToken) { short[] currentStatus = new short[2]; short[] previousStatus = new short[2]; await _statusLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { Array.Copy(_status, currentStatus, 2); Array.Copy(_oldStatus, previousStatus, 2); } finally { _statusLock.Release(); } if (HasStatusChanged(currentStatus, previousStatus)) { await ProcessAlarmChangesAsync(currentStatus, previousStatus, cancellationToken).ConfigureAwait(false); await _statusLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { Array.Copy(_status, _oldStatus, 2); } finally { _statusLock.Release(); } } } private static bool HasStatusChanged(short[] current, short[] previous) { return current[0] != previous[0] || current[1] != previous[1]; } private async Task ProcessAlarmChangesAsync(short[] currentStatus, short[] previousStatus, CancellationToken cancellationToken) { bool[] currentBits1 = csUtils.Int16ToBitArray(currentStatus[0]); bool[] currentBits2 = csUtils.Int16ToBitArray(currentStatus[1]); bool[] previousBits1 = csUtils.Int16ToBitArray(previousStatus[0]); bool[] previousBits2 = csUtils.Int16ToBitArray(previousStatus[1]); _logger.LogTrace("Processing alarm changes for Module {ModuleId}", _moduleId); await CheckCellVoltageAlarmAsync(currentBits1, currentBits2, previousBits1, previousBits2, cancellationToken).ConfigureAwait(false); } private async Task CheckCellVoltageAlarmAsync(bool[] currentBits1, bool[] currentBits2, bool[] previousBits1, bool[] previousBits2, CancellationToken cancellationToken) { // Cell Over/Under Voltage Check if (previousBits2[0] != currentBits2[0]) { if (currentBits2[0]) { await InsertAlarmRecordAsync(csDbConstData.DB_ALARM.CELL_UNDER_VOLTAGE, csDbConstData.DB_ALARM.FLAG_TRIP, cancellationToken).ConfigureAwait(false); } else { await HandleVoltageWarningOrReleaseAsync(currentBits1, previousBits1, cancellationToken).ConfigureAwait(false); } } else if (previousBits1[0] != currentBits1[0]) { await HandleVoltageWarningOrReleaseAsync(currentBits1, previousBits1, cancellationToken).ConfigureAwait(false); } } private async Task HandleVoltageWarningOrReleaseAsync(bool[] currentBits1, bool[] previousBits1, CancellationToken cancellationToken) { int flag = currentBits1[0] ? csDbConstData.DB_ALARM.FLAG_WARNING : csDbConstData.DB_ALARM.FLAG_RELEASE; await InsertAlarmRecordAsync(csDbConstData.DB_ALARM.CELL_UNDER_VOLTAGE, flag, cancellationToken).ConfigureAwait(false); } private async Task InsertAlarmRecordAsync(int alarmCode, int flagCode, CancellationToken cancellationToken) { if (_connection == null || _connection.State != ConnectionState.Open) { await InitializeConnectionAsync().ConfigureAwait(false); } using (var transaction = _connection.BeginTransaction()) { try { using (var command = _connection.CreateCommand()) { command.Transaction = transaction; command.CommandText = string.Format( "INSERT INTO {0} (HTime, model, sno, alarm_name, alarm_code, flag_name, flag, param1, param2) " + "VALUES (@HTime, @model, @sno, @alarm_name, @alarm_code, @flag_name, @flag, @param1, @param2)", csDbConstData.DataBase.TableName); AddParameters(command, alarmCode, flagCode); await Task.Run(() => command.ExecuteNonQuery(), cancellationToken).ConfigureAwait(false); } transaction.Commit(); } catch { transaction.Rollback(); throw; } } } private static void AddParameters(SQLiteCommand command, int alarmCode, int flagCode) { var parameters = new[] { new SQLiteParameter("@HTime", DbType.DateTime) { Value = DateTime.Now }, new SQLiteParameter("@model", DbType.String) { Value = "AAA" }, new SQLiteParameter("@sno", DbType.Int16) { Value = 1 }, new SQLiteParameter("@alarm_name", DbType.String) { Value = csDbConstData.DB_ALARM.ALARM_NAME[alarmCode] }, new SQLiteParameter("@alarm_code", DbType.Int16) { Value = alarmCode }, new SQLiteParameter("@flag_name", DbType.String) { Value = csDbConstData.DB_ALARM.FLAG_NAME[flagCode] }, new SQLiteParameter("@flag", DbType.Int16) { Value = flagCode }, new SQLiteParameter("@param1", DbType.Decimal) { Value = 0.0m }, new SQLiteParameter("@param2", DbType.Decimal) { Value = 0.0m } }; command.Parameters.AddRange(parameters); } private async Task CleanupResourcesAsync() { if (_connection != null) { try { if (_connection.State == ConnectionState.Open) { _connection.Close(); } _connection.Dispose(); _connection = null; } catch (Exception ex) { _logger.LogError(ex, "Error cleaning up database connection for Module {0}", _moduleId); } } await Task.CompletedTask.ConfigureAwait(false); } #endregion #region DISPOSE PATTERN public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (!_disposed && disposing) { try { StopAsync(CancellationToken.None).GetAwaiter().GetResult(); } catch (Exception ex) { _logger.LogError(ex, "Error during disposal for Module {0}", _moduleId); } _cancellationTokenSource?.Dispose(); _dataLock?.Dispose(); _statusLock?.Dispose(); _connection?.Dispose(); _disposed = true; } } #endregion } }