C#使用Parallel处理数据同步写入Datatable并使用BulkInsert批量导入数据库

前端之家收集整理的这篇文章主要介绍了C#使用Parallel处理数据同步写入Datatable并使用BulkInsert批量导入数据库前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

项目需要,几十万张照片需要计算出每个照片的特征值(调用C++编写的DLL)。

业务流程:选择照片文件夹,分别访问照片-->调用DLL接口传递照片路径-->接收处理返回值-->写入数据库

前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用Parallel来进行平行计算(调用DLL处理照片),统一写入Datatable,然后使用BulkInsert批量把Datatable写入数据库,目前测试8万张照片并行计算速度30分钟,速度提高约30%-40%左右。

代码示例如下:

@H_502_9@private static sqlConnection sqlconn; private static ConcurrentDictionary<string,int> currInts = new ConcurrentDictionary<string,int>(); private void Button1_Click(object sender,EventArgs e) { var dirPath = ""; using (var folderBrowser = new FolderBrowserDialog()) { if (folderBrowser.ShowDialog() != DialogResult.OK) return; dirPath = folderBrowser.SelectedPath; if (!Directory.Exists(dirPath)) { MessageBox.Show(@"所选路径不存在或无权访问",@"错误",MessageBoxButtons.OK,MessageBoxIcon.Error); return; } } BeginInvoke(new Action(async () => { button1.Enabled = false; var sw = new Stopwatch(); sw.Start(); //检测服务器链接 Log.WriteLine(@"尝试连接数据库服务器"); sqlconn = new sqlConnection( $"Data Source={txt_serverIP.Text},{txt_ServerPort.Text};User ID={txt_User.Text};Password={txt_Pwd.Text};Initial Catalog={txt_DB.Text};Persist Security Info=False;Pooling=true;Min Pool Size=30;Max Pool Size=200;"); if (sqlconn.State == ConnectionState.Closed) { try { sqlconn.Open(); } catch (Exception exception) { Log.WriteLine($@"连接数据库服务器【失败】-->{exception.Message}"); button1.Enabled = true; return; } } Log.WriteLine($@"连接数据库服务器【成功】{Environment.NewLine}获取未转换图片数据。。。"); var ds = new DataSet(); int.TryParse(txt_start.Text,out var start); int.TryParse(txt_end.Text,out var end); var sqlstrALL = ""; if (start == 0 || end == 0) { sqlstrALL = "SELECT * FROM ViewWeiZhuanHuan"; } else { sqlstrALL = $"SELECT * FROM ViewWeiZhuanHuan WHERE {txt_mastKey.Text} BETWEEN {start} AND {end}"; } var sqlcmd = new sqlCommand(sqlstrALL,sqlconn); DataAdapter da = new sqlDataAdapter(sqlcmd); da.Fill(ds); if (ds.Tables.Count == 0 || ds.Tables[0].Rows.Count == 0) { Log.WriteLine("所有图片都已经转换完毕。"); sqlconn.Close(); return; } Log.WriteLine($"{ds.Tables[0].Rows.Count}个图片需要转换。"); var total = ds.Tables[0].Rows.Count; var rowkey = comboBox1.SelectedValue.ToString(); var splitkey = txt_split.Text.Trim(); #region 定义数据保存 var dt = new DataTable(); dt.Columns.Add("zd1",typeof(int)); dt.Columns.Add("zd2",typeof(int)); dt.Columns.Add("zd3",typeof(string)); dt.Columns.Add("zd4",typeof(string)); dt.Columns.Add("zd5",typeof(string)); dt.Columns.Add("zd6",typeof(string)); #endregion #region 并行执行 currInts.TryAdd("currInts",1);//初始化进度数字为1 await Task.Run(() => { //使用8个cpu核心来运行 var result = Parallel.For(0,ds.Tables[0].Rows.Count,new ParallelOptions { MaxDegreeOfParallelism = 8},(rotIndex,state) => { BeginInvoke(new Action(() => { currInts.TryGetValue("currInts",out var currValue); lb_process.Text = $@"{currValue}/{total}";//显示进度 var nextValue = currValue + 1; currInts.TryUpdate("currInts",nextValue,currValue);//加1 })); var fileDirPath = "";//根据选择的文件名格式,用填写的规则生成路径 var file = new List<string>{ $"{dirPath}\\{fileDirPath}\\{ksno}_fp1.jpg",$"{dirPath}\\{fileDirPath}\\{ksno}_fp2.jpg",$"{dirPath}\\{fileDirPath}\\{ksno}_fp3.jpg"}; foreach (var zwzp in file) { try { var model = ZwHelper.zwzhAsync($"{zwzp}").Result;//调用C++转换 if (model != null) { //并行计算下写入Datatable需要锁定才可以,否则会提示datatable索引损坏 lock (dt.Rows.SyncRoot) { var dr = dt.NewRow(); dr["zd1"] = Convert.ToInt32(filexe); dr["zd2"] = Convert.ToInt32(ds.Tables[0].Rows[rotIndex]["zd1"]); dr["zd3"] = model.zhtz; dr["zd4"] = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); dr["zd5"] = ""; dr["zd6"] = ""; dt.Rows.Add(dr); } } else { Log.WriteLine($@"{ksno}转换失败"); Log.Log.Error($"{ksno}转换失败。"); } } catch (Exception exception) { Log.Log.Error($"学号{ksno},图片路径{zwzp}转换失败。{exception}"); } } }); sw.Stop(); Log.WriteLine($"转换耗时:{sw.ElapsedMilliseconds}毫秒"); Log.WriteLine($@"开始写入数据库,数量{dt.Rows.Count}"); #region 批量写入 if (dt.Rows.Count ==0) { Log.WriteLine(@"没有要写入的数据。"); return; } sw.Restart(); var sucess = false; if (sqlHelper.BulkInsert(sqlconn,txt_TableName.Text.Trim(),dt,out var err)) { sucess = true; } else { Log.Log.Error($"写入数据库失败==》{err}"); } sw.Stop(); Log.WriteLine($"写入数据库是否成功=>{sucess},耗时{sw.ElapsedMilliseconds}毫秒"); #endregion }); #endregion if (sqlconn.State == ConnectionState.Open) { sqlconn.Close(); } button1.Enabled = true; })); }

  sql批量写入函数

@H_502_9@ /// <summary> /// 批量插入 /// </summary> /// <param name="conn">连接对象</param> /// <param name="tableName">将泛型集合插入到本地数据库表的表名</param> /// <param name="dataTable">要批量写入的Datatable</param> /// <param name="err">错误时返回的信息</param> public static bool BulkInsert(sqlConnection conn,string tableName,DataTable dataTable,out string err) { err = ""; if (dataTable == null || dataTable.Rows.Count == 0) { err = "要写入的数据为空"; return false; } var tran = conn.BeginTransaction();//开启事务 var bulkCopy = new sqlBulkCopy(conn,sqlBulkCopyOptions.KeepNulls,tran); try { if (conn.State == ConnectionState.Closed) { conn.Open(); } bulkCopy.BatchSize = 1000; bulkCopy.DestinationTableName = tableName; bulkCopy.WriteToServer(dataTable); tran.Commit(); return true; } catch (Exception e) { err = e.ToString(); tran.Rollback(); return false; } finally { bulkCopy.Close(); if (conn.State == ConnectionState.Open) { conn.Close(); } } }

  

原文链接:https://www.cnblogs.com/wdw984/p/10968808.html

猜你在找的C#相关文章