当前位置: 首页 > news >正文

中企动力网站建设合同潍坊网站建设电话

中企动力网站建设合同,潍坊网站建设电话,建设手机网站公司,雅安市住房和城乡建设局网站1、消息存储分析 1.1 DefaultMessageStore 概要 其核心属性如下#xff1a; messageStoreConfig 存储相关的配置#xff0c;例如存储路径、commitLog文件大小#xff0c;刷盘频次等等。CommitLog commitLog comitLog 的核心处理类#xff0c;消息存储在 commitlog 文件中…1、消息存储分析 1.1 DefaultMessageStore 概要 其核心属性如下 messageStoreConfig 存储相关的配置例如存储路径、commitLog文件大小刷盘频次等等。CommitLog commitLog comitLog 的核心处理类消息存储在 commitlog 文件中。ConcurrentMapString/* topic */, ConcurrentMapInteger/* queueId */, ConsumeQueue consumeQueueTable topic 的队列信息。FlushConsumeQueueService flushConsumeQueueService ConsumeQueue 刷盘服务线程。CleanCommitLogService cleanCommitLogService commitLog 过期文件删除线程。CleanConsumeQueueService cleanConsumeQueueService consumeQueue 过期文件删除线程。、IndexService indexService 索引服务。AllocateMappedFileService allocateMappedFileService MappedFile 分配线程RocketMQ 使用内存映射处理 commitlog、consumeQueue文件。ReputMessageService reputMessageService reput 转发线程负责 Commitlog 转发到 Consumequeue、Index文件。HAService haService 主从同步实现服务。ScheduleMessageService scheduleMessageService 定时任务调度器执行定时任务。StoreStatsService storeStatsService 存储统计服务。TransientStorePool transientStorePool ByteBuffer 池后文会详细使用。RunningFlags runningFlags 存储服务状态。BrokerStatsManager brokerStatsManager Broker 统计服务。MessageArrivingListener messageArrivingListener 消息达到监听器。StoreCheckpoint storeCheckpoint 刷盘检测点。LinkedList dispatcherList 转发 comitlog 日志主要是从 commitlog 转发到 consumeQueue、index 文件。 上面这些属性是整个消息存储的核心也是我们需要重点关注与理解的将会在本系列一一介绍到。 接下来先从 putMessage 为入口一起探究整个消息存储全过程。 1.2 消息存储流程 1.2.1 DefaultMessageStore.putMessage public PutMessageResult putMessage(MessageExtBrokerInner msg) {if (this.shutdown) {log.warn(message store has shutdown, so putMessage is forbidden);return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is slave mode, so putMessage is forbidden );}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}if (!this.runningFlags.isWriteable()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is not writeable, so putMessage is forbidden this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}if (msg.getTopic().length() Byte.MAX_VALUE) {log.warn(putMessage message topic length too long msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}if (msg.getPropertiesString() ! null msg.getPropertiesString().length() Short.MAX_VALUE) {log.warn(putMessage message properties length too long msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}if (this.isOSPageCacheBusy()) { //1return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}long beginTime this.getSystemClock().now();PutMessageResult result this.commitLog.putMessage(msg); // 2long eclipseTime this.getSystemClock().now() - beginTime;if (eclipseTime 500) {log.warn(putMessage not in lock eclipse time(ms){}, bodyLength{}, eclipseTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); //3if (null result || !result.isOk()) { //4 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result; 代码1检测操作系统页写入是否繁忙。 Overridepublic boolean isOSPageCacheBusy() {long begin this.getCommitLog().getBeginTimeInLock();long diff this.systemClock.now() - begin;if (diff 10000000 // diff this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {return true;}return false; 代码2将日志写入CommitLog 文件具体实现类 CommitLog。 代码3记录相关统计信息。 代码4记录写commitlog 失败次数。 1.2.2 CommitLog.putMessage public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();int queueId msg.getQueueId();final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 1if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE//|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // 2// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic ScheduleMessageService.SCHEDULE_TOPIC;queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock 0;MappedFile unlockMappedFile null;MappedFile mappedFile this.mappedFileQueue.getLastMappedFile(); // 3putMessageLock.lock(); //spin or ReentrantLock ,depending on store config //4try {long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null mappedFile) {log.error(create maped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);} // 5result mappedFile.appendMessage(msg, this.appendMessageCallback); // 6switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:unlockMappedFile mappedFile;// Create a new file, re-write the messagemappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create maped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock 0;} finally {putMessageLock.unlock();}if (eclipseTimeInLock 500) {log.warn([NOTIFYME]putMessage in lock cost time(ms){}, bodyLength{} AppendMessageResult{}, eclipseTimeInLock, msg.getBody().length, result);}if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());handleDiskFlush(result, putMessageResult, msg); // 7handleHA(result, putMessageResult, msg); //8return putMessageResult; 先对 ComitLog 写入消息做一个简单描述然后需要详细探究每个步骤的实现。 代码1获取消息类型事务消息非事务消息Commit消息。 代码3获取一个 MappedFile 对象内存映射的具体实现。 代码4追加消息需要加锁串行化处理。 代码5验证代码3的 MappedFile 对象获取一个可用的 MappedFile (如果没有则创建一个)。 代码6通过MappedFile对象写入文件。 代码7根据刷盘策略刷盘。 代码8主从同步。 1.3 存储核心类分析 1.3.1 源码分析MappedFile 1、3.1.1 MappedFile 基础属性 public static final int OS_PAGE_SIZE 1024 * 4; // 4K private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY new AtomicLong(0); private static final AtomicInteger TOTAL_MAPPED_FILES new AtomicInteger(0); protected final AtomicInteger wrotePosition new AtomicInteger(0); protected final AtomicInteger committedPosition new AtomicInteger(0); private final AtomicInteger flushedPosition new AtomicInteger(0); protected int fileSize; protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/ protected ByteBuffer writeBuffer null; protected TransientStorePool transientStorePool null; private String fileName; private long fileFromOffset; private File file; private MappedByteBuffer mappedByteBuffer; private volatile long storeTimestamp 0; OS_PAGE_SIZE OSpage大小4K。 TOTAL_MAPPED_VIRTUAL_MEMORY 类变量所有 MappedFile 实例已使用字节总数。TOTAL_MAPPED_FILES MappedFile 个数。wrotePosition 当前MappedFile对象当前写指针。committedPosition 当前提交的指针。flushedPosition 当前刷写到磁盘的指针。fileSize 文件总大小。fileChannel 文件通道。writeBuffer 如果开启了transientStorePoolEnable消息会写入堆外内存然后提交到 PageCache 并最终刷写到磁盘。TransientStorePool transientStorePool ByteBuffer的缓冲池堆外内存transientStorePoolEnable 为 true 时生效。fileName 文件名称。fileFromOffset 文件序号,代表该文件代表的文件偏移量。File file 文件对象。MappedByteBuffer mappedByteBuffer 对应操作系统的 PageCache。storeTimestamp 最后一次存储时间戳。 1、3.1.2 初始化 private void init(final String fileName, final int fileSize) throws IOException {this.fileName fileName;this.fileSize fileSize;this.file new File(fileName);this.fileFromOffset Long.parseLong(this.file.getName());boolean ok false;ensureDirOK(this.file.getParent());try {this.fileChannel new RandomAccessFile(this.file, rw).getChannel();this.mappedByteBuffer this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok true;} catch (FileNotFoundException e) {log.error(create file channel this.fileName Failed. , e);throw e;} catch (IOException e) {log.error(map file this.fileName Failed. , e);throw e;} finally {if (!ok this.fileChannel ! null) {this.fileChannel.close();}} 初始化 FileChannel、mappedByteBuffer 等。 1、3.1.3 appendMessagesInner 消息写入 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {assert messageExt ! null;assert cb ! null;int currentPos this.wrotePosition.get(); // 1if (currentPos this.fileSize) {ByteBuffer byteBuffer writeBuffer ! null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos);AppendMessageResult result null;if (messageExt instanceof MessageExtBrokerInner) { // 2result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes()); // 4this.storeTimestamp result.getStoreTimestamp();return result;}log.error(MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}, currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); 代码1获取当前写入位置。 代码2根据消息类型是批量消息还是单个消息进入相应的处理。 代码3消息写入实现。 接下看具体的消息写入逻辑代码来源于 CommitLog$DefaultAppendMessageCallback。 public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) { //1// STORETIMESTAMP STOREHOSTADDRESS OFFSET br// PHY OFFSETlong wroteOffset fileFromOffset byteBuffer.position();this.resetByteBuffer(hostHolder, 8);String msgId MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); //2// Record ConsumeQueue information //3startkeyBuilder.setLength(0);keyBuilder.append(msgInner.getTopic());keyBuilder.append(-);keyBuilder.append(msgInner.getQueueId());String key keyBuilder.toString();Long queueOffset CommitLog.this.topicQueueTable.get(key);if (null queueOffset) {queueOffset 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);} //3 end// Transaction messages that require special handling //4 startfinal int tranType MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queueccase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;} // 4 end/*** Serialize message*/final byte[] propertiesData msgInner.getPropertiesString() null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final int propertiesLength propertiesData null ? 0 : propertiesData.length;if (propertiesLength Short.MAX_VALUE) {log.warn(putMessage message properties length too long. length{}, propertiesData.length);return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);} //5final byte[] topicData msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final int topicLength topicData.length;final int bodyLength msgInner.getBody() null ? 0 : msgInner.getBody().length;final int msgLen calMsgLength(bodyLength, topicLength, propertiesLength); //6// Exceeds the maximum messageif (msgLen this.maxMessageSize) { // 7CommitLog.log.warn(message size exceeded, msg total size: msgLen , msg body size: bodyLength , maxMessageSize: this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen END_FILE_MIN_BLANK_LENGTH) maxBlank) { // 8this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value//// Here the length of the specially set maxBlankfinal long beginTimeMills CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}// Initialization of storage space 9this.resetByteBuffer(msgStoreItemMemory, msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.resetByteBuffer(hostHolder, 8);this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.resetByteBuffer(hostHolder, 8);this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));//this.msgBatchMemory.put(msgInner.getStoreHostBytes());// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength 0)this.msgStoreItemMemory.put(propertiesData);final long beginTimeMills CommitLog.this.defaultMessageStore.now();// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); //10switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, queueOffset);break;default:break;}return result;
http://www.hyszgw.com/news/98111.html

相关文章:

  • 盐城做百度网站如何做和别人一样的网站
  • 茶叶红酒网站建设站长做旅游网站
  • 设计一套企业网站设计报价广东深圳网络科技有限公司
  • 河北住房和城乡建设厅网站首万象城网站建设
  • 建立网站英文网站职业技术培训学校
  • 网站开发 知乎国外WordPress主题购买
  • 专业的高端企业网站wordpress文章登录可见
  • 物流网站建设 市场分析企业网站建设投标书
  • 免费做暧暧网站电子商务网站建设成本
  • 十大电商平台有哪些呼和浩特网站seo
  • 购物网站支付功能怎么做免费自建网站工具
  • 宁波网站建设seo淘宝客网站设计
  • 网站开发行业前景网站项目下载
  • 关于一学一做的短视频网站好乐清网站定制公司
  • 自助建站seo怎样做网站的ico图片
  • 做的网站搜不到申请摇号广州网站
  • php网站开发文档想做设计师需要学什么
  • 杭州鼎易科技做网站太坑crm系统免费
  • 移动微网站建设wordpress联系我们无法发邮件
  • 黔东南手机网站建设外贸公司怎么起步
  • 获取整个网站源码工具杭州seo服务公司
  • 17网站一起做网店株洲官网怎么注册
  • dogip网站开发html拖拽代码生成器
  • 网站页面设计 颜色 背景 要求福建建设银行招聘网站
  • 广州市 网站建设建设深汕特别合作区面积
  • 龙岗网站设计资讯东莞网站推广策划活动
  • 百度网站统计添加网址网站维护公告模板
  • 中国十大网站域名注册一家公司需要多少费用
  • 网站建设营销开场白网站图片在手机上做多大最清晰
  • 如何把图片做网站背景东营会计信息网官网报名