Overview
The CustomSerialPort library (available at flyfire.CustomSerialPort) builds upon the SerialPortStream library to provide protocol-agnostic complete frame reception with cross-platform support. While the library offers general-purpose serial port functionality, the implementation contains several architectural issues that warrant attention.
Note: The latest codebase is maintained in the SerialPortStreamHelper repository on Gitee. All subsequent references refer to this implementation.
Identified Issues
Package Coalescing
The library accumulates received serial data and only dispatches it after a 128ms inactivity period. This approach creates a vulnerability: if multiple distinct packets arrive within the same 128ms window, they merge into a single event emission, causing package coalescing problems that can corrupt higher-level protocol parsing.
Thread Safety Violations
The data reception thread and the processing thread share mutable state without proper synchronization. This race condition can lead to inconsistent buffer states and unpredictable behavior during concurrent read/write operations.
Thread Creation Overhead
The current design spawns a new worker thread for each processing cycle (every 128ms), resulting in excessive thread creation and destruction. This thrashing wastes system resources and increases garbage collection pressure, degrading overall performance.
Architectural Improvements
Design Principles
The refactored implementation addresses these concerns through several key decisions:
-
Semaphore-Based Coordination: Replace the timeout-driven thread spawning model with a semaphore-waiting pattern that maintains a single persistent worker thread.
-
Lock-Free Data Transfer: Isolate the data reception pipeline from the processing pipeline using an internal queue with explicit synchronization.
-
Observer Pattern: Introduce a data observer interface to decouple processing logic from the serial port abstraction.
-
Timeout Toggle: Provide an option to disable the timeout mechanism for scenarios requiring immediate data forwarding.
Component Structure
The solution comprises three primary components:
- ISerialPortDataObserver: Defines the contract for receiving processed serial data.
- SerialPortDataHelper: Manages the persistent worker thread and coordinates data flow via semaphores and queues.
- SerialPortStreamHelper: Encapsulates the SerialPortStream instance and orchestrates the entire pipeline.
Implementation
Data Observer Interface
namespace Xhubobo.IO.Ports
{
public interface ISerialPortDataObserver
{
void OnDataReceived(byte[] data);
}
}
Data Processing Helper
The helper class manages a background thread that waits on a semaphore. Incoming data enters a queue and signals the semaphore, awakening the worker to process accumulated data when the timeout elapses.
using System;
using System.Collections.Generic;
using System.Threading;
namespace Xhubobo.IO.Ports
{
internal class SerialPortDataHelper
{
private Thread _workerThread;
private volatile bool _isRunning;
private readonly object _stateLock = new object();
private readonly Queue<byte[]> _dataQueue;
private readonly Semaphore _dataSignal;
private byte[] _receiveBuffer;
private int _bufferPosition;
private int _lastDataTimestamp;
private readonly int _idleThreshold;
private readonly ISerialPortDataObserver _observer;
public SerialPortDataHelper(
ISerialPortDataObserver observer,
int idleTimeout = 128,
int bufferCapacity = 4096)
{
_observer = observer;
_idleThreshold = idleTimeout;
_receiveBuffer = new byte[bufferCapacity];
_dataQueue = new Queue<byte[]>();
_dataSignal = new Semaphore(0, byte.MaxValue);
}
public void Start()
{
_isRunning = true;
_workerThread = new Thread(ProcessLoop)
{
IsBackground = true,
Name = "SerialPortDataWorker"
};
_workerThread.Start();
}
public void Stop()
{
_isRunning = false;
Enqueue(null); // Signal to exit
_workerThread?.Join();
_workerThread = null;
ClearQueue();
}
public void Enqueue(byte[] chunk)
{
if (!_isRunning) return;
lock (_dataQueue)
{
_dataQueue.Enqueue(chunk);
}
_dataSignal.Release();
}
private byte[] Dequeue()
{
lock (_dataQueue)
{
if (_dataQueue.Count == 0)
return null;
return _dataQueue.Dequeue();
}
}
private void ClearQueue()
{
lock (_dataQueue)
{
_dataQueue.Clear();
}
}
private void ProcessLoop()
{
while (_isRunning)
{
if (_dataSignal.WaitOne(1))
{
var chunk = Dequeue();
if (chunk != null)
{
AccumulateData(chunk);
}
}
CheckTimeoutAndDispatch();
}
}
private void AccumulateData(byte[] chunk)
{
_lastDataTimestamp = Environment.TickCount;
if (_bufferPosition + chunk.Length > _receiveBuffer.Length)
{
ResetBuffer();
return;
}
Buffer.BlockCopy(chunk, 0, _receiveBuffer, _bufferPosition, chunk.Length);
_bufferPosition += chunk.Length;
}
private void CheckTimeoutAndDispatch()
{
if (_bufferPosition > 0 && Environment.TickCount - _lastDataTimestamp > _idleThreshold)
{
var completed = new byte[_bufferPosition];
Buffer.BlockCopy(_receiveBuffer, 0, completed, 0, _bufferPosition);
_observer?.OnDataReceived(completed);
ResetBuffer();
}
}
private void ResetBuffer()
{
_bufferPosition = 0;
}
}
}
Serial Port Wrapper
This wrapper consolidates configuration parameters, exposes read-only properties for port attributes, and provides simplified open/close/dispose lifecycle management.
using RJCP.IO.Ports;
using System;
using System.Linq;
namespace Xhubobo.IO.Ports
{
public class SerialPortStreamHelper : ISerialPortDataObserver
{
public event Action<string, byte[]> DataReceived = (port, data) => { };
public string LastError { get; private set; }
public string PortName => _stream.PortName;
public int BaudRate => _stream.BaudRate;
public Parity Parity => _stream.Parity;
public int DataBits => _stream.DataBits;
public StopBits StopBits => _stream.StopBits;
public bool IsOpen => _stream.IsOpen;
public bool DtrEnable
{
get => _stream.DtrEnable;
set => _stream.DtrEnable = value;
}
public bool RtsEnable
{
get => _stream.RtsEnable;
set => _stream.RtsEnable = value;
}
private readonly bool _useTimeout;
private readonly SerialPortStream _stream;
private readonly SerialPortDataHelper _processor;
public SerialPortStreamHelper(bool useTimeout = true, int idleTimeout = 128, int bufferSize = 4096)
{
_useTimeout = useTimeout;
_stream = new SerialPortStream
{
DtrEnable = true,
RtsEnable = true
};
_stream.DataReceived += HandleIncomingData;
_processor = new SerialPortDataHelper(this, idleTimeout, bufferSize);
}
public static string[] GetPortNames() => SerialPortStream.GetPortNames();
public static string FormatHex(byte[] data) =>
string.Join(" ", data.Select(b => b.ToString("X2")));
public bool Open(
string port,
int baud = 115200,
Parity parity = Parity.None,
int dataBits = 8,
StopBits stop = StopBits.One)
{
_stream.PortName = port;
_stream.BaudRate = baud;
_stream.Parity = parity;
_stream.DataBits = dataBits;
_stream.StopBits = stop;
try
{
_stream.Open();
_processor.Start();
return true;
}
catch (Exception ex)
{
LastError = ex.Message;
return false;
}
}
public void Close()
{
if (_stream.IsOpen)
{
_processor.Stop();
_stream.Close();
}
}
public void Dispose()
{
_processor.Stop();
_stream.Dispose();
}
public void OnDataReceived(byte[] data)
{
DataReceived?.Invoke(PortName, data);
}
private void HandleIncomingData(object sender, SerialDataReceivedEventArgs args)
{
if (_useTimeout)
{
while (_stream.BytesToRead > 0)
{
int available = _stream.BytesToRead;
byte[] buffer = new byte[available];
int read = _stream.Read(buffer, 0, available);
if (read != available)
{
throw new InvalidOperationException("Read operation incomplete");
}
_processor.Enqueue(buffer);
}
}
else
{
int available = _stream.BytesToRead;
if (available == 0) return;
byte[] buffer = new byte[available];
int position = 0;
while (position < available)
{
position += _stream.Read(buffer, position, available - position);
}
DataReceived?.Invoke(PortName, buffer);
}
}
public void Write(byte[] data) => Write(data, 0, data.Length);
public void Write(byte[] data, int offset, int count)
{
if (IsOpen)
_stream.Write(data, offset, count);
}
public void Write(string text)
{
if (IsOpen)
_stream.Write(text);
}
public void WriteLine(string text)
{
if (IsOpen)
_stream.WriteLine(text);
}
}
}
Key Differences from Original Implementation
The refactored version eliminates the per-cycle thread spawning by maintaining a persistent worker thread. The semaphore-based blocking replaces polling-based timeouts. Queue operations are now protected by locks, ensuring thread-safe enqueue and dequeue operations. Additional, the timeout mechanism can be disabled entirely for applications that require immediate data forwarding without buffering.