UdsServer.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Diagnostics;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Text;
  7. using System.Threading;
  8. namespace NetCoreServer
  9. {
  10. /// <summary>
  11. /// Unix Domain Socket server is used to connect, disconnect and manage Unix Domain Socket sessions
  12. /// </summary>
  13. /// <remarks>Thread-safe</remarks>
  14. public class UdsServer : IDisposable
  15. {
  16. /// <summary>
  17. /// Initialize Unix Domain Socket server with a given socket path
  18. /// </summary>
  19. /// <param name="path">Socket path</param>
  20. public UdsServer(string path) : this(new UnixDomainSocketEndPoint(path)) {}
  21. /// <summary>
  22. /// Initialize Unix Domain Socket server with a given Unix Domain Socket endpoint
  23. /// </summary>
  24. /// <param name="endpoint">Unix Domain Socket endpoint</param>
  25. public UdsServer(UnixDomainSocketEndPoint endpoint)
  26. {
  27. Id = Guid.NewGuid();
  28. Endpoint = endpoint;
  29. }
  30. /// <summary>
  31. /// Server Id
  32. /// </summary>
  33. public Guid Id { get; }
  34. /// <summary>
  35. /// Endpoint
  36. /// </summary>
  37. public EndPoint Endpoint { get; private set; }
  38. /// <summary>
  39. /// Number of sessions connected to the server
  40. /// </summary>
  41. public long ConnectedSessions { get { return Sessions.Count; } }
  42. /// <summary>
  43. /// Number of bytes pending sent by the server
  44. /// </summary>
  45. public long BytesPending { get { return _bytesPending; } }
  46. /// <summary>
  47. /// Number of bytes sent by the server
  48. /// </summary>
  49. public long BytesSent { get { return _bytesSent; } }
  50. /// <summary>
  51. /// Number of bytes received by the server
  52. /// </summary>
  53. public long BytesReceived { get { return _bytesReceived; } }
  54. /// <summary>
  55. /// Option: acceptor backlog size
  56. /// </summary>
  57. /// <remarks>
  58. /// This option will set the listening socket's backlog size
  59. /// </remarks>
  60. public int OptionAcceptorBacklog { get; set; } = 1024;
  61. /// <summary>
  62. /// Option: receive buffer size
  63. /// </summary>
  64. public int OptionReceiveBufferSize { get; set; } = 8192;
  65. /// <summary>
  66. /// Option: send buffer size
  67. /// </summary>
  68. public int OptionSendBufferSize { get; set; } = 8192;
  69. #region Start/Stop server
  70. // Server acceptor
  71. private Socket _acceptorSocket;
  72. private SocketAsyncEventArgs _acceptorEventArg;
  73. // Server statistic
  74. internal long _bytesPending;
  75. internal long _bytesSent;
  76. internal long _bytesReceived;
  77. /// <summary>
  78. /// Is the server started?
  79. /// </summary>
  80. public bool IsStarted { get; private set; }
  81. /// <summary>
  82. /// Is the server accepting new clients?
  83. /// </summary>
  84. public bool IsAccepting { get; private set; }
  85. /// <summary>
  86. /// Create a new socket object
  87. /// </summary>
  88. /// <remarks>
  89. /// Method may be override if you need to prepare some specific socket object in your implementation.
  90. /// </remarks>
  91. /// <returns>Socket object</returns>
  92. protected virtual Socket CreateSocket()
  93. {
  94. return new Socket(Endpoint.AddressFamily, SocketType.Stream, ProtocolType.IP);
  95. }
  96. /// <summary>
  97. /// Start the server
  98. /// </summary>
  99. /// <returns>'true' if the server was successfully started, 'false' if the server failed to start</returns>
  100. public virtual bool Start()
  101. {
  102. Debug.Assert(!IsStarted, "Unix Domain Socket server is already started!");
  103. if (IsStarted)
  104. return false;
  105. // Setup acceptor event arg
  106. _acceptorEventArg = new SocketAsyncEventArgs();
  107. _acceptorEventArg.Completed += OnAsyncCompleted;
  108. // Create a new acceptor socket
  109. _acceptorSocket = CreateSocket();
  110. // Update the acceptor socket disposed flag
  111. IsSocketDisposed = false;
  112. // Bind the acceptor socket to the endpoint
  113. _acceptorSocket.Bind(Endpoint);
  114. // Refresh the endpoint property based on the actual endpoint created
  115. Endpoint = _acceptorSocket.LocalEndPoint;
  116. // Call the server starting handler
  117. OnStarting();
  118. // Start listen to the acceptor socket with the given accepting backlog size
  119. _acceptorSocket.Listen(OptionAcceptorBacklog);
  120. // Reset statistic
  121. _bytesPending = 0;
  122. _bytesSent = 0;
  123. _bytesReceived = 0;
  124. // Update the started flag
  125. IsStarted = true;
  126. // Call the server started handler
  127. OnStarted();
  128. // Perform the first server accept
  129. IsAccepting = true;
  130. StartAccept(_acceptorEventArg);
  131. return true;
  132. }
  133. /// <summary>
  134. /// Stop the server
  135. /// </summary>
  136. /// <returns>'true' if the server was successfully stopped, 'false' if the server is already stopped</returns>
  137. public virtual bool Stop()
  138. {
  139. Debug.Assert(IsStarted, "Unix Domain Socket server is not started!");
  140. if (!IsStarted)
  141. return false;
  142. // Stop accepting new clients
  143. IsAccepting = false;
  144. // Reset acceptor event arg
  145. _acceptorEventArg.Completed -= OnAsyncCompleted;
  146. // Call the server stopping handler
  147. OnStopping();
  148. try
  149. {
  150. // Close the acceptor socket
  151. _acceptorSocket.Close();
  152. // Dispose the acceptor socket
  153. _acceptorSocket.Dispose();
  154. // Dispose event arguments
  155. _acceptorEventArg.Dispose();
  156. // Update the acceptor socket disposed flag
  157. IsSocketDisposed = true;
  158. }
  159. catch (ObjectDisposedException) {}
  160. // Disconnect all sessions
  161. DisconnectAll();
  162. // Update the started flag
  163. IsStarted = false;
  164. // Call the server stopped handler
  165. OnStopped();
  166. return true;
  167. }
  168. /// <summary>
  169. /// Restart the server
  170. /// </summary>
  171. /// <returns>'true' if the server was successfully restarted, 'false' if the server failed to restart</returns>
  172. public virtual bool Restart()
  173. {
  174. if (!Stop())
  175. return false;
  176. while (IsStarted)
  177. Thread.Yield();
  178. return Start();
  179. }
  180. #endregion
  181. #region Accepting clients
  182. /// <summary>
  183. /// Start accept a new client connection
  184. /// </summary>
  185. private void StartAccept(SocketAsyncEventArgs e)
  186. {
  187. // Socket must be cleared since the context object is being reused
  188. e.AcceptSocket = null;
  189. // Async accept a new client connection
  190. if (!_acceptorSocket.AcceptAsync(e))
  191. ProcessAccept(e);
  192. }
  193. /// <summary>
  194. /// Process accepted client connection
  195. /// </summary>
  196. private void ProcessAccept(SocketAsyncEventArgs e)
  197. {
  198. if (e.SocketError == SocketError.Success)
  199. {
  200. // Create a new session to register
  201. var session = CreateSession();
  202. // Register the session
  203. RegisterSession(session);
  204. // Connect new session
  205. session.Connect(e.AcceptSocket);
  206. }
  207. else
  208. SendError(e.SocketError);
  209. // Accept the next client connection
  210. if (IsAccepting)
  211. StartAccept(e);
  212. }
  213. /// <summary>
  214. /// This method is the callback method associated with Socket.AcceptAsync()
  215. /// operations and is invoked when an accept operation is complete
  216. /// </summary>
  217. private void OnAsyncCompleted(object sender, SocketAsyncEventArgs e)
  218. {
  219. if (IsSocketDisposed)
  220. return;
  221. ProcessAccept(e);
  222. }
  223. #endregion
  224. #region Session factory
  225. /// <summary>
  226. /// Create Unix Domain Socket session factory method
  227. /// </summary>
  228. /// <returns>Unix Domain Socket session</returns>
  229. protected virtual UdsSession CreateSession() { return new UdsSession(this); }
  230. #endregion
  231. #region Session management
  232. /// <summary>
  233. /// Server sessions
  234. /// </summary>
  235. protected readonly ConcurrentDictionary<Guid, UdsSession> Sessions = new ConcurrentDictionary<Guid, UdsSession>();
  236. /// <summary>
  237. /// Disconnect all connected sessions
  238. /// </summary>
  239. /// <returns>'true' if all sessions were successfully disconnected, 'false' if the server is not started</returns>
  240. public virtual bool DisconnectAll()
  241. {
  242. if (!IsStarted)
  243. return false;
  244. // Disconnect all sessions
  245. foreach (var session in Sessions.Values)
  246. session.Disconnect();
  247. return true;
  248. }
  249. /// <summary>
  250. /// Find a session with a given Id
  251. /// </summary>
  252. /// <param name="id">Session Id</param>
  253. /// <returns>Session with a given Id or null if the session it not connected</returns>
  254. public UdsSession FindSession(Guid id)
  255. {
  256. // Try to find the required session
  257. return Sessions.TryGetValue(id, out UdsSession result) ? result : null;
  258. }
  259. /// <summary>
  260. /// Register a new session
  261. /// </summary>
  262. /// <param name="session">Session to register</param>
  263. internal void RegisterSession(UdsSession session)
  264. {
  265. // Register a new session
  266. Sessions.TryAdd(session.Id, session);
  267. }
  268. /// <summary>
  269. /// Unregister session by Id
  270. /// </summary>
  271. /// <param name="id">Session Id</param>
  272. internal void UnregisterSession(Guid id)
  273. {
  274. // Unregister session by Id
  275. Sessions.TryRemove(id, out UdsSession _);
  276. }
  277. #endregion
  278. #region Multicasting
  279. /// <summary>
  280. /// Multicast data to all connected sessions
  281. /// </summary>
  282. /// <param name="buffer">Buffer to multicast</param>
  283. /// <returns>'true' if the data was successfully multicasted, 'false' if the data was not multicasted</returns>
  284. public virtual bool Multicast(byte[] buffer) => Multicast(buffer.AsSpan());
  285. /// <summary>
  286. /// Multicast data to all connected clients
  287. /// </summary>
  288. /// <param name="buffer">Buffer to multicast</param>
  289. /// <param name="offset">Buffer offset</param>
  290. /// <param name="size">Buffer size</param>
  291. /// <returns>'true' if the data was successfully multicasted, 'false' if the data was not multicasted</returns>
  292. public virtual bool Multicast(byte[] buffer, long offset, long size) => Multicast(buffer.AsSpan((int)offset, (int)size));
  293. /// <summary>
  294. /// Multicast data to all connected clients
  295. /// </summary>
  296. /// <param name="buffer">Buffer to send as a span of bytes</param>
  297. /// <returns>'true' if the data was successfully multicasted, 'false' if the data was not multicasted</returns>
  298. public virtual bool Multicast(ReadOnlySpan<byte> buffer)
  299. {
  300. if (!IsStarted)
  301. return false;
  302. if (buffer.IsEmpty)
  303. return true;
  304. // Multicast data to all sessions
  305. foreach (var session in Sessions.Values)
  306. session.SendAsync(buffer);
  307. return true;
  308. }
  309. /// <summary>
  310. /// Multicast text to all connected clients
  311. /// </summary>
  312. /// <param name="text">Text string to multicast</param>
  313. /// <returns>'true' if the text was successfully multicasted, 'false' if the text was not multicasted</returns>
  314. public virtual bool Multicast(string text) => Multicast(Encoding.UTF8.GetBytes(text));
  315. /// <summary>
  316. /// Multicast text to all connected clients
  317. /// </summary>
  318. /// <param name="text">Text to multicast as a span of characters</param>
  319. /// <returns>'true' if the text was successfully multicasted, 'false' if the text was not multicasted</returns>
  320. public virtual bool Multicast(ReadOnlySpan<char> text) => Multicast(Encoding.UTF8.GetBytes(text.ToArray()));
  321. #endregion
  322. #region Server handlers
  323. /// <summary>
  324. /// Handle server starting notification
  325. /// </summary>
  326. protected virtual void OnStarting() {}
  327. /// <summary>
  328. /// Handle server started notification
  329. /// </summary>
  330. protected virtual void OnStarted() {}
  331. /// <summary>
  332. /// Handle server stopping notification
  333. /// </summary>
  334. protected virtual void OnStopping() {}
  335. /// <summary>
  336. /// Handle server stopped notification
  337. /// </summary>
  338. protected virtual void OnStopped() {}
  339. /// <summary>
  340. /// Handle session connecting notification
  341. /// </summary>
  342. /// <param name="session">Connecting session</param>
  343. protected virtual void OnConnecting(UdsSession session) {}
  344. /// <summary>
  345. /// Handle session connected notification
  346. /// </summary>
  347. /// <param name="session">Connected session</param>
  348. protected virtual void OnConnected(UdsSession session) {}
  349. /// <summary>
  350. /// Handle session disconnecting notification
  351. /// </summary>
  352. /// <param name="session">Disconnecting session</param>
  353. protected virtual void OnDisconnecting(UdsSession session) {}
  354. /// <summary>
  355. /// Handle session disconnected notification
  356. /// </summary>
  357. /// <param name="session">Disconnected session</param>
  358. protected virtual void OnDisconnected(UdsSession session) {}
  359. /// <summary>
  360. /// Handle error notification
  361. /// </summary>
  362. /// <param name="error">Socket error code</param>
  363. protected virtual void OnError(SocketError error) {}
  364. internal void OnConnectingInternal(UdsSession session) { OnConnecting(session); }
  365. internal void OnConnectedInternal(UdsSession session) { OnConnected(session); }
  366. internal void OnDisconnectingInternal(UdsSession session) { OnDisconnecting(session); }
  367. internal void OnDisconnectedInternal(UdsSession session) { OnDisconnected(session); }
  368. #endregion
  369. #region Error handling
  370. /// <summary>
  371. /// Send error notification
  372. /// </summary>
  373. /// <param name="error">Socket error code</param>
  374. private void SendError(SocketError error)
  375. {
  376. // Skip disconnect errors
  377. if ((error == SocketError.ConnectionAborted) ||
  378. (error == SocketError.ConnectionRefused) ||
  379. (error == SocketError.ConnectionReset) ||
  380. (error == SocketError.OperationAborted) ||
  381. (error == SocketError.Shutdown))
  382. return;
  383. OnError(error);
  384. }
  385. #endregion
  386. #region IDisposable implementation
  387. /// <summary>
  388. /// Disposed flag
  389. /// </summary>
  390. public bool IsDisposed { get; private set; }
  391. /// <summary>
  392. /// Acceptor socket disposed flag
  393. /// </summary>
  394. public bool IsSocketDisposed { get; private set; } = true;
  395. // Implement IDisposable.
  396. public void Dispose()
  397. {
  398. Dispose(true);
  399. GC.SuppressFinalize(this);
  400. }
  401. protected virtual void Dispose(bool disposingManagedResources)
  402. {
  403. // The idea here is that Dispose(Boolean) knows whether it is
  404. // being called to do explicit cleanup (the Boolean is true)
  405. // versus being called due to a garbage collection (the Boolean
  406. // is false). This distinction is useful because, when being
  407. // disposed explicitly, the Dispose(Boolean) method can safely
  408. // execute code using reference type fields that refer to other
  409. // objects knowing for sure that these other objects have not been
  410. // finalized or disposed of yet. When the Boolean is false,
  411. // the Dispose(Boolean) method should not execute code that
  412. // refer to reference type fields because those objects may
  413. // have already been finalized."
  414. if (!IsDisposed)
  415. {
  416. if (disposingManagedResources)
  417. {
  418. // Dispose managed resources here...
  419. Stop();
  420. }
  421. // Dispose unmanaged resources here...
  422. // Set large fields to null here...
  423. // Mark as disposed.
  424. IsDisposed = true;
  425. }
  426. }
  427. #endregion
  428. }
  429. }