using System; using System.Linq; using System.Collections.Concurrent; using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; using UnityEngine; using System.Threading.Tasks; using pb = global::Google.Protobuf; using MapServer; using System.Diagnostics; using System.Text; public class PeerBase { /// /// 任务取消 /// protected readonly CancellationTokenSource cts = new CancellationTokenSource(); protected readonly CancellationToken ct; public Thread RecvloopTask { get; private set; } private Thread SendLoopTask { get; set; } private Task PingPongLoopTask; protected Socket Sock { get; private set; } // 连接对象 private volatile bool SocketConnected = false; /// /// 连接断开事件 /// public event Action OnConnectClosed; /// /// 接收buffer /// private readonly ConcurrentQueue recvDataBuffer = new ConcurrentQueue(); /// /// 发送buffer /// private readonly ConcurrentQueue sendDataBuffer = new ConcurrentQueue(); /// /// 消息分发 /// protected ConcurrentDictionary> callbacks = new ConcurrentDictionary>(); /// /// ping/pong 轮回延时(毫秒) /// public volatile int tts = 0; /// /// 主动连接型Peer(客户端Peer) /// /// /// /// public PeerBase(string Ip, int Port) { ct = cts.Token; _ = InitScoket(Ip, Port); } public void Log(string msg) { LogHelper.Log("Socket: " + msg); } ///// ///// 断线事件(如果处理函数中返回true,代表已经处理妥善,不需要继续退出) ///// //public event Func OnDisconnect; protected async Task InitScoket(string IP, int Port) { if (SocketConnected) { LogHelper.Log("连接正常无需重连!!"); return true; } try { var endPoint = new IPEndPoint(IPAddress.Parse(IP), Port); Sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await Sock.ConnectAsync(endPoint); Sock.NoDelay = true; //Sock.Blocking = false; SocketConnected = true; } catch (Exception e) { throw new Exception($"{this} 初始化失败!({e.Message})"); } InitDispatch(); // 初始化逻辑派发 SendLoopTask = new Thread(WriteLooper); SendLoopTask.IsBackground = true; SendLoopTask.Start(); RecvloopTask = new Thread(RecvLooper); RecvloopTask.IsBackground = true; RecvloopTask.Start(); PingPongLoopTask = Task.Run(PingPongLoop); return true; } /// /// 被动建立连接型Peer(服务端peer) /// /// [Obsolete("服务端/p2p时才可以使用 -gwang", true)] public PeerBase(Socket socket) { ct = cts.Token; Sock = socket; InitDispatch(); // 初始化逻辑派发 RecvloopTask = new Thread(RecvLooper); RecvloopTask.IsBackground = true; RecvloopTask.Start(); } /// /// 初始化消息到业务逻辑之间的派发执行映射 /// protected virtual void InitDispatch() { } async Task PingPongLoop() { int ms = 1 * 1000; while (true) { try { await Task.Delay(ms, ct); Ping(); if (cts.IsCancellationRequested) { break; } } catch { break; } } } void OnPong(byte[] data) { var msg = Pong.Parser.ParseFrom(data); long now = DateTimeOffset.Now.ToUnixTimeMilliseconds(); this.tts = (int)(now - msg.PingMs); } public void Ping() { var msg = new MapServer.Ping() { PingMs = DateTimeOffset.Now.ToUnixTimeMilliseconds() }; var sdata = sSocketData.FromIMsg(eProtocalCommand.Ping, msg); sendDataBuffer.Enqueue(sdata); } /// /// 向服务端端发送消息 /// /// /// protected void SendMsg(eProtocalCommand msgType, pb::IMessage msg) { var sdata = sSocketData.FromIMsg(msgType, msg); sendDataBuffer.Enqueue(sdata); } /// /// 将向所有后台任务发送结束或取消信号(注: MockLogic的实现方法需要自己编码关注ct.CancellationRequested状态主动退出) /// public virtual void Close() { LogHelper.Log("关闭连接: " + TraceBack()); cts.Cancel(); // 向recv协程发送取消消息 var endMsg = sSocketData.FromBytes(eProtocalCommand.Noop, new byte[] { }); sendDataBuffer.Enqueue(endMsg); // 向send协程发送结束消息 recvDataBuffer.Enqueue(endMsg); // 向read协程发送结束消息 if (RecvloopTask.ThreadState == System.Threading.ThreadState.Running) { RecvloopTask.Abort(); // 停止接收线程 } LogHelper.Log("关闭连接: 发送关闭事件" + TraceBack()); this.OnConnectClosed?.Invoke(); // 需不需要重新救活一下? } static string TraceBack() // 打印调用到这里的栈信息 { var sb = new StringBuilder(); StackTrace st = new StackTrace(); StackFrame[] sf = st.GetFrames(); sf.ToList().ForEach(curSf => sb.AppendLine($"{curSf}")); return sb.ToString(); } #region 后台任务 virtual public void Update() { Dispatch(); // 分发业务逻辑 //WriteToserver(); // 向服务端写消息 } /// /// 接收服务端发来的信 /// void RecvLooper() { Sock.ReceiveTimeout = 18000; // 接收等待超时时间设为800毫秒 var _databuffer = new DataBuffer(); byte[] arrServerRecMsg = new byte[4096]; // 创建一个内存缓冲区,其大小为4k字节 while (true) { try { //var length = Sock.ReceiveAsync(new ArraySegment(arrServerRecMsg), SocketFlags.None, ct).Result; // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度 var length = Sock.Receive(arrServerRecMsg); // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度 if (length <= 0) // 视为客户端已经close连接 { Log("recv 捕获对端连接已关闭信号, 开始有序退出"); Close(); break; } _databuffer.AddBuffer(arrServerRecMsg, length); //将收到的数据添加到缓存器中 while (_databuffer.GetData(out sSocketData _socketData)) //取出一条完整数据 { recvDataBuffer.Enqueue(_socketData); // 放入channel } Array.Clear(arrServerRecMsg, 0, length); } catch (OperationCanceledException ce) { // 捕获取消信号 Log("recv 捕获取消任务信号, 开始有序退出"); Close(); break; } catch (SocketException se) { if (se.ErrorCode == 10060) // 超时的时候错误号码是10060 { //Log("捕获等待超时信号,继续recv."); continue; // 继续等待 } if (se.ErrorCode == 10035) { Log("捕获recv空返回信号,继续recv."); Thread.Sleep(0); continue; } LogHelper.LogError("recv捕获Socket连接(断开)错误, 开始有序退出"); Close(); // 关闭之前accept出来的和客户端进行通信的套接字 break; } catch (ThreadAbortException ta) { LogHelper.LogError($"recv 捕获退出运行信息: socket立即关闭"); Close(); } catch (Exception e) { LogHelper.LogError($"{e}"); LogHelper.LogError($"recv 捕获未知错误: {e.Message} 开始有序退出"); Close(); // 关闭之前accept出来的和客户端进行通信的套接字 break; } } Log("recv 任务退出ing"); Sock.Close(); // 关闭之前accept出来的和客户端进行通信的套接字 SocketConnected = false; } /// /// 向服务端写入消息 /// void WriteLooper() { while (SocketConnected) { try { if (sendDataBuffer.TryDequeue(out sSocketData msg)) { if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号 { LogHelper.Log("[send]收到退出信号"); break; } Sock.Send(new ArraySegment(msg.ToBytes()), SocketFlags.None); } else { Thread.Sleep(0); // sleep(0)即可 } } catch (OperationCanceledException ce) { // 捕获取消信号 Log("捕获取消任务信号, 开始有序退出"); Close(); break; } catch (SocketException se) { if (se.ErrorCode == 10060) // 超时的时候错误号码是10060 { Log("捕获等待超时信号,继续"); continue; // 继续等待 } LogHelper.LogError($"捕获Socket连接(断开)错误{se.Message} \r\n 开始有序退出"); Close(); // 关闭之前accept出来的和客户端进行通信的套接字 break; } catch (Exception e) { LogHelper.LogError($"捕获未知错误: {e.Message} \r\n 开始有序退出"); Close(); // 关闭之前accept出来的和客户端进行通信的套接字 break; } } } /// /// 服务端消息到业务逻辑的分发执行 /// /// void Dispatch() { while (recvDataBuffer.TryDequeue(out sSocketData msg)) { if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号 { Log("[dispatch]收到退出信号: " + msg.ProtocallType); break; } else if (msg.ProtocallType == eProtocalCommand.Pong) { OnPong(msg.Data); } else if (callbacks.ContainsKey(msg.ProtocallType)) { Log("收到消息: " + msg.ProtocallType); callbacks[msg.ProtocallType](msg.Data); } else { LogHelper.LogError("未注册的消息类型:" + msg.ProtocallType.ToString()); } } } #endregion }