C# ConcurrentQueue对列的基本使用方式

03-03 1800阅读

前言

        队列(Queue)代表了一个先进先出的对象集合。当您需要对各项进行先进先出的访问时,则使用队列。当您在列表中添加一项,称为入队,当您从列表中移除一项时,称为出队。

C# ConcurrentQueue对列的基本使用方式
(图片来源网络,侵删)

  ConcurrentQueue队列是一个高效的线程安全的队列,是.Net Framework 4.0,System.Collections.Concurrent命名空间下的一个数据结构。

应用场景

        本地对列的应用场景:高效API日志的异步写入,API中有回调其他接口推送数据,并发API数据的写入等其他可以交给第二个人去做的事。

        上位机开发应用场景: 双机协作的TCP通讯,异步获取数据并显示在图表控件上。

代码

对列初始化:
ConcurrentQueue Queue = new ConcurrentQueue();
/// 
/// 单条入队列
/// 
/// 入列模型
Queue.Enqueue(model);
/// 
/// 多条入队
/// 
/// 
List list = new List();
list.add(new model{});
list.add(new model{});
list.add(new model{});
list.add(new model{});
list.ForEach(t => Enqueue(t));
/// 
/// 单条出队
/// 
/// 
ApiLogContract apiLog = null;
Queue.TryDequeue(out apiLog);
/// 
/// 多条出队
/// 
/// 数量
/// 
var logs = new List();
if (Queue.Count > 0)
{
  for (int i = 0; i  
/// 
/// 获取对列数量
/// 
/// 
Queue.Count
/// 
/// 确定序列是否包含任何元素[用于判断对列是否有要处理的数据]这个方法的性能比Count()方法快
/// 
/// 
Queue.Any()

以上就是对列的基本使用方法。

对列的使用场景有很多。所有要异步处理的都可以使用对列的方式。如接口日志异步处理,邮件、短信异步发送等。对列一般配合单例设计模式和线程一起使用更佳。

示例

接口日志异步写入。不影响接口性能。

/// 
    /// 接口日志对列
    /// 
    public sealed class ApiLogQueueService
    {
        #region 属性成员
        /// 
        /// 单例
        /// 
        private static ApiLogQueueService _Instance = new ApiLogQueueService();
        /// 
        /// 上次写入时间
        /// 
        public DateTime LastWriteTime = DateTime.Now;
        /// 
        /// 是否写入运行线程日志
        /// 
        public bool IsWriteInitLog = false;
        /// 
        /// 是否刷新缓存
        /// 
        public bool IsRefreshCache = false;
        /// 
        /// 上次刷新时间
        /// 
        public DateTime LastRefreshTime = DateTime.Now;
        /// 
        /// 是否写入监控线程日志
        /// 
        public bool IsWriteMonitorLog = false;
        /// 
        /// 接口日志服务类对象
        /// 
        private IApiLogService logsService = ServiceFactory.ApiLogs;
        /// 
        /// 队列
        /// 
        private static readonly ConcurrentQueue Queue = new ConcurrentQueue();
        #endregion
        /// 
        /// 私有默认构造
        /// 
        private ApiLogQueueService()
        {
            Init();
            InitMonitoring();
        }
        /// 
        /// 单例只读属性
        /// 
        public static ApiLogQueueService Instance
        {
            get
            {
                return _Instance;
            }
        }
        #region 操作对列方法
        /// 
        /// 单条入队列
        /// 
        /// 入列模型
        public void Enqueue(ApiLogServiceModel model)
        {
            //判断是否使用日志对列
            if (Queue.Count  Enqueue(t));
        }
        /// 
        /// 单条出队
        /// 
        /// 
        public ApiLogServiceModel Dequeue()
        {
            ApiLogServiceModel apiLog;
            Queue.TryDequeue(out apiLog);
            return apiLog;
        }
        /// 
        /// 多条出队
        /// 
        /// 数量
        /// 
        public List Dequeue(int count)
        {
            var logs = new List();
            if (Queue.Count > 0)
            {
                for (int i = 0; i 
            {
                logsService.AddLogQueue("Init", InterfaceName.ApiLogQueue, "运行线程日志Init-->StartNew", "Init", 0, CommonHelp.ServiceIp);
                while (true)
                {
                    //判断消费队列是否小于某个值
                    try
                    {
                        var queueCount = GetCount();
                        //批量获取日志写入数据库
                        //logsService.AddModelBatch(Dequeue(SystemConst.Queue.ApiLogQueueCount));
                        if (queueCount >= SystemConst.Queue.ApiLogQueueCount && queueCount = LastWriteTime.AddMinutes(1))
                        {
                            LastWriteTime = DateTime.Now;
                            var apiLogs = Dequeue(queueCount);
                            if (apiLogs.Count > 0)
                            {
                                //批量获取日志写入数据库
                                logsService.AddModelBatch(apiLogs);
                                apiLogs.Clear();
                            }
                        }
                        #region 自己的运行日志
                        if (DateTime.Now.Minute == SystemConst.Queue.ApiWriteLogMinute && !IsWriteInitLog)
                        {
                            logsService.AddLogQueue("Init", InterfaceName.ApiLogQueue,
                                string.Format("运行线程日志Init-->StartNew-->while,当前对列数量:[{0}],时间:{1},IsWriteInitLog:{2}", queueCount, DateTime.Now.Minute, !IsWriteInitLog),
                                "Init", queueCount, CommonHelp.ServiceIp);
                            IsWriteInitLog = true;
                        }
                        else if (DateTime.Now.Minute > SystemConst.Queue.ApiWriteLogMinute && IsWriteInitLog)
                        {
                            IsWriteInitLog = false;
                        }
                        #endregion
                    }
                    catch (Exception ex)
                    {
                        logsService.AddLogQueue(ApiLogType.Error, 0, InterfaceName.ApiLogQueue, ex.ToString(), string.Empty, "Init", 0, CommonHelp.ServiceIp);
                    }
                    //Thread.Sleep(100);
                    Thread.Sleep(10);
                }
            }, null);
        }
        /// 
        /// 初始化监控方法
        /// 
        private void InitMonitoring()
        {
            logsService.AddLogQueue("InitMonitoring", InterfaceName.ApiLogQueue, "监控线程日志InitMonitoring", "InitMonitoring", 0, CommonHelp.ServiceIp);
            Task.Factory.StartNew(t =>
            {
                logsService.AddLogQueue("InitMonitoring", InterfaceName.ApiLogQueue, "监控线程日志InitMonitoring-->StartNew", "InitMonitoring", 0, CommonHelp.ServiceIp);
                while (true)
                {
                    //判断消费队列是否小于某个值
                    try
                    {
                        var queueCount = GetCount();
                        if (queueCount > SystemConst.Queue.ApiLogQueueWarningCount)
                        {
                            var content = string.Format("接口日志对列已到达报警值{0},当前对列数量为:[{1}].", SystemConst.Queue.ApiLogQueueWarningCount, queueCount);
                            //记录报警日志
                            logsService.AddLogQueue("InitMonitoring", InterfaceName.ApiLogQueue, content, "InitMonitoring", 0, CommonHelp.ServiceIp);
                            //发报警邮件
                            //new EmailUtils().SendWarningEmail("ApiLogQueueService", "InitMonitoring", content, LogType.Warning);
                            LastWriteTime = DateTime.Now;
                            logsService.AddModelBatch(Dequeue(queueCount));
                        }
                        #region 接口日志对列已到达危险值报警
                        if (queueCount > SystemConst.Queue.ApiLogQueueDangerCount)//ApiLogQueueDangerCount
                        {
                            var content = string.Format("接口日志对列已到达危险值{0},当前对列数量为:[{1}].", SystemConst.Queue.ApiLogQueueDangerCount, queueCount);
                            //记录报警日志
                            logsService.AddLogQueue("InitMonitoring", InterfaceName.ApiLogQueue, content, "InitMonitoring", 0, CommonHelp.ServiceIp);
                            //发报警邮件
                            //new EmailUtils().SendWarningEmail("ApiLogQueueService", "InitMonitoring", content, LogType.Error);
                        }
                        #endregion
                        #region 监控线程自己运行日志
                        if (DateTime.Now.Hour == SystemConst.Queue.ApiWriteLogHour && !IsWriteMonitorLog)
                        {
                            logsService.AddLogQueue("InitMonitoring", InterfaceName.ApiLogQueue, string.Format("监控线程日志InitMonitoring-->StartNew-->while,当前对列数量:[{0}]", queueCount),
                                "InitMonitoring", queueCount, CommonHelp.ServiceIp);
                            IsWriteMonitorLog = true;
                        }
                        else if (DateTime.Now.Hour > SystemConst.Queue.ApiWriteLogHour && IsWriteInitLog)
                        {
                            IsWriteMonitorLog = false;
                        }
                        #endregion
                    }
                    catch (Exception ex)
                    {
                        logsService.AddLogQueue(ApiLogType.Error, 0, InterfaceName.ApiLogQueue, ex.ToString(), string.Empty, "InitMonitoring", 0, CommonHelp.ServiceIp);
                    }
                    Thread.Sleep(1000);//1秒监控一次
                }
            }, null);
        }
        #endregion
        public void AddLog()
        {
            logsService.AddLogQueue("AddLog", InterfaceName.ApiLogQueue, "对列测试日志AddLog1", "AddLog", 0, CommonHelp.ServiceIp);
            logsService.AddModelBatch(Dequeue(SystemConst.Queue.ApiLogQueueCount));
            logsService.AddLogQueue("AddLog", InterfaceName.ApiLogQueue, "对列测试日志AddLog1", "AddLog", 0, CommonHelp.ServiceIp);
        }
        public void AddLog(string className, string method, ApiLogType logType, int projectId, int code, string requestData, string responseData, string remark, int resultCode, long elapsedTime, string createdBy, string search1 = "", string search2 = "")
        {
            logsService.AddLog(className, method, logType, projectId, code, requestData, responseData, remark, resultCode, elapsedTime, createdBy, search1, search2);
        }
    }

调用

BaseController

/// 
        /// 添加日志
        /// 
        /// 
        /// 
        /// 
        /// 请求参数对象
        /// 请求参数对象
        /// 请求参数对象
        /// 计时器对象
        /// 查询条件1
        /// 查询条件2
        /// 日志类型
        protected void AddLogElapsedTime(InterfaceName interfaceName, string className, string method, object reqModel, object rspModel, int resultCode, Stopwatch sw, Exception ex, string search1 = "", string search2 = "", ApiLogType logType = ApiLogType.Common)
        {
            sw.Stop();
            ApiLogQueueService.Instance.Enqueue(new ApiLogServiceModel()
            {
                Search1 = search1.SubstringExtend(100),
                Search2 = search2.SubstringExtend(100),
                ClassName = className.SubstringExtend(100),
                Method = method.SubstringExtend(100),
                Type = ex == null ? ApiLogType.Common.GetHashCode() : ApiLogType.Error.GetHashCode(),
                RequestModel = reqModel,
                ResponseModel = rspModel,
                ElapsedTime = sw.ElapsedMilliseconds,
                Code = interfaceName.GetHashCode(),
                InterfaceNameEnum = interfaceName,
                ProjectId = ProjectId.GetHashCode(),
                ResultCode = resultCode,
                Remark = ex != null ? string.Format("errorMsg:{0},Exception:{1}", ex.Message, ex.ToString()) : string.Empty,
                RowStatus = TrueOrFalseEnum.True.GetHashCode(),
                CreatedBy = CurrentUserId,
                CreatedOn = DateTime.Now,
                UpdatedBy = CommonHelp.ServiceIp,
            });
        }

net core API 过滤器调用

/// 
    /// api 日志跟踪类
    /// 
    public class ApiTrackerAttribute : ActionFilterAttribute
    {
        private string _prefix = "Tracker_";
        /// 
        /// 控制器方法执行之前执行此方法
        /// 
        /// 
        /// 
        /// 
        public override Task OnActionExecutionAsync(ActionExecutingContext context, ActionExecutionDelegate next)
        {
            var sw = new Stopwatch();
            sw.Start();
            context.HttpContext.Items.Add(_prefix + "params", context.ActionArguments);//请求开始实际带上
            var executeStartTime = DateTime.Now;
            context.HttpContext.Items.Add(_prefix + "executeStartTime", executeStartTime.ToString("yyyy-MM-dd HH:mm:ss:ffff"));
            //context.HttpContext.Items.Add(_prefix + "executeStartTimeffff", executeStartTime.ToString("yyyy-MM-dd HH:mm:ss:ffff"));
            context.HttpContext.Items.Add(_prefix + "sw", sw);
            context.HttpContext.Items.Add(_prefix + "ControllerName", ((ControllerActionDescriptor)context.ActionDescriptor).ControllerName);
            context.HttpContext.Items.Add(_prefix + "ActionName", ((ControllerActionDescriptor)context.ActionDescriptor).ActionName);
            return base.OnActionExecutionAsync(context, next);
        }
        /// 
        /// 控制器操作结果执行之前调用此方法
        /// 
        /// 
        /// 
        /// 
        public override Task OnResultExecutionAsync(ResultExecutingContext context, ResultExecutionDelegate next)
        {
            WriteLogAsyncResult(context);
            return base.OnResultExecutionAsync(context, next);
        }
        /// 
        /// 控制器操作结果执行之后调用此方法
        /// 
        /// 
        public override void OnActionExecuted(ActionExecutedContext context)
        {
            //WriteLogAsync(context);
            base.OnActionExecuted(context);
        }
        /// 
        /// 写日志
        /// 
        /// 
        /// 
        private bool WriteLogAsyncResult(ResultExecutingContext actionContext)
        {
            var isSucceed = false;
            var log = "0";
            try
            {
                ObjectResult resultObj = null;
                if (actionContext.Result != null)
                {
                    resultObj = actionContext.Result as Microsoft.AspNetCore.Mvc.ObjectResult;
                }
                //var resultJosn = JsonConvert.SerializeObject(resultObj.Value);
                log = "1";
                var items = actionContext.HttpContext.Items;
                log = "2";
                Stopwatch sw = null;
                if (items.ContainsKey(_prefix + "sw"))
                {
                    sw = items[_prefix + "sw"] as Stopwatch;
                }
                log = "3";
                var executeStartTimefffStr = string.Empty;
                if (items.ContainsKey(_prefix + "executeStartTime"))
                {
                    executeStartTimefffStr = items[_prefix + "executeStartTime"].ToString();
                }
                log = "4";
                var executeStartTimeStr = string.Empty;
                var fffStr = string.Empty;
                var executeStartTime = DateTime.Now;
                if (!string.IsNullOrEmpty(executeStartTimefffStr))
                {
                    if (executeStartTimefffStr.Length >= 19)
                    {
                        executeStartTimeStr = executeStartTimefffStr.Substring(0, 19);
                    }
                    if (executeStartTimefffStr.Length >= 24)
                    {
                        log = "5";
                        fffStr = executeStartTimefffStr.Substring(20, 4);
                    }
                    executeStartTime = DateTime.Parse(executeStartTimeStr);
                }                
                //log = "6";
                var fffStr_int = fffStr.ToInt();
                //log = "7";
                var controllerName = string.Empty;
                if (items.ContainsKey(_prefix + "ControllerName"))
                {
                    controllerName = items[_prefix + "ControllerName"].ToString();
                }
                var method = string.Empty;
                if (items.ContainsKey(_prefix + "ActionName"))
                {
                    method = items[_prefix + "ActionName"].ToString();
                }
                //log = "8";
                //var executeStartTime = DateTime.Parse(items[_prefix + "executeStartTime"].ToString());
                //var executeStartTimefff = items[_prefix + "executeStartTimeffff"].ToString();
                //executeStartTime.AddMilliseconds(fffStr.ToInt());
                //var aa = 0;
                #region MyRegion
                //WebApiLogModel logModel = new WebApiLogModel();
                //logModel.ExecuteStartTime = executeStartTime;
                //logModel.ExecuteEndTime = DateTime.Now;
                获取Action 参数
                //logModel.ActionParams = new Dictionary(actionContext.ActionArguments);
                //logModel.HttpRequestHeaders = actionContext.HttpContext.Request.Headers.ToString();
                //logModel.HttpRequestPath = actionContext.HttpContext.Request.Path;
                //logModel.HttpMethod = actionContext.HttpContext.Request.Method;
                //logModel.ActionName = ((ControllerActionDescriptor)actionContext.ActionDescriptor).ActionName;
                //logModel.ControllerName = ((ControllerActionDescriptor)actionContext.ActionDescriptor).ControllerName;
                //logModel.TotalSeconds = (logModel.ExecuteEndTime - logModel.ExecuteStartTime).TotalSeconds;
                //logModel.IP = CommonHttpContext.Current.Connection.RemoteIpAddress.ToString(); 
                #endregion
                IDictionary actionArguments = null;
                if (items.ContainsKey(_prefix + "params"))
                {
                    actionArguments = (IDictionary)items[_prefix + "params"];
                    log = "9";
                }
                var logModel = new ApiLogServiceModel()
                {
                    Search1 = string.Empty,
                    Search2 = string.Empty,
                    ClassName = controllerName,
                    Method = method,
                    Type = ApiLogType.Common.GetHashCode(),
                    RequestModel = actionArguments,
                    ResponseModel = resultObj != null ? resultObj.Value : null,
                    Code = 0,
                    InterfaceNameEnum = InterfaceName.All,
                    ProjectId = ProjectIdEnum.LDApi.GetHashCode(),
                    ResultCode = 200,
                    ExecuteStartTime = executeStartTime,//DateTime.Parse(items[_prefix + "executeStartTime"].ToString()),
                    ExecuteEndTime = DateTime.Now,
                    Remark = string.Empty,
                    RowStatus = TrueOrFalseEnum.True.GetHashCode(),
                    CreatedBy = CommonHelp.ServiceIp,//CommonHttpContext.Current.Connection.RemoteIpAddress.ToString(),
                    CreatedOn = DateTime.Now,
                    UpdatedBy = CommonHelp.ServiceIp
                };
                log = "10";
                var totalMilliseconds = (logModel.ExecuteEndTime - logModel.ExecuteStartTime).TotalMilliseconds;
                log = "11";
                if ((totalMilliseconds - fffStr_int) >= 0)
                {
                    logModel.TimeDifference = totalMilliseconds - fffStr_int;
                }
                else
                {
                    logModel.TimeDifference = totalMilliseconds;
                }
                log = "12";
                logModel.Remark = string.Format("ExecuteStartTime:{0},executeStartTimefff:{1},ExecuteEndTime:{2},fffStr:{3}", executeStartTime.ToString("yyyy-MM-dd HH:mm:ss:fffff"), executeStartTimefffStr, logModel.ExecuteEndTime.ToString("yyyy-MM-dd HH:mm:ss:fffff"), fffStr);
                log = "13";
                if (sw != null)
                {
                    sw.Stop();
                    log = "14";
                    logModel.ElapsedTime = sw.ElapsedMilliseconds;
                }                
                log = "15";
                ApiLogQueueService.Instance.Enqueue(logModel);
                log = "16";
                isSucceed = true;
                log = "17";
            }
            catch (Exception ex)
            {
                var logModel = new ApiLogServiceModel()
                {
                    Search1 = string.Empty,
                    Search2 = string.Empty,
                    ClassName = "ApiTrackerAttribute",
                    Method = "WriteLogAsync",
                    Type = ApiLogType.Error.GetHashCode(),
                    RequestModel = null,
                    RequestData = String.Format("log:{0}", log),
                    ResponseModel = null,
                    ElapsedTime = 0,
                    Code = 0,
                    InterfaceNameEnum = InterfaceName.All,
                    ProjectId = ProjectIdEnum.LDApi.GetHashCode(),
                    ResultCode = 0,
                    TimeDifference = 0,
                    ExecuteStartTime = SystemConst.DateTimeDefault,
                    ExecuteEndTime = SystemConst.DateTimeDefault,
                    Remark = string.Format("errorMsg:{0},Exception:{1}", ex.Message, ex.ToString()),
                    RowStatus = TrueOrFalseEnum.True.GetHashCode(),
                    CreatedBy = CommonHelp.ServiceIp,
                    CreatedOn = DateTime.Now,
                    UpdatedBy = CommonHelp.ServiceIp,
                };
                ApiLogQueueService.Instance.Enqueue(logModel);
            }
            return isSucceed;
        }
        /// 
        /// 写日志
        /// 
        /// 
        /// 
        private async Task WriteLogAsync(ActionExecutedContext actionContext)
        {
            try
            {
                var items = actionContext.HttpContext.Items;
                var sw = items[_prefix + "sw"] as Stopwatch;
                var executeStartTime = DateTime.Parse(items[_prefix + "executeStartTime"].ToString());
                #region MyRegion
                //WebApiLogModel logModel = new WebApiLogModel();
                //logModel.ExecuteStartTime = executeStartTime;
                //logModel.ExecuteEndTime = DateTime.Now;
                获取Action 参数
                //logModel.ActionParams = new Dictionary(actionContext.ActionArguments);
                //logModel.HttpRequestHeaders = actionContext.HttpContext.Request.Headers.ToString();
                //logModel.HttpRequestPath = actionContext.HttpContext.Request.Path;
                //logModel.HttpMethod = actionContext.HttpContext.Request.Method;
                //logModel.ActionName = ((ControllerActionDescriptor)actionContext.ActionDescriptor).ActionName;
                //logModel.ControllerName = ((ControllerActionDescriptor)actionContext.ActionDescriptor).ControllerName;
                //logModel.TotalSeconds = (logModel.ExecuteEndTime - logModel.ExecuteStartTime).TotalSeconds;
                //logModel.IP = CommonHttpContext.Current.Connection.RemoteIpAddress.ToString(); 
                
                #endregion
                IDictionary actionArguments = null;
                if (items.ContainsKey(_prefix + "params"))
                {
                    actionArguments = (IDictionary)items[_prefix + "params"];
                }
                var logModel = new ApiLogServiceModel()
                {
                    Search1 = string.Empty,
                    Search2 = string.Empty,
                    ClassName = items[_prefix + "ControllerName"].ToString(),
                    Method = items[_prefix + "ActionName"].ToString(),
                    Type = ApiLogType.Common.GetHashCode(),
                    RequestModel = actionArguments,
                    ResponseModel = null,
                    Code = 0,
                    InterfaceNameEnum = InterfaceName.All,
                    ProjectId = ProjectIdEnum.LDApi.GetHashCode(),
                    ResultCode = 200,
                    ExecuteStartTime = executeStartTime,//DateTime.Parse(items[_prefix + "executeStartTime"].ToString()),
                    ExecuteEndTime = DateTime.Now,
                    Remark = string.Empty,
                    RowStatus = TrueOrFalseEnum.True.GetHashCode(),
                    CreatedBy = CommonHelp.ServiceIp,//CommonHttpContext.Current.Connection.RemoteIpAddress.ToString(),
                    CreatedOn = DateTime.Now,
                    UpdatedBy = CommonHelp.ServiceIp
                };
                logModel.TimeDifference = (logModel.ExecuteEndTime - logModel.ExecuteStartTime).TotalMilliseconds;
                logModel.Remark = string.Format("ExecuteStartTime:{0},ExecuteEndTime:{1}", executeStartTime.ToString("yyyy-MM-dd HH:mm:ss:fffff"), logModel.ExecuteEndTime.ToString("yyyy-MM-dd HH:mm:ss:fffff"));
                sw.Stop();
                logModel.ElapsedTime = sw.ElapsedMilliseconds;
                ApiLogQueueService.Instance.Enqueue(logModel);
            }
            catch (Exception ex)
            {
                var logModel = new ApiLogServiceModel()
                {
                    Search1 = string.Empty,
                    Search2 = string.Empty,
                    ClassName = "ApiTrackerAttribute",
                    Method = "WriteLogAsync",
                    Type = ApiLogType.Error.GetHashCode(),
                    RequestModel = string.Empty,
                    ResponseModel = null,
                    ElapsedTime = 0,
                    Code = 0,
                    InterfaceNameEnum = InterfaceName.All,
                    ProjectId = ProjectIdEnum.LDApi.GetHashCode(),
                    ResultCode = 0,
                    TimeDifference = 0,
                    ExecuteStartTime = SystemConst.DateTimeDefault,
                    ExecuteEndTime = SystemConst.DateTimeDefault,
                    Remark = string.Format("errorMsg:{0},Exception:{1}", ex.Message, ex.ToString()),
                    RowStatus = TrueOrFalseEnum.True.GetHashCode(),
                    CreatedBy = CommonHelper.CommonHelp.ServiceIp,
                    CreatedOn = DateTime.Now
                };
                ApiLogQueueService.Instance.Enqueue(logModel);
            }
        }
    }

配置服务

public void ConfigureServices(IServiceCollection services)
{
//在startup的ConfigureServices中设置或禁用WebApiExceptionFilterAttribute或WebApiTrackerAttribute
            services.AddControllers(ops =>
            {
                ops.Filters.Add(new ApiExceptionFilterAttribute());
                ops.Filters.Add(new ApiTrackerAttribute());
            }).SetCompatibilityVersion(CompatibilityVersion.Version_3_0);
}

上位机双机协作通讯应用

/// 
/// 通讯 对列
/// 
public sealed class SoketMsgQueueService
{
    #region 属性成员
    private static string _className = "SoketMsgQueueService";
    private static string remark = "监控日志";
    /// 
    /// 单例
    /// 
    private static SoketMsgQueueService _Instance = new SoketMsgQueueService();
    /// 
    /// 是否写入运行线程日志
    /// 
    private bool IsWriteInitLog = false;
    /// 
    /// 是否刷新缓存
    /// 
    private bool IsRefreshCache = false;
    /// 
    /// 缓存区到料请求发送时间
    /// 
    private DateTime CacheRegionRequestSendTime = DateTime.Now;
    /// 
    /// 是否写入监控线程日志
    /// 
    private bool IsWriteMonitorLog = false;
    private SoketStatusEnum _status = SoketStatusEnum.Leisure;
    /// 
    /// 通讯状态
    /// 
    public SoketStatusEnum Status
    {
        get { return _status; }
        set
        {
            _status = value;
            Global._globalIns.IsLeisureCommunication = _status == SoketStatusEnum.Leisure;
        }
    }
    private SoketStatusEnum _statusCacheRegion = SoketStatusEnum.Leisure;
    /// 
    /// 通讯状态(缓存区)  = isOk;
    /// 
    public SoketStatusEnum StatusCacheRegion
    {
        get { return _statusCacheRegion; }
        set
        {
            _statusCacheRegion = value;
            Global._globalIns.IsLeisureCacheRegion = _statusCacheRegion == SoketStatusEnum.Leisure;
        }
    }
    /// 
    /// 服务端的是否开始焊接状态
    /// 
    public bool IsStartWeldServer { get; set; }
    /// 
    /// 联机设备是否生产
    /// 
    public Dictionary DeviceIsProduction = new Dictionary();
    /// 
    /// 添加设备生产状态
    /// 
    /// 设备ID
    /// 是否生产中
    /// 
    public void AddDeviceProductionStatus(int device, bool isProduction)
    {
        if (DeviceIsProduction.ContainsKey(device))
        {
            if (DeviceIsProduction[device] != isProduction)
            {
                DeviceIsProduction[device] = isProduction;
                setWorkType();
                //发送联机状态通知 客户端不要发送统一由服务端通知
                //sendProductionStatusNotice();
            }
        }
        else
        {
            DeviceIsProduction.Add(device, isProduction);
            setWorkType();
            //发送联机状态通知 客户端不要发送统一由服务端通知
            //sendProductionStatusNotice();
        }
    }
    /// 
    /// 发送联机状态通知(只发送本机生产状态)
    /// 
    public void SendProductionStatusNotice(bool isProduction)
    {
        if (Global.EthernetClient != null)
        {
            var type = SoketTypeEnum.ProductionStatusNotice;
            //发送联机状态通知
            var no = AppCommonMethods.GenerateMsgNo(type);
            var reqModel = new SoketMsgModel()
            {
                No = no,
                DeviceId = Global.DeviceId,//Global.WorkType.GetHashCode(),
                Type = type,
                Msg = Global.WorkType.GetEnumDesc(),
                Status = isProduction ? TrueOrFalseEnum.True.GetHashCode() : TrueOrFalseEnum.False.GetHashCode(),
                MsgTime = DateTime.Now
            };
            var modelJson = JsonConvert.SerializeObject(reqModel);
            Global.EthernetClient.SendMsg(modelJson);
        }
    }
    /// 
    /// 设置工作模式
    /// 
    private void setWorkType()
    {
        if (DeviceIsProduction.Count > 2)
        {
            Global.WorkType = DeviceIsProduction.Values.Count(t => t == true) >= 2 ? WorkTypeEnum.Auto : WorkTypeEnum.Single;
        }
        else
        {
            Global.WorkType = WorkTypeEnum.Single;
        }
    }
    /// 
    /// 接口日志服务类对象
    /// 
    //  private IApiLogService logsService = ServiceFactory.ApiLogs;
    /// 
    /// 公共日志服务
    /// 
    private static ICommonLogService logsService = ServiceIocFactory.GetRegisterServiceImp();
    private ICommunicationLogService _serverCommunicationLog = ServiceIocFactory.GetRegisterServiceImp();
    /// 
    /// 队列
    /// 
    private static readonly ConcurrentQueue Queue = new ConcurrentQueue();
    #endregion
    /// 
    /// 私有默认构造
    /// 
    private SoketMsgQueueService()
    {
        Init();
        InitMonitoring();
    }
    /// 
    /// 单例只读属性
    /// 
    public static SoketMsgQueueService Instance
    {
        get
        {
            return _Instance;
        }
    }
    #region 操作对列方法
    /// 
    /// 单条入队列
    /// 
    /// 入列模型
    public void Enqueue(SoketMsgModel model)
    {
        //判断是否使用日志对列
        //if (Queue.Count  Enqueue(t));
    }
    /// 
    /// 单条出队
    /// 
    /// 
    public SoketMsgModel Dequeue()
    {
        SoketMsgModel model;
        Queue.TryDequeue(out model);
        return model;
    }
    /// 
    /// 多条出队
    /// 
    /// 数量
    /// 
    public List Dequeue(int count)
    {
        var datas = new List();
        if (Queue.Count > 0)
        {
            for (int i = 0; i 
        {
            var method = _className + ".Init";
            logsService.AddLogContent(_className, "Init", "运行线程日志Init-->StartNew",
                "Init", remark);
            while (true)
            {
                //判断消费队列是否小于某个值
                try
                {
                    var queueCount = GetCount();
                    if (queueCount > 0)
                    {
                        //LastWriteTime = DateTime.Now;
                        if (Status == SoketStatusEnum.Leisure)
                        {
                            var model = Dequeue();
                            var msg = "1.开始处理【" + model.Type.GetEnumDesc() + "】请求No:" + model.No;
                            WorkFlow.ShowMsg(msg, method);
                            logsService.AddLog(_className, method, string.Empty, string.Empty, msg,
                                model.No, model.WorkpieceTrayCode, "Init", remark);
                            if (model.Type == SoketTypeEnum.InRequest || model.Type == SoketTypeEnum.OutRequest)
                            {
                                //发送给服端
                                Global.EthernetClient.SendModel(model);
                                WorkFlow.ShowMsgSocket("[发]:" + model.DeviceId + "机" + model.Type.GetEnumDesc() + ",No:" + model.No);
                                logsService.AddLog(_className, method, string.Empty, string.Empty, "2.发送[" + model.Type.GetEnumDesc() + "]给服端",
                                    model.No, model.WorkpieceTrayCode, "Init", remark);
                            }
                            else if (model.Type == SoketTypeEnum.InSendRequest)
                            {
                                Status = SoketStatusEnum.In;
                                //业务逻辑自己写
                                Status = SoketStatusEnum.Leisure;
                                SocketCommonClient.SendStatusMsg = null;//清除服务端进料处理消息
                                SocketCommonClient.ClearClientRspMsgList();
                            }
                            else if (model.Type == SoketTypeEnum.OutSendRequest)
                            {
                                Status = SoketStatusEnum.Out;
                                if (!Global._globalIns.IsStartWeld)
                                {
                                    //在处理上游出料请求时,如果本机不是生产状态时要打开【皮带电机】和下降【待料位阻挡】
                                    IoWork.SetoutPosCylinderMotion(DirectionUpDown.Down, "处理上游出料请求 待料位阻挡下降");
                                    IoWork.MotorOperation(true);
                                }
                                if (StatusCacheRegion == SoketStatusEnum.Leisure)
                                {
                                    
                                    if (model.DeviceId == Global.DeviceId)//2号机
                                    {
                                        WorkFlow.ShowMsgSocket("开:" + model.DeviceId + "机(本)" + model.Type.GetEnumDesc() + ",No:" + model.No);
                                        //这时生产线程是处于一个等待出料通知的,这里要马上给通知
                                        localOut(method, model);
                                        WorkFlow.ShowMsgSocket("完:" + model.DeviceId + "机(本)" + model.Type.GetEnumDesc() + ",No:" + model.No);
                                    }
                                    else
                                    {
                                        //对列线程去处理
                                        WorkFlow.ShowMsgSocket("开:" + model.DeviceId + "机(下)" + model.Type.GetEnumDesc() + ",No:" + model.No);
                                        noticeOut(method, model);
                                        WorkFlow.ShowMsgSocket("完:" + model.DeviceId + "机(下)" + model.Type.GetEnumDesc() + ",No:" + model.No);
                                    }
                                    //}
                                }
                                else
                                {
                                    WorkFlow.ShowMsgSocket("开:" + model.DeviceId + "机" + model.Type.GetEnumDesc()
                                        + ",缓)状态:" + StatusCacheRegion.GetEnumDesc() + "监控中,No:" + model.No);
                                    //等待缓存区状态为【空闲】时才能放(出)料
                                    Monitoring.CacheRegionStatusMonitoring(model);
                                    //if (!Global._globalIns.IsStartWeld && model.DeviceId == Global.DeviceId)
                                    //{
                                    //    break;
                                    //}
                                    if (model.DeviceId == Global.DeviceId)
                                    {
                                        localOut(method, model);
                                        WorkFlow.ShowMsgSocket("完:" + model.DeviceId + "机(本)" + model.Type.GetEnumDesc()
                                            + ",缓)状态:" + StatusCacheRegion.GetEnumDesc() + ",No:" + model.No);
                                    }
                                    else
                                    {
                                        //通知2号机
                                        noticeOut(method, model);
                                        WorkFlow.ShowMsgSocket("完成:" + model.DeviceId + "机(下)" + model.Type.GetEnumDesc()
                                            + ",缓)状态:" + StatusCacheRegion.GetEnumDesc() + ",No:" + model.No);
                                    }
                                }
                                Status = SoketStatusEnum.Leisure;
                                SocketCommonClient.ClearClientRspMsgList();
                            }
                        }
                        else
                        {
                            WorkFlow.ShowAlarmMsg("目前通讯状态非空闲请求检查状态是否有误", "Init");
                        }
                        //WorkFlow.GetAndShowWeldData(model);
                    }
                    #region 自己的运行日志
                    if (DateTime.Now.Minute == SystemConst.Queue.ApiWriteLogMinute && !IsWriteInitLog)
                    {
                        logsService.AddLogContent(_className, "Init",
                            string.Format("运行线程日志Init-->StartNew-->while,当前对列数量:[{0}],时间:{1},IsWriteInitLog:{2}",
                            queueCount, DateTime.Now.Minute, !IsWriteInitLog),
                            "Init", remark);
                        IsWriteInitLog = true;
                    }
                    else if (DateTime.Now.Minute > SystemConst.Queue.ApiWriteLogMinute && IsWriteInitLog)
                    {
                        IsWriteInitLog = false;
                    }
                    #endregion
                }
                catch (Exception ex)
                {
                    logsService.AddLogContent(_className, "Init",
                        "运行线程日志Init-->StartNew,异常:" + ex.Message, "Init", remark);
                }
                Thread.Sleep(10);
            }
        }, null);
    }
    /// 
    /// 通知出料
    /// 
    /// 
    /// 
    private void noticeOut(string method, SoketMsgModel model)
    {
        var head = "noticeOut>";
        IoWork.SetoutPosCylinderMotion(DirectionUpDown.Down);
        logsService.AddLog(_className, method, string.Empty, string.Empty, "2.待料位气缸下降",
            model.No, model.WorkpieceTrayCode, "Init", remark);
        //业务逻辑自己写
        CacheRegionRequestSendTime = DateTime.Now;
        Status = SoketStatusEnum.Leisure;
        StatusCacheRegion = SoketStatusEnum.In;
        msg = head + "[" + model.WorkpieceTrayCode + "] 出料完成,Status:" + Status.GetEnumDesc() + ",缓存区状态:" + StatusCacheRegion.GetEnumDesc();
        logsService.AddLog(_className, method, string.Empty, string.Empty, msg,
            model.No, model.WorkpieceTrayCode, "Init", remark);
    }
    /// 
    /// 本机出料
    /// 
    /// 
    /// 
    private void localOut(string method, SoketMsgModel model)
    {
        var head = "localOut>";
        //放料(出料)通知P成员出料
        
        CacheRegionRequestSendTime = DateTime.Now;
        Status = SoketStatusEnum.Leisure;
        StatusCacheRegion = SoketStatusEnum.In;
        msg = head + model.WorkpieceTrayCode + " 出料完成,Status:" + Status.GetEnumDesc() + ",缓存区状态:" + StatusCacheRegion.GetEnumDesc();
        logsService.AddLog(_className, method, string.Empty, string.Empty, msg,
            model.No, model.WorkpieceTrayCode, "Init", remark);
    }
    /// 
    /// 初始化 清扫机要料信号 监控方法
    /// 
    private void InitMonitoring()
    {
        logsService.AddLogContent(_className, "InitMonitoring", "清扫机要料信号 监控线程日志InitMonitoring",
            "InitMonitoring", remark);
        Task.Factory.StartNew(t =>
        {
            var method = _className + ".InitMonitoring";
            logsService.AddLogContent(_className, "InitMonitoring",
                "清扫机要料信号 监控线程日志InitMonitoring-->StartNew", "InitMonitoring", remark);
            while (true)
            {
                //清扫机要料信号 监控
                try
                {
                    var msg = string.Empty;
                    var queueCount = GetCount();
                    #region 上料位的空料盘放到出料位顶升
                    if (WorkFlow.WorkpieceTrayInToOut && Global._globalIns.IsStartWeld)
                    {
                        
                    }
                    #endregion
                    
                    #region (同步)缓存区状态
                    if (StatusCacheRegion == SoketStatusEnum.Leisure)
                    {
                        msg = "发送(同步)缓存区状态,时间:" + DateTime.Now + ",缓存区状态:" + StatusCacheRegion.GetEnumDesc();
                        //发送(同步)缓存区状态
                        if (Global.EthernetClient != null)
                        {
                            Global.EthernetClient.SendModel(string.Empty, SoketTypeEnum.InSweeperRequest, msg, string.Empty);
                            logsService.AddLog(_className, method, string.Empty, string.Empty, msg,
                                string.Empty, string.Empty, "InitMonitoring", remark);
                        }
                    }
                    #endregion
                    #region 每分钟同步一次生产状态
                    if (DateTime.Now.Minute == SystemConst.Queue.ApiWriteLogMinute)
                    {
                        SendProductionStatusNotice(Global._globalIns.IsStartWeld);
                    }
                    #endregion
                    #region 监控线程自己运行日志
                    if (DateTime.Now.Hour == SystemConst.Queue.ApiWriteLogHour && !IsWriteMonitorLog)
                    {
                        logsService.AddLogContent(_className, "InitMonitoring",
                            string.Format("监控线程日志InitMonitoring-->StartNew-->while,当前对列数量:[{0}]",
                                queueCount),
                            "InitMonitoring", remark);
                        IsWriteMonitorLog = true;
                    }
                    else if (DateTime.Now.Hour > SystemConst.Queue.ApiWriteLogHour && IsWriteInitLog)
                    {
                        IsWriteMonitorLog = false;
                    }
                    #endregion
                }
                catch (Exception ex)
                {
                    logsService.AddLogContent(_className, "InitMonitoring",
                        "监控线程日志InitMonitoring-->StartNew,异常:" + ex.Message,
                        "InitMonitoring", remark);
                }
                Thread.Sleep(1000);//1秒监控一次
                //Thread.Sleep(100);
            }
        }, null);
    }
    /// 
    /// 是否有请求
    /// 
    /// 
    /// 
    public bool IsHaveReq(SoketMsgModel model)
    {
        return Queue.Count(t => t.Type == model.Type) > 0;
    }
    /// 
    /// 保存数据
    /// 
    public void SaveData()
    {
        var queueCount = GetCount();
        if (queueCount > 0)
        {
            //var models = Dequeue(queueCount);
            //foreach (var item in models)
            //{
            //    WorkFlow.GetAndShowWeldData(item);
            //}
        }
    }
    /// 
    /// 清空对列
    /// 
    public void Clear()
    {
        Queue.Clear();
    }
    /// 
    /// 复位
    /// 
    public void Restoration()
    {
        Clear();
        Status = SoketStatusEnum.Leisure;
        StatusCacheRegion = SoketStatusEnum.Leisure;
    }
    public void InitObj()
    {
        WorkFlow.ShowMsg("通讯对列初始化成功", "InitObj");
    }
    #endregion
}
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]