本文主要是介绍Python迁移MySQL数据到MongoDB和Elasticsearch,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在工作中经常遇到一些需要批量导入数据到NoSql中。
比如我们上线一个新的功能,需要把mysql的一些数据批量导入到Elaticsearch或者mongodb中。这里使用Python代码简洁,方便高效。
根据需求变更参数,sql。执行以下python脚本即可。
import pymysql, json
import pymongo
from elasticsearch import Elasticsearch, helpersclass DB:def __init__(self, host='localhost', port=3306, db='', user='', passwd='', charset='utf8'):# 建立连接self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)# 创建游标,操作设置为字典类型self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)def __enter__(self):# 返回游标return self.curdef __exit__(self, exc_type, exc_val, exc_tb):# 提交数据库并执行self.conn.commit()# 关闭游标self.cur.close()# 关闭数据库连接self.conn.close()es = Elasticsearch(hosts=["127.0.0.1:9200"],# 在做任何操作之前,先进行嗅探sniff_on_start=True,# 节点没有响应时,进行刷新,重新连接sniff_on_connection_fail=True,# 每 60 秒刷新一次sniffer_timeout=60)client = pymongo.MongoClient("mongodb://127.0.0.1:27017/")
db_list = client.list_database_names()
db = client['giftConfig']
collection = db['giftInfo']if __name__ == '__main__':with DB(host='127.0.0.1', user='root', passwd='123456', db='test') as db:sql = 'select * from GiftConfig where id < {} order by id desc limit 10'maxId = 201db.execute(sql.format(maxId))while db.rowcount > 0:for row in db:# 字典格式print(row)maxId = row['id']"""mysql 数据转存到es"""# es相同id会覆盖es.index(index="gift_config_index", doc_type="gift_info", id=maxId, body=row)"""mysql数据转存到mongodb """# mongodb 无法保存decimal 需要转换成float 数据库设置id唯一索引 或者row['_id'] = maxId 设置唯一索引 db.giftInfo.createIndex({"id":1},{"unique":true})row['unitPrice'] = float(row['unitPrice'])try:collection.insert_one(row)except:print("mongodb insert error id:%d" %(maxId))db.execute(sql.format(maxId))
这篇关于Python迁移MySQL数据到MongoDB和Elasticsearch的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!