以前使用virtualenv来隔离Python环境。最近知道了virtualenvwrapper, 才知道之前使用virtualenv的方法不对,而virtualenv也有一些不便,而virtualenvwrapper就是用来解决这些不便。

  • 统一环境存储位置
  • 方便环境切换

具体查看virtualwapper官方文档。工欲善其事,必先利其器确实很有道理。

想想以前花费在Windowns系统安装和软件安装的时间,太不值了。

联系作者

这里使用Hadoop 2.7.2, 在Mac上安装, 如果按照官方文档一步步来,是可以安装成功的。但如果漏了一些步骤,就会出现问题。

Input path does not exist

最后查找日志,发现原因是
/bin/bash: /bin/java: No such file or directory

解决办法是将JAVA_HOME加入到etc/hadoop/hadoop-env.sh即可

org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible

原因是多次运行bin/hdfs namenode -format, 导致namenode的version和datanode的version不一致。

解决办法是修改datanode的version.
具体参考http://blog.csdn.net/wanghai__/article/details/5752199

联系作者

修改版本号

https://github.com/elastic/elasticsearch/releases里下载到v2.3.0的Elasticsearch, 编译后得到的是2.3.0-SNAPSHOT, 这在pom.xml文件里有体现,于是进行替换

1
find . -name "pom.xml" | xargs sed -i '' 's/2.3.0-SNAPSHOT/2.3.0/g'

这里sed的用法是在Mac电脑上。

但Elasticsearch无法启动,报错说还是2.3.0-SNAPSHOT, 打开Version.java, 将

1
public static final Version V_2_3_0 = new Version(V_2_3_0_ID, true, org.apache.lucene.util.Version.LUCENE_5_5_0);

修改为

1
public static final Version V_2_3_0 = new Version(V_2_3_0_ID, false, org.apache.lucene.util.Version.LUCENE_5_5_0);

编译后可以正常启动.

修改日志格式

目前Elasticsearch的日志输出无法知道是哪个类,哪个包,第几行打印的日志,所以需要修改logging.yml的配置。


1
conversionPattern: "[%d{ISO8601}][%-5p][%-25c]: %m%n"

改为

1
conversionPattern: "[%d{ISO8601}][%-5p][%l]: %m%n"

输出query日志

想知道用户的query日志,但Elasticsearch没有记录,在NettyHttpRequest.java里, String uri = request.getUri();后添加

1
2
3
4
5
try {
logger.info("### query uri {}", URLDecoder.decode(uri, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
logger.info("### query uri {}", uri);
}

联系作者

Elasticsearch源码分析-启动里简单了解Elasticsearch的启动过程,这里来看看查询过程。

接收请求

从启动篇里知道HttpRequestHandler,进入这个类查看,看到messageReceived, 进入NettyHttpRequest, 看到String uri = request.getUri(); 看到这里没有日志输出,一直纳闷为什么Elasticsearch没有请求url输出,于是加上日志

1
2
3
4
5
try {
logger.info("query uri {}", URLDecoder.decode(uri, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
logger.info("query uri {}", uri);
}

之后日志里就有请求的uri了。看到RestUtils.decodeQueryString(uri, pathEndPos + 1, params), 知道请求参数是在这里完成解析。

查看serverTransport.dispatchRequest,进入httpServerAdapter.dispatchRequest(request, channel),这里要知道httpServerAdapter的具体对象,查看

1
2
3
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
this.httpServerAdapter = httpServerAdapter;
}

被哪个函数调用,跳到HttpServer.java, 打开server.internalDispatchRequest(request, channel); 之后到了restController.dispatchRequest(request, channel);

最终请求的处理由restController.dispatchRequest(request, channel);完成

请求处理

进入RestController的dispatchRequest方法, 进入executeHandler方法, 在getHandler(request)里,根据不同的请求方法,返回不同的handler,然后调用handler里的handleRequest方法处理请求,这里以GET方法为例。

对于不同的动作,都可以使用GET方法,如curl -XGET /index/type/id, curl -XGET /index/type/_search, 这里以/index/type/_search这查询为例。

在RestSearchAction.java里,有语句controller.registerHandler(GET, "/{index}/{type}/_search", this);, 所以执行curl -XGET /index/type/_search时,得到的handler就是RestSearchAction, 并执行这个类里的handleRequest方法。

进入RestSearchAction.java里的handleRequest方法,先是执行RestSearchAction.parseSearchRequest(searchRequest, request, parseFieldMatcher, null),这个方法主要对查询参数进行设置,之后调用client.search(searchRequest, new RestStatusToXContentListener(channel))进行查询。

client类型

现在要弄清楚client的具体类型, 在Node初始化里,有modules.add(new NodeClientModule())这句,打开查看,有bind(Client.class).to(NodeClient.class).asEagerSingleton(),所以这里的client具体类型是NodeClient, 而NodeClent继承自AbstractClient,

然后看查询调用过程client.search ->client.execute->client.doExecute->transportAction.execute, 最终还是由transportAction来完成实际的查询

值得注意的一点是client. execute是execute(SearchAction.INSTANCE, request, listener);

transportAction类型

在Node初始化时,有modules.add(new ActionModule(false)),进入ActionModule.java查看,有registerAction(SearchAction.INSTANCE, TransportSearchAction.class);所以transportAction是TransportSearchAction类型。

具体执行

transportAction.execute最终会调用transportAction.doExecute, 这里是进入TransportSearchAction.java的doExecute,这里会对search_type进行判断

对于search_type, 是由RestSearchAction.java里的searchRequest.searchType(searchType)语句设定,默认是SearchType.DEFAULT, 也就是SearchType.QUERY_THEN_FETCH

query阶段

由此新建了一个SearchQueryThenFetchAsyncAction实例,之后searchAsyncAction.start();开始查询。在父类AbstractSearchAsyncAction的start()函数里,

1
2
3
4
5
6
7
8
9
10
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}

对每一个shard调用performFirstPhase,

查看performFirstPhase, 最终会调用sendExecuteFirstPhase,并添加了ActionListener, 如果成功则执行onResponse里的onFirstPhaseResult, 在onFirstPhaseResult里有个判断, if (xTotalOps == expectedTotalOps),当所有shard都执行完后,执行innerMoveToSecondPhase, 最终执行moveToSecondPhase

fetch阶段

在moveToSecondPhase里, sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults)对第一阶段的结果进行合并,之后对每个shard里入选到topN的doc进行fetch,即执行executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node),

在executeFetch里,

1
2
3
if (counter.decrementAndGet() == 0) {
finishHim();
}

当所有需要执行的shard都结束后,执行finishHim(),标志着查询结束。

在finishHim里,

1
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,fetchResults, request);

对fetch阶段Shard返回的结果进行合并.

1
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,successfulOps.get(), buildTookInMillis(), buildShardFailures()))

设置返回的SearchResponse对象.

请求结果返回

在TransportAction调用execute时,有添加Actionlistener,

1
2
3
4
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}

这里的Response就是上面返回的SearchResponse, 而listener可以在RestSearchAction中找到, 是RestStatusToXContentListener(channel).

RestStatusToXContentListener继承RestResponseListener, RestResponseListener继承RestActionListener, 最终onResponse方法会调用RestStatusToXContentListener中的buildResponse, 也就调用了SearchResponse中的toXContent方法。

到此,大致了解Elasticsearch的查询过程。目前,我修改JSON返回格式,就是修改SearchResponse的toXContent方法。

联系作者

前言

刚开始使用Elasticsearch时,我只需要修改Elasticsearch的_search这个查询的返回格式,使之与django-rest-framework的返回结果一致,凭着修改Solr的JSONResponseWriter返回结果的经验,在没有研究Elasticsearch源码的情况下,很快找到了org.elasticsearch.action.search.SearchResponse类,并进行修改,虽然遇到一些问题,但最终还是达到了目的。最近需要修改top hits aggregations的返回结果,于是开始看源码。

准备工作

修改日志

  • 修改config下的logging.yml, 将所有INFO替换为DEBUG,
  • conversionPattern: "[%d{ISO8601}][%-5p][%-25c]: %.10000m%n" 改为conversionPattern: "[%d{ISO8601}][%-5p][%l]: %.10000m%n"以便查看到更多的日志,这里建议生产环境中也这样设置,这个更容易查找错误

    查看程序入口

    查看bin目录下的启动脚本elasticsearch, 知道程序入口是org.elasticsearch.bootstrap.Elasticsearch

    深入代码

  • 进入Bootstrap.java的init方法, Environment environment = initialSettings(foreground);加载环境配置,
  • 进入INSTANCE.setup(true, settings, environment);JarHell.checkJarHell();完成jar hell检查, 跟踪node = nodeBuilder.build();,发现是这里新建Node,并完成初始化

    Node初始化

  • 在Node的构造函数里,nodeEnvironment = new NodeEnvironment(this.settings, this.environment);完成Node环境初始化,
  • final ThreadPool threadPool = new ThreadPool(settings);完成线程池初始化,进入ThreadPool可以看到对于不同任务会建立不同的线程池。
  • Elasticsearch使用Guice作为依赖注入容器,这在 ModulesBuilder modules = new ModulesBuilder();里有所体现,这里主要关注RestModule, TransportModule,HttpServerMoudle的配置。
  • 进入RestModule.java之后进入RestActionModule.java,可以看到配置了许多RestAction,
  • 进入TransportModule.java, 可以看到NettyTransport,
  • 进入HttpServerModule.java,可以看到使用NettyHttpServerTransport.

    Node启动

    进入INSTANCE.start(),之后进入node.start(), 可以看到得到很多实例,
  • 对于RestController, 进入之后可以看到在registerHandler函数里对不同的request method绑定了不同的handler
  • 对于TransportServer, 默认绑定到9300端口, 这个用来做集群节点间通信
  • 对于HttpServerTransport,在配置里使用NettyHttpServerTransport, 所以这里实际上是得到NettyHttpServerTransport实例, 默认绑定到9200端口, 这个用来处理http请求

    NettyHttpServerTransport

    进入NettyHttpServerTransport, 在doStart()函数里,看到serverBoostrap是Netty的ServerBootstrap实例,看到serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());, 查看configureServerChannelPipelineFactory, 知道requestHandler是HttpRequestHandler

这样,差不多就完成了Elasticsearch的启动。

联系作者

google “pycharm vim”, 第一条指向https://confluence.jetbrains.com/display/PYH/Configuring+PyCharm+to+work+as+a+Vim+editor这里,但没有找到我想要的,于是自己在PyCharm里找,终于找到了,记下来。

PyCharm->Preference->Plugins->Install JetBrains plugin, 之后搜索vim找到ideavim, 安装后重启,进入PyCharm已经可以和vim一样编辑代码。

但是连行号都没有,于是想到要给ideavim加个配置文件,可是要加到哪里?打开http://blog.csdn.net/u010211892/article/details/43274699看到

1
cp ~/.vimrc ~/.ideavimrc

对啊,vim是.vimrc, ideavim就是.ideavimrc, 没想到啊。

联系作者

在看Django-rest-framework2时,看到Tutorial 6: ViewSets & Routers,执行from rest_framework.decorators import detail_route时,报cannot import name detail_route错误

查看decorators.py源码,发现原因是从2.4.0才有这个方法,而公司用的是2.3.14,所以没有。

在view里添加detail_route的代码

1
2
3
4
5
6
7
8
9
10
def detail_route(methods=['get'], **kwargs):
"""
Used to mark a method on a ViewSet that should be routed for detail requests.
"""

def decorator(func):
func.bind_to_methods = methods
func.detail = True
func.kwargs = kwargs
return func
return decorator

联系作者

在Solr和Elasticsearch两个中权衡,最后还是选择了Elasticsearch。虽然之前有Solr开发经验,但是当看过Elasticsearch的配置后,还是投奔Elasticsearch,只能说Solr的配置太复杂了。

依然是要从MySQL中导数据,在http://www.jianshu.com/p/05cff717563c中看到一些解决方案。因为对Mysql的binlog并不了解,而搜索elasticsearch-river-jdbc时,只搜到了elasticsearch-jdbc,于是决定使用它。

增量导入数据

决定使用elasticsearch-jdbc后,使用增量导MySQL数据时发现官方文档,写的不好,而且竟然连向数据库提交的查询语句都不输出日志,出现问题时很难找错。

在使用增量导数据时,一直找不到它导入时间的存储位置,于是只好看代码,发现statefile的配置很重要,于是将它加上。但还是发现需要做一次全量导入后,这个增量导入才有效。

于是修改README

1
2
3
4
There is a problem here, the first time you run the script, it can't select any data from table, it have two solutions here:

1. in another script, do full-import, later you can use the incremental script to select incremental data
2. define a statefile.json file before the first time you run the incremental script, set the lastexecutionstart to 0, so that you can select all the data from table.

今天发现,为何不在,开始时间设置为0,这样就可以做全量导入了,于是提交了一个新的patch.

update:

后来又提交了一个patch, 现在只要加上statefile, 第一次导入时,开始时间为0,之后就是增量了,方便了不少。

定时导数据

原计划是在crontab里添加定时执行任务, 所以没看elasticsearch-jdbc提供的schedule功能,但看到issue中有人提到,于是开始解决。最后发现schedule时没有重新加载statefile文件,于是提交了一个patch。这次也把向数据库提交的查询语句打印出来,方便找错。

结束语

无法删除数据确实是一个很严重的缺陷,看来还是要想办法从binlog里读取数据才行,先这样做吧,以后再优化。

联系作者

参考http://elasticsearch.cn/?/question/29

配置synonym.txt

在config目录下analysis,在analysis目录里新建synonym.txt文件,内容如下

1
2
beijing,北京,帝都
上海,魔都

配置elasticsearch.yml

在elasticsearch.yml里添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
index:
analysis:
filter:
my_synonym:
type: synonym
synonyms_path: analysis/synonym.txt
analyzer:
ik_smart_syno:
type: custom
tokenizer: ik_smart
filter: [my_synonym]
ik_max_word_syno:
type: custom
tokenizer: ik_max_word
filter: [my_synonym]

测试

新建索引curl -XPUT 'localhost:9200/test?pretty',之后执行http://localhost:9200/test/_analyze?analyzer=ik_max_word_syno&text=上海外滩

联系作者

本来打算用Solr来搭建搜索服务,而公司的数据放在MySQL数据里,于是在文档里找到DataImportHandler,参考https://wiki.apache.org/solr/DataImportHandler, 这里以导入Wordpress数据为例

在conf目录下新建data-config.xml

data-config.xml的内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dataConfig>
<dataSource type="JdbcDataSource"
driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost/blog"
user="blog"
password="12345678"/>

<document>
<entity name="post" pk="ID"
query="select ID,post_title,post_content from wp_posts where post_status='publish'"
deltaImportQuery="select ID,post_title,post_content from wp_posts where ID='${dih.delta.ID}'"
deltaQuery="select ID from wp_posts where post_status='publish' and post_modified_gmt > '${dih.last_index_time}'">

<field column="ID" name="id"/>
<field column="post_title" name="title"/>
<field column="post_content" name="content"/>
</entity>
</document>
</dataConfig>

配置schema.xml

1
2
3
 <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
<field name="title" type="text_general" indexed="true" stored="true" required="true" multiValued="false" />
<field name="content" type="text_general" indexed="true" stored="true" required="true" multiValued="false" />

修改solrconfig.xml

在solrconfig.xml增加
<lib dir="${solr.install.dir:../../../..}/dist/" regex="solr-dataimporthandler-.*\.jar" />,这样就不会报solr.Dataimport Class not found error.

  • 添加jdbc连接mysql

在server/lib里添加mysql-connector-java-5.1.38.jar,我这里下载到的是5.1.38,其它版本的也可以。

  • 新建core.properties

在blog目录下新建core.properties文件,内容为

1
2
3
4
5
6
7
8
#Written by CorePropertiesLocator
#Wed Mar 23 10:55:00 UTC 2016
numShards=1
collection.configName=blog
#name=blog_shard1_replica1
shard=shard1
collection=blog
coreNodeName=core_node1

  • 启动Solr

bin/solr start -s server/solr/blog启动Solr

执行全量索引

命令为http://127.0.0.1:8983/solr/blog/dataimport?command=full-import

执行增量索引

命令为http://127.0.0.1:8983/solr/blog/dataimport?command=delta-import

遇到的问题

  • nohup: can’t detach from console: Inappropriate ioctl for device

这个问题时在搭建SolrCloud时遇到的,在这里不妨说说。在启动zookeeper时,遇到这个问题,网上说时因为在tmux里启动的缘故,于是新开一个终端,启动zookeeper,这次正常启动。

  • /Users/long/program/java/solr-5.5.0/solr/server/logs/solr.log: No such file or directory

执行命令bin/solr start -s server/solr/blog时出现这个错误,莫名奇妙的,我想依然是不能在tmux里执行shell, 于是新开一个终端再次执行,这次正常启动

联系作者