using System;
using System.Linq;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using UnityEngine;
using System.Threading.Tasks;
using pb = global::Google.Protobuf;
using MapServer;
using System.Diagnostics;
using System.Text;
public class PeerBase
{
///
/// 任务取消
///
protected readonly CancellationTokenSource cts = new CancellationTokenSource();
protected readonly CancellationToken ct;
public Thread RecvloopTask { get; private set; }
private Thread SendLoopTask { get; set; }
private Task PingPongLoopTask;
protected Socket Sock { get; private set; } // 连接对象
private volatile bool SocketConnected = false;
///
/// 连接断开事件
///
public event Action OnConnectClosed;
///
/// 接收buffer
///
private readonly ConcurrentQueue recvDataBuffer = new ConcurrentQueue();
///
/// 发送buffer
///
private readonly ConcurrentQueue sendDataBuffer = new ConcurrentQueue();
///
/// 消息分发
///
protected ConcurrentDictionary> callbacks = new ConcurrentDictionary>();
///
/// ping/pong 轮回延时(毫秒)
///
public volatile int tts = 0;
///
/// 主动连接型Peer(客户端Peer)
///
///
///
///
public PeerBase(string Ip, int Port)
{
ct = cts.Token;
_ = InitScoket(Ip, Port);
}
public void Log(string msg)
{
LogHelper.Log("Socket: " + msg);
}
/////
///// 断线事件(如果处理函数中返回true,代表已经处理妥善,不需要继续退出)
/////
//public event Func OnDisconnect;
protected async Task InitScoket(string IP, int Port)
{
if (SocketConnected)
{
LogHelper.Log("连接正常无需重连!!");
return true;
}
try
{
var endPoint = new IPEndPoint(IPAddress.Parse(IP), Port);
Sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await Sock.ConnectAsync(endPoint);
Sock.NoDelay = true;
//Sock.Blocking = false;
SocketConnected = true;
}
catch (Exception e)
{
throw new Exception($"{this} 初始化失败!({e.Message})");
}
InitDispatch(); // 初始化逻辑派发
SendLoopTask = new Thread(WriteLooper);
SendLoopTask.IsBackground = true;
SendLoopTask.Start();
RecvloopTask = new Thread(RecvLooper);
RecvloopTask.IsBackground = true;
RecvloopTask.Start();
PingPongLoopTask = Task.Run(PingPongLoop);
return true;
}
///
/// 被动建立连接型Peer(服务端peer)
///
///
[Obsolete("服务端/p2p时才可以使用 -gwang", true)]
public PeerBase(Socket socket)
{
ct = cts.Token;
Sock = socket;
InitDispatch(); // 初始化逻辑派发
RecvloopTask = new Thread(RecvLooper);
RecvloopTask.IsBackground = true;
RecvloopTask.Start();
}
///
/// 初始化消息到业务逻辑之间的派发执行映射
///
protected virtual void InitDispatch()
{
}
async Task PingPongLoop()
{
int ms = 1 * 1000;
while (true)
{
try
{
await Task.Delay(ms, ct);
Ping();
if (cts.IsCancellationRequested)
{
break;
}
}
catch
{
break;
}
}
}
void OnPong(byte[] data)
{
var msg = Pong.Parser.ParseFrom(data);
long now = DateTimeOffset.Now.ToUnixTimeMilliseconds();
this.tts = (int)(now - msg.PingMs);
}
public void Ping()
{
var msg = new MapServer.Ping() { PingMs = DateTimeOffset.Now.ToUnixTimeMilliseconds() };
var sdata = sSocketData.FromIMsg(eProtocalCommand.Ping, msg);
sendDataBuffer.Enqueue(sdata);
}
///
/// 向服务端端发送消息
///
///
///
protected void SendMsg(eProtocalCommand msgType, pb::IMessage msg)
{
var sdata = sSocketData.FromIMsg(msgType, msg);
sendDataBuffer.Enqueue(sdata);
}
///
/// 将向所有后台任务发送结束或取消信号(注: MockLogic的实现方法需要自己编码关注ct.CancellationRequested状态主动退出)
///
public virtual void Close()
{
LogHelper.Log("关闭连接: " + TraceBack());
cts.Cancel(); // 向recv协程发送取消消息
var endMsg = sSocketData.FromBytes(eProtocalCommand.Noop, new byte[] { });
sendDataBuffer.Enqueue(endMsg); // 向send协程发送结束消息
recvDataBuffer.Enqueue(endMsg); // 向read协程发送结束消息
if (RecvloopTask.ThreadState == System.Threading.ThreadState.Running)
{
RecvloopTask.Abort(); // 停止接收线程
}
LogHelper.Log("关闭连接: 发送关闭事件" + TraceBack());
this.OnConnectClosed?.Invoke(); // 需不需要重新救活一下?
}
static string TraceBack() // 打印调用到这里的栈信息
{
var sb = new StringBuilder();
StackTrace st = new StackTrace();
StackFrame[] sf = st.GetFrames();
sf.ToList().ForEach(curSf => sb.AppendLine($"{curSf}"));
return sb.ToString();
}
#region 后台任务
virtual public void Update()
{
Dispatch(); // 分发业务逻辑
//WriteToserver(); // 向服务端写消息
}
///
/// 接收服务端发来的信
///
void RecvLooper()
{
Sock.ReceiveTimeout = 18000; // 接收等待超时时间设为800毫秒
var _databuffer = new DataBuffer();
byte[] arrServerRecMsg = new byte[4096]; // 创建一个内存缓冲区,其大小为4k字节
while (true)
{
try
{
//var length = Sock.ReceiveAsync(new ArraySegment(arrServerRecMsg), SocketFlags.None, ct).Result; // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
var length = Sock.Receive(arrServerRecMsg); // 将接收到的信息存入到内存缓冲区,并返回其字节数组的长度
if (length <= 0) // 视为客户端已经close连接
{
Log("recv 捕获对端连接已关闭信号, 开始有序退出");
Close();
break;
}
_databuffer.AddBuffer(arrServerRecMsg, length); //将收到的数据添加到缓存器中
while (_databuffer.GetData(out sSocketData _socketData)) //取出一条完整数据
{
recvDataBuffer.Enqueue(_socketData); // 放入channel
}
Array.Clear(arrServerRecMsg, 0, length);
}
catch (OperationCanceledException ce)
{ // 捕获取消信号
Log("recv 捕获取消任务信号, 开始有序退出");
Close();
break;
}
catch (SocketException se)
{
if (se.ErrorCode == 10060) // 超时的时候错误号码是10060
{
//Log("捕获等待超时信号,继续recv.");
continue; // 继续等待
}
if (se.ErrorCode == 10035)
{
Log("捕获recv空返回信号,继续recv.");
Thread.Sleep(0);
continue;
}
LogHelper.LogError("recv捕获Socket连接(断开)错误, 开始有序退出");
Close(); // 关闭之前accept出来的和客户端进行通信的套接字
break;
}
catch (ThreadAbortException ta)
{
LogHelper.LogError($"recv 捕获退出运行信息: socket立即关闭");
Close();
}
catch (Exception e)
{
LogHelper.LogError($"{e}");
LogHelper.LogError($"recv 捕获未知错误: {e.Message} 开始有序退出");
Close(); // 关闭之前accept出来的和客户端进行通信的套接字
break;
}
}
Log("recv 任务退出ing");
Sock.Close(); // 关闭之前accept出来的和客户端进行通信的套接字
SocketConnected = false;
}
///
/// 向服务端写入消息
///
void WriteLooper()
{
while (SocketConnected)
{
try
{
if (sendDataBuffer.TryDequeue(out sSocketData msg))
{
if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号
{
LogHelper.Log("[send]收到退出信号");
break;
}
Sock.Send(new ArraySegment(msg.ToBytes()), SocketFlags.None);
}
else
{
Thread.Sleep(0); // sleep(0)即可
}
}
catch (OperationCanceledException ce)
{ // 捕获取消信号
Log("捕获取消任务信号, 开始有序退出");
Close();
break;
}
catch (SocketException se)
{
if (se.ErrorCode == 10060) // 超时的时候错误号码是10060
{
Log("捕获等待超时信号,继续");
continue; // 继续等待
}
LogHelper.LogError($"捕获Socket连接(断开)错误{se.Message} \r\n 开始有序退出");
Close(); // 关闭之前accept出来的和客户端进行通信的套接字
break;
}
catch (Exception e)
{
LogHelper.LogError($"捕获未知错误: {e.Message} \r\n 开始有序退出");
Close(); // 关闭之前accept出来的和客户端进行通信的套接字
break;
}
}
}
///
/// 服务端消息到业务逻辑的分发执行
///
///
void Dispatch()
{
while (recvDataBuffer.TryDequeue(out sSocketData msg))
{
if (msg.ProtocallType == eProtocalCommand.Noop) // 无意义操作,用作退出信号
{
Log("[dispatch]收到退出信号: " + msg.ProtocallType);
break;
}
else if (msg.ProtocallType == eProtocalCommand.Pong)
{
OnPong(msg.Data);
}
else if (callbacks.ContainsKey(msg.ProtocallType))
{
Log("收到消息: " + msg.ProtocallType);
callbacks[msg.ProtocallType](msg.Data);
}
else
{
LogHelper.LogError("未注册的消息类型:" + msg.ProtocallType.ToString());
}
}
}
#endregion
}