Wikimedia To Opensearch

2024-05-14 01:28
文章标签 opensearch wikimedia

本文主要是介绍Wikimedia To Opensearch,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概览

  • Wikimedia ⇒ Kafka ⇒ Opensearch
  • Java Library:OKhttp3和OkHttp EventSource;
  • 生产者:Wikimedia:WikimediaChangeHandler和WikimediaChangeProducer;
  • 消费者:Opensearch:OpenSearchConsumer,opensearch-java + httpclient5;
  • https://stream.wikimedia.org/v2/stream/recentchange
  • https://esjewett.github.io/wm-eventsource-demo
  • https://codepen.io/Krinkle/pen/BwEKgW?editors=1010
  • Rest Api使用OpenSearch Dashboard,在线可使用Bonsai.io;

Kafka环境

version: '3.8'
services:kafka:image: apache/kafka:3.7.0container_name: kafkaprivileged: truehostname: kafkaports:- "9092:9092"environment:KAFKA_NODE_ID: 1KAFKA_LOG_DIRS: '/tmp/kafka-log'CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'KAFKA_PROCESS_ROLES: 'broker,controller'KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:default:name: network-commonexternal: true
  • vi /opt/kafka/config/kraft/server.properties
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092

Opensearch

Open Search Prerequisite

# disable memory paging and swapping performance
sudo swapoff -a# edit sysctl config
sudo vi /etc/sysctl.conf# add line to define desired value or change exist
vm.max_map_count=262144# reload kernel parameter using sysctl
sudo sysctl -p# verify change
cat /proc/sys/vm/max_map_count

Open Search Compose

 docker pull opensearchproject/opensearch:1.3.16 && \docker pull opensearchproject/opensearch-dashboards:1.3.16
version: '3.8'
services:opensearch:image: opensearchproject/opensearch:1.3.16container_name: opensearchenvironment:discovery.type: single-nodeplugins.security.disabled: truecompatibility.override_main_response_version: trueports:- "9200:9200"- "9600:9600"opensearch-dashboard:image: opensearchproject/opensearch-dashboards:1.3.16container_name: opensearch-dashboardports:- "5601:5601"environment:OPENSEARCH_HOSTS: '["http://opensearch:9200"]'DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
  • http://192.168.0.123:5601
  • https://192.168.0.123:9200

Producer

Producer Dependency

<properties><okhttp.eventsource>2.7.1</okhttp.eventsource>
</properties><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId>
</dependency><dependency><groupId>com.launchdarkly</groupId><artifactId>okhttp-eventsource</artifactId><version>${okhttp.eventsource}</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

WikimediaChangeHandler

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import java.lang.invoke.MethodHandles;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class WikimediaChangeHandler implements EventHandler {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());String topic;KafkaProducer<String, String> kafkaProducer;public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer,String topic) {this.kafkaProducer = kafkaProducer;this.topic = topic;}@Overridepublic void onOpen() {}@Overridepublic void onClosed() {kafkaProducer.close();}@Overridepublic void onMessage(String event, MessageEvent messageEvent) {logger.error(messageEvent.getData());kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));}@Overridepublic void onComment(String comment) {}@Overridepublic void onError(Throwable t) {logger.error("Stream Reading Failure!", t);}
}

WikimediaChangeProducer

import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.EventHandler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;import java.net.URI;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class WikimediaChangeProducer {public static void main(String[] args) throws InterruptedException {String bootstrapServers = "192.168.0.123:9092";// create Producer propertiesProperties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);String topic = "wikimedia.recentchange";EventHandler eventHandler = new WikimediaChangeHandler(kafkaProducer, topic);String url = "https://stream.wikimedia.org/v2/stream/recentchange";EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));builder.connectTimeout(Duration.ofMinutes(10));//注:需科学上网builder.proxy("127.0.0.1",1080);EventSource eventSource = builder.build();// start the producer in another threadeventSource.start();// we produce for 10 minutes and block the program until thenTimeUnit.MINUTES.sleep(10);}}

Consumer

Consumer Dependency

<properties><opensearch.java>2.10.1</opensearch.java>
</properties><dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>${opensearch.java}</version>
</dependency><dependency><groupId>org.apache.httpcomponents.client5</groupId><artifactId>httpclient5</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

OpenSearchConsumer

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class OpenSearchConsumer {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static OpenSearchClient connect(String scheme,String hostName,int port) {final HttpHost host = new HttpHost(scheme,hostName,port);final ApacheHttpClient5TransportBuilder builder =ApacheHttpClient5TransportBuilder.builder(host);builder.setHttpClientConfigCallback(hcb -> {final PoolingAsyncClientConnectionManager manager =PoolingAsyncClientConnectionManagerBuilder.create().build();return hcb.setConnectionManager(manager);});final OpenSearchTransport transport = builder.build();return new OpenSearchClient(transport);}public static OpenSearchClient connect() {return connect("http","192.168.0.123",9200);}public static boolean exist(OpenSearchClient client,String indexName)throws OpenSearchException, IOException {var existRequest = ExistsRequest.of(fn -> fn.index(indexName));BooleanResponse exist = client.indices().exists(existRequest);return exist.value();}public static void createIndex(OpenSearchClient client,String indexName) throws OpenSearchException, IOException {var exist = exist(client,indexName);if (exist) {System.out.printf("index %s already exist!\n",indexName);} else {var createRequest = new CreateIndexRequest.Builder().index(indexName).build();client.indices().create(createRequest);}}//GET /_cat/indices?vpublic static void deleteIndex(OpenSearchClient client,String indexName) throws OpenSearchException, IOException {var exist = exist(client,indexName);if (!exist) {System.out.printf("index %s not exist!\n",indexName);} else {var deleteRequest = new DeleteIndexRequest.Builder().index(indexName).build();client.indices().delete(deleteRequest);}}public static KafkaConsumer<String,String> createKafkaConsumer(){var boostrapServer = "192.168.0.123:9092";var groupId = "group-wikimedia-opensearch";Properties prop = new Properties();prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,boostrapServer);prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);//earliest,latest etc.prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");return new KafkaConsumer<>(prop);}public static void consume(OpenSearchClient client,String indexName)throws OpenSearchException, IOException {var consumer = createKafkaConsumer();var topic = "wikimedia.recentchange";consumer.subscribe(Arrays.asList(topic));while (true) {var consumerRecord = consumer.poll(Duration.ofMillis(3000));int recordCount = consumerRecord.count();logger.info("receive %d record!",recordCount);for (ConsumerRecord<String,String> cr : consumerRecord) {//send record into OpenSearchIndexData indexData = new IndexData(cr.value());var indexRequest = new IndexRequest.Builder<IndexData>().index(indexName).document(indexData).build();IndexResponse response = client.index(indexRequest);System.out.println(response.id());}}}public static void main(String[] sa) throws OpenSearchException, IOException {var client = connect();var indexName = "wikimedia-opensearch";//createIndex(client, indexName);System.out.println("consuming start...");consume(client,indexName);System.out.println("consuming end...");}static class IndexData {private String wikiMediaValue;public IndexData(String wikiMediaValue) {this.wikiMediaValue = wikiMediaValue;}@Overridepublic String toString() {return String.format("IndexData{wikiMediaValue='%s'}",wikiMediaValue);}public String getWikiMediaValue() {return wikiMediaValue;}public void setWikiMediaValue(String wikiMediaValue) {this.wikiMediaValue = wikiMediaValue;}}}

Testing

Create Topic

  • 创建主题,并启动WikimediaChangeProducer;
  • 注:需科学上网,需科学上网,需科学上网;
./kafka-topics.sh \--bootstrap-server 192.168.0.123:9092  \--topic wikimedia.recentchange --create \--partitions 3 --replication-factor 1
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"93bf8bf8-4c36-4a49-b226-3a9311d2c906","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257037},"id":484017699,"type":"log","namespace":2,"title":"Участник:Sherbek Qarshiyev","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062090,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Sherbek Qarshiyev (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q46898283","request_id":"161daa3c-9f11-44f1-b042-209a3acabcf8","id":"65fb4ae3-86fd-47ab-a85f-f0dd26b04b66","dt":"2024-04-25T12:52:32Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257038},"id":2201214347,"type":"edit","namespace":0,"title":"Q46898283","title_url":"https://www.wikidata.org/wiki/Q46898283","comment":"/* wbsetclaimvalue:1| */ [[Property:P1476]]: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults","timestamp":1714049552,"user":"KrBot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2136975570&oldid=2136975561&rcid=2201214347","minor":false,"patrolled":true,"length":{"old":60872,"new":60869},"revision":{"old":2136975561,"new":2136975570},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Определено значение для утверждения: </span></span> <a href=\"/wiki/Property:P1476\" title=\"название | название произведения (книги, фильма, газетной статьи, произведения исполнительского искусства, веб-сайта)\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"ru\" dir=\"ltr\">название</span> <span class=\"wb-itemlink-id\">(P1476)</span></span></a>: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","request_id":"4c136271-a6cd-4ff6-a6b7-429d769ba5ba","id":"14fcefb7-1f73-40a9-9acc-539d97aa06c3","dt":"2024-04-25T12:52:32Z","domain":"en.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257039},"id":1769512861,"type":"edit","namespace":2,"title":"User:Ali Ahwazi/sandbox2","title_url":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","comment":"","timestamp":1714049552,"user":"Ali Ahwazi","bot":false,"notify_url":"https://en.wikipedia.org/w/index.php?diff=1220709553&oldid=1220709505","minor":false,"length":{"old":26671,"new":31548},"revision":{"old":1220709505,"new":1220709553},"server_url":"https://en.wikipedia.org","server_name":"en.wikipedia.org","server_script_path":"/w","wiki":"enwiki","parsedcomment":""}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","request_id":"be816a4f-be27-4c49-830a-31161665401f","id":"ec47eca3-b6a4-4b4f-ac71-b44369de940d","dt":"2024-04-25T12:52:33Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257040},"id":519472837,"type":"edit","namespace":100,"title":"Portail:Châteaux/Articles récents","title_url":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","comment":"+ [[Château de Mielmont]]","timestamp":1714049553,"user":"OrlodrimBot","bot":true,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561930&oldid=214559720&rcid=519472837","minor":false,"patrolled":true,"length":{"old":779,"new":779},"revision":{"old":214559720,"new":214561930},"server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"+ <a href=\"/wiki/Ch%C3%A2teau_de_Mielmont\" title=\"Château de Mielmont\">Château de Mielmont</a>"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"b8ab3286-f69f-466a-9a00-c5c12c176001","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257041},"id":484017700,"type":"log","namespace":2,"title":"Участник:Виктория Никитенко","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062091,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Виктория Никитенко (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}20:52:31.937 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","request_id":"739f7d34-fd23-4b37-ab77-87c95663aeda","id":"34ab12f5-7256-48e5-a4f8-b40bf95316ad","dt":"2024-04-25T12:52:33Z","domain":"oc.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257042},"id":10815379,"type":"new","namespace":0,"title":"Pairac lo Chasteu","title_url":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","comment":"Redireccion cap a [[Pairac (lo Chasteu)]]","timestamp":1714049553,"user":"PairacLoChasteu","bot":false,"notify_url":"https://oc.wikipedia.org/w/index.php?oldid=2436522&rcid=10815379","minor":false,"patrolled":false,"length":{"new":32},"revision":{"new":2436522},"server_url":"https://oc.wikipedia.org","server_name":"oc.wikipedia.org","server_script_path":"/w","wiki":"ocwiki","parsedcomment":"Redireccion cap a <a href=\"/wiki/Pairac_(lo_Chasteu)\" title=\"Pairac (lo Chasteu)\">Pairac (lo Chasteu)</a>"}
20:52:31.938 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","request_id":"7d6530d5-10a3-4628-a101-bc2e75b9a92f","id":"2eb4865e-b4ee-407c-9e19-b871967ed9e1","dt":"2024-04-25T12:52:30Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257043},"id":519472838,"type":"categorize","namespace":14,"title":"Catégorie:Article à référence nécessaire","title_url":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","comment":"[[:20e armée (Union soviétique)]] ajoutée à la catégorie","timestamp":1714049550,"user":"Le Petit Chat","bot":false,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561929&oldid=209346180&rcid=519472838","server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"<a href=\"/wiki/20e_arm%C3%A9e_(Union_sovi%C3%A9tique)\" title=\"20e armée (Union soviétique)\">20e armée (Union soviétique)</a> ajoutée à la catégorie"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","request_id":"3d4237d1-1994-4c65-b41a-84ee1f1a05c6","id":"d9e3f133-2e22-4022-9449-65a78ed83452","dt":"2024-04-25T12:52:31Z","domain":"commons.wikimedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257044},"id":2478318778,"type":"categorize","namespace":14,"title":"Category:Milford, Derbyshire","title_url":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","comment":"[[:File:The King William pub - geograph.org.uk - 5560373.jpg]] added to category","timestamp":1714049551,"user":"WereSpielChequers","bot":false,"notify_url":"https://commons.wikimedia.org/w/index.php?diff=871189763&oldid=871189716&rcid=2478318778","server_url":"https://commons.wikimedia.org","server_name":"commons.wikimedia.org","server_script_path":"/w","wiki":"commonswiki","parsedcomment":"<a href=\"/wiki/File:The_King_William_pub_-_geograph.org.uk_-_5560373.jpg\" title=\"File:The King William pub - geograph.org.uk - 5560373.jpg\">File:The King William pub - geograph.org.uk - 5560373.jpg</a> added to category"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","request_id":"c408c6e9-7e92-4f96-8a99-1d518f3af5ed","id":"e94ece30-3416-41e9-860d-d3547f2248d6","dt":"2024-04-25T12:52:33Z","domain":"ko.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257045},"type":"log","namespace":0,"title":"조상환","title_url":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","comment":"","timestamp":1714049553,"user":"Cho Sang Hwan","bot":false,"log_id":0,"log_type":"abusefilter","log_action":"hit","log_params":{"action":"edit","filter":"71","actions":"tag","log":1521905},"log_action_comment":"Cho Sang Hwan님이 [[조상환]]에서 \"edit\" 동작을 하여 [[특수:편집필터/71|필터 71]]이(가) 작동하였습니다. 조치: 태그 ([[특수:편집필터기록/1521905|자세한 사항]])","server_url":"https://ko.wikipedia.org","server_name":"ko.wikipedia.org","server_script_path":"/w","wiki":"kowiki","parsedcomment":""}
20:52:31.940 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","request_id":"d2dfebf5-df74-43d5-860e-be964ee93420","id":"27e197b2-cfb1-4c65-9717-09bb25438f08","dt":"2024-04-25T12:52:33Z","domain":"os.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257046},"id":1911685,"type":"new","namespace":14,"title":"Категори:Хуссар Голландийы чи амардис, уыдон","title_url":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","comment":"Ног фарс, йæ код райдайы афтæ: «[[Категори:Нидерландты чи амардис, уыдон]] [[Категори:Хуссар Голландийы зындгонд адæм|Амард]]»","timestamp":1714049553,"user":"Taamu","bot":false,"notify_url":"https://os.wikipedia.org/w/index.php?oldid=558822&rcid=1911685","minor":false,"patrolled":true,"length":{"new":167},"revision":{"new":558822},"server_url":"https://os.wikipedia.org","server_name":"os.wikipedia.org","server_script_path":"/w","wiki":"oswiki","parsedcomment":"Ног фарс, йæ код райдайы афтæ: «<a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%9D%D0%B8%D0%B4%D0%B5%D1%80%D0%BB%D0%B0%D0%BD%D0%B4%D1%82%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD\" title=\"Категори:Нидерландты чи амардис, уыдон\">Категори:Нидерландты чи амардис, уыдон</a> <a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D0%B7%D1%8B%D0%BD%D0%B4%D0%B3%D0%BE%D0%BD%D0%B4_%D0%B0%D0%B4%C3%A6%D0%BC\" title=\"Категори:Хуссар Голландийы зындгонд адæм\">Амард»</a>"}……

Consume Message

  • 启动OpenSearchConsumer
# 此步骤可选
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \--topic wikimedia.recentchange --from-beginning
GET /_cat/indices?vGET _search
{"query": {"match_all": {}}
}GET /index_name/_search
{"query": {"match_all": {}}
}

Outro

在这里插入图片描述

这篇关于Wikimedia To Opensearch的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/987367

相关文章

搜索引擎:OpenSearch【结构化数据搜索托管服务】【特点:单应用亿级别文档搜索 ,毫秒级别查询延迟 ,万级别QPS】

阿里云开放搜索(OpenSearch)是一款结构化数据搜索托管服务,其能够提供简单、高效、稳定、低成本和可扩展的搜索解决方案。OpenSearch以平台服务化的形式,将专业搜索技术简单化、低门槛化和低成本化,让搜索引擎技术不再成为客户的业务瓶颈,以低成本实现产品搜索功能并快速迭代。本文将为大家介绍OpenSearch的最新推出的电商查询语义理解和搜索算法平台两个新功能。 参考资料:

OpenSearch的快照还原

本次测试选择把索引快照备份到Amazon S3,所以需要使用S3 repository plugin,这个插件添加了对使用 Amazon S3 作为快照/恢复存储库的支持。 OpenSearch集群自带了这个插件,所以无需额外安装。 由于需要和Amazon Web Services打交道,所以我们需要设置IAM凭证,这个插件可以从EC2 IAM instance profile,ECS tas

【DevOps】OpenSearch最大 JVM 内存压力是什么意思?

一、OpenSearch最大 JVM 内存压力是什么意思 在 AWS OpenSearch Service(以前称为 Amazon Elasticsearch Service)中,“最大 JVM 内存压力”指标反映了 Java 虚拟机(JVM)堆内存的使用情况,这是评估集群健康的一个关键指标。这个指标显示了堆内存使用接近其最大限制的程度,是了解集群是否正面临内存溢出风险的重要依据。 详细解释

docker部署opensearch —— 筑梦之路

OpenSearch 简介 •OpenSearch 是一款开源的分布式搜索引擎(从 ElasticSearch 特定版本分叉而来),可以执行快速、可扩展的全文搜索、应用程序和基础设施监控、安全和事件信息管理、运营健康跟踪等用例。 •OpenSearch 具有多种功能和插件,可以帮助索引、保护、监控和分析数据。 •OpenSearch 包含一个演示配置,以便您可以快速启动和运行,但在生产环境中使用

docker compose部署opensearch集群

docker compose 配置 假设有两台电脑 A电脑的ip为192.168.1.100 B电脑的ip为192.168.1.103 A电脑的docker compose 配置 version: '3'services:opensearch:image: opensearchproject/opensearch:2.1.0container_name: opensearch-node-1en

Azure AD使用SAML登录OpenSearch

Azure AD的配置如下: 登录 Global Azure 控制台,选择并切换至 Azure Active Directory 服务,在左侧的侧边栏选择 Enterprise applications,All Applications 页面点击 New application. 选择 Create your own application,命名,点击 Create 完成创建。 点

阿里云OpenSearch-LLM智能问答故障的一天

上周五使用阿里云开放搜索问答版时,故障了一整天,可能这个服务使用的人比较少,没有什么消息爆出来,特此记录下这几天的阿里云处理过程,不免让人怀疑阿里云整体都外包出去了,反应迟钝,水平业余,却又很在意KPI。 1. 什么是智能开放搜索OpenSearch 在抨击阿里云服务之前,还是要肯定open search问答版挺好用的,解决了公司想要根据内部知识内容搭建大模型,进行内部AI的数据问答,在这

阿里云OpenSearch-LLM智能问答故障的一天

上周五使用阿里云开放搜索问答版时,故障了一整天,可能这个服务使用的人比较少,没有什么消息爆出来,特此记录下这几天的阿里云处理过程,不免让人怀疑阿里云整体都外包出去了,反应迟钝,水平业余,却又很在意KPI。 1. 什么是智能开放搜索OpenSearch 在抨击阿里云服务之前,还是要肯定open search问答版挺好用的,解决了公司想要根据内部知识内容搭建大模型,进行内部AI的数据问答,在这

本地使用 docker 运行OpenSearch + Dashboard + IK 分词插件

准备基础镜像 注意一定要拉取和当前 IK 分词插件版本一致的 OpenSearch 镜像: https://github.com/aparo/opensearch-analysis-ik/releases 写这篇文章的时候 IK 最新版本 2.11.0, 而 dockerhub 上 OpenSearch 最新版是 2.11.1 如果版本不匹配的话是不能用的, 小版本号对不上也不行! 已经踩过坑

AWS向量数据库Amazon OpenSearch Service使用测评

前言 在大模型盛行的当今,选择适宜的数据库显得尤为重要。因为你需要面对海量训练数据,快速的检索至关紧要,以及对于存储的要求也是至关重要的。对于海量的数据查询和存储是需要巨大的算力支持。向量数据库常用在一些图像文本或者视频的生成中进行训练,快速的检索能提高程序响应的速度,对于优化算法也是至关重要的。 本文主要是针对AWS向量数据库进行的测评感受试用总结,看到每月最多 750 小时免费使用