最近公司说要做智能推荐,于是想起了协同过滤,想到了Slope One算法,虽然以前看过这个算法,但没有记笔记,这次只好从头看起,好在Slope One比较容易。

Wiki上看了介绍,印象中有人用Python写了一个非常简洁的版本,于是在网上找。在这里找到详细说明,在这里找到代码。

u\v i i
u1 3 2
u2 4 2
u3 5 ?

对于上表中,使用 Slope One算法来预测用户u3对j 的评分具体过程是这样的:首先计算项目i和j的偏差,即((3 – 2) + (4 – 2)) / 2 = 1.5,之后预测用户u3对j的评分就可以这样计算5 – 1.5 = 3.5。

之后自己写了一个版本

联系作者

因为Oryx推荐引擎需要用到Spark, 所以开始了解Spark,

按照使用Spark MLlib给豆瓣用户推荐电影写了一个Python版本, 算是有了一个初步了解。只是不知道推荐效果怎样,关键是不好测试效果。

使用的过程中遇到一个问题

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/long/README.md

这是在执行官方文档例子quickstart例子时遇到,

1
2
>>> textFile = sc.textFile("README.md")
>>> textFile.count()

一直想不通,后来想到在测试Oryx的例子时,在conf/spark-env.sh里配置了HADOOP_CONF_DIR,把它注释掉即可。

而之所以之前配置了HADOOP_CONF_DIR, 是因为在执行Oryx的例子时,会使用bin/spark-submit –master yarn-client提交,此时如果没有配置HADOOP_CONF_DIR, 会报Exception in thread “main” java.lang.Exception: When running with master ‘yarn-client’ either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.错误。

参考文章

联系作者

之前是用Django自带的Paginator进行分页。在每一个需要分页的view都要添加分页处理,而使用django-pagination, 则只需要在模版里添加即可。于是开始使用django-pagination。使用的过程中发现以下问题

‘WSGIRequest’ object has no attribute ‘REQUEST’

这是因为REQUEST对象已经在Django1.9中丢弃. 进入python的lib目录,进入lib/python2.7/site-packages/pagination, 将middleware.py里的return int(self.REQUEST[‘page’])改为return int(self.GET[‘page’])

sequence index must be integer, not ‘slice’

这是因为xrange对象不能进行slice操作,进入templatetags,将pagination_tags.py,paginate函数里的page_range = paginator.page_range改为 page_range = list(paginator.page_range)

很郁闷的是,django-pagination的github仓库里的程序没有更新,而且报TOKEN_BLOCK错误,估计是这个commit中引入的。

于是只好fork出一份,自己修改。参见product分支

联系作者

以前使用virtualenv来隔离Python环境。最近知道了virtualenvwrapper, 才知道之前使用virtualenv的方法不对,而virtualenv也有一些不便,而virtualenvwrapper就是用来解决这些不便。

  • 统一环境存储位置
  • 方便环境切换

具体查看virtualwapper官方文档。工欲善其事,必先利其器确实很有道理。

想想以前花费在Windowns系统安装和软件安装的时间,太不值了。

联系作者

这里使用Hadoop 2.7.2, 在Mac上安装, 如果按照官方文档一步步来,是可以安装成功的。但如果漏了一些步骤,就会出现问题。

Input path does not exist

最后查找日志,发现原因是
/bin/bash: /bin/java: No such file or directory

解决办法是将JAVA_HOME加入到etc/hadoop/hadoop-env.sh即可

org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible

原因是多次运行bin/hdfs namenode -format, 导致namenode的version和datanode的version不一致。

解决办法是修改datanode的version.
具体参考http://blog.csdn.net/wanghai__/article/details/5752199

联系作者

修改版本号

https://github.com/elastic/elasticsearch/releases里下载到v2.3.0的Elasticsearch, 编译后得到的是2.3.0-SNAPSHOT, 这在pom.xml文件里有体现,于是进行替换

1
find . -name "pom.xml" | xargs sed -i '' 's/2.3.0-SNAPSHOT/2.3.0/g'

这里sed的用法是在Mac电脑上。

但Elasticsearch无法启动,报错说还是2.3.0-SNAPSHOT, 打开Version.java, 将

1
public static final Version V_2_3_0 = new Version(V_2_3_0_ID, true, org.apache.lucene.util.Version.LUCENE_5_5_0);

修改为

1
public static final Version V_2_3_0 = new Version(V_2_3_0_ID, false, org.apache.lucene.util.Version.LUCENE_5_5_0);

编译后可以正常启动.

修改日志格式

目前Elasticsearch的日志输出无法知道是哪个类,哪个包,第几行打印的日志,所以需要修改logging.yml的配置。


1
conversionPattern: "[%d{ISO8601}][%-5p][%-25c]: %m%n"

改为

1
conversionPattern: "[%d{ISO8601}][%-5p][%l]: %m%n"

输出query日志

想知道用户的query日志,但Elasticsearch没有记录,在NettyHttpRequest.java里, String uri = request.getUri();后添加

1
2
3
4
5
try {
logger.info("### query uri {}", URLDecoder.decode(uri, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
logger.info("### query uri {}", uri);
}

联系作者

Elasticsearch源码分析-启动里简单了解Elasticsearch的启动过程,这里来看看查询过程。

接收请求

从启动篇里知道HttpRequestHandler,进入这个类查看,看到messageReceived, 进入NettyHttpRequest, 看到String uri = request.getUri(); 看到这里没有日志输出,一直纳闷为什么Elasticsearch没有请求url输出,于是加上日志

1
2
3
4
5
try {
logger.info("query uri {}", URLDecoder.decode(uri, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
logger.info("query uri {}", uri);
}

之后日志里就有请求的uri了。看到RestUtils.decodeQueryString(uri, pathEndPos + 1, params), 知道请求参数是在这里完成解析。

查看serverTransport.dispatchRequest,进入httpServerAdapter.dispatchRequest(request, channel),这里要知道httpServerAdapter的具体对象,查看

1
2
3
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
this.httpServerAdapter = httpServerAdapter;
}

被哪个函数调用,跳到HttpServer.java, 打开server.internalDispatchRequest(request, channel); 之后到了restController.dispatchRequest(request, channel);

最终请求的处理由restController.dispatchRequest(request, channel);完成

请求处理

进入RestController的dispatchRequest方法, 进入executeHandler方法, 在getHandler(request)里,根据不同的请求方法,返回不同的handler,然后调用handler里的handleRequest方法处理请求,这里以GET方法为例。

对于不同的动作,都可以使用GET方法,如curl -XGET /index/type/id, curl -XGET /index/type/_search, 这里以/index/type/_search这查询为例。

在RestSearchAction.java里,有语句controller.registerHandler(GET, "/{index}/{type}/_search", this);, 所以执行curl -XGET /index/type/_search时,得到的handler就是RestSearchAction, 并执行这个类里的handleRequest方法。

进入RestSearchAction.java里的handleRequest方法,先是执行RestSearchAction.parseSearchRequest(searchRequest, request, parseFieldMatcher, null),这个方法主要对查询参数进行设置,之后调用client.search(searchRequest, new RestStatusToXContentListener(channel))进行查询。

client类型

现在要弄清楚client的具体类型, 在Node初始化里,有modules.add(new NodeClientModule())这句,打开查看,有bind(Client.class).to(NodeClient.class).asEagerSingleton(),所以这里的client具体类型是NodeClient, 而NodeClent继承自AbstractClient,

然后看查询调用过程client.search ->client.execute->client.doExecute->transportAction.execute, 最终还是由transportAction来完成实际的查询

值得注意的一点是client. execute是execute(SearchAction.INSTANCE, request, listener);

transportAction类型

在Node初始化时,有modules.add(new ActionModule(false)),进入ActionModule.java查看,有registerAction(SearchAction.INSTANCE, TransportSearchAction.class);所以transportAction是TransportSearchAction类型。

具体执行

transportAction.execute最终会调用transportAction.doExecute, 这里是进入TransportSearchAction.java的doExecute,这里会对search_type进行判断

对于search_type, 是由RestSearchAction.java里的searchRequest.searchType(searchType)语句设定,默认是SearchType.DEFAULT, 也就是SearchType.QUERY_THEN_FETCH

query阶段

由此新建了一个SearchQueryThenFetchAsyncAction实例,之后searchAsyncAction.start();开始查询。在父类AbstractSearchAsyncAction的start()函数里,

1
2
3
4
5
6
7
8
9
10
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}

对每一个shard调用performFirstPhase,

查看performFirstPhase, 最终会调用sendExecuteFirstPhase,并添加了ActionListener, 如果成功则执行onResponse里的onFirstPhaseResult, 在onFirstPhaseResult里有个判断, if (xTotalOps == expectedTotalOps),当所有shard都执行完后,执行innerMoveToSecondPhase, 最终执行moveToSecondPhase

fetch阶段

在moveToSecondPhase里, sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults)对第一阶段的结果进行合并,之后对每个shard里入选到topN的doc进行fetch,即执行executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node),

在executeFetch里,

1
2
3
if (counter.decrementAndGet() == 0) {
finishHim();
}

当所有需要执行的shard都结束后,执行finishHim(),标志着查询结束。

在finishHim里,

1
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,fetchResults, request);

对fetch阶段Shard返回的结果进行合并.

1
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,successfulOps.get(), buildTookInMillis(), buildShardFailures()))

设置返回的SearchResponse对象.

请求结果返回

在TransportAction调用execute时,有添加Actionlistener,

1
2
3
4
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}

这里的Response就是上面返回的SearchResponse, 而listener可以在RestSearchAction中找到, 是RestStatusToXContentListener(channel).

RestStatusToXContentListener继承RestResponseListener, RestResponseListener继承RestActionListener, 最终onResponse方法会调用RestStatusToXContentListener中的buildResponse, 也就调用了SearchResponse中的toXContent方法。

到此,大致了解Elasticsearch的查询过程。目前,我修改JSON返回格式,就是修改SearchResponse的toXContent方法。

联系作者

前言

刚开始使用Elasticsearch时,我只需要修改Elasticsearch的_search这个查询的返回格式,使之与django-rest-framework的返回结果一致,凭着修改Solr的JSONResponseWriter返回结果的经验,在没有研究Elasticsearch源码的情况下,很快找到了org.elasticsearch.action.search.SearchResponse类,并进行修改,虽然遇到一些问题,但最终还是达到了目的。最近需要修改top hits aggregations的返回结果,于是开始看源码。

准备工作

修改日志

  • 修改config下的logging.yml, 将所有INFO替换为DEBUG,
  • conversionPattern: "[%d{ISO8601}][%-5p][%-25c]: %.10000m%n" 改为conversionPattern: "[%d{ISO8601}][%-5p][%l]: %.10000m%n"以便查看到更多的日志,这里建议生产环境中也这样设置,这个更容易查找错误

    查看程序入口

    查看bin目录下的启动脚本elasticsearch, 知道程序入口是org.elasticsearch.bootstrap.Elasticsearch

    深入代码

  • 进入Bootstrap.java的init方法, Environment environment = initialSettings(foreground);加载环境配置,
  • 进入INSTANCE.setup(true, settings, environment);JarHell.checkJarHell();完成jar hell检查, 跟踪node = nodeBuilder.build();,发现是这里新建Node,并完成初始化

    Node初始化

  • 在Node的构造函数里,nodeEnvironment = new NodeEnvironment(this.settings, this.environment);完成Node环境初始化,
  • final ThreadPool threadPool = new ThreadPool(settings);完成线程池初始化,进入ThreadPool可以看到对于不同任务会建立不同的线程池。
  • Elasticsearch使用Guice作为依赖注入容器,这在 ModulesBuilder modules = new ModulesBuilder();里有所体现,这里主要关注RestModule, TransportModule,HttpServerMoudle的配置。
  • 进入RestModule.java之后进入RestActionModule.java,可以看到配置了许多RestAction,
  • 进入TransportModule.java, 可以看到NettyTransport,
  • 进入HttpServerModule.java,可以看到使用NettyHttpServerTransport.

    Node启动

    进入INSTANCE.start(),之后进入node.start(), 可以看到得到很多实例,
  • 对于RestController, 进入之后可以看到在registerHandler函数里对不同的request method绑定了不同的handler
  • 对于TransportServer, 默认绑定到9300端口, 这个用来做集群节点间通信
  • 对于HttpServerTransport,在配置里使用NettyHttpServerTransport, 所以这里实际上是得到NettyHttpServerTransport实例, 默认绑定到9200端口, 这个用来处理http请求

    NettyHttpServerTransport

    进入NettyHttpServerTransport, 在doStart()函数里,看到serverBoostrap是Netty的ServerBootstrap实例,看到serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());, 查看configureServerChannelPipelineFactory, 知道requestHandler是HttpRequestHandler

这样,差不多就完成了Elasticsearch的启动。

联系作者

google “pycharm vim”, 第一条指向https://confluence.jetbrains.com/display/PYH/Configuring+PyCharm+to+work+as+a+Vim+editor这里,但没有找到我想要的,于是自己在PyCharm里找,终于找到了,记下来。

PyCharm->Preference->Plugins->Install JetBrains plugin, 之后搜索vim找到ideavim, 安装后重启,进入PyCharm已经可以和vim一样编辑代码。

但是连行号都没有,于是想到要给ideavim加个配置文件,可是要加到哪里?打开http://blog.csdn.net/u010211892/article/details/43274699看到

1
cp ~/.vimrc ~/.ideavimrc

对啊,vim是.vimrc, ideavim就是.ideavimrc, 没想到啊。

联系作者

在看Django-rest-framework2时,看到Tutorial 6: ViewSets & Routers,执行from rest_framework.decorators import detail_route时,报cannot import name detail_route错误

查看decorators.py源码,发现原因是从2.4.0才有这个方法,而公司用的是2.3.14,所以没有。

在view里添加detail_route的代码

1
2
3
4
5
6
7
8
9
10
def detail_route(methods=['get'], **kwargs):
"""
Used to mark a method on a ViewSet that should be routed for detail requests.
"""

def decorator(func):
func.bind_to_methods = methods
func.detail = True
func.kwargs = kwargs
return func
return decorator

联系作者