无人在线电视剧免费观看,seo网站的锚文本怎么写,网络设计专业工资,郑州仿站定制模板建站本文作者系360奇舞团前端开发工程师 文章标题#xff1a;谈谈node架构中的线程进程的应用场景、事件循环及任务队列 Node.js是一个基于Chrome V8引擎的JavaScript运行时环境#xff0c;nodejs是单线程执行的#xff0c;它基于事件驱动和非阻塞I/O模型进行多任务的执行。在理… 本文作者系360奇舞团前端开发工程师 文章标题谈谈node架构中的线程进程的应用场景、事件循环及任务队列 Node.js是一个基于Chrome V8引擎的JavaScript运行时环境nodejs是单线程执行的它基于事件驱动和非阻塞I/O模型进行多任务的执行。在理解Node.js的工作原理时我们需要了解进程、线程、事件循环[1]以及消息队列[2]的概念本篇文章就基于这几点去详细介绍帮你慢慢理解node的工作原理。 进程 进程是操作系统中正在运行的一个程序的实例。在Node.js中每个应用程序都运行在一个单独的进程中。 node app.js 就是开启一个服务进程多进程就是进程的复制child_process.forkchild_process.fork 出来的每个进程都拥有自己的独立空间地址、数据栈一个进程无法访问另外一个进程里定义的变量、数据结构只有建立了 IPC 通信进程之间才可数据共 多进程的好处是可以充分利用多核处理器的优势通过将工作负载分配到多个进程中来提高应用程序的性能。 process 模块 Node.js 中的进程 Process 是一个全局对象无需 require 直接使用给我们提供了当前进程中的相关信息。 process.env环境变量例如通过 process.env.NODE_ENV 获取不同环境项目配置信息process.nextTick这个在谈及 Event Loop 时经常为会提到process.pid获取当前进程idprocess.ppid当前进程对应的父进程process.cwd()获取当前进程工作目录process.platform获取当前进程运行的操作系统平台process.uptime()当前进程已运行时间例如pm2 守护进程的 uptime 值进程事件process.on(‘uncaughtException’, cb) 捕获异常信息、process.on(‘exit’, cb进程推出监听三个标准流process.stdout 标准输出、process.stdin 标准输入、process.stderr 标准错误输出process.title 指定进程名称有的时候需要给进程指定一个名称 创建多进程child_process、cluster的应用 Node.js提供了child_process模块用于创建和管理子进程。通过child_process模块我们可以在Node.js中创建新的进程与其进行通信并监视其状态。以下是一个简单的示例演示了如何在Node.js中创建一个子进程并与主进程通信 开启一个http服务并通过 require(child_process).fork创建一个子进程 // child_process.js const http require(http);
const fork require(child_process).fork;
const path require(path);const server http.createServer((req, res) {if(req.url /compute){const compute fork(path.resolve(__dirname, ./compute.js));compute.send(开启一个新的子进程);// 当一个子进程使用 process.send() 发送消息时会触发 message 事件compute.on(message, sum {res.end(Sum is ${sum});compute.kill();});// 子进程监听到一些错误消息退出compute.on(close, (code, signal) {console.log(收到close事件子进程收到信号 ${signal} 而终止退出码 ${code});compute.kill();})}else{res.end(ok);}
});
server.listen(3000, () {console.log(server started at http://127.0.0.1:3000);
}); 创建一个含有大量计算任务的处理逻辑 // compute.js
const computation () {let sum 0;console.info(计算开始);console.time(计算耗时);for (let i 0; i 1e10; i) {sum i};console.info(计算结束);console.timeEnd(计算耗时);return sum;
};process.on(message, msg {console.log(msg, process.pid, process.pid); // 子进程idconst sum computation();// 如果Node.js进程是通过进程间通信产生的那么process.send()方法可以用来给父进程发送消息process.send(sum);
}) 在上面的示例中我们创建了一个http服务并在接口http://127.0.0.1:3000/compute接口中使用require(child_process).fork()创建了一个子进程将大量的计算逻辑放在了子进程中这样一来当我们频繁请求http://127.0.0.1:3000/compute接口时我们的node服务就会并发处理这些计算逻辑密集型的逻辑从而让接口有更快的响应。 试想如果此时没有开启子进程而是将大量计算逻辑放到主进程当有大量请求时会发生什么 答案会变成每次请求都是同步的前一个请求处理完毕才会处理下一个时间就会拉长后面的请求响应就会变慢。 再比如我们上传图片的功能就可以利用开启多个进程 使用cluster创建多进程 const http require(http);
const numCPUs require(os).cpus().length;
const cluster require(cluster);
if(cluster.isMaster){console.log(Master proces id is,process.pid);// fork workersfor(let i 0;inumCPUs;i){cluster.fork();}cluster.on(exit,function(worker,code,signal){console.log(worker process died,id,worker.process.pid)})
}else{// 这里是一个http服务器http.createServer(function(req,res){res.writeHead(200);res.end(hello word);}).listen(8000);} cluster模块调用cluster.fork()来创建子进程该方法与child_process中的fork是同一个方法。 cluster模块采用的是经典的主从模型Cluster会创建一个master然后根据你指定的数量复制出多个子进程可以使用cluster.isMaster属性判断当前进程是master还是worker(工作进程)。由master进程来管理所有的子进程主进程不负责具体的任务处理主要工作是负责调度和管理。 cluster 模块同时实现了负载均衡调度算法在类 unix 系统中cluster 使用轮转调度round-robinnode 中维护一个可用 worker 节点的队列 free和一个任务队列 handles。当一个新的任务到来时节点队列队首节点出队处理该任务并返回确认处理标识依次调度执行。而在 win 系统中Node 通过 Shared Handle 来处理负载通过将文件描述符、端口等信息传递给子进程子进程通过信息创建相应的 SocketHandle / ServerHandle然后进行相应的端口绑定和监听处理请求。 开启多进程时候端口疑问讲解如果多个Node进程监听同一个端口时会出现 Error:listen EADDRIUNS的错误而cluster模块为什么可以让多个子进程监听同一个端口呢?原因是master进程内部启动了一个TCP服务器而真正监听端口的只有这个服务器当来自前端的请求触发服务器的connection事件后master会将对应的socket具柄发送给子进程。而child_process操作子进程时创建多个TCP服务器 无论是 child_process 模块还是 cluster 模块为了解决 Node.js 实例单线程运行无法利用多核 CPU 的问题而出现的。核心就是通过fork()或者其他API创建了子进程之后父进程即 master 进程负责监听端口接收到新的请求后将其分发给下面的 worker 进程父子进程之间才能通过message和send()进行IPC通信Inter-Process Communication。 Node中实现IPC通道是依赖于libuv 总结 当有大量请求时或者大量任务时可以开启多个进程同时并发处理这些请求以缓解处理完一个才能处理下一个请求的阻塞状态。 线程 const http require(http);const server http.createServer();
server.listen(3000,(){process.title测试进程线程数量;console.log(进程id,process.pid)
}) 创建了http服务开启了一个进程都说了Node.js是单线程所以大家可能认为 Node 启动后线程数应该为 1让我们使用Mac自带的活动监视器搜索process.title也就是测试进程线程数量来查看一下具体是几个线程可以看到线程数量是8但是为什么会开启8个线程呢难道Javascript不是单线程不知道小伙伴们有没有这个疑问 解释一下这个原因 Node 中最核心的是 v8 引擎v8是一个执行 JS 的引擎. 也就是翻译 JS. 包括我们熟悉的编译优化, 垃圾回收等等.在 Node 启动后会创建 v8 的实例这个实例是多线程的。 主线程编译、执行代码。编译/优化线程在主线程执行的时候可以优化代码。分析器线程记录分析代码运行时间为 Crankshaft 优化代码执行提供依据。垃圾回收的几个线程。 所以大家常说的 Node 是单线程的指的是 JavaScript 的执行是单线程的(开发者编写的代码运行在单线程环境中)但 Javascript 的宿主环境无论是 Node 还是浏览器都是多线程的 还是刚才的例子我们加入一个读取文件的IO操作 const http require(http);
const fs require(fs)const server http.createServer();
server.listen(3000,(){process.title测试进程线程数量;console.log(进程id,process.pid)
})fs.readFile(./read.js, () {}) 再来看看这个时候的线程数量为什么因为Nodejs是单线程的作为服务器他涉及到IO,而IO是会阻塞的从而影响性能。所以Nodejs把IO操作交给libuv保证主线程可以继续处理其他事情。如图libuv会负责一些 IO 操作DNS因为dns.lookup方法会涉及到读取本地文件例如nsswitch.conf,resolv.conf以及 /etc/hosts)FS读取本地文件和一些 CPU 密集计算ZlibCryptolibuv会启用线程池。当 js层传递给 libuv一个操作任务时libuv会把这个任务加到队列中。而线程池默认大小为 4可以通过UV_THREADPOOL_SIZE可以修改线程池的线程数线程数最大值为128最小值为1。 process.env.UV_THREADPOOL_SIZE 64 前面讲了node本身的一些IO操作和CPU密集计算是可以利用线程做事情的那么我们项目开发中该如何利用线程 Node.js的事件循环模型[3]是单线程的适用于I/O密集型任务。但对于计算密集型任务单线程的性能可能有限。通过创建多个子线程可以将计算密集型任务分配到这些线程中并发执行从而提高性能。 线程的应用场景 下面我们就利用多线程来计算一个CPU密集型任务生成斐波那契数列。 创建执行脚本worker.js Node.js 中的 worker_threads[4] 模块是用于创建多线程应用程序的官方模块。它允许在 Node.js 程序中创建和管理真正的操作系统线程以实现并行处理和利用多核 CPU 的能力。 // worker.js
const {parentPort, workerData} require(worker_threads);parentPort.postMessage(getFibonacciNumber(workerData.num))function getFibonacciNumber(num) {if (num 0) {return 0;}else if (num 1) {return 1;}else {return getFibonacciNumber(num - 1) getFibonacciNumber(num - 2);}
} 创建主线程脚本脚本index.js: // index.js
const {Worker} require(worker_threads);
const path require(path);let number 30;const worker new Worker(path.resolve(__dirname, ./worker.js), {workerData: {num: number}});worker.once(message, result {console.log(${number}th Fibonacci Result: ${result});
});worker.on(error, error {console.log(error);
});worker.on(exit, exitCode {console.log(It exited with code ${exitCode});
})console.log(Execution in main thread); 运行脚本node index.js 查看结果 看控制台打印结果是 Execution in main thread30th Fibonacci Result: 832040It exited with code 0 可以看到“Execution in main thread”是先执行的并没有被前面worker中大量的CPU密集型计算所阻塞到倘若没有新开线程去处理这个大量计算逻辑后面的所有任务都会被阻塞到所以在处理复杂的计算或耗时操作时使用线程可以显著提高CPU利用率和系统吞吐量。 总结 当一个请求或者任务内部有很多逻辑且有大量的CPU密集型计算逻辑时可以开启新线程将部分密集型计算逻辑放到新线程中计算从而不阻塞后面的其他同步逻辑。 node中的事件循环 前面已经讲到node是单线程模型是一个基于事件驱动、非阻塞式 I/O 的模型这离不开他的事件循环机制总体来说事件循环机制就是基于回调通知的机制原本同步模式等待的时间则可以用来处理其它任务。 事件循环的6个阶段 本阶段执行已经被 setTimeout() 和 setInterval() 的调度回调函数。┌───────────────────────────┐
┌─│ timers │
│ └─────────────┬─────────────┘
| 执行延迟到下一个循环迭代的 I/O 回调。
│ ┌─────────────┴─────────────┐
│ │ I/O callbacks
│ └─────────────┬─────────────┘
| 仅系统内部使用。
│ ┌─────────────┴─────────────┐
│ │ idle, prepare │
│ └─────────────┬─────────────┘
| 检索新的I/O事件;执行与 I/O相关的回调 ┌───────────────┐
│ ┌─────────────┴─────────────┐ │ incoming: │
│ │ poll │─────┤ connections, │
│ └─────────────┬─────────────┘ │ data, etc. │
│ setImmediate() 回调函数在这里执行。 └───────────────┘
│ ┌─────────────┴─────────────┐
│ │ check │
│ └─────────────┬─────────────┘
| 一些关闭的回调函数
│ ┌─────────────┴─────────────┐
└──┤ close callbacks │ └───────────────────────────┘ 每个循环阶段内容详解 timers阶段 一个timer指定一个下限时间而不是准确时间在达到这个下限时间后执行回调。在指定时间过后timers会尽可能早地执行回调但系统调度或者其它回调的执行可能会延迟它们。 注意技术上来说poll 阶段控制 timers 什么时候执行。注意这个下限时间有个范围[1, 2147483647]如果设定的时间不在这个范围将被设置为1。此外, libuv为了防止某个阶段任务太多, 导致后续的 阶段 发生饥饿的现象, 所以消息循环的每一个迭代(iterate) 中, 每个阶段执行回调都有个最大数量. 如果超过数量的话也会强行结束当前阶段而进入下一个阶段. 这一条规则适用于消息循环中的每一个阶段. I/O callbacks阶段 这个阶段执行一些系统操作的回调。比如TCP错误如一个TCP socket在想要连接时收到ECONNREFUSED, 类unix系统会等待以报告错误这就会放到 I/O callbacks 阶段的队列执行. 名字会让人误解为执行I/O回调处理程序, 实际上I/O回调会由poll阶段处理. idle, prepare 据说是内部使用, 所以我们也不在这里过多讨论. poll阶段 这是整个消息循环中最重要的一个阶段, 作用是等待异步请求和数据获取I/O事件回调, 例如操作读取文件等等适当的条件下node将阻塞在这里; 该阶段有两个情况 如果 poll 队列不空event loop会遍历队列并同步执行回调直到队列清空或执行的回调数到达系统上限如果 poll 队列为空则发生以下两件事之一 如果代码已经被setImmediate()设定了回调, event loop将结束 poll 阶段进入 check 阶段来执行 check 队列里面的回调 callback。如果代码没有被setImmediate()设定回调event loop将阻塞在该阶段等待回调被加入 poll 队列并立即执行。但是当event loop进入 poll 阶段并且 有设定的timers一旦 poll 队列为空poll 阶段空闲状态 event loop将检查timers,如果有1个或多个timers的下限时间已经到达event loop将绕回 timers 阶段并执行 timer 队列。 check阶段 这个阶段允许在 poll 阶段结束后立即执行回调。如果 poll 阶段空闲并且有被setImmediate()设定的回调event loop会转到 check 阶段而不是继续等待。 setImmediate() 实际上是一个特殊的timer跑在event loop中一个独立的阶段。它使用libuv的API 来设定在 poll 阶段结束后立即执行回调。通常上来讲随着代码执行event loop终将进入 poll 阶段在这个阶段等待 incoming connection, request 等等。但是只要有被setImmediate()设定了回调一旦 poll 阶段空闲那么程序将结束 poll 阶段并进入 check 阶段而不是继续等待 poll 事件们 poll events。 close callbacks 阶段 如果一个 socket 或 handle 被突然关掉比如 socket.destroy()close事件将在这个阶段被触发否则将通过process.nextTick()触发小测试 console.log(同步);process.nextTick((){console.log(nextTick);
});Promise.resolve().then((){console.log(微任务);
});// 到达可执行条件才会执行与
setTimeout(() {console.log(setTimeout);
}, 0);// poll之后会立即检查是否有setImmediate如果存在就立即执行
setImmediate((){console.log(setImmediate);
}) 打印结果为同步 - nextTick - 微任务 - setTimeout - setImmediate setImmediate() 对比 setTimeout() setImmediate() 和 setTimeout() 很类似但是基于被调用的时机他们也有不同表现。 setImmediate() 设计为一旦在当前 轮询 阶段完成 就执行脚本。setTimeout() 在最小阈值ms 单位过后运行脚本。 执行计时器的顺序将根据调用它们的上下文而异。如果二者都从主模块内调用则时序将受进程性能的约束这可能会受到计算机上其他正在运行应用程序的影响 bull.js任务队列 当一大批客户端同时产生大量的网络请求消息时候服务器的承受能力肯定是有一个限制的。对服务器的访问已经超过服务所能处理的最大峰值甚至导致服务器超时负载崩溃。 这时候要是有个容器先让这些消息排队就好了还好有个叫队列的数据结构通过有队列属性的容器排队(先进先出)把消息再传到我们的服务器压力减小了好多这个很棒的容器就是消息队列。 消息队列优势 应用解耦消息队列可以使消费者和生产者直接互不干涉互不影响只需要把消息发送到队列即可而且可独立的扩展或修改两边的处理过程只要能确保它们遵守同样的接口约定可以生产者用Node.js实现消费者用phython实现。灵活性和峰值处理能力当客户端访问量突然剧增对服务器的访问已经超过服务所能处理的最大峰值甚至导致服务器超时负载崩溃使用消息队列可以解决这个问题可以通过控制消费者的处理速度和生产者可进入消息队列的数量等来避免峰值问题排序保证消息队列可以控制数据处理的顺序因为消息队列本身使用的是队列这个数据结构FIFO(先进选出)在一些场景数据处理的顺序很重要比如商品下单顺序等。异步通信消息队列中的有些消息并不需要立即处理消息队列提供了异步处理机制可以报消息放在队列中并不立即处理需要的时候处理或者异步慢慢处理一些不重要的发送短信和邮箱功能可以使用。可扩展性前面提到了消息队列可以做到解耦如果我们想增强消息入队和出队的处理频率很简单并不需要改变代码中任何内容可以直接对消息队列修改一些配置即可比如我们想限制每次发送给消费者的消息条数等。 node消息队列框架 介绍几款目前市场上主流的消息队列(课外知识可忽略) Kafka是由 Apache 软件基金会开发的一个开源流处理平台由 Scala 和 Java 编写是一种高吞吐量的分布式发布订阅消息系统支持单机每秒百万并发。另外Kafka 的定位主要在日志等方面 因为Kafka 设计的初衷就是处理日志的可以看做是一个日志消息系统一个重要组件针对性很强。0.8 版本开始支持复制不支持事物因此对消息的重复、丢失、错误没有严格的要求。RocketMQ阿里开源的消息中间件是一款低延迟、高可靠、可伸缩、易于使用的消息中间件思路起源于 Kafka。最大的问题商业版收费有些功能不开放。RabbitMQ由 Erlang有着和原生 Socket 一样低的延迟语言开发基于 AMQP 协议的开源消息队列系统。能保证消息的可靠性、稳定性、安全性。高并发的特性毋庸置疑RabbitMQ 最高原因是它的实现语言是天生具备高并发高可用的erlang 语言天生的分布式优势。Bull Bull[5] 是基于 Redis 实现的一个快速且强大的消息系统队列。Bull 提供了可以很简单就能使用的消息队列、延时任务和定时任务。 bull.js使用 const Bull require(bull);const queueOptions {// limiter: { max: 2, duration: 10000 }, // 设置并发执行数为5redis: {port: 5816,host: xx.xxx.xx.xx, // 连接IPpassword: xxxxxxxxxxx, // 没有密码就填nulldb: 10, // 使用区间库},defaultJobOptions: {attempts: 1,removeOnComplete: true,backoff: false,delay: 0,},
};const myQueue new Bull(test-queue,queueOptions);// 假设我们有10000个秒杀请求过来要处理我们可以将任务放入队列挨个去处理
for (let i 0; i 10000; i) {myQueue.add({ data: i });
}myQueue.process(async (job) {console.log(,job.data);await asyncHandle(job);
});async function asyncHandle(job){await handleJSError(job)
}function handleJSError(job) {return new Promise((resolve,reject){setTimeout((){console.log(,job.data);resolve()},5000)})
} 参考资料 [1] nodejs事件循环: https://blog.csdn.net/i10630226/article/details/81369841 [2] nodejs 消息队列: https://www.imooc.com/article/296194 [3] NodeJS事件循环机制: https://juejin.cn/post/6844903999506923528 [4] nodejs worker_threads: https://juejin.cn/post/7062733724504293413 [5] Bull: https://blog.csdn.net/weixin_43698328/article/details/125193000 - END - 关于奇舞团 奇舞团是 360 集团最大的大前端团队代表集团参与 W3C 和 ECMA 会员TC39工作。奇舞团非常重视人才培养有工程师、讲师、翻译官、业务接口人、团队 Leader 等多种发展方向供员工选择并辅以提供相应的技术力、专业力、通用力、领导力等培训课程。奇舞团以开放和求贤的心态欢迎各种优秀人才关注和加入奇舞团。