PeerBase.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. using System;
  2. using System.Linq;
  3. using System.Collections.Concurrent;
  4. using System.IO;
  5. using System.Net;
  6. using System.Net.Sockets;
  7. using System.Threading;
  8. using UnityEngine;
  9. using System.Threading.Tasks;
  10. using pb = global::Google.Protobuf;
  11. using MapServer;
  12. using System.Diagnostics;
  13. using System.Text;
  14. public class PeerBase
  15. {
  16. /// <summary>
  17. /// 任务取消
  18. /// </summary>
  19. protected readonly CancellationTokenSource cts = new CancellationTokenSource();
  20. protected readonly CancellationToken ct;
  21. public Thread RecvloopTask { get; private set; }
  22. private Thread SendLoopTask { get; set; }
  23. private Task PingPongLoopTask;
  24. protected Socket Sock { get; private set; } // 连接对象
  25. private volatile bool SocketConnected = false;
  26. /// <summary>
  27. /// 连接断开事件
  28. /// </summary>
  29. public event Action OnConnectClosed;
  30. /// <summary>
  31. /// 接收buffer
  32. /// </summary>
  33. private readonly ConcurrentQueue<sSocketData> recvDataBuffer = new ConcurrentQueue<sSocketData>();
  34. /// <summary>
  35. /// 发送buffer
  36. /// </summary>
  37. private readonly ConcurrentQueue<sSocketData> sendDataBuffer = new ConcurrentQueue<sSocketData>();
  38. /// <summary>
  39. /// 消息分发
  40. /// </summary>
  41. protected ConcurrentDictionary<eProtocalCommand, Action<byte[]>> callbacks = new ConcurrentDictionary<eProtocalCommand, Action<byte[]>>();
  42. /// <summary>
  43. /// ping/pong 轮回延时(毫秒)
  44. /// </summary>
  45. public volatile int tts = 0;
  46. /// <summary>
  47. /// 主动连接型Peer(客户端Peer)
  48. /// </summary>
  49. /// <param name="Ip"></param>
  50. /// <param name="Port"></param>
  51. /// <exception cref="Exception"></exception>
  52. public PeerBase(string Ip, int Port)
  53. {
  54. ct = cts.Token;
  55. _ = InitScoket(Ip, Port);
  56. }
  57. public void Log(string msg)
  58. {
  59. LogHelper.Log("Socket: " + msg);
  60. }
  61. ///// <summary>
  62. ///// 断线事件(如果处理函数中返回true,代表已经处理妥善,不需要继续退出)
  63. ///// </summary>
  64. //public event Func<bool> OnDisconnect;
  65. protected async Task<bool> InitScoket(string IP, int Port)
  66. {
  67. if (SocketConnected)
  68. {
  69. LogHelper.Log("连接正常无需重连!!");
  70. return true;
  71. }
  72. try
  73. {
  74. var endPoint = new IPEndPoint(IPAddress.Parse(IP), Port);
  75. Sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  76. await Sock.ConnectAsync(endPoint);
  77. Sock.NoDelay = true;
  78. //Sock.Blocking = false;
  79. SocketConnected = true;
  80. }
  81. catch (Exception e)
  82. {
  83. throw new Exception($"{this} 初始化失败!({e.Message})");
  84. }
  85. InitDispatch(); // 初始化逻辑派发
  86. SendLoopTask = new Thread(WriteLooper);
  87. SendLoopTask.IsBackground = true;
  88. SendLoopTask.Start();
  89. RecvloopTask = new Thread(RecvLooper);
  90. RecvloopTask.IsBackground = true;
  91. RecvloopTask.Start();
  92. PingPongLoopTask = Task.Run(PingPongLoop);
  93. return true;
  94. }
  95. /// <summary>
  96. /// 被动建立连接型Peer(服务端peer)
  97. /// </summary>
  98. /// <param name="socket"></param>
  99. [Obsolete("服务端/p2p时才可以使用 -gwang", true)]
  100. public PeerBase(Socket socket)
  101. {
  102. ct = cts.Token;
  103. Sock = socket;
  104. InitDispatch(); // 初始化逻辑派发
  105. RecvloopTask = new Thread(RecvLooper);
  106. RecvloopTask.IsBackground = true;
  107. RecvloopTask.Start();
  108. }
  109. /// <summary>
  110. /// 初始化消息到业务逻辑之间的派发执行映射
  111. /// </summary>
  112. protected virtual void InitDispatch()
  113. {
  114. }
  115. async Task PingPongLoop()
  116. {
  117. int ms = 1 * 1000;
  118. while (true)
  119. {
  120. try
  121. {
  122. await Task.Delay(ms, ct);
  123. Ping();
  124. if (cts.IsCancellationRequested)
  125. {
  126. break;
  127. }
  128. }
  129. catch
  130. {
  131. break;
  132. }
  133. }
  134. }
  135. void OnPong(byte[] data)
  136. {
  137. var msg = Pong.Parser.ParseFrom(data);
  138. long now = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  139. this.tts = (int)(now - msg.PingMs);
  140. }
  141. public void Ping()
  142. {
  143. var msg = new MapServer.Ping() { PingMs = DateTimeOffset.Now.ToUnixTimeMilliseconds() };
  144. var sdata = sSocketData.FromIMsg(eProtocalCommand.Ping, msg);
  145. sendDataBuffer.Enqueue(sdata);
  146. }
  147. /// <summary>
  148. /// 向服务端端发送消息
  149. /// </summary>
  150. /// <param name="msgType"></param>
  151. /// <param name="msg"></param>
  152. protected void SendMsg(eProtocalCommand msgType, pb::IMessage msg)
  153. {
  154. var sdata = sSocketData.FromIMsg(msgType, msg);
  155. sendDataBuffer.Enqueue(sdata);
  156. }
  157. /// <summary>
  158. /// 将向所有后台任务发送结束或取消信号(注: MockLogic的实现方法需要自己编码关注ct.CancellationRequested状态主动退出)
  159. /// </summary>
  160. public virtual void Close()
  161. {
  162. LogHelper.Log("关闭连接: " + TraceBack());
  163. cts.Cancel(); // 向recv协程发送取消消息
  164. var endMsg = sSocketData.FromBytes(eProtocalCommand.Noop, new byte[] { });
  165. sendDataBuffer.Enqueue(endMsg); // 向send协程发送结束消息
  166. recvDataBuffer.Enqueue(endMsg); // 向read协程发送结束消息
  167. if (RecvloopTask.ThreadState == System.Threading.ThreadState.Running)
  168. {
  169. RecvloopTask.Abort(); // 停止接收线程
  170. }
  171. LogHelper.Log("关闭连接: 发送关闭事件" + TraceBack());
  172. this.OnConnectClosed?.Invoke(); // 需不需要重新救活一下?
  173. }
  174. static string TraceBack() // 打印调用到这里的栈信息
  175. {
  176. var sb = new StringBuilder();
  177. StackTrace st = new StackTrace();
  178. StackFrame[] sf = st.GetFrames();
  179. sf.ToList().ForEach(curSf => sb.AppendLine($"{curSf}"));
  180. return sb.ToString();
  181. }
  182. #region 后台任务
  183. virtual public void Update()
  184. {
  185. Dispatch(); // 分发业务逻辑
  186. //WriteToserver(); // 向服务端写消息
  187. }
  188. /// <summary>
  189. /// 接收服务端发来的信
  190. /// </summary>
  191. void RecvLooper()
  192. {
  193. Sock.ReceiveTimeout = 18000; // 接收等待超时时间设为800毫秒
  194. var _databuffer = new DataBuffer();
  195. byte[] arrServerRecMsg = new byte[4096]; // 创建一个内存缓冲区,其大小为4k字节
  196. while (true)
  197. {
  198. try
  199. {
  200. //var length = Sock.ReceiveAsync(new ArraySegment<byte>(arrServerRecMsg), SocketFlags.None, ct).Result; // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
  201. var length = Sock.Receive(arrServerRecMsg); // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
  202. if (length <= 0) // 视为客户端已经close连接
  203. {
  204. Log("recv 捕获对端连接已关闭信号, 开始有序退出");
  205. Close();
  206. break;
  207. }
  208. _databuffer.AddBuffer(arrServerRecMsg, length); //将收到的数据添加到缓存器中
  209. while (_databuffer.GetData(out sSocketData _socketData)) //取出一条完整数据
  210. {
  211. recvDataBuffer.Enqueue(_socketData); // 放入channel
  212. }
  213. Array.Clear(arrServerRecMsg, 0, length);
  214. }
  215. catch (OperationCanceledException ce)
  216. { // 捕获取消信号
  217. Log("recv 捕获取消任务信号, 开始有序退出");
  218. Close();
  219. break;
  220. }
  221. catch (SocketException se)
  222. {
  223. if (se.ErrorCode == 10060) // 超时的时候错误号码是10060
  224. {
  225. //Log("捕获等待超时信号,继续recv.");
  226. continue; // 继续等待
  227. }
  228. if (se.ErrorCode == 10035)
  229. {
  230. Log("捕获recv空返回信号,继续recv.");
  231. Thread.Sleep(0);
  232. continue;
  233. }
  234. LogHelper.LogError("recv捕获Socket连接(断开)错误, 开始有序退出");
  235. Close(); // 关闭之前accept出来的和客户端进行通信的套接字
  236. break;
  237. }
  238. catch (ThreadAbortException ta)
  239. {
  240. LogHelper.LogError($"recv 捕获退出运行信息: socket立即关闭");
  241. Close();
  242. }
  243. catch (Exception e)
  244. {
  245. LogHelper.LogError($"{e}");
  246. LogHelper.LogError($"recv 捕获未知错误: {e.Message} 开始有序退出");
  247. Close(); // 关闭之前accept出来的和客户端进行通信的套接字
  248. break;
  249. }
  250. }
  251. Log("recv 任务退出ing");
  252. Sock.Close(); // 关闭之前accept出来的和客户端进行通信的套接字
  253. SocketConnected = false;
  254. }
  255. /// <summary>
  256. /// 向服务端写入消息
  257. /// </summary>
  258. void WriteLooper()
  259. {
  260. while (SocketConnected)
  261. {
  262. try
  263. {
  264. if (sendDataBuffer.TryDequeue(out sSocketData msg))
  265. {
  266. if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号
  267. {
  268. LogHelper.Log("[send]收到退出信号");
  269. break;
  270. }
  271. Sock.Send(new ArraySegment<byte>(msg.ToBytes()), SocketFlags.None);
  272. }
  273. else
  274. {
  275. Thread.Sleep(0); // sleep(0)即可
  276. }
  277. }
  278. catch (OperationCanceledException ce)
  279. { // 捕获取消信号
  280. Log("捕获取消任务信号, 开始有序退出");
  281. Close();
  282. break;
  283. }
  284. catch (SocketException se)
  285. {
  286. if (se.ErrorCode == 10060) // 超时的时候错误号码是10060
  287. {
  288. Log("捕获等待超时信号,继续");
  289. continue; // 继续等待
  290. }
  291. LogHelper.LogError($"捕获Socket连接(断开)错误{se.Message} \r\n 开始有序退出");
  292. Close(); // 关闭之前accept出来的和客户端进行通信的套接字
  293. break;
  294. }
  295. catch (Exception e)
  296. {
  297. LogHelper.LogError($"捕获未知错误: {e.Message} \r\n 开始有序退出");
  298. Close(); // 关闭之前accept出来的和客户端进行通信的套接字
  299. break;
  300. }
  301. }
  302. }
  303. /// <summary>
  304. /// 服务端消息到业务逻辑的分发执行
  305. /// </summary>
  306. /// <returns></returns>
  307. void Dispatch()
  308. {
  309. while (recvDataBuffer.TryDequeue(out sSocketData msg))
  310. {
  311. if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号
  312. {
  313. Log("[dispatch]收到退出信号: " + msg.ProtocallType);
  314. break;
  315. }
  316. else if (msg.ProtocallType == eProtocalCommand.Pong)
  317. {
  318. OnPong(msg.Data);
  319. }
  320. else if (callbacks.ContainsKey(msg.ProtocallType))
  321. {
  322. Log("收到消息: " + msg.ProtocallType);
  323. callbacks[msg.ProtocallType](msg.Data);
  324. }
  325. else
  326. {
  327. LogHelper.LogError("未注册的消息类型:" + msg.ProtocallType.ToString());
  328. }
  329. }
  330. }
  331. #endregion
  332. }