本文主要是介绍Elasticsearch分词插件配置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
- 1、 分词插件配置
- 1.1、[IK分词器地址](https://github.com/infinilabs/analysis-ik/tree/v7.11.1?tab=readme-ov-file)
- 1.2、分词器配置
- 2、分词插件词库配置
- 2.1、使用词库文件
- 2.2、使用远程扩展词(官方推荐)
- 2.3、自定义一个监控线程,从数据中加载
- 2.3.1、新建监控线程:
- 2.3.2、读取数据
- 2.3.3、新建jdbc.properties文件
- 2.3.4、新建表
- 2.4 几种方式对比
前言: Elasticsearch是一个分布式、高扩展、高实时的搜索与数据分析引擎。通常情况下,会使用ES来存储商品、订单、商品评论等数据。
本篇文章主要讲一下es分词插件的配置,以及词库的更新。
1、 分词插件配置
ES本身是支持分词的,但是默认的分词针对中文不太友好,所以通常情况下,我们需要使用中文分词器,常见的分词器可以参考这篇文章:Elasticsearch 中文分词器
本篇文章介绍的是:IK 中文分词器
1.1、IK分词器地址
1.2、分词器配置
解压下载的压缩包,将文件夹名称改为ik,然后将整个文件夹放到plugins目录下,重启es就可以了
2、分词插件词库配置
至于为什么要自定义词库,可以参考下这篇文章 Elasticsearch自定义分词
自定义分词的常见几种配置
2.1、使用词库文件
2.2、使用远程扩展词(官方推荐)
这种方式远离是通过开启一个定时任务,60秒执行一次,从远程仓库查询,根据请求头中返回的参数判断是否更新词库
部分源码截图 Dictionary类
public static synchronized void initial(Configuration cfg) {if (singleton == null) {synchronized (Dictionary.class) {if (singleton == null) {singleton = new Dictionary(cfg);singleton.loadMainDict();singleton.loadSurnameDict();singleton.loadQuantifierDict();singleton.loadSuffixDict();singleton.loadPrepDict();singleton.loadStopWordDict();if(cfg.isEnableRemoteDict()){// 建立监控线程for (String location : singleton.getRemoteExtDictionarys()) {// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);}for (String location : singleton.getRemoteExtStopWordDictionarys()) {pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);}}}
/*** 监控流程:* ①向词库服务器发送Head请求* ②从响应中获取Last-Modify、ETags字段值,判断是否变化* ③如果未变化,休眠1min,返回第①步* ④如果有变化,重新加载词典* ⑤休眠1min,返回第①步*/public void runUnprivileged() {//超时设置RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000).setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();HttpHead head = new HttpHead(location);head.setConfig(rc);//设置请求头if (last_modified != null) {head.setHeader("If-Modified-Since", last_modified);}if (eTags != null) {head.setHeader("If-None-Match", eTags);}CloseableHttpResponse response = null;try {response = httpclient.execute(head);//返回200 才做操作if(response.getStatusLine().getStatusCode()==200){if (((response.getLastHeader("Last-Modified")!=null) && !response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified))||((response.getLastHeader("ETag")!=null) && !response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags))) {// 远程词库有更新,需要重新加载词典,并修改last_modified,eTagsDictionary.getSingleton().reLoadMainDict();last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();}}else if (response.getStatusLine().getStatusCode()==304) {//没有修改,不做操作//noop}else{logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );}} catch (Exception e) {logger.error("remote_ext_dict {} error!",e , location);}finally{try {if (response != null) {response.close();}} catch (IOException e) {logger.error(e.getMessage(), e);}}}}
2.3、自定义一个监控线程,从数据中加载
部分代码:
2.3.1、新建监控线程:
/*** 词典初始化 由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化* 只有当Dictionary类被实际调用时,才会开始载入词典, 这将延长首次分词操作的时间 该方法提供了一个在应用加载阶段就初始化字典的手段* * @return Dictionary*/public static synchronized void initial(Configuration cfg) {if (singleton == null) {synchronized (Dictionary.class) {if (singleton == null) {singleton = new Dictionary(cfg);singleton.loadMainDict();singleton.loadSurnameDict();singleton.loadQuantifierDict();singleton.loadSuffixDict();singleton.loadPrepDict();singleton.loadStopWordDict();if(cfg.isEnableRemoteDict()){// 建立监控线程for (String location : singleton.getRemoteExtDictionarys()) {// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);}for (String location : singleton.getRemoteExtStopWordDictionarys()) {pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);}}// 建立数据库监控线程pool.scheduleAtFixedRate(new DatabaseMonitor(), 10, Long.parseLong(getSingleton().getProperty(DatabaseMonitor.JDBC_UPDATE_INTERVAL)), TimeUnit.SECONDS);}}}}/*** 加载新词条*/public static void addWord(String word) {singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray());}/*** 移除(屏蔽)词条*/public static void disableWord(String word) {singleton._MainDict.disableSegment(word.trim().toLowerCase().toCharArray());}/*** 加载 jdbc.properties*/public void loadJdbcProperties() {Path file = PathUtils.get(getDictRoot(), DatabaseMonitor.PATH_JDBC_PROPERTIES);try {props.load(Files.newInputStream(file.toFile().toPath()));logger.info("====================================jdbc配置文件内容 start====================================");for (Map.Entry<Object, Object> entry : props.entrySet()) {logger.info("key:{} value:{}", entry.getKey(), entry.getValue());}logger.info("====================================jdbc配置文件内容 end====================================");} catch (IOException e) {logger.error("加载jdbc属性配置文件失败 文件名称:{} exMsg:{}",DatabaseMonitor.PATH_JDBC_PROPERTIES,e.getMessage(),e);}
2.3.2、读取数据
public class DatabaseMonitor implements Runnable {private static final Logger logger = ESPluginLoggerFactory.getLogger(DatabaseMonitor.class.getName());public static final String PATH_JDBC_PROPERTIES = "jdbc.properties";private static final String JDBC_URL = "jdbc.url";private static final String JDBC_USERNAME = "jdbc.username";private static final String JDBC_PASSWORD = "jdbc.password";private static final String JDBC_DRIVER = "jdbc.driver";private static final String SQL_UPDATE_MAIN_DIC = "jdbc.update.main.dic.sql";/*** 更新间隔*/public final static String JDBC_UPDATE_INTERVAL = "jdbc.update.interval";private static final Timestamp DEFAULT_LAST_UPDATE = Timestamp.valueOf(LocalDateTime.of(LocalDate.of(2020, 1, 1), LocalTime.MIN));private static Timestamp lastUpdateTimeOfMainDic = null;public String getUrl() {return Dictionary.getSingleton().getProperty(JDBC_URL);}public String getUsername() {return Dictionary.getSingleton().getProperty(JDBC_USERNAME);}public String getPassword() {return Dictionary.getSingleton().getProperty(JDBC_PASSWORD);}public String getDriver() {return Dictionary.getSingleton().getProperty(JDBC_DRIVER);}public String getUpdateMainDicSql() {return Dictionary.getSingleton().getProperty(SQL_UPDATE_MAIN_DIC);}/*** 加载MySQL驱动*/public DatabaseMonitor() {SpecialPermission.check();AccessController.doPrivileged((PrivilegedAction<Void>) () -> {try {Class.forName(getDriver());} catch (ClassNotFoundException e) {logger.error("mysql jdbc driver not found", e);}return null;});}@Overridepublic void run() {SpecialPermission.check();AccessController.doPrivileged((PrivilegedAction<Void>) () -> {// 更新主词典performMainDicUpdate();return null;});}public Connection getConnection() {Connection connection = null;try {connection = DriverManager.getConnection(getUrl(), getUsername(), getPassword());} catch (SQLException e) {logger.error("failed to get connection", e);}return connection;}public void closeConnection(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {logger.error("failed to close Connection", e);}}}public void closeRsAndPs(ResultSet rs, PreparedStatement ps) {if (rs != null) {try {rs.close();} catch (SQLException e) {logger.error("failed to close ResultSet", e);}}if (ps != null) {try {ps.close();} catch (SQLException e) {logger.error("failed to close PreparedStatement", e);}}}/*** 执行主词典更新操作,根据返回结果决定是否继续查询*/public synchronized void performMainDicUpdate() {Connection conn = getConnection();boolean continueQuery = true;while (continueQuery) {continueQuery = updateMainDic(conn);if (!continueQuery) {logger.info("数据处理完毕!");}}closeConnection(conn);}/*** 更新主词典数据* @param conn 数据库连接* @return 是否有新数据返回*/public boolean updateMainDic(Connection conn) {boolean newDataReturned = false;logger.info("开始更新主词汇库");int numberOfAddWords = 0;int numberOfDisableWords = 0;PreparedStatement ps = null;ResultSet rs = null;try {String sql = getUpdateMainDicSql();Timestamp param = lastUpdateTimeOfMainDic == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfMainDic;logger.info("当前更新时间戳 param:{} ",param);ps = conn.prepareStatement(sql);ps.setTimestamp(1, param);rs = ps.executeQuery();while (rs.next()) {newDataReturned = true;String word = rs.getString("word");word = word.trim();if (word.isEmpty()) {continue;}lastUpdateTimeOfMainDic = rs.getTimestamp("updateTime");if (rs.getBoolean("isDeleted")) {// 删除Dictionary.disableWord(word);numberOfDisableWords++;} else {// 添加Dictionary.addWord(word);numberOfAddWords++;}}logger.info("更新主词汇完成 -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);} catch (SQLException e) {newDataReturned = false;logger.error("更新主词汇失败 exMsg:{}",e.getMessage(),e);// 如果捕获到异常,将 lastUpdateTimeOfMainDic 的值减少一些if (lastUpdateTimeOfMainDic != null) {Calendar cal = Calendar.getInstance();cal.setTime(lastUpdateTimeOfMainDic);cal.add(Calendar.MINUTE, -1); // 将时间减少1分钟,可以根据实际需求调整lastUpdateTimeOfMainDic = new Timestamp(cal.getTime().getTime());}}finally {// 关闭 ResultSet、PreparedStatementcloseRsAndPs(rs, ps);}return newDataReturned;}
}
2.3.3、新建jdbc.properties文件
jdbc.url=
jdbc.username=
jdbc.password=
jdbc.driver=com.mysql.cj.jdbc.Driver
jdbc.update.main.dic.sql=SELECT * FROM `cs_es_extra_main` WHERE updateTime > ? order by updateTime asc limit 1000
### 定时任务间隔时间
jdbc.update.interval=60
2.3.4、新建表
CREATE TABLE `cs_es_extra_main` (`id` bigint(20) NOT NULL COMMENT '主键',`word` varchar(255) NOT NULL COMMENT '词',`isDeleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已删除',`updateTime` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '是否已删除 0:未删除 1:已删除',PRIMARY KEY (`id`),UNIQUE KEY `cs_es_extra_main_word_IDX` (`word`) USING BTREE,KEY `es_extra_main_updateTime_IDX` (`updateTime`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='es分词器主分词';
2.4 几种方式对比
-
使用词库文件
优点:这种配置方式比较简单
缺点:但是更新复杂,每次更新需要重启ES(在生产情况下,这个不太现实) -
使用远程词库
优点:更新比较方便,也不用重启ES
缺点:更新有延时,而且数据量比较大的时候,会有一定影响 -
自定义词库更新
优点:更新方便,更新的数据量可配,而且是增量更新。
缺点:需要对ik分词插件进行自定义修改
上面就是关于Elasticsearch分词插件配置,代码仅供参考学习,欢迎留言讨论。后续会将完整的项目代码更新到远程仓库,欢迎关注我的公众号,后续会在公众号提供获取地址。
这篇关于Elasticsearch分词插件配置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!