前端之家收集整理的这篇文章主要介绍了
SQLite之大数据量批量入库,
前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.sqlException;
import java.sql.Statement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BatchTool {
// ddl
private static String ddl = "CREATE TABLE IF NOT EXISTS pbeijing_point (OBJECTID INTEGER,NAME TEXT,ADDRESS TEXT,PHONE TEXT,FAX TEXT,TYPE TEXT,CITYCODE TEXT,URL TEXT,EMAIL TEXT,NAME2 TEXT,X INTEGER,Y INTEGER)";
Connection jCon = null;
// get connection
public synchronized Connection getConnection() {
if (jCon == null) {
// json=
Statement state = null;
try {
Class.forName("org.sqlite.JDBC");
jCon = DriverManager.getConnection("jdbc:sqlite:c:\\newD.db");
state = jCon.createStatement();
state.executeUpdate(ddl);
} catch (sqlException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return jCon;
}
// 创建500个线程
ExecutorService service = Executors.newFixedThreadPool(20);
// 读取sql文件 每五百个insert 语句由一个线程批量操作
public void readBatchsql(InputStream is) throws IOException {
BufferedReader bufferReader = new BufferedReader(new InputStreamReader(is,"UTF-8"));
String line;
String one = "";
int tag = 0;
String batchsql = "";
while ((line = bufferReader.readLine()) != null) {
one += line;
if (one.indexOf(";") != -1) {
batchsql += one;
one = "";// reset
tag++;
}
// 符合条件 开辟一个线程
if (tag != 0 && tag / 500 != 0) {
service.execute(new sqliteBatchHandler(batchsql));
batchsql = "";// reset
tag = 0;// reset
}
}
// 最后执行 剩余的sql
if (batchsql.length() > 0) {
System.out.println("finalsql:" + batchsql);
Runnable r = new sqliteBatchHandler(batchsql);
service.execute(r);
}
try {
// 关闭线程池
this.service.shutdown();
this.service.awaitTermination(1,TimeUnit.HOURS);
getConnection().close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (sqlException e) {
e.printStackTrace();
}
};
/**
* @note 分割sql
* */
private static String[] splitsql(String batchsql) {
if (batchsql != null) {
return batchsql.split(";");
}
return null;
}
/**
* @note 执行批量更新操作 由于connection.comit 操作时 如果存在 statement没有close 就会报错
* 因此将此方法加上同步 。
* */
private synchronized void exucteUpdate(String batch) {
Statement ste = null;
Connection con = null;
try {
con = getConnection();
con.setAutoCommit(false);
ste = con.createStatement();
String[] sqls = splitsql(batch);
for (String sql : sqls) {
if (sql != null) {
ste.addBatch(sql);
}
}
ste.executeBatch();
ste.close();
con.commit();// 提交
} catch (Exception e) {
e.printStackTrace();
System.out.println("执行失败:" + batch);
try {
con.rollback();// 回滚
} catch (sqlException e1) {
e1.printStackTrace();
}
} finally {
if (ste != null) {
try {
ste.close();
} catch (sqlException e) {
e.printStackTrace();
}
}
}
}
/**
* @note 入库线程
* */
private class sqliteBatchHandler implements Runnable {
private String batch;
public sqliteBatchHandler(String sql) {
this.batch = sql;
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (this.batch.length() > 0) {
exucteUpdate(batch);
}
}
}
public static void main(String[] args) throws FileNotFoundException,IOException {
BatchTool s = new BatchTool();
s.readBatchsql(new FileInputStream(new File("c:\\poi.sql")));
}
}