记一次搜索代码重构

达芬奇密码2018-08-09 11:05
写在最前面

拖了好久,还是要写一下的,也是折腾了好久的东西.实际效果还不错,搜索接口性能较去年大促有明显提示,平均接口rt大概是原来的30%左右

考拉的搜索,是一个单入口多功能返回的复杂接口.随着后续业务的发展,接口的性能以及后续的开发效率会随之下降,针对现有情况,所以有了此次的代码重构和性能优化.代码也没什么具体参考,摸着石头过河,觉着应该这样写就写完了,欢迎大家提供宝贵建议.

面临的一些问题
已有的代码主要存在以及面临如下一些情况
1.接口存在重复调用
2.不同端搜索存在部分逻辑处理差异
3.不同端搜索返回结果结构不同
4.线性处理流程,接口性能差
5.对新增功能不友好,可能涉及修改多个地方

主要的目标和一些思路
本次重构主要完成如下一些功能
1.提供统一获取数据的方式,对于相同数据,做到对外只获取一次.
2.按模块拆分处理逻辑,根据模块不同,拆分不同的处理逻辑,接口按照需要添加处理模块,类似插件
3.并发地从第三方获取数据,降低接口整体rt
4.基于处理后的基础数据,提供不同的打包模块,针对不同的端可以返回不同的数据,尽量只返回需要的数据

针对上面的一些功能,抽象出简单的SearchResultBuilder类,根据搜索条件和搜索结果,处理生成最终返回结果
1.accessor 数据获取器,主要负责数据的获取
public interface Accessor {

void accessData();//主要实现获取数据的接口

}
默认的获取dubbo数据的一个实现
public abstract class DubboAccessor<Key, Value> implements Accessor {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

private Set<Key> set = new HashSet<>();

private Map<Key, Value> map = new HashMap<>();

//提供给处理模块在预处理时添加需要查询的key
public void addKey(Key key) {
set.add(key);
}

//提供给处理模块在预处理时添加需要查询的key
public void addKey(Collection<Key> c) {
if (CollectionUtils.isEmpty(c)) {
return;
}
for (Key key : c) {
addKey(key);
}
}

public Set<Key> getSet() {
return set;
}

//提供给处理模块在处理时根据key获取数据
public Value getValue(Key key) {
return map.get(key);
}

//提供给处理模块在处理时根据key获取数据
public void setValue(Key key, Value value) {
map.put(key, value);
}

public Map<Key, Value> getMap() {
return map;
}

public void setMap(Map<Key, Value> map) {
if (MapUtils.isNotEmpty(map)) {
this.map.putAll(map);
}
}
}
其他用到的还有类似
CacheAccessor(从缓存查询并获取map结构的数据返回)
SingleAccessor(根据特殊条件查询特殊的值,非map结构)
ParallelAccessor(特殊的处理器,用于分页太大,需要分页查询时,可以转化成多个小accessor)

2.processor处理器模块,主要负责数据处理,根据不同功能实现不同模块,按需配置,分为两个流程
preProcess:预处理,判定是否需要获取数据等一些逻辑,以及把需要查询的条件放置到accessor中
process:处理,根据先前的判断判定是都执行处理逻辑,以及处理数据,生成基础数据,用于最后打包
public interface Processor {

/**
* 预处理,收集缓存key值等
*
* @param builder
*/
void preProcess(SearchResultBuilder builder);

/**
* 用于构建,从缓存中获取值,并填充
*
* @param builder
*/
void process(SearchResultBuilder builder);

}

3.packager打包模块,根据不同的需求,基于基础数据包装出不同的返回结果
public interface Packager<T> {

//根据基础数据填充返回结果
T pack(Map<String, Object> params);

}

4.BuilderContext
存在两个buildeContext,后面可以考虑统一
1.基于threadLocal实现,主要用于模块间的数据交互
2.基于普通map实现,主要用于存放 处理器处理后的数据

5.SearchResultbuilder核心类,
整个流程基于该类实现(简易实现版),负责串联整个流程,以及数据的并发获取
package com.netease.haitao.search.builder;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
* 搜索结果构造器 所有数据获取器和数据处理器,执行顺序存在先后关系
*
* @author hzfanshilong
*
*/
@SuppressWarnings("rawtypes")
public class SearchResultBuilder {
private static final Logger logger = LoggerFactory.getLogger(SearchResultBuilder.class);

/**
* TODO 线程数需要根据dubbo线程池大小以及获取数据的并行度设置
*/
private static ExecutorService executorService = Executors
.newFixedThreadPool(1024);

// 底层搜索结果(es等)
private SearchResult srSearchResult;

// 底层搜索条件(es等)
private SearchCondition srSearchCondition;

private Map<Class<? extends Accessor>, Accessor> accessorMap = Maps.newHashMap();

// 混合处理器(有强列顺序时可以使用该处理器)
private List<Processor> processorList = Lists.newLinkedList();

// 构建最终参数
private Map<String, Object> builderContext = Maps.newConcurrentMap();

private Long EXECUTION_TIME_OUT = 2000l;

private int parallel = 6;

public SearchResultBuilder(SearchResult srSearchResult) {
this.srSearchResult = srSearchResult;
}

public SearchResultBuilder(SearchCondition srSearchCondition,
SearchResult srSearchResult) {
this.srSearchCondition = srSearchCondition;
this.srSearchResult = srSearchResult;
}

/**
* 设置数据处理器
*
* @param processor
* @return
*/
public SearchResultBuilder processor(Processor processor) {
this.processorList.add(processor);
return this;
}

/**
* 设置数据,用于最后打包用
*
* @param key
* @param value
* @return
*/
public SearchResultBuilder addParam(String key, Object value) {
if (value != null) {
this.builderContext.put(key, value);
}
return this;
}

/**
* 获取数据
*
* @param key
* @return
*/
public Object getParam(String key) {
return this.builderContext.get(key);
}

/**
* 异步获取数据,可以自己设置并行度
*
* @param parallel
* @return
*/
public SearchResultBuilder accessDataAsync(int parallel) {
int size = accessorMap.values().size();
if (size == 0) {
return this;
}
PriorityBlockingQueue<Accessor> accessorList = new PriorityBlockingQueue<>(size,
new ExecutionComparator<Accessor>());

accessorList.addAll(accessorMap.values());

CountDownLatch cdl = new CountDownLatch(parallel);

List<AccessAsyncTask> taskList = new ArrayList<>(parallel);

for (int i = 0; i < parallel; i++) {
AccessAsyncTask task = new AccessAsyncTask(accessorList, cdl);
executorService.submit(task);
taskList.add(task);
}

try {
cdl.await(EXECUTION_TIME_OUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("accessDataAsync time out! more than " + EXECUTION_TIME_OUT + " ms, cancel all task", e);
for (AccessAsyncTask task : taskList) {
task.cancel();
}
}

return this;
}

public SearchResultBuilder accessorParallel(int parallel) {
this.parallel = parallel;
return this;
}

/**
* 数据处理器数据预处理
*
* @return
*/
public SearchResultBuilder processorPreProcess() {
for (Processor processor : processorList) {
processor.preProcess(this);
}

return this;
}

/**
* 所有数据处理器数据预处理
*
* @return
*/
public SearchResultBuilder preProcess() {
processorPreProcess();
return this;
}

/**
* 数据处理器处理数据
*
* @return
*/
public SearchResultBuilder processorProcess() {
for (Processor processor : processorList) {
processor.process(this);
}

return this;
}

/**
* 所有数据处理器处理数据
*
* @return
*/
public SearchResultBuilder process() {
processorProcess();
return this;
}

/**
* 所有操作一块进行
* 数据预处理
* 数据获取
* 数据处理
*
* @return
*/
public SearchResultBuilder buildTogetherAsync() {
preProcess();
accessDataAsync(parallel);
process();
BuilderContext.removeAll();
return this;
}

/**
* 数据打包成结果对象
*
* @param packager
* @return
*/
public <T> T pack(Packager<T> packager) {
return packager.pack(builderContext);
}

/**
* 异步执行任务,引用同一个线程变量
*/
private class AccessAsyncTask implements Runnable {
private PriorityBlockingQueue<Accessor> queue;
private CountDownLatch cdl;
private volatile boolean isCanceled = false;

private Map<String, Object> buildContextCopy;

AccessAsyncTask(PriorityBlockingQueue<Accessor> queue, CountDownLatch cdl) {
this.queue = queue;
this.cdl = cdl;
buildContextCopy = BuilderContext.getAll();
}

public void cancel() {
isCanceled = true;
}

@Override
public void run() {
BuilderContext.setAll(buildContextCopy);
Accessor accessor = null;
do {
try {
accessor = queue.poll(1, TimeUnit.MILLISECONDS);
if(accessor != null) {
try{
accessor.accessData();
} catch (Throwable t) {
logger.error("access data async failed", t);
}
}
} catch (InterruptedException e) {
logger.warn("poll accessor failed", e);
}
} while (accessor != null && !isCanceled);
cdl.countDown();
BuilderContext.removeAll();
}
}

}

简单的应用
1.定义一个数据获取模块
public class NewBrandCacheAccessor extends CacheAccessor<Long, Brand> {

private CacheService searchCache;

private Future<Map<Long, Brand>> future;

public NewBrandCacheAccessor(CacheService searchCache) {
this.searchCache = searchCache;
}

@Override
public void accessData() {
if (CollectionUtils.isEmpty(getSet())) {
return;
}

Map<Long, Brand> map = searchCache.getAllPresent(getSet(), new KeyGenerator<Long, Brand>() {
@Override
public NkvKey<Brand> generatKey(Long brandId) {
return NkvKeyManager.BRAND_NKV_KEY_PREFIX.createKey(brandId);
}
});

setMap(map);
}

}
2.定义一个数据处理模块
public class NewBrandFilterProcessor implements FilterProcessor {

@Override
public void preProcess(SearchResultBuilder builder) {
NewBrandCacheAccessor brandCacheAccessor = builder.getCacheAccessor(NewBrandCacheAccessor.class);

SearchResult srSearchResult = builder.getSrSearchResult();
List<BrandInfo> brandInfos = srSearchResult.getBrandInfoList();
if (CollectionUtils.isNotEmpty(brandInfos)) {
for (BrandInfo brandInfo : brandInfos) {
brandCacheAccessor.addKey(brandInfo.getBrandId());
}
}
}

@Override
public void process(SearchResultBuilder builder) {
NewBrandCacheAccessor brandCacheAccessor = builder.getCacheAccessor(NewBrandCacheAccessor.class);
List<BrandSearchResultInfo> searchBrandInfos = new ArrayList<BrandSearchResultInfo>();

SearchResult srSearchResult = builder.getSrSearchResult();
List<BrandInfo> brandInfos = srSearchResult.getBrandInfoList();
if(CollectionUtils.isNotEmpty(brandInfos)) {
for (BrandInfo brandInfo : brandInfos) {
Brand brand = brandCacheAccessor.getValue(brandInfo.getBrandId());
if (brand != null) {
BrandSearchResultInfo searchBrandInfo = new BrandSearchResultInfo();
searchBrandInfo.setBrandId(brandInfo.getBrandId());
//组装品牌信息
searchBrandInfos.add(searchBrandInfo);
}
}

builder.addParam("brandNameList", searchBrandInfos);
}
}

}

3.定义一个搜索接口,通用流程
格式化查询条件==》查询数据==》处理查询结果==》组装返回结果
public AppSearchResult getAppSearchResult(OutSearchCondition condition, SearchCheckCondition checkCondition) {
SearchCondition srSearchCondition = queryParserService.getSrSearchCondition(condition, checkCondition);//处理查询条件
SearchResult srSearchResult = searchPhraseFacadeService.getSearchGoodsForShopFromSr(srSearchCondition);//从sr获取查询结果
AppSearchResult result = new AppSearchResult();
if (srSearchResult != null) {
SearchResultBuilder builder = appBuilderService.getMarketBuilder(srSearchCondition, srSearchResult);//获取接口处理逻辑
builder.buildTogetherAsync();//获取并处理数据
result = builder.pack(new AppSearchResultPackager());//根据接口组装返回数据
return result;
}
return result;
}
4.定制接口处理逻辑,主要定义接口需要获取哪些数据,以及数据的处理逻辑
通过accessor数据获取器来统一获取数据,对相同接口只存在单次调用
通过processor处理器处理数据,根据业务逻辑,设置相应的处理器,使相似流程只需变更部分处理器即可 
public SearchResultBuilder getBuilder(SearchCondition searchCondition, SearchResult searchResult) {
SearchResultBuilder builder = new SearchResultBuilder(searchCondition, searchResult);
builder.accessorParallel(9);
// 设置需要用到的缓存数据获取器
builder.accessor(new NewBrandCacheAccessor(searchNkvService));
builder.processor(new NewBrandFilterProcessor()));//设置数据处理器
return builder;
}
5.并发处理数据
具体的处理流程
处理器预处理结果==》并发从缓存第三方获取数据==》根据结果和第三方数据处理结果
public SearchResultBuilder buildTogetherAsync() {
preProcess();//处理器预处理数据,主要是收集获取数据需要的条件以及一些获取数据的先决判定条件,例如是否展示直达品牌或者店铺
accessDataAsync(parallel);//并发获取数据,parallel为线程并发参数,避免单机线程过多
process();//获取数据后的处理数据,产出可用于最终或间接展示的数据
BuilderContext.removeAll();清理线程变量
return this;
}
6.组装数据,根据packager和SearchResultBuilder组装出返回对象

网易云新用户大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者范世龙授权发布。