中企动力网站建设合同,潍坊网站建设电话,建设手机网站公司,雅安市住房和城乡建设局网站1、消息存储分析
1.1 DefaultMessageStore 概要
其核心属性如下#xff1a;
messageStoreConfig 存储相关的配置#xff0c;例如存储路径、commitLog文件大小#xff0c;刷盘频次等等。CommitLog commitLog comitLog 的核心处理类#xff0c;消息存储在 commitlog 文件中…1、消息存储分析
1.1 DefaultMessageStore 概要
其核心属性如下
messageStoreConfig 存储相关的配置例如存储路径、commitLog文件大小刷盘频次等等。CommitLog commitLog comitLog 的核心处理类消息存储在 commitlog 文件中。ConcurrentMapString/* topic */, ConcurrentMapInteger/* queueId */, ConsumeQueue consumeQueueTable topic 的队列信息。FlushConsumeQueueService flushConsumeQueueService ConsumeQueue 刷盘服务线程。CleanCommitLogService cleanCommitLogService commitLog 过期文件删除线程。CleanConsumeQueueService cleanConsumeQueueService consumeQueue 过期文件删除线程。、IndexService indexService 索引服务。AllocateMappedFileService allocateMappedFileService MappedFile 分配线程RocketMQ 使用内存映射处理 commitlog、consumeQueue文件。ReputMessageService reputMessageService reput 转发线程负责 Commitlog 转发到 Consumequeue、Index文件。HAService haService 主从同步实现服务。ScheduleMessageService scheduleMessageService 定时任务调度器执行定时任务。StoreStatsService storeStatsService 存储统计服务。TransientStorePool transientStorePool ByteBuffer 池后文会详细使用。RunningFlags runningFlags 存储服务状态。BrokerStatsManager brokerStatsManager Broker 统计服务。MessageArrivingListener messageArrivingListener 消息达到监听器。StoreCheckpoint storeCheckpoint 刷盘检测点。LinkedList dispatcherList 转发 comitlog 日志主要是从 commitlog 转发到 consumeQueue、index 文件。
上面这些属性是整个消息存储的核心也是我们需要重点关注与理解的将会在本系列一一介绍到。
接下来先从 putMessage 为入口一起探究整个消息存储全过程。
1.2 消息存储流程
1.2.1 DefaultMessageStore.putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {if (this.shutdown) {log.warn(message store has shutdown, so putMessage is forbidden);return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is slave mode, so putMessage is forbidden );}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}if (!this.runningFlags.isWriteable()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is not writeable, so putMessage is forbidden this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}if (msg.getTopic().length() Byte.MAX_VALUE) {log.warn(putMessage message topic length too long msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}if (msg.getPropertiesString() ! null msg.getPropertiesString().length() Short.MAX_VALUE) {log.warn(putMessage message properties length too long msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}if (this.isOSPageCacheBusy()) { //1return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}long beginTime this.getSystemClock().now();PutMessageResult result this.commitLog.putMessage(msg); // 2long eclipseTime this.getSystemClock().now() - beginTime;if (eclipseTime 500) {log.warn(putMessage not in lock eclipse time(ms){}, bodyLength{}, eclipseTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); //3if (null result || !result.isOk()) { //4 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result;
代码1检测操作系统页写入是否繁忙。
Overridepublic boolean isOSPageCacheBusy() {long begin this.getCommitLog().getBeginTimeInLock();long diff this.systemClock.now() - begin;if (diff 10000000 // diff this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {return true;}return false;
代码2将日志写入CommitLog 文件具体实现类 CommitLog。
代码3记录相关统计信息。
代码4记录写commitlog 失败次数。
1.2.2 CommitLog.putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();int queueId msg.getQueueId();final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 1if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE//|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // 2// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic ScheduleMessageService.SCHEDULE_TOPIC;queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock 0;MappedFile unlockMappedFile null;MappedFile mappedFile this.mappedFileQueue.getLastMappedFile(); // 3putMessageLock.lock(); //spin or ReentrantLock ,depending on store config //4try {long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null mappedFile) {log.error(create maped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);} // 5result mappedFile.appendMessage(msg, this.appendMessageCallback); // 6switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:unlockMappedFile mappedFile;// Create a new file, re-write the messagemappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create maped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock 0;} finally {putMessageLock.unlock();}if (eclipseTimeInLock 500) {log.warn([NOTIFYME]putMessage in lock cost time(ms){}, bodyLength{} AppendMessageResult{}, eclipseTimeInLock, msg.getBody().length, result);}if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());handleDiskFlush(result, putMessageResult, msg); // 7handleHA(result, putMessageResult, msg); //8return putMessageResult;
先对 ComitLog 写入消息做一个简单描述然后需要详细探究每个步骤的实现。
代码1获取消息类型事务消息非事务消息Commit消息。
代码3获取一个 MappedFile 对象内存映射的具体实现。
代码4追加消息需要加锁串行化处理。
代码5验证代码3的 MappedFile 对象获取一个可用的 MappedFile (如果没有则创建一个)。
代码6通过MappedFile对象写入文件。
代码7根据刷盘策略刷盘。
代码8主从同步。
1.3 存储核心类分析
1.3.1 源码分析MappedFile
1、3.1.1 MappedFile 基础属性
public static final int OS_PAGE_SIZE 1024 * 4; // 4K
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY new AtomicLong(0);
private static final AtomicInteger TOTAL_MAPPED_FILES new AtomicInteger(0);
protected final AtomicInteger wrotePosition new AtomicInteger(0);
protected final AtomicInteger committedPosition new AtomicInteger(0);
private final AtomicInteger flushedPosition new AtomicInteger(0);
protected int fileSize;
protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/
protected ByteBuffer writeBuffer null;
protected TransientStorePool transientStorePool null;
private String fileName;
private long fileFromOffset;
private File file;
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp 0;
OS_PAGE_SIZE OSpage大小4K。 TOTAL_MAPPED_VIRTUAL_MEMORY 类变量所有 MappedFile 实例已使用字节总数。TOTAL_MAPPED_FILES MappedFile 个数。wrotePosition 当前MappedFile对象当前写指针。committedPosition 当前提交的指针。flushedPosition 当前刷写到磁盘的指针。fileSize 文件总大小。fileChannel 文件通道。writeBuffer 如果开启了transientStorePoolEnable消息会写入堆外内存然后提交到 PageCache 并最终刷写到磁盘。TransientStorePool transientStorePool ByteBuffer的缓冲池堆外内存transientStorePoolEnable 为 true 时生效。fileName 文件名称。fileFromOffset 文件序号,代表该文件代表的文件偏移量。File file 文件对象。MappedByteBuffer mappedByteBuffer 对应操作系统的 PageCache。storeTimestamp 最后一次存储时间戳。
1、3.1.2 初始化
private void init(final String fileName, final int fileSize) throws IOException {this.fileName fileName;this.fileSize fileSize;this.file new File(fileName);this.fileFromOffset Long.parseLong(this.file.getName());boolean ok false;ensureDirOK(this.file.getParent());try {this.fileChannel new RandomAccessFile(this.file, rw).getChannel();this.mappedByteBuffer this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok true;} catch (FileNotFoundException e) {log.error(create file channel this.fileName Failed. , e);throw e;} catch (IOException e) {log.error(map file this.fileName Failed. , e);throw e;} finally {if (!ok this.fileChannel ! null) {this.fileChannel.close();}}
初始化 FileChannel、mappedByteBuffer 等。
1、3.1.3 appendMessagesInner 消息写入
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {assert messageExt ! null;assert cb ! null;int currentPos this.wrotePosition.get(); // 1if (currentPos this.fileSize) {ByteBuffer byteBuffer writeBuffer ! null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos);AppendMessageResult result null;if (messageExt instanceof MessageExtBrokerInner) { // 2result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes()); // 4this.storeTimestamp result.getStoreTimestamp();return result;}log.error(MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}, currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
代码1获取当前写入位置。
代码2根据消息类型是批量消息还是单个消息进入相应的处理。
代码3消息写入实现。
接下看具体的消息写入逻辑代码来源于 CommitLog$DefaultAppendMessageCallback。
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) { //1// STORETIMESTAMP STOREHOSTADDRESS OFFSET br// PHY OFFSETlong wroteOffset fileFromOffset byteBuffer.position();this.resetByteBuffer(hostHolder, 8);String msgId MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); //2// Record ConsumeQueue information //3startkeyBuilder.setLength(0);keyBuilder.append(msgInner.getTopic());keyBuilder.append(-);keyBuilder.append(msgInner.getQueueId());String key keyBuilder.toString();Long queueOffset CommitLog.this.topicQueueTable.get(key);if (null queueOffset) {queueOffset 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);} //3 end// Transaction messages that require special handling //4 startfinal int tranType MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queueccase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;} // 4 end/*** Serialize message*/final byte[] propertiesData msgInner.getPropertiesString() null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final int propertiesLength propertiesData null ? 0 : propertiesData.length;if (propertiesLength Short.MAX_VALUE) {log.warn(putMessage message properties length too long. length{}, propertiesData.length);return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);} //5final byte[] topicData msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final int topicLength topicData.length;final int bodyLength msgInner.getBody() null ? 0 : msgInner.getBody().length;final int msgLen calMsgLength(bodyLength, topicLength, propertiesLength); //6// Exceeds the maximum messageif (msgLen this.maxMessageSize) { // 7CommitLog.log.warn(message size exceeded, msg total size: msgLen , msg body size: bodyLength , maxMessageSize: this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen END_FILE_MIN_BLANK_LENGTH) maxBlank) { // 8this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value//// Here the length of the specially set maxBlankfinal long beginTimeMills CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}// Initialization of storage space 9this.resetByteBuffer(msgStoreItemMemory, msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.resetByteBuffer(hostHolder, 8);this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.resetByteBuffer(hostHolder, 8);this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));//this.msgBatchMemory.put(msgInner.getStoreHostBytes());// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength 0)this.msgStoreItemMemory.put(propertiesData);final long beginTimeMills CommitLog.this.defaultMessageStore.now();// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); //10switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, queueOffset);break;default:break;}return result;