Flink Sql Redis Connector

2024-06-20 21:12

本文主要是介绍Flink Sql Redis Connector,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (`order_id` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hgetall','key' = 'orders'
);select * from orders

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (`order_number` BIGINT,`price` DECIMAL(32,2),`order_time` TIMESTAMP(3),PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);2. define redis sink table CREATE TABLE orders (`order_number` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hmset','key' = 'orders'
);3. insert data to redis sink table (cast data type to string)insert into redis_sinkselectcast(order_number as STRING) order_number,cast(price as STRING) price,cast(order_time as STRING) order_timefrom orders

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {private ReadableConfig options;public RedisSourceSinkFactory(){}public RedisSourceSinkFactory(ReadableConfig options){this.options = options;}//DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();options = helper.getOptions();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();return new RedisDynamicTableSource(options,columnNames,primaryKey);}/DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options,columnNames,primaryKey);}@Overridepublic String factoryIdentifier() {return "redis";}//sql connector 必填项@Overridepublic Set<ConfigOption<?>> requiredOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.PASSWORD);options.add(RedisOptions.KEY);options.add(RedisOptions.MODE);return options;}//sql connector 选填项@Overridepublic Set<ConfigOption<?>> optionalOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.SINGLE_HOST);options.add(RedisOptions.SINGLE_PORT);options.add(RedisOptions.CLUSTER_NODES);options.add(RedisOptions.FIELD);options.add(RedisOptions.CURSOR);options.add(RedisOptions.EXPIRE);options.add(RedisOptions.COMMAND);options.add(RedisOptions.START);options.add(RedisOptions.END);options.add(RedisOptions.CONNECTION_MAX_TOTAL);options.add(RedisOptions.CONNECTION_MAX_IDLE);options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);options.add(RedisOptions.CONNECTION_TIMEOUT_MS);options.add(RedisOptions.TTL_SEC);options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);return options;}

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;import java.util.List;public class RedisDynamicTableSource implements ScanTableSource {private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic DynamicTableSource copy() {return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table source";}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.all();}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);return SourceFunctionProvider.of(redisSourceFunction,false);}
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;import java.util.*;public class RedisSourceFunction extends RichSourceFunction<RowData>{private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private Jedis jedis;private JedisCluster jedisCluster;private String value;private String field;private String[] fields;private String cursor;private Integer start;private Integer end;private String[] keySplit;private static int position = 1;private GenericRowData rowData;public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void run(SourceContext<RowData> ctx) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String command = options.get(RedisOptions.COMMAND);// judge if command is redis set data command and stop methodList<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);if(sourceCommand.contains(command.toUpperCase())){ return;}Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedis.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedis.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedis.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedis.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedis.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedis.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedisCluster.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedisCluster.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedisCluster.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedisCluster.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedisCluster.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedisCluster.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void cancel() {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);return SinkFunctionProvider.of(myRedisSinkFunction);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table sink";}
}
package org.apache.flink.sink;import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;import java.util.List;public class RedisSinkFunction extends RichSinkFunction<RowData>{private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private String fields;private Jedis jedis;private JedisCluster jedisCluster;private String[] fieldsArr;private StringBuffer redisTableKey;private String value;public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void invoke(RowData rowData, Context context) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String command = options.get(RedisOptions.COMMAND);Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedis.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}if (i != primaryKey.size() -1){redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}}for (int i = 1; i < columns.size(); i++) {if (!primaryKey.contains(columns.get(i))){value = rowData.getString(i).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedis.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedis.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedis.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedisCluster.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}for (int i = 1; i < columns.size(); i++) {value = rowData.getString(i).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedisCluster.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedisCluster.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedisCluster.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void close() throws Exception {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

这篇关于Flink Sql Redis Connector的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1079265

相关文章

Mysql虚拟列的使用场景

《Mysql虚拟列的使用场景》MySQL虚拟列是一种在查询时动态生成的特殊列,它不占用存储空间,可以提高查询效率和数据处理便利性,本文给大家介绍Mysql虚拟列的相关知识,感兴趣的朋友一起看看吧... 目录1. 介绍mysql虚拟列1.1 定义和作用1.2 虚拟列与普通列的区别2. MySQL虚拟列的类型2

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

MySQL中时区参数time_zone解读

《MySQL中时区参数time_zone解读》MySQL时区参数time_zone用于控制系统函数和字段的DEFAULTCURRENT_TIMESTAMP属性,修改时区可能会影响timestamp类型... 目录前言1.时区参数影响2.如何设置3.字段类型选择总结前言mysql 时区参数 time_zon

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

使用SQL语言查询多个Excel表格的操作方法

《使用SQL语言查询多个Excel表格的操作方法》本文介绍了如何使用SQL语言查询多个Excel表格,通过将所有Excel表格放入一个.xlsx文件中,并使用pandas和pandasql库进行读取和... 目录如何用SQL语言查询多个Excel表格如何使用sql查询excel内容1. 简介2. 实现思路3

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

redis-cli命令行工具的使用小结

《redis-cli命令行工具的使用小结》redis-cli是Redis的命令行客户端,支持多种参数用于连接、操作和管理Redis数据库,本文给大家介绍redis-cli命令行工具的使用小结,感兴趣的... 目录基本连接参数基本连接方式连接远程服务器带密码连接操作与格式参数-r参数重复执行命令-i参数指定命

深入理解Redis大key的危害及解决方案

《深入理解Redis大key的危害及解决方案》本文主要介绍了深入理解Redis大key的危害及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、背景二、什么是大key三、大key评价标准四、大key 产生的原因与场景五、大key影响与危