123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- using System;
- using System.Linq;
- using System.Collections.Concurrent;
- using System.IO;
- using System.Net;
- using System.Net.Sockets;
- using System.Threading;
- using UnityEngine;
- using System.Threading.Tasks;
- using pb = global::Google.Protobuf;
- using MapServer;
- using System.Diagnostics;
- using System.Text;
- public class PeerBase
- {
- /// <summary>
- /// 任务取消
- /// </summary>
- protected readonly CancellationTokenSource cts = new CancellationTokenSource();
- protected readonly CancellationToken ct;
- public Thread RecvloopTask { get; private set; }
- private Thread SendLoopTask { get; set; }
- private Task PingPongLoopTask;
- protected Socket Sock { get; private set; } // 连接对象
- private volatile bool SocketConnected = false;
- /// <summary>
- /// 连接断开事件
- /// </summary>
- public event Action OnConnectClosed;
- /// <summary>
- /// 接收buffer
- /// </summary>
- private readonly ConcurrentQueue<sSocketData> recvDataBuffer = new ConcurrentQueue<sSocketData>();
- /// <summary>
- /// 发送buffer
- /// </summary>
- private readonly ConcurrentQueue<sSocketData> sendDataBuffer = new ConcurrentQueue<sSocketData>();
- /// <summary>
- /// 消息分发
- /// </summary>
- protected ConcurrentDictionary<eProtocalCommand, Action<byte[]>> callbacks = new ConcurrentDictionary<eProtocalCommand, Action<byte[]>>();
- /// <summary>
- /// ping/pong 轮回延时(毫秒)
- /// </summary>
- public volatile int tts = 0;
- /// <summary>
- /// 主动连接型Peer(客户端Peer)
- /// </summary>
- /// <param name="Ip"></param>
- /// <param name="Port"></param>
- /// <exception cref="Exception"></exception>
- public PeerBase(string Ip, int Port)
- {
- ct = cts.Token;
- _ = InitScoket(Ip, Port);
- }
- public void Log(string msg)
- {
- LogHelper.Log("Socket: " + msg);
- }
- ///// <summary>
- ///// 断线事件(如果处理函数中返回true,代表已经处理妥善,不需要继续退出)
- ///// </summary>
- //public event Func<bool> OnDisconnect;
- protected async Task<bool> InitScoket(string IP, int Port)
- {
-
- if (SocketConnected)
- {
- LogHelper.Log("连接正常无需重连!!");
- return true;
- }
- try
- {
- var endPoint = new IPEndPoint(IPAddress.Parse(IP), Port);
- Sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- await Sock.ConnectAsync(endPoint);
- Sock.NoDelay = true;
- //Sock.Blocking = false;
- SocketConnected = true;
- }
- catch (Exception e)
- {
- throw new Exception($"{this} 初始化失败!({e.Message})");
- }
- InitDispatch(); // 初始化逻辑派发
- SendLoopTask = new Thread(WriteLooper);
- SendLoopTask.IsBackground = true;
- SendLoopTask.Start();
- RecvloopTask = new Thread(RecvLooper);
- RecvloopTask.IsBackground = true;
- RecvloopTask.Start();
- PingPongLoopTask = Task.Run(PingPongLoop);
- return true;
- }
- /// <summary>
- /// 被动建立连接型Peer(服务端peer)
- /// </summary>
- /// <param name="socket"></param>
- [Obsolete("服务端/p2p时才可以使用 -gwang", true)]
- public PeerBase(Socket socket)
- {
- ct = cts.Token;
- Sock = socket;
- InitDispatch(); // 初始化逻辑派发
- RecvloopTask = new Thread(RecvLooper);
- RecvloopTask.IsBackground = true;
- RecvloopTask.Start();
- }
- /// <summary>
- /// 初始化消息到业务逻辑之间的派发执行映射
- /// </summary>
- protected virtual void InitDispatch()
- {
- }
- async Task PingPongLoop()
- {
- int ms = 1 * 1000;
- while (true)
- {
- try
- {
- await Task.Delay(ms, ct);
- Ping();
- if (cts.IsCancellationRequested)
- {
- break;
- }
- }
- catch
- {
- break;
- }
- }
- }
- void OnPong(byte[] data)
- {
- var msg = Pong.Parser.ParseFrom(data);
- long now = DateTimeOffset.Now.ToUnixTimeMilliseconds();
- this.tts = (int)(now - msg.PingMs);
- }
- public void Ping()
- {
- var msg = new MapServer.Ping() { PingMs = DateTimeOffset.Now.ToUnixTimeMilliseconds() };
- var sdata = sSocketData.FromIMsg(eProtocalCommand.Ping, msg);
- sendDataBuffer.Enqueue(sdata);
- }
- /// <summary>
- /// 向服务端端发送消息
- /// </summary>
- /// <param name="msgType"></param>
- /// <param name="msg"></param>
- protected void SendMsg(eProtocalCommand msgType, pb::IMessage msg)
- {
- var sdata = sSocketData.FromIMsg(msgType, msg);
- sendDataBuffer.Enqueue(sdata);
- }
- /// <summary>
- /// 将向所有后台任务发送结束或取消信号(注: MockLogic的实现方法需要自己编码关注ct.CancellationRequested状态主动退出)
- /// </summary>
- public virtual void Close()
- {
- LogHelper.Log("关闭连接: " + TraceBack());
- cts.Cancel(); // 向recv协程发送取消消息
- var endMsg = sSocketData.FromBytes(eProtocalCommand.Noop, new byte[] { });
- sendDataBuffer.Enqueue(endMsg); // 向send协程发送结束消息
- recvDataBuffer.Enqueue(endMsg); // 向read协程发送结束消息
- if (RecvloopTask.ThreadState == System.Threading.ThreadState.Running)
- {
- RecvloopTask.Abort(); // 停止接收线程
- }
- LogHelper.Log("关闭连接: 发送关闭事件" + TraceBack());
- this.OnConnectClosed?.Invoke(); // 需不需要重新救活一下?
- }
- static string TraceBack() // 打印调用到这里的栈信息
- {
- var sb = new StringBuilder();
- StackTrace st = new StackTrace();
- StackFrame[] sf = st.GetFrames();
- sf.ToList().ForEach(curSf => sb.AppendLine($"{curSf}"));
- return sb.ToString();
- }
- #region 后台任务
- virtual public void Update()
- {
- Dispatch(); // 分发业务逻辑
- //WriteToserver(); // 向服务端写消息
- }
- /// <summary>
- /// 接收服务端发来的信
- /// </summary>
- void RecvLooper()
- {
- Sock.ReceiveTimeout = 18000; // 接收等待超时时间设为800毫秒
- var _databuffer = new DataBuffer();
- byte[] arrServerRecMsg = new byte[4096]; // 创建一个内存缓冲区,其大小为4k字节
- while (true)
- {
- try
- {
- //var length = Sock.ReceiveAsync(new ArraySegment<byte>(arrServerRecMsg), SocketFlags.None, ct).Result; // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
- var length = Sock.Receive(arrServerRecMsg); // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
- if (length <= 0) // 视为客户端已经close连接
- {
- Log("recv 捕获对端连接已关闭信号, 开始有序退出");
- Close();
- break;
- }
- _databuffer.AddBuffer(arrServerRecMsg, length); //将收到的数据添加到缓存器中
- while (_databuffer.GetData(out sSocketData _socketData)) //取出一条完整数据
- {
- recvDataBuffer.Enqueue(_socketData); // 放入channel
- }
- Array.Clear(arrServerRecMsg, 0, length);
- }
- catch (OperationCanceledException ce)
- { // 捕获取消信号
- Log("recv 捕获取消任务信号, 开始有序退出");
- Close();
- break;
- }
- catch (SocketException se)
- {
- if (se.ErrorCode == 10060) // 超时的时候错误号码是10060
- {
- //Log("捕获等待超时信号,继续recv.");
- continue; // 继续等待
- }
- if (se.ErrorCode == 10035)
- {
- Log("捕获recv空返回信号,继续recv.");
- Thread.Sleep(0);
- continue;
- }
- LogHelper.LogError("recv捕获Socket连接(断开)错误, 开始有序退出");
- Close(); // 关闭之前accept出来的和客户端进行通信的套接字
- break;
- }
- catch (ThreadAbortException ta)
- {
- LogHelper.LogError($"recv 捕获退出运行信息: socket立即关闭");
- Close();
- }
- catch (Exception e)
- {
- LogHelper.LogError($"{e}");
- LogHelper.LogError($"recv 捕获未知错误: {e.Message} 开始有序退出");
- Close(); // 关闭之前accept出来的和客户端进行通信的套接字
- break;
- }
- }
- Log("recv 任务退出ing");
- Sock.Close(); // 关闭之前accept出来的和客户端进行通信的套接字
- SocketConnected = false;
- }
- /// <summary>
- /// 向服务端写入消息
- /// </summary>
- void WriteLooper()
- {
- while (SocketConnected)
- {
- try
- {
- if (sendDataBuffer.TryDequeue(out sSocketData msg))
- {
- if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号
- {
- LogHelper.Log("[send]收到退出信号");
- break;
- }
- Sock.Send(new ArraySegment<byte>(msg.ToBytes()), SocketFlags.None);
- }
- else
- {
- Thread.Sleep(0); // sleep(0)即可
- }
- }
- catch (OperationCanceledException ce)
- { // 捕获取消信号
- Log("捕获取消任务信号, 开始有序退出");
- Close();
- break;
- }
- catch (SocketException se)
- {
- if (se.ErrorCode == 10060) // 超时的时候错误号码是10060
- {
- Log("捕获等待超时信号,继续");
- continue; // 继续等待
- }
- LogHelper.LogError($"捕获Socket连接(断开)错误{se.Message} \r\n 开始有序退出");
- Close(); // 关闭之前accept出来的和客户端进行通信的套接字
- break;
- }
- catch (Exception e)
- {
- LogHelper.LogError($"捕获未知错误: {e.Message} \r\n 开始有序退出");
- Close(); // 关闭之前accept出来的和客户端进行通信的套接字
- break;
- }
- }
- }
- /// <summary>
- /// 服务端消息到业务逻辑的分发执行
- /// </summary>
- /// <returns></returns>
- void Dispatch()
- {
- while (recvDataBuffer.TryDequeue(out sSocketData msg))
- {
- if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号
- {
- Log("[dispatch]收到退出信号: " + msg.ProtocallType);
- break;
- }
- else if (msg.ProtocallType == eProtocalCommand.Pong)
- {
- OnPong(msg.Data);
- }
- else if (callbacks.ContainsKey(msg.ProtocallType))
- {
- Log("收到消息: " + msg.ProtocallType);
- callbacks[msg.ProtocallType](msg.Data);
- }
- else
- {
- LogHelper.LogError("未注册的消息类型:" + msg.ProtocallType.ToString());
- }
- }
- }
- #endregion
- }
|