solrcloud代码分析

前几天把公司的solr配置成了solrcloud,好处就不说了,可是发现主节点和从节点的时间字段不一样,其他字段的值都是相同的,一开始觉得很诡异,网上搜索相关的问题也没有找到答案,只能看看源码了。由于solrcloud是通过zookeeper来进行管理的,不是跑在eclipse上的,所以想要调试基本上是不可能的,我只能通过猜测来定位了。

在发起索引建立的请求后首先会通过filter然后来到DataImportHandler的hadleRequestBody(req,rsp)方法,主要代码如下:

if (DataImporter.SHOW_CONF_CMD.equals(command)) {
    //显示solr的配置文件信息没啥说的
    String dataConfigFile = params.get("config");
    String dataConfig = params.get("dataConfig");
    if(dataConfigFile != null) {
        dataConfig = SolrWriter.getResourceAsString(req.getCore().getResourceLoader().openResource(dataConfigFile));
    }
    if(dataConfig==null)  {
        rsp.add("status", DataImporter.MSG.NO_CONFIG_FOUND);
    } else {
        // Modify incoming request params to add wt=raw
        ModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());
        rawParams.set(CommonParams.WT, "raw");
        req.setParams(rawParams);
        ContentStreamBase content = new ContentStreamBase.StringStream(dataConfig);
        rsp.add(RawResponseWriter.CONTENT, content);
    }
    return;
}
rsp.add("initArgs", initArgs);
String message = "";
if (command != null) {
    rsp.add("command", command);
}
// If importer is still null
if (importer == null) {
    rsp.add("status", DataImporter.MSG.NO_INIT);
    return;
}
if (command != null && DataImporter.ABORT_CMD.equals(command)) {
    //终止操作命令也没啥说的
    importer.runCmd(requestParams, null);
} else if (importer.isBusy()) {
    message = DataImporter.MSG.CMD_RUNNING;
} else if (command != null) {
    if (DataImporter.FULL_IMPORT_CMD.equals(command) || DataImporter.DELTA_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
        //full-import或delta-import或import命令,主要是在这里
        importer.maybeReloadConfiguration(requestParams, defaultParams);
        //获取索引更新处理链,在构造SolrCore这个类的时候会进行初始化,solr默认提供LogUpdateProcessorFactory,DistributedUpdateProcessorFactory,RunUpdateProcessorFactory三个处理器,可通过配置solrconfig进行修改
        UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
        //通过上面的处理器工厂创建处理器,分析见下面
        UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
        SolrResourceLoader loader = req.getCore().getResourceLoader();
        SolrWriter sw = getSolrWriter(processor, loader, requestParams, req);
        if (requestParams.isDebug()) {
            if (debugEnabled) {
                // Synchronous request for the debug mode
                importer.runCmd(requestParams, sw);
                rsp.add("mode", "debug");
                rsp.add("documents", requestParams.getDebugInfo().debugDocuments);
                if (requestParams.getDebugInfo().debugVerboseOutput != null) {
                    rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);
                }
            } else {
                message = DataImporter.MSG.DEBUG_NOT_ENABLED;
            }
        } else {
            // Asynchronous request for normal mode
            if(requestParams.getContentStream() == null && !requestParams.isSyncMode()){
                importer.runAsync(requestParams, sw);
            } else {
                importer.runCmd(requestParams, sw);
            }
        }
    } else if (DataImporter.RELOAD_CONF_CMD.equals(command)) {
        if(importer.maybeReloadConfiguration(requestParams, defaultParams)) {
            message = DataImporter.MSG.CONFIG_RELOADED;
        } else {
            message = DataImporter.MSG.CONFIG_NOT_RELOADED;
        }
    }
}

处理器工厂创建处理器方法:

public UpdateRequestProcessor createProcessor(SolrQueryRequest req,SolrQueryResponse rsp){
    UpdateRequestProcessor processor = null;
    UpdateRequestProcessor last = null;
    final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
    final boolean skipToDistrib = distribPhase != null;
    boolean afterDistrib = true;  // we iterate backwards, so true to start
     for (int i = chain.length-1; i>=0; i--) {
            //通过从后向前的方式进行创建,即最开始创建RunUpdateProcessor,接着DistributedUpdateProcessor,最后才是LogUpdateProcessor
         UpdateRequestProcessorFactory factory = chain[i];
            if (skipToDistrib) {
                    if (afterDistrib) {
                        if (factory instanceof DistributingUpdateProcessorFactory) {
                    afterDistrib = false;
                        }
            } else if (!(factory instanceof LogUpdateProcessorFactory)) {
                // TODO: use a marker interface for this?
             // skip anything that is not the log factory
                         continue;
                    }
        }
        //getInstance方法的第三个参数指的是该处理器的next即下一个处理器,这样就形成了一条链了
     processor = factory.getInstance(req, rsp, last);
            last = processor == null ? last : processor;
        }
    //这里返回最后一个处理器即LogUpdateProcessor,它的next是DistributedUpdateProcessor,DistributedUpdateProcessor的next是RunUpdateProcessor
 return last;
}

获取SolrWriter进行索引创建,processor就是上面的处理器,索引创建主要是靠upload方法完成:

private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,final SolrResourceLoader loader, final RequestInfo requestParams, SolrQueryRequest req) {
    return new SolrWriter(processor, req) {
        @Override
        public boolean upload(SolrInputDocument document) {
            try {
                return super.upload(document);
            } catch (RuntimeException e) {
                LOG.error( "Exception while adding: " + document, e);
                return false;
            }
        }
    };
}

接下来看runAsync和runCmd两个方法,runAsync从字面看就是以异步方式进行,无非就是重新开一个线程执行runCmd方法,进到runCmd方法,主要代码如下:

if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
    doFullImport(sw, reqParams);
} else if (command.equals(DELTA_IMPORT_CMD)) {
    doDeltaImport(sw, reqParams);
}

接着进入doFullImport方法:

//根据db-data-config.xml文件创建dataimport.properties文件,该文件记录了上次做索引的时间可用于delta-import
DIHProperties dihPropWriter = createPropertyWriter();
//将索引开始时间写入dataimport.properties文件
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
//创建DocBuilder用于获取数据并构造成Document,如何获取数据下面会讲到(通过DocBuilder的EntityProcessorWrapper currentEntityProcessorWrapper属性进行操作)
docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
checkWritablePersistFile(writer, dihPropWriter);
docBuilder.execute();

进入DocBuilder.execute():

public void execute() {
    List<EntityProcessorWrapper> epwList = null;
    try {
        //下面就是一些状态信息的设置没啥说的
     dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
        config = dataImporter.getConfig();
        final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
        statusMessages.put(TIME_ELAPSED, new Object() {
            @Override
            public String toString() {
                return getTimeElapsedSince(startTime.get());
            }
        });
        statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,importStatistics.queryCount);
        statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,importStatistics.rowsCount);
        statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,importStatistics.docCount);
        statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,importStatistics.skipDocCount);
        //获取正在做索引的entity
     List<String> entities = reqParams.getEntitiesToRun();
        // Trigger onImportStart
     if (config.getOnImportStart() != null) {
            invokeEventListener(config.getOnImportStart());
        }
        AtomicBoolean fullCleanDone = new AtomicBoolean(false);
        //we must not do a delete of : multiple times if there are multiple root entities to be run
     Map<String,Object> lastIndexTimeProps = new HashMap<String,Object>();
        lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime());
        epwList = new ArrayList<EntityProcessorWrapper>(config.getEntities().size());
        for (Entity e : config.getEntities()) {
            //通过entity获取entity处理器,默认是SqlEntityProcessor
         epwList.add(getEntityProcessorWrapper(e));
        }
        for (EntityProcessorWrapper epw : epwList) {
            if (entities != null && !entities.contains(epw.getEntity().getName()))
                continue;
            lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp());
            //设置currentEntityProcessorWrapper以便操作数据库
         currentEntityProcessorWrapper = epw;
            //读取db-data-config.xml文件获取删除索引的配置
         String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
            if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
                //执行删除索引操作
             cleanByQuery(delQuery, fullCleanDone);
                //执行增量索引
             doDelta();
                delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
                if (delQuery != null) {
                    fullCleanDone.set(false);
                    cleanByQuery(delQuery, fullCleanDone);
                }
            } else {
                cleanByQuery(delQuery, fullCleanDone);
                //执行全量索引操作
             doFullDump();
                delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
                if (delQuery != null) {
                    fullCleanDone.set(false);
                    cleanByQuery(delQuery, fullCleanDone);
                }
            }
            statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
        }
        if (stop.get()) {
            // Dont commit if aborted using command=abort
         statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
            rollback();
        } else {
            // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
         if (!reqParams.isClean()) {
                if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
                    finish(lastIndexTimeProps);
                }
            } else {
                // Finished operation normally, commit now
             finish(lastIndexTimeProps);
            }
            if (config.getOnImportEnd() != null) {
                invokeEventListener(config.getOnImportEnd());
            }
        }
        statusMessages.remove(TIME_ELAPSED);
        statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
        if(importStatistics.failedDocCount.get() > 0)
            statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
        statusMessages.put("Time taken", getTimeElapsedSince(startTime.get()));
        LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
    } catch(Exception e){
        throw new RuntimeException(e);
    } finally{
        if (writer != null) {
            writer.close();
        }
            if (epwList != null) {
                    closeEntityProcessorWrappers(epwList);
            }
            if(reqParams.isDebug()) {
                    reqParams.getDebugInfo().debugVerboseOutput = getDebugLogger().output;
            }
        }
}

进入cleanByQuery方法:

private void cleanByQuery(String delQuery, AtomicBoolean completeCleanDone) {
    delQuery = getVariableResolver().replaceTokens(delQuery);
    if (reqParams.isClean()) {//如果需要执行clean操作(可选)
     if (delQuery == null && !completeCleanDone.get()) {
            //db-data-config没有配置索引删除则删除所有索引
         writer.doDeleteAll();
            completeCleanDone.set(true);
        } else if (delQuery != null) {
            writer.deleteByQuery(delQuery);
        }
    }
}

public void doDeleteAll() {
    try {
        DeleteUpdateCommand deleteCommand = new DeleteUpdateCommand(req);
        deleteCommand.query = ":";
        //通过处理器进行删除操作,处理器不陌生吧,上面说过的
     processor.processDelete(deleteCommand);
     catch (IOException e) {
        throw new DataImportHandlerException(DataImportHandlerException.SEVERE,"Exception in full dump while deleting all documents.", e);
    }
}

好了删除的先说到这里,先说增加索引吧,因为问题是出在增加索引的时候,看完代码会发现增加和删除的基本上一样的流程。

进入doFullDump()方法:

private void doFullDump() {
    addStatusMessage("Full Dump Started"); 
buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null); }

private void buildDocument(VariableResolver vr, DocWrapper doc,Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
    ContextImpl ctx = new ContextImpl(epw, vr, null,pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,session, parentCtx, this);
    epw.init(ctx);
    if (!epw.isInitalized()) {
        entitiesToDestroy.add(epw);
        epw.setInitalized(true);
    }
    if (reqParams.getStart() > 0) {
        getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
    }
    if (verboseDebug) {
        getDebugLogger().log(DIHLogLevels.START_ENTITY, epw.getEntity().getName(), null);
    }
    int seenDocCount = 0;
    try {
        while (true) {
            if (stop.get())
                return;
            if(importStatistics.docCount.get() > (reqParams.getStart() + reqParams.getRows())) break;
            try {
                seenDocCount++;
                if (seenDocCount > reqParams.getStart()) {
                    getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
                }
                if (verboseDebug && epw.getEntity().isDocRoot()) {
                    getDebugLogger().log(DIHLogLevels.START_DOC, epw.getEntity().getName(), null);
                }
                if (doc == null && epw.getEntity().isDocRoot()) {
                    doc = new DocWrapper();
                    ctx.setDoc(doc);
                    Entity e = epw.getEntity();
                    while (e.getParentEntity() != null) {
                                addFields(e.getParentEntity(), doc, (Map<String, Object>) vr.resolve(e.getParentEntity().getName()), vr);
                                e = e.getParentEntity();
                            }
                        }
                //这里获取从数据库读取出来的每一样数据,epw就是之前说过的EntityProcessorWrapper
                         Map<String, Object> arow = epw.nextRow();
                        if (arow == null) {
                            break;
                        }
                        // Support for start parameter in debug mode
                         if (epw.getEntity().isDocRoot()) {
                            if (seenDocCount <= reqParams.getStart())
                                continue;
                            if (seenDocCount > reqParams.getStart() + reqParams.getRows()) {
                                LOG.info("Indexing stopped at docCount = " + importStatistics.docCount);
                                break;
                            }
                        }
                        if (verboseDebug) {
                            getDebugLogger().log(DIHLogLevels.ENTITY_OUT, epw.getEntity().getName(), arow);
                        }
                        importStatistics.rowsCount.incrementAndGet();
                        if (doc != null) {
                    //特殊处理也没啥特殊的,不讲
                         handleSpecialCommands(arow, doc);
                    //将数据库数据组装成EntityFiled并构造成Document,不讲,很简单。
                         addFields(epw.getEntity(), doc, arow, vr);
                        }
                        if (epw.getEntity().getChildren() != null) {
                            vr.addNamespace(epw.getEntity().getName(), arow);
                            for (EntityProcessorWrapper child : epw.getChildren()) {
                        buildDocument(vr, doc,child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
                            }
                            vr.removeNamespace(epw.getEntity().getName());
                        }
                        if (epw.getEntity().isDocRoot()) {
                            if (stop.get())
                                return;
                            if (!doc.isEmpty()) {
                        //好了,重点来了,看到upload方法是不是有点似曾相似,是的,上面说过的。构造好Document需要由SolrWriter进行写到硬盘中
                                 boolean result = writer.upload(doc);
                                if(reqParams.isDebug()) {
                                        reqParams.getDebugInfo().debugDocuments.add(doc);
                                }
                                doc = null;
                                if (result){
                                        importStatistics.docCount.incrementAndGet();
                                } else {
                                        importStatistics.failedDocCount.incrementAndGet();
                                }
                            }
                        }
                    } catch (DataImportHandlerException e) {
                        if (verboseDebug) {
                            getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), e);
                        }
                        if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
                            continue;
                        }
                        if (isRoot) {
                            if (e.getErrCode() == DataImportHandlerException.SKIP) {
                                importStatistics.skipDocCount.getAndIncrement();
                                doc = null;
                            } else {
                                SolrException.log(LOG, "Exception while processing: "+ epw.getEntity().getName() + " document : " + doc, e);
                            }
                            if (e.getErrCode() == DataImportHandlerException.SEVERE)
                                throw e;
                        } else
                            throw e;
                    } catch (Throwable t) {
                        if (verboseDebug) {
                            getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), t);
                        }
                        throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
                    } finally {
                        if (verboseDebug) {
                            getDebugLogger().log(DIHLogLevels.ROW_END, epw.getEntity().getName(), null);
                            if (epw.getEntity().isDocRoot())
                                getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
                        }
                    }
            }
        } finally {
            if (verboseDebug) {
                    getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
            }
        }
}

进入nextRow()方法:

public Map<String, Object> nextRow() {
        if (rowcache != null) {
            return getFromRowCache();
    }
        while (true) {
            Map<String, Object> arow = null;
            try {
            //通过entity处理代理类即上面说过的SqlEntityProcessor来获取具体的数据,内部通过JdbcDataSource进行连接数据源进行获取,集体代码就不讲了
                 arow = delegate.nextRow();
            } catch (Exception e) {
                    if(ABORT.equals(onError)){
                        wrapAndThrow(SEVERE, e);
                    } else {
                        //SKIP is not really possible. If this calls the nextRow() again the Entityprocessor would be in an inconisttent state         
SolrException.log(log, "Exception in entity : "+ entityName, e); return null; } } if (arow == null) { return null; } else { //获取到一行数据以后对改行数据进行转换处理,transformer可通过自己扩展及配置,主要是对数据库取出的数据进行二次转换并重新设置,这里下面还会继续讲到。 arow = applyTransformer(arow); if (arow != null) { delegate.postTransform(arow); return arow; } } } }

进入SolrWriter的upload()方法:

public boolean upload(SolrInputDocument d) {
        try {
            AddUpdateCommand command = new AddUpdateCommand(req);
            command.solrDoc = d;
            command.commitWithin = commitWithin;
        //再次邂逅processor
             processor.processAdd(command);
        } catch (Exception e) {
            log.warn("Error creating document : " + d, e);
            return false;
        }
        return true;
}

进入LogUpdateProcessor.processAdd()方法:

public void processAdd(AddUpdateCommand cmd) throws IOException {
        if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString() + " " + req); }
        // call delegate first so we can log things like the version that get set later
 //由LogUpdateProcessor的next(还记得是什么吗)即DistributedUpdateProcessor来处理。
 if (next != null) next.processAdd(cmd);
        // Add a list of added id's to the response
     if (adds == null) {
        adds = new ArrayList<String>();
        toLog.add("add",adds);
        }
        if (adds.size() < maxNumToLog) {
        long version = cmd.getVersion();
            String msg = cmd.getPrintableId();
        if (version != 0) msg = msg + " (" + version + ')';
            adds.add(msg);
        }
        numAdds++;
}

下面进入重点的重点DistributedUpdateProcessor.processAdd()方法,看名字就知道是为分布式而生的,当然你的solr如果不是分布式的也会进入到这里的,具体判断见代码分析:

public void processAdd(AddUpdateCommand cmd) throws IOException {
        updateCommand = cmd;
    //是否是zookeeper管理的集群,如果没配置zookeeper,sold也会通过update.distrib参数来检查当前索引操作是在leader节点还是在slave节点
     if (zkEnabled) {
            zkCheck();
        //配置了solrcloud的就会在这里获取到所有的节点,setupRequest代码有点复杂,主要是通过zkController来获取集群的配置信息来
             nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
        } else {
            isLeader = getNonZkLeaderAssumption(req);
        }
        boolean dropCmd = false;
        if (!forwardToLeader) {//不是由子节点向主节点请求
     //versionAdd()方法也比较复杂,主要是根据VersionBucket来修改当前Document的version             dropCmd = versionAdd(cmd);
        }
        if (dropCmd) {
            // TODO: do we need to add anything to the response?
             return;
        }
        ModifiableSolrParams params = null;
        if (nodes != null) {
            params = new ModifiableSolrParams(filterParams(req.getParams()));
            params.set(DISTRIB_UPDATE_PARAM,(isLeader ?DistribPhase.FROMLEADER.toString() :
                    DistribPhase.TOLEADER.toString()));
            if (isLeader) {
                    params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
                    zkController.getBaseUrl(), req.getCore().getName()));
            }
            params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
        //这里进行分布式添加索引
             cmdDistrib.distribAdd(cmd, nodes, params);
        }
        // TODO: what to do when no idField?
     if (returnVersions && rsp != null && idField != null) {
            if (addsResponse == null) {
                    addsResponse = new NamedList<String>();
                    rsp.add("adds",addsResponse);
            }
            if (scratch == null) scratch = new CharsRef();
        idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
        addsResponse.add(scratch.toString(), cmd.getVersion());
        }
        // TODO: keep track of errors?  needs to be done at a higher level though since
     // an id may fail before it gets to this processor.
     // Given that, it may also make sense to move the version reporting out of this
     // processor too.
}

进入getHashableId()方法,这个方法很重要,判断将当前索引放到哪个节点就是通过获取这个索引的uniqueKey的值来进行hash计算定位放到哪个shard,如果通过该值没有获取到某个具体的shard则solrcloud会默认选择该Document所在的那个shard:

public String getHashableId() {
        String id = null;
        IndexSchema schema = req.getSchema();
        SchemaField sf = schema.getUniqueKeyField();
        if (sf != null) {
            if (solrDoc != null) {
                    SolrInputField field = solrDoc.getField(sf.getName());
                    int count = field == null ? 0 : field.getValueCount();
                    if (count == 0) {
                        if (overwrite) {
                            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"Document is missing mandatory uniqueKey field: "+ sf.getName());
                        }
                    } else if (count > 1) {
                        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"Document contains multiple values for uniqueKey field: " + field);
                    } else {
                        return field.getFirstValue().toString();
                    }
            }
        }
        return id;
}

SetupRequest方法中有 Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);这行代码:通过上面获取到的id来定位目标分片。进入getTargetSlice()方法:

public Slice getTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) {
        if (id == null) id = getId(sdoc, params);
        int hash = sliceHash(id, sdoc, params);
        return hashToSlice(hash, collection);
}

protected int sliceHash(String id, SolrInputDocument sdoc, SolrParams params) { return Hash.murmurhash3_x86_32(id, 0, id.length(), 0); }

protected Slice hashToSlice(int hash, DocCollection collection) { //collection.getSlices()获取该collection的所有分片也就是所有节点 for (Slice slice : collection.getSlices()) { //计算该节点的索引存储范围,范围计算见下面 Range range = slice.getRange(); if (range != null && range.includes(hash)) return slice; } throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection); }

在完成solrcloud集群配置完成之后创建几点是通过Overseer的createCollection方法进行的:

private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
            log.info("Create collection {} with numShards {}", collectionName, numShards);
            DocRouter router = DocRouter.DEFAULT;
    //这里是主要的代码,为每个节点分配接收索引的范围
         List<DocRouter.Range> ranges = router.partitionRange(numShards, router.fullRange());
            Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
            Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
            newCollections.putAll(state.getCollectionStates());
            for (int i = 0; i < numShards; i++) {
                final String sliceName = "shard" + (i+1);
                Map<String,Object> sliceProps = new LinkedHashMap<String,Object>(1);
                sliceProps.put(Slice.RANGE, ranges.get(i));
                newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
            }
            // TODO: fill in with collection properties read from the /collections/collectionName node
         Map<String,Object> collectionProps = defaultCollectionProps();
            DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
            newCollections.put(collectionName, newCollection);
            ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
            return newClusterState;
}

partitionRange()方法,partitions代表节点数目,算法很简单,不细讲:

public List<Range> partitionRange(int partitions, Range range) {
        int min = range.min;//Integer.MIN_VALUE
     int max = range.max;//Integer.MAX_VALUE
     assert max >= min;
        if (partitions == 0) return Collections.EMPTY_LIST;
        long rangeSize = (long)max - (long)min;
        long rangeStep = Math.max(1, rangeSize / partitions);
        List<Range> ranges = new ArrayList<Range>(partitions);
        long start = min;
        long end = start;
        // keep track of the idealized target to avoid accumulating rounding errors
     long targetStart = min;
        long targetEnd = targetStart;
        // Round to avoid splitting hash domains across ranges if such rounding is not significant.
     // With default bits==16, one would need to create more than 4000 shards before this
     // becomes false by default.
     boolean round = rangeStep >= (1<<bits)*16;
        while (end < max) {
            targetEnd = targetStart + rangeStep;
            end = targetEnd;
            if (round && ((end & mask2) != mask2)) {
                    // round up or down?
                 int increment = 1 << bits;  // 0x00010000
                 long roundDown = (end | mask2) - increment ;
                    long roundUp = (end | mask2) + increment;
                    if (end - roundDown < roundUp - end && roundDown > start) {
                        end = roundDown;
                    } else {
                        end = roundUp;
                    }
            }
            // make last range always end exactly on MAX_VALUE
             if (ranges.size() == partitions - 1) {
                    end = max;
            }
            ranges.add(new Range((int)start, (int)end));
            start = end + 1L;
            targetStart = targetEnd + 1L;
        }
            return ranges;
}

由cmdDistrib.distribAdd()方法最终会到sumit()(注意每个节点都会调用)方法:

public void submit(final Request sreq) {
    //获取该节点的url(在创建该节点就会确定的)以便发起http请求
     final String url = sreq.node.getUrl();
        Callable<Request> task = new Callable<Request>() {
            @Override
            public Request call() throws Exception {
                    Request clonedRequest = null;
                    try {
                        clonedRequest = new Request();
                        clonedRequest.node = sreq.node;
                        clonedRequest.ureq = sreq.ureq;
                        clonedRequest.retries = sreq.retries;
                        String fullUrl;
                        if (!url.startsWith("http://") && !url.startsWith("https://")) {
                            fullUrl = "http://" + url;
                        } else {
                            fullUrl = url;
                        }
                        HttpSolrServer server = new HttpSolrServer(fullUrl,updateShardHandler.getHttpClient());
                        if (Thread.currentThread().isInterrupted()) {
                            clonedRequest.rspCode = 503;
                            clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down.");
                            return clonedRequest;
                        }
                        clonedRequest.ursp = server.request(clonedRequest.ureq);
                        // currently no way to get the request body.
                 } catch (Exception e) {
                        clonedRequest.exception = e;
                        if (e instanceof SolrException) {
                            clonedRequest.rspCode = ((SolrException) e).code();
                        } else {
                            clonedRequest.rspCode = -1;
                        }
                    } finally {
                        semaphore.release();
                    }
                    return clonedRequest;
            }
    };
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Update thread interrupted", e);
        }
        try {
            pending.add(completionService.submit(task));
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down", e);
        }
}

server.request()方法中重要的代码Collection streams = requestWriter.getContentStreams(request);

进入UpdateRequestExt的getContentStreams()方法,将Document构造成一个xml文件并转换成字符流作为参数进行http请求,收到请求后会以xml的方式添加索引(这个过程就很简单了,会调用lucene的IndexWriter):

public Collection<ContentStream> getContentStreams() throws IOException {
        return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
}

public String getXML() throws IOException {
        StringWriter writer = new StringWriter();
        writeXML(writer);
        writer.flush();
        String xml = writer.toString();
        return (xml.length() > 0) ? xml : null;
}

public void writeXML(Writer writer) throws IOException {
        List<List<SolrDoc>> getDocLists = getDocLists(documents);
        for (List<SolrDoc> docs : getDocLists) {
            if ((docs != null && docs.size() > 0)) {
                    SolrDoc firstDoc = docs.get(0);
                    int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin;
                    boolean overwrite = firstDoc.overwrite;
                    if (commitWithin > -1 || overwrite != true) {
                        writer.write("<add commitWithin=\"" + commitWithin + "\" " + "overwrite=\"" + overwrite + "\">");
                    } else {
                        writer.write("<add>");
                    }
                    if (documents != null) {
                        for (SolrDoc doc : documents) {
                            if (doc != null) {
                                ClientUtils.writeXML(doc.document, writer);
                            }
                        }
                    }
                    writer.write("</add>");
            }
        }
        // Add the delete commands
     boolean deleteI = deleteById != null && deleteById.size() > 0;
        boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
        if (deleteI || deleteQ) {
            writer.append("<delete>");
            if (deleteI) {
                    for (Map.Entry<String,Long> entry : deleteById.entrySet()) {
                        writer.append("<id");
                        Long version = entry.getValue();
                        if (version != null) {
                            writer.append(" version=\"" + version + "\"");
                        }
                        writer.append(">");
                        XML.escapeCharData(entry.getKey(), writer);
                        writer.append("</id>");
                    }
            }
            if (deleteQ) {
                    for (String q : deleteQuery) {
                        writer.append("<query>");
                        XML.escapeCharData(q, writer);
                        writer.append("</query>");
                    }
            }
            writer.append("</delete>");
        }
}

进入ClientUtil.writeXML()方法:

public static void writeXML( SolrInputDocument doc, Writer writer ) throws IOException{
        writer.write("<doc boost=\""+doc.getDocumentBoost()+"\">");
        for( SolrInputField field : doc ) {
            float boost = field.getBoost();
            String name = field.getName();
            for( Object v : field ) {
                    String update = null;
                    if (v instanceof Map) {
                        // currently only supports a single value
                         for (Entry<Object,Object> entry : ((Map<Object,Object>)v).entrySet()) {
                            update = entry.getKey().toString();
                            v = entry.getValue();
                            if (v instanceof Collection) {
                                Collection values = (Collection) v;
                                for (Object value : values) {
                                        writeVal(writer, boost, name, value, update);
                                        boost = 1.0f;
                                }
                            } else  {
                                writeVal(writer, boost, name, v, update);
                                boost = 1.0f;
                            }
                        }
                    } else  {
                        writeVal(writer, boost, name, v, update);
                        // only write the boost for the first multi-valued field
                         // otherwise, the used boost is the product of all the boost values
                         boost = 1.0f;
                    }
            }
        }
        writer.write("</doc>");
}

继续跟进writeVal():

private static void writeVal(Writer writer, float boost, String name, Object v, String update) throws IOException {
        if (v instanceof Date) {
            v = DateUtil.getThreadLocalDateFormat().format( (Date)v );
        } else if (v instanceof byte[]) {
            byte[] bytes = (byte[]) v;
            v = Base64.byteArrayToBase64(bytes, 0, bytes.length);
        } else if (v instanceof ByteBuffer) {
            ByteBuffer bytes = (ByteBuffer) v;
            v = Base64.byteArrayToBase64(bytes.array(), bytes.position(),bytes.limit() - bytes.position());
        }
        if (update == null) {
            if( boost != 1.0f ) {
                    XML.writeXML(writer, "field", v.toString(), "name", name, "boost", boost);
            } else if (v != null) {
                    XML.writeXML(writer, "field", v.toString(), "name", name );
            }
        } else {
            if( boost != 1.0f ) {
                    XML.writeXML(writer, "field", v.toString(), "name", name, "boost", boost, "update", update);
            } else {
                    if (v == null)  {
                        XML.writeXML(writer, "field", null, "name", name, "update", update, "null", true);
                    } else  {
                        XML.writeXML(writer, "field", v.toString(), "name", name, "update", update);
                    }
            }
        }
}

看到这里就豁然开朗了,第一行代码的if(v instance Date),由于数据库的时间字段是Timestamp extends Date类型的,所以这里又将字符串时间值进行了转换,所以该索引的时间字段值就和主节点的值不一样了。

解决方法可以为entity加上自定义的transformer主动将时间字段值toString(),在上面的nextRow()操作就会调用这个transformer,到了writeVal中v就不再是Date类型了,还有一种方法就是修改sql语句convert(varchar(19),120,updateTime) as updateTime,当然推荐后者了,从简单和做索引的速度来看就知道了。



Previous     Next
uohzoaix /
Published under (CC) BY-NC-SA in categories solrCloud  tagged with