分享MSSQL、MySql、Oracle的大数据批量导入方法及编程手法细节

本文主要是介绍分享MSSQL、MySql、Oracle的大数据批量导入方法及编程手法细节,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1:MSSQL

SQL语法篇:

BULK INSERT  

   [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]  

      FROM 'data_file'  

     [ WITH  

    (  

   [ [ , ] BATCHSIZE = batch_size ]  

   [ [ , ] CHECK_CONSTRAINTS ]  

   [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]  

   [ [ , ] DATAFILETYPE =  

      { 'char' | 'native'| 'widechar' | 'widenative' } ]  

   [ [ , ] FIELDTERMINATOR = 'field_terminator' ]  

   [ [ , ] FIRSTROW = first_row ]  

   [ [ , ] FIRE_TRIGGERS ]  

   [ [ , ] FORMATFILE = 'format_file_path' ]  

   [ [ , ] KEEPIDENTITY ]  

   [ [ , ] KEEPNULLS ]  

   [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]  

   [ [ , ] LASTROW = last_row ]  

   [ [ , ] MAXERRORS = max_errors ]  

   [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]  

   [ [ , ] ROWS_PER_BATCH = rows_per_batch ]  

   [ [ , ] ROWTERMINATOR = 'row_terminator' ]  

   [ [ , ] TABLOCK ]  

   [ [ , ] ERRORFILE = 'file_name' ]  

    )]  

SQL示例:

1

2

3

4

5

6

bulk insert 表名  from 'D:\mydata.txt'

with

 (fieldterminator=',',

 rowterminator='\n',

 check_constraints)

select from 表名

由于C#提供了SqlBulkCopy,所以非DBA的我们,更多会通过程序来调用:

C#代码篇:

C#代码调用示例及细节,以下代码摘录自CYQ.Data:

复制代码

using (SqlBulkCopy sbc = new SqlBulkCopy(con, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, sqlTran)){sbc.BatchSize = 100000;sbc.DestinationTableName = SqlFormat.Keyword(mdt.TableName, DalType.MsSql);sbc.BulkCopyTimeout = AppConfig.DB.CommandTimeout;foreach (MCellStruct column in mdt.Columns){sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);}sbc.WriteToServer(mdt);}

复制代码

有5个细节:

1:事务:

如果只是单个事务,构造函数可以是链接字符串。

如果需要和外部合成一个事务(比如先删除,再插入,这在同一个事务中)

就需要自己构造Connection对象和Transaction,在上下文中传递来处理。

2:插入是否引发触发器

通过SqlBulkCopyOptions.FireTriggers 引入

3:其它:批量数、超时时间、是否写入主键ID。

可能引发的数据库Down机的情况:

在历史的过程中,我遇到过的一个大坑是:

当数据的长度过长,数据的字段过短,产生数据二进制截断时,数据库服务竟然停掉了(也许是特例,也许不是)。

所以小心使用,尽力做好对外部数据做好数据长度验证。

2:MySql

关于MySql的批量,这是一段悲催的往事,有几个坑,直到今天,才发现并解决了。

SQL语法篇:

LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'data.txt'

    [REPLACE | IGNORE]

    INTO TABLE tbl_name

    [FIELDS

        [TERMINATED BY 'string']

        [[OPTIONALLY] ENCLOSED BY 'char']

        [ESCAPED BY 'char' ]

    ]

    [LINES

        [STARTING BY 'string']

        [TERMINATED BY 'string']

    ]

    [IGNORE number LINES]

    [(col_name_or_user_var,...)]

    [SET col_name = expr,...)]

示例篇:

1

2

LOAD DATA LOCAL INFILE 'C:\\Users\\cyq\\AppData\\Local\\Temp\\BulkCopy.csv' INTO TABLE `BulkCopy` CHARACTER SET utf8 FIELDS TERMINATED BY '$,$' LINES TERMINATED BY '

' (`ID`,`Name`,`CreateTime`,`Sex`)

虽然MySql.Data.dll 提供了MySqlBulkLoader,但是看源码只是生成了个Load Data 并用ADO.NET执行,

核心大坑的生成*.csv数据文件的竟然没提供,所以自己生成语句并执行就好了,不需要用它。

C#代码篇:

以下代码摘自CYQ.Data,是一段今天才修正好的代码:

复制代码

 private static string MDataTableToFile(MDataTable dt, bool keepID, DalType dalType){string path = Path.GetTempPath() + dt.TableName + ".csv";using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false))){MCellStruct ms;string value;foreach (MDataRow row in dt.Rows){for (int i = 0; i < dt.Columns.Count; i++){#region 设置值ms = dt.Columns[i];if (!keepID && ms.IsAutoIncrement){continue;}else if (dalType == DalType.MySql && row[i].IsNull){sw.Write("\\N");//Mysql用\N表示null值。}else{value = row[i].ToString();if (ms.SqlType == SqlDbType.Bit){int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;if (dalType == DalType.MySql){byte[] b = new byte[1];b[0] = (byte)v;value = System.Text.Encoding.UTF8.GetString(b);//mysql必须用字节存档。}else{value = v.ToString();}}else{value = value.Replace("\\", "\\\\");//处理转义符号}sw.Write(value);}if (i != dt.Columns.Count - 1)//不是最后一个就输出{sw.Write(AppConst.SplitChar);}#endregion}sw.WriteLine();}}if (Path.DirectorySeparatorChar == '\\'){path = path.Replace(@"\", @"\\");}return path;}

复制代码

以上代码是产生一个csv文件,用于被调用,有两个核心的坑,费了我不少时间:

1:Bit类型数据导不进去?

2:第1行数据自增ID被重置为1?

这两个问题,网上搜不到答案,放纵到今天,觉的应该解决了,然后就把它解决了。

解决的思路是这样的:

A:先用Load Data OutFile导出一个文件,再用Load Data InFile导入文件。

一开始我用记事本打开看了一下,又顺手Ctrl+S了一下,结果发现问题和我的一样,让我怀疑竟然

直到今天,重新导出,中间不看了,直接导入,发现它竟然又正常的,于是,思维一转:

B:把自己生成的文件和命令产生的文件,进行了十六进制比对,结果发现:

Bit类型自己生成的的数据:是0,1,在十六进制下显示是30、31。

命令产生的数据在十六进制是00、01,查了下资料,发现MySql的Bit存档的Bit是二进制。

于是,把0,1用字节表示,再转字符串,再存档,就好了。

于是这么一段代码产生了(网上的DataTable转CSV代码都是没处理的,都不知道他们是怎么跑的,难道都没有定义Bit类型?):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

if (ms.SqlType == SqlDbType.Bit)

{

     int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;

     if (dalType == DalType.MySql)

     {

           byte[] b = new byte[1];

           b[0] = (byte)v;

           value = System.Text.Encoding.UTF8.GetString(b);//mysql必须用字节存档。

       }

      else

       {

            value = v.ToString();

       }

}

另外关于Null值,用\N表示。

解决完第一个问题,剩下就是第二个问题了,为什么第一个行代码的主键会被置为1?

还是比对十六进制,结果惊人的发现:

是BOM头,让它错识别了第一个主键值,所以被忽略主键,用了第1个自增值1替代了。

这也解释了为什么只要重新保存的数据都有Bug的原因。

于是,解决的方法就是StreaWrite的时候,不生成BOM头,怎么处理呢?

于是就有了以下的代码:

1

2

3

4

using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))

{

       ...................

}

通过New一个Encoding,并指定参数为false,替代我们常规的System.Text.Encoding.UTF8Encoding。

这些细节很隐秘,不说你都猜不道。。。

3:Oracle

SQL语法篇

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

LOAD[DATA]

[ { INFILE | INDDN } {file | * }

[STREAM | RECORD | FIXED length [BLOCKSIZE size]|

VARIABLE [length] ]

[ { BADFILE | BADDN } file ]

{DISCARDS | DISCARDMAX} integr ]

[ {INDDN | INFILE} . . . ]

[ APPEND | REPLACE INSERT ]

[RECLENT integer]

[ { CONCATENATE integer |

CONTINUEIF { [THIS | NEXT] (start[: end])LAST }

Operator { 'string' | X 'hex' } } ]

INTO TABLE [user.]table

[APPEND | REPLACE|INSERT]

[WHEN condition [AND condition]...]

[FIELDS [delimiter] ]

(

column {

RECNUM | CONSTANT value |

SEQUENCE ( { integer MAX |COUNT} [, increment] ) |

[POSITION ( { start [end] | * [ + integer] }

) ]

datatype

[TERMINATED [ BY ] {WHITESPACE| [X] 'character' } ]

[ [OPTIONALLY] ENCLOSE[BY] [X]'charcter']

[NULLIF condition ]

[DEFAULTIF condotion]

}

[ ,...]

)

以上配置存档成一个CTL文件,再由以下的命令调用:

1

Sqlldr userid=用户名/密码@数据库 control=文件名.ctl

C#语法篇:

.NET里大概有三种操作Oracle的手法:

1:System.Data.OracleClient (需要安装客户端)没有带批量方法(还区分x86和x64)。

2:Oracle.DataAccess  (需要安装客户端)带批量方法(也区分x86和x64)。

3:Oracle.ManagedDataAccess (不需要安装客户端)没带批量方法(不区分x86和x64,但仅支持.NET 4.0或以上)

Oracle.DataAccess 带的批量方法叫:OracleBulkCopy,由于使用方式和SqlBulkCopy几乎一致,就不介绍了。

如果调用程序所在的服务器安装了Oracle客户端,可以进行以下方法的调用:

流程如下:

1:产生*.cvs数据文件,见MySql中的代码,一样用的。

2:产生*.ctl控制文件,把生成的Load Data 语句存档成一个*.ctl文件即可。

3:用sqlidr.exe执行CTL文件,这里悲催的一点是,不能用ADO.NET调用,只能用进程调用,所以,这个批量只能单独使用。

调用进程的相关代码:

复制代码

 bool hasSqlLoader = false;private bool HasSqlLoader() //检测是否安装了客户端。{hasSqlLoader = false;Process proc = new Process();proc.StartInfo.FileName = "sqlldr";proc.StartInfo.CreateNoWindow = true;proc.StartInfo.UseShellExecute = false;proc.StartInfo.RedirectStandardOutput = true;proc.OutputDataReceived += new DataReceivedEventHandler(proc_OutputDataReceived);proc.Start();proc.BeginOutputReadLine();proc.WaitForExit();return hasSqlLoader;}void proc_OutputDataReceived(object sender, DataReceivedEventArgs e){if (!hasSqlLoader){hasSqlLoader = e.Data.StartsWith("SQL*Loader:");}}//已经实现,但没有事务,所以暂时先不引入。private bool ExeSqlLoader(string arg){try{Process proc = new Process();proc.StartInfo.FileName = "sqlldr";proc.StartInfo.Arguments = arg;proc.Start();proc.WaitForExit();return true;}catch{}return false;}

复制代码

总结:

随着大数据的普及,数据间的批量移动必然越来频繁的被涉及,所以不管是用SQL脚本,还是自己写代码,或是用DBImport工具,都将成必备技能之一了!

鉴于此,分享一下我在这一块费过的力和填过的坑,供大伙参考!

一条sql 在MySQL中是如何执行的_一条sql语句在mysql中如何执行的-CSDN博客  https://blog.csdn.net/m0_66572126/article/details/141310260?spm=1001.2100.3001.7377&utm_medium=distribute.pc_feed_blog_category.none-task-blog-classify_tag-4-141310260-null-null.nonecase&depth_1-utm_source=distribute.pc_feed_blog_category.none-task-blog-classify_tag-4-141310260-null-null.nonecase

不支持?

这篇关于分享MSSQL、MySql、Oracle的大数据批量导入方法及编程手法细节的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1147567

相关文章

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd