博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
几种数据库的大数据批量插入
阅读量:6324 次
发布时间:2019-06-22

本文共 12055 字,大约阅读时间需要 40 分钟。

在之前只知道SqlServer支持数据批量插入,殊不知道Oracle、SQLite和MySql也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法。

    首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了。

 

    
///
 
<summary>
    
///
 提供数据批量处理的方法。
    
///
 
</summary>
    
public 
interface IBatcherProvider : IProviderService
    {
        
///
 
<summary>
        
///
 将 
<see cref="DataTable"/>
 的数据批量插入到数据库中。
        
///
 
</summary>
        
///
 
<param name="dataTable">
要批量插入的 
<see cref="DataTable"/>
</param>
        
///
 
<param name="batchSize">
每批次写入的数据量。
</param>
        
void Insert(DataTable dataTable, 
int batchSize = 
10000);
    }

 

    一、SqlServer数据批量插入

    SqlServer的批量插入很简单,使用SqlBulkCopy就可以,以下是该类的实现:

 

    
///
 
<summary>
    
///
 为 System.Data.SqlClient 提供的用于批量操作的方法。
    
///
 
</summary>
    
public 
sealed 
class MsSqlBatcher : IBatcherProvider
    {
        
///
 
<summary>
        
///
 获取或设置提供者服务的上下文。
        
///
 
</summary>
        
public ServiceContext ServiceContext { 
get
set; }
        
///
 
<summary>
        
///
 将 
<see cref="DataTable"/>
 的数据批量插入到数据库中。
        
///
 
</summary>
        
///
 
<param name="dataTable">
要批量插入的 
<see cref="DataTable"/>
</param>
        
///
 
<param name="batchSize">
每批次写入的数据量。
</param>
        
public 
void Insert(DataTable dataTable, 
int batchSize = 
10000)
        {
            Checker.ArgumentNull(dataTable, 
"
dataTable
");
            
if (dataTable.Rows.Count == 
0)
            {
                
return;
            }
            
using (
var connection = (SqlConnection)ServiceContext.Database.CreateConnection())
            {
                
try
                {
                    connection.TryOpen();
                    
//
给表名加上前后导符
                    
var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);
                    
using (
var bulk = 
new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, 
null)
                        {
                            DestinationTableName = tableName, 
                            BatchSize = batchSize
                        })
                    {
                        
//
循环所有列,为bulk添加映射
                        dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);
                        bulk.WriteToServer(dataTable);
                        bulk.Close();
                    }
                }
                
catch (Exception exp)
                {
                    
throw 
new BatcherException(exp);
                }
                
finally
                {
                    connection.TryClose();
                }
            }
        }
    }

 

     SqlBulkCopy的ColumnMappings中列的名称受大小写敏感限制,因此在构造DataTable的时候应请注意列名要与表一致。

     以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。

 

     二、Oracle数据批量插入

     System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。

 

    
///
 
<summary>
    
///
 Oracle.Data.Access 组件提供的用于批量操作的方法。
    
///
 
</summary>
    
public 
sealed 
class OracleAccessBatcher : IBatcherProvider
    {
        
///
 
<summary>
        
///
 获取或设置提供者服务的上下文。
        
///
 
</summary>
        
public ServiceContext ServiceContext { 
get
set; }
        
///
 
<summary>
        
///
 将 
<see cref="DataTable"/>
 的数据批量插入到数据库中。
        
///
 
</summary>
        
///
 
<param name="dataTable">
要批量插入的 
<see cref="DataTable"/>
</param>
        
///
 
<param name="batchSize">
每批次写入的数据量。
</param>
        
public 
void Insert(DataTable dataTable, 
int batchSize = 
10000)
        {
            Checker.ArgumentNull(dataTable, 
"
dataTable
");
            
if (dataTable.Rows.Count == 
0)
            {
                
return;
            }
            
using (
var connection = ServiceContext.Database.CreateConnection())
            {
                
try
                {
                    connection.TryOpen();
                    
using (
var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
                    {
                        
if (command == 
null)
                        {
                            
throw 
new BatcherException(
new ArgumentException(
"
command
"));
                        }
                        command.Connection = connection;
                        command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);
                        command.ExecuteNonQuery();
                    }
                }
                
catch (Exception exp)
                {
                    
throw 
new BatcherException(exp);
                }
                
finally
                {
                    connection.TryClose();
                }
            }
        }
        
///
 
<summary>
        
///
 生成插入数据的sql语句。
        
///
 
</summary>
        
///
 
<param name="database"></param>
        
///
 
<param name="command"></param>
        
///
 
<param name="table"></param>
        
///
 
<returns></returns>
        
private 
string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
        {
            
var names = 
new StringBuilder();
            
var values = 
new StringBuilder();
            
//
将一个DataTable的数据转换为数组的数组
            
var data = table.ToArray();
            
//
设置ArrayBindCount属性
            command.GetType().GetProperty(
"
ArrayBindCount
").SetValue(command, table.Rows.Count, 
null);
            
var syntax = database.Provider.GetService<ISyntaxProvider>();
            
for (
var i = 
0; i < table.Columns.Count; i++)
            {
                
var column = table.Columns[i];
                
var parameter = database.Provider.DbProviderFactory.CreateParameter();
                
if (parameter == 
null)
                {
                    
continue;
                }
                parameter.ParameterName = column.ColumnName;
                parameter.Direction = ParameterDirection.Input;
                parameter.DbType = column.DataType.GetDbType();
                parameter.Value = data[i];
                
if (names.Length > 
0)
                {
                    names.Append(
"
,
");
                    values.Append(
"
,
");
                }
                names.AppendFormat(
"
{0}
", DbUtility.FormatByQuote(syntax, column.ColumnName));
                values.AppendFormat(
"
{0}{1}
", syntax.ParameterPrefix, column.ColumnName);
                command.Parameters.Add(parameter);
            }
            
return 
string.Format(
"
INSERT INTO {0}({1}) VALUES ({2})
", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
        }
    }

     以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。

 

     三、SQLite数据批量插入

     SQLite的批量插入只需开启事务就可以了,这个具体的原理不得而知。

 

    
public 
sealed 
class SQLiteBatcher : IBatcherProvider
    {
        
///
 
<summary>
        
///
 获取或设置提供者服务的上下文。
        
///
 
</summary>
        
public ServiceContext ServiceContext { 
get
set; }
        
///
 
<summary>
        
///
 将 
<see cref="DataTable"/>
 的数据批量插入到数据库中。
        
///
 
</summary>
        
///
 
<param name="dataTable">
要批量插入的 
<see cref="DataTable"/>
</param>
        
///
 
<param name="batchSize">
每批次写入的数据量。
</param>
        
public 
void Insert(DataTable dataTable, 
int batchSize = 
10000)
        {
            Checker.ArgumentNull(dataTable, 
"
dataTable
");
            
if (dataTable.Rows.Count == 
0)
            {
                
return;
            }
            
using (
var connection = ServiceContext.Database.CreateConnection())
            {
                DbTransaction transcation = 
null;
                
try
                {
                    connection.TryOpen();
                    transcation = connection.BeginTransaction();
                    
using (
var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
                    {
                        
if (command == 
null)
                        {
                            
throw 
new BatcherException(
new ArgumentException(
"
command
"));
                        }
                        command.Connection = connection;
                        command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);
                        
if (command.CommandText == 
string.Empty)
                        {
                            
return;
                        }
                        
var flag = 
new AssertFlag();
                        dataTable.EachRow(row =>
                            {
                                
var first = flag.AssertTrue();
                                ProcessCommandParameters(dataTable, command, row, first);
                                command.ExecuteNonQuery();
                            });
                    }
                    transcation.Commit();
                }
                
catch (Exception exp)
                {
                    
if (transcation != 
null)
                    {
                        transcation.Rollback();
                    }
                    
throw 
new BatcherException(exp);
                }
                
finally
                {
                    connection.TryClose();
                }
            }
        }
        
private 
void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, 
bool first)
        {
            
for (
var c = 
0; c < dataTable.Columns.Count; c++)
            {
                DbParameter parameter;
                
//
首次创建参数,是为了使用缓存
                
if (first)
                {
                    parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();
                    parameter.ParameterName = dataTable.Columns[c].ColumnName;
                    command.Parameters.Add(parameter);
                }
                
else
                {
                    parameter = command.Parameters[c];
                }
                parameter.Value = row[c];
            }
        }
        
///
 
<summary>
        
///
 生成插入数据的sql语句。
        
///
 
</summary>
        
///
 
<param name="database"></param>
        
///
 
<param name="table"></param>
        
///
 
<returns></returns>
        
private 
string GenerateInserSql(IDatabase database, DataTable table)
        {
            
var syntax = database.Provider.GetService<ISyntaxProvider>();
            
var names = 
new StringBuilder();
            
var values = 
new StringBuilder();
            
var flag = 
new AssertFlag();
            table.EachColumn(column =>
                {
                    
if (!flag.AssertTrue())
                    {
                        names.Append(
"
,
");
                        values.Append(
"
,
");
                    }
                    names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));
                    values.AppendFormat(
"
{0}{1}
", syntax.ParameterPrefix, column.ColumnName);
                });
            
return 
string.Format(
"
INSERT INTO {0}({1}) VALUES ({2})
", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
        }
    }

 

     四、MySql数据批量插入

 

    
///
 
<summary>
    
///
 为 MySql.Data 组件提供的用于批量操作的方法。
    
///
 
</summary>
    
public 
sealed 
class MySqlBatcher : IBatcherProvider
    {
        
///
 
<summary>
        
///
 获取或设置提供者服务的上下文。
        
///
 
</summary>
        
public ServiceContext ServiceContext { 
get
set; }
        
///
 
<summary>
        
///
 将 
<see cref="DataTable"/>
 的数据批量插入到数据库中。
        
///
 
</summary>
        
///
 
<param name="dataTable">
要批量插入的 
<see cref="DataTable"/>
</param>
        
///
 
<param name="batchSize">
每批次写入的数据量。
</param>
        
public 
void Insert(DataTable dataTable, 
int batchSize = 
10000)
        {
            Checker.ArgumentNull(dataTable, 
"
dataTable
");
            
if (dataTable.Rows.Count == 
0)
            {
                
return;
            }
            
using (
var connection = ServiceContext.Database.CreateConnection())
            {
                
try
                {
                    connection.TryOpen();
                    
using (
var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
                    {
                        
if (command == 
null)
                        {
                            
throw 
new BatcherException(
new ArgumentException(
"
command
"));
                        }
                        command.Connection = connection;
                        command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);
                        
if (command.CommandText == 
string.Empty)
                        {
                            
return;
                        }
                        command.ExecuteNonQuery();
                    }
                }
                
catch (Exception exp)
                {
                    
throw 
new BatcherException(exp);
                }
                
finally
                {
                    connection.TryClose();
                }
            }
        }
        
///
 
<summary>
        
///
 生成插入数据的sql语句。
        
///
 
</summary>
        
///
 
<param name="database"></param>
        
///
 
<param name="command"></param>
        
///
 
<param name="table"></param>
        
///
 
<returns></returns>
        
private 
string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
        {
            
var names = 
new StringBuilder();
            
var values = 
new StringBuilder();
            
var types = 
new List<DbType>();
            
var count = table.Columns.Count;
            
var syntax = database.Provider.GetService<ISyntaxProvider>();
            table.EachColumn(c =>
                {
                    
if (names.Length > 
0)
                    {
                        names.Append(
"
,
");
                    }
                    names.AppendFormat(
"
{0}
", DbUtility.FormatByQuote(syntax, c.ColumnName));
                    types.Add(c.DataType.GetDbType());
                });
            
var i = 
0;
            
foreach (DataRow row 
in table.Rows)
            {
                
if (i > 
0)
                {
                    values.Append(
"
,
");
                }
                values.Append(
"
(
");
                
for (
var j = 
0; j < count; j++)
                {
                    
if (j > 
0)
                    {
                        values.Append(
"
");
                    }
                    
var isStrType = IsStringType(types[j]);
                    
var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);
                    
if (parameter != 
null)
                    {
                        values.Append(parameter.ParameterName);
                        command.Parameters.Add(parameter);
                    }
                    
else 
if (isStrType)
                    {
                        values.AppendFormat(
"
'{0}'
", row[j]);
                    }
                    
else
                    {
                        values.Append(row[j]);
                    }
                }
                values.Append(
"
)
");
                i++;
            }
            
return 
string.Format(
"
INSERT INTO {0}({1}) VALUES {2}
", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
        }
        
///
 
<summary>
        
///
 判断是否为字符串类别。
        
///
 
</summary>
        
///
 
<param name="dbType"></param>
        
///
 
<returns></returns>
        
private 
bool IsStringType(DbType dbType)
        {
            
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;
        }
        
///
 
<summary>
        
///
 创建参数。
        
///
 
</summary>
        
///
 
<param name="provider"></param>
        
///
 
<param name="isStrType"></param>
        
///
 
<param name="dbType"></param>
        
///
 
<param name="value"></param>
        
///
 
<param name="parPrefix"></param>
        
///
 
<param name="row"></param>
        
///
 
<param name="col"></param>
        
///
 
<returns></returns>
        
private DbParameter CreateParameter(IProvider provider, 
bool isStrType, DbType dbType, 
object value, 
char parPrefix, 
int row, 
int col)
        {
            
//
如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数
            
if ((isStrType && value.ToString().IndexOf(
'
\'
') != -
1) || dbType == DbType.DateTime)
            {
                
var name = 
string.Format(
"
{0}p_{1}_{2}
", parPrefix, row, col);
                
var parameter = provider.DbProviderFactory.CreateParameter();
                parameter.ParameterName = name;
                parameter.Direction = ParameterDirection.Input;
                parameter.DbType = dbType;
                parameter.Value = value;
                
return parameter;
            }
            
return 
null;
        }
    }

 

     MySql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。

 

     五、测试

     接下来写一个测试用例来看一下使用批量插入的效果。

 

        [Test]
        
public 
void TestBatchInsert()
        {
            Console.WriteLine(TimeWatcher.Watch(() =>
                InvokeTest(database =>
                    {
                        
var table = 
new DataTable(
"
Batcher
");
                        table.Columns.Add(
"
Id
"
typeof(
int));
                        table.Columns.Add(
"
Name1
"
typeof(
string));
                        table.Columns.Add(
"
Name2
"
typeof(
string));
                        table.Columns.Add(
"
Name3
"
typeof(
string));
                        table.Columns.Add(
"
Name4
"
typeof(
string));
                        
//
构造100000条数据
                        
for (
var i = 
0; i < 
100000; i++)
                        {
                            table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());
                        }
                        
//
获取 IBatcherProvider
                        
var batcher = database.Provider.GetService<IBatcherProvider>();
                        
if (batcher == 
null)
                        {
                            Console.WriteLine(
"
不支持批量插入。
");
                        }
                        
else
                        {
                            batcher.Insert(table);
                        }
                        
//
输出batcher表的数据量
                        
var sql = 
new SqlCommand(
"
SELECT COUNT(1) FROM Batcher
");
                        Console.WriteLine(
"
当前共有 {0} 条数据
", database.ExecuteScalar(sql));
                    })));
        }

 

      以下表中列出了四种数据库生成10万条数据各耗用的时间

数据库

耗用时间

MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891
 
分类: , ,

转载地址:http://oclaa.baihongyu.com/

你可能感兴趣的文章
Setting Up KeePass For Centos 6
查看>>
HDU 4579 Random Walk (解方程组)
查看>>
nodejs
查看>>
ambari删除脚本
查看>>
丹麦蓝罐曲奇_百度百科
查看>>
[转]使用mysql profiles 来查看sql 语句执行计划
查看>>
压缩十进制数据的一次实践
查看>>
大话人工智能
查看>>
TF-IDF与余弦相似性的应用
查看>>
Android之动态改变控件大小
查看>>
从程序员到项目经理(五):程序员加油站 -- 不是人人都懂的学习要点
查看>>
共用体
查看>>
hadoop+海量数据面试题汇总(二)
查看>>
Leetcode: Binary Tree Level Order Traversal I,II
查看>>
shell shift 使用一例
查看>>
Android中的Handler的具体用法
查看>>
让索引包含null值的两种方法
查看>>
nodejs获取客户端IP Address
查看>>
《菊与刀》--[美]鲁思·本尼迪克特(Ruth Benedict)
查看>>
mapreduce (五) MapReduce实现倒排索引 修改版 combiner是把同一个机器上的多个map的结果先聚合一次...
查看>>