三个注解类:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) public @interface Column { String family() ; String column() default ""; String oracle_column() default ""; }
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) public @interface RowKey { }
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface Table { String oracle_name()default ""; String hbase_name() default ""; }实体类根据你的需求而创建,有两个基本实体类
public class BaseEntity { public boolean isEmpty(){ return false; } }
public class RetryEntity { private String oracleTableName; private String hbaseTableName; private int startIndex; private int endIndex; public String getOracleTableName() { return oracleTableName; } public void setOracleTableName(String oracleTableName) { this.oracleTableName = oracleTableName; } public String getHbaseTableName() { return hbaseTableName; } public void setHbaseTableName(String hbaseTableName) { this.hbaseTableName = hbaseTableName; } public int getStartIndex() { return startIndex; } public void setStartIndex(int startIndex) { this.startIndex = startIndex; } public int getEndIndex() { return endIndex; } public void setEndIndex(int endIndex) { this.endIndex = endIndex; } }一个简单的例子:
@Table(hbase_name = "CreditReport",oracle_name = "TT_CREDIT_REPORT") public class CreditReport extends BaseEntity { @RowKey @Column(family = "CreditReport",column = "",oracle_column = "BIZGUID") private String bizguid; @Column(family = "CreditReport",oracle_column = "PATH") private String path; @Column(family = "CreditReport",oracle_column = "STATUS") private int status; @Column(family = "CreditReport",oracle_column = "ERRORMSG") private String errormsg; @Column(family = "CreditReport",oracle_column = "DOCID") private String docid; @Column(family = "CreditReport",oracle_column = "FILEID") private String fileid; @Column(family = "CreditReport",oracle_column = "LASTCHGDTS") private Date lastchgdts; @Column(family = "CreditReport",oracle_column = "CATEGORY") private String category; @Column(family = "CreditReport",oracle_column = "FILENAME") private String filename; @Column(family = "CreditReport",oracle_column = "FILEINDEX") private int fileindex; public String getBizguid() { return bizguid; } public void setBizguid(String bizguid) { this.bizguid = bizguid; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public String getErrormsg() { return errormsg; } public void setErrormsg(String errormsg) { this.errormsg = errormsg; } public String getDocid() { return docid; } public void setDocid(String docid) { this.docid = docid; } public String getFileid() { return fileid; } public void setFileid(String fileid) { this.fileid = fileid; } public Date getLastchgdts() { return lastchgdts; } public void setLastchgdts(Date lastchgdts) { this.lastchgdts = lastchgdts; } public String getCategory() { return category; } public void setCategory(String category) { this.category = category; } public String getFilename() { return filename; } public void setFilename(String filename) { this.filename = filename; } public int getFileindex() { return fileindex; } public void setFileindex(int fileindex) { this.fileindex = fileindex; } }
public class EmptyEntity extends BaseEntity { @Override public boolean isEmpty() { return true; } }队列:
public class RecordQueue { private static BlockingQueue<BaseEntity> queue=null; public RecordQueue(){ queue=new LinkedBlockingQueue<BaseEntity>(1000000); } public RecordQueue(int capacity){ queue=new LinkedBlockingQueue<BaseEntity>(capacity); } public BaseEntity getValue() throws InterruptedException{ return queue.take(); } public boolean isEmpty(){ return queue.isEmpty(); } public void putValue(BaseEntity value) throws InterruptedException{ queue.put(value); } public void putValues(List<BaseEntity> values) throws InterruptedException{ for(BaseEntity obj:values){ this.putValue(obj); } } }失败队列:
public class PartErrorRetryQueue { private static BlockingQueue<RetryEntity> queue=null; private AtomicBoolean retryThreadStarted=new AtomicBoolean(false); public PartErrorRetryQueue(){ queue=new LinkedBlockingQueue<RetryEntity>(1000000); } public PartErrorRetryQueue(int capacity){ queue=new LinkedBlockingQueue<RetryEntity>(capacity); } public boolean isEmpty(){ return queue.isEmpty(); } public void putValue(RetryEntity value) throws InterruptedException{ queue.put(value); } public void putValues(List<RetryEntity> values) throws InterruptedException{ for(RetryEntity obj:values){ this.putValue(obj); } } public void startRetryThread(){ if(!retryThreadStarted.getAndSet(true)){ new RetryThread().start(); } } private class RetryThread extends Thread{ @Override public void run() { while(true){ try { int getConnTimes=0; ConnectionEntity connEntity=null; try { getConnTimes++; connEntity=ConnectionPool.getInstance().getConnection(); } catch (sqlException e) { e.printStackTrace(); while(connEntity==null){ try { if(getConnTimes>10){ Thread.currentThread().interrupt(); throw new RuntimeException("get connection times exceeded"); } getConnTimes++; connEntity=ConnectionPool.getInstance().getConnection(); } catch (sqlException e1) { e1.printStackTrace(); } } } RetryEntity entity=queue.take(); Connection conn=connEntity.getConnection(); List<String> columns=OracleUtils.getTableColumns(conn,entity.getOracleTableName()); ResultSet rs=null; PreparedStatement statement=null; try{ statement=conn.prepareStatement("select * from (select rownum rn,t.* from "+entity.getOracleTableName()+" t where rownum < ? )t where t.rn >= ?"); statement.setInt(1,entity.getEndIndex() ); statement.setInt(2,entity.getStartIndex()); rs=statement.executeQuery(); List<BaseEntity> entityList=OracleUtils.convertResultSetToEntity(rs,columns,entity.getOracleTableName()); HBaseUtils.getInstance().batchInsertRecord(entity.getHbaseTableName(),entityList); } catch (Exception e) { queue.put(entity); e.printStackTrace(); }finally{ OracleUtils.closeResultSet(rs); OracleUtils.closePrepareStatement(statement); } } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException("get connection times exceeded"); } } } } }
hbase单表单线程处理:
public class HBaSEOutgoingWorkThread extends Thread { private RecordQueue queue; private int batchSize; private String tableName; private int totalCount; private BackendThread backendThread; public HBaSEOutgoingWorkThread(RecordQueue queue,int batchSize,String tableName,int totalCount){ this.queue=queue; this.batchSize=batchSize; this.tableName=tableName; this.totalCount=totalCount; backendThread=new BackendThread(); backendThread.start(); } @Override public void run() { try { int currCount=0; while(true){ int count=0; List<BaseEntity> entityList=new ArrayList<BaseEntity>(); while(true){ BaseEntity obj=queue.getValue(); count++; if(!obj.isEmpty()){ entityList.add(obj); currCount++; } if(count==batchSize){ break; } } HBaseUtils.getInstance().batchInsertRecord(tableName,entityList); if(totalCount==currCount){ HBaseUtils.closeConnection(); ConnectionPool.getInstance().shutdown(); backendThread.interrupt(); break; } } } catch (InterruptedException e) { e.printStackTrace(); } } class BackendThread extends Thread{ @Override public void run() { try { while(true){ boolean firstEmpty=queue.isEmpty(); Thread.sleep(100000); boolean secondEmpty=queue.isEmpty(); if(firstEmpty&&secondEmpty){ for(int i=0;i<batchSize;i++){ try { queue.putValue(new EmptyEntity()); } catch (InterruptedException e) { e.printStackTrace(); } } } } } catch (InterruptedException e) { System.out.println("InterruptedException"); e.printStackTrace(); } } } }hbase单表多现场,基于CountDownLatch:
public class HBaSEOutgoingWorkThreadUseCountDownLatch extends Thread { private RecordQueue queue; private int batchSize; private String tableName; private CountDownLatch latch; private int totalCount; private BackendThread backendThread; public HBaSEOutgoingWorkThreadUseCountDownLatch(RecordQueue queue,CountDownLatch latch,int totalCount){ this.queue=queue; this.batchSize=batchSize; this.tableName=tableName; this.latch=latch; this.totalCount=totalCount; backendThread=new BackendThread(); backendThread.start(); } @Override public void run() { try { int currentCount=0; while(true){ int count=0; List<BaseEntity> entityList=new ArrayList<BaseEntity>(); while(true){ BaseEntity obj=queue.getValue(); count++; if(!obj.isEmpty()){ entityList.add(obj); currentCount++; } if(count==batchSize){ System.out.println(currentCount); break; } } HBaseUtils.getInstance().batchInsertRecord(tableName,entityList); if(totalCount==currentCount){ System.out.println(currentCount); backendThread.interrupt(); break; } } } catch (InterruptedException e) { e.printStackTrace(); }finally{ latch.countDown(); } } class BackendThread extends Thread{ @Override public void run() { try { while(true){ boolean firstEmpty=queue.isEmpty(); Thread.sleep(100000); boolean secondEmpty=queue.isEmpty(); if(firstEmpty&&secondEmpty){ for(int i=0;i<batchSize;i++){ try { queue.putValue(new EmptyEntity()); } catch (InterruptedException e) { e.printStackTrace(); } } } } } catch (InterruptedException e) { e.printStackTrace(); } } } }
Oracle拉取数据单线程:
public class OracleIncomeWorkThread extends Thread { private String tableName; private RecordQueue queue; private int batchSize; private int start; private int end; public OracleIncomeWorkThread(String tableName,RecordQueue queue,int start,int end){ this.tableName=tableName; this.queue=queue; this.batchSize=batchSize; this.start=start; this.end=end; } @Override public void run() { if(start>end){ return; } ConnectionPool pool=null; ConnectionEntity entity=null; try { pool=ConnectionPool.getInstance(); entity=pool.getConnection(); Connection conn=entity.getConnection(); List<String> columns=OracleUtils.getTableColumns(conn,tableName); int count=0; do{ ResultSet rs=null; PreparedStatement statement=null; try{ if((start+batchSize*(count+1))>end){ batchSize=end-(start+batchSize*count)+1; } statement=conn.prepareStatement("select * from (select rownum rn,t.* from "+tableName+" t where rownum < ? )t where t.rn >= ?"); statement.setInt(1,start+batchSize); statement.setInt(2,start); rs=statement.executeQuery(); List<BaseEntity> list=OracleUtils.convertResultSetToEntity(rs,tableName); queue.putValues(list); start+=batchSize; }catch(Exception e){ e.printStackTrace(); }finally{ OracleUtils.closeResultSet(rs); OracleUtils.closePrepareStatement(statement); } }while((start+batchSize*count)<end); } catch (InterruptedException e) { e.printStackTrace(); } catch (sqlException e) { e.printStackTrace(); }finally{ try { pool.releaseConnection(entity.getId()); } catch (sqlException e) { e.printStackTrace(); } } } }
oracle拉取数据多线程:
public class OracleIncomeWorkThreadUseCountDownLatch extends Thread { private String tableName; private RecordQueue queue; private int batchSize; private int start; private int end; private CountDownLatch latch; public OracleIncomeWorkThreadUseCountDownLatch(String tableName,int end,CountDownLatch latch){ this.tableName=tableName; this.queue=queue; this.batchSize=batchSize; this.start=start; this.end=end; this.latch=latch; } @Override public void run() { if(start>end){ return; } ConnectionPool pool=null; ConnectionEntity entity=null; try { pool=ConnectionPool.getInstance(); entity=pool.getConnection(); Connection conn=entity.getConnection(); List<String> columns=OracleUtils.getTableColumns(conn,tableName); queue.putValues(list); start+=batchSize; }catch(Exception e){ e.printStackTrace(); }finally{ OracleUtils.closeResultSet(rs); OracleUtils.closePrepareStatement(statement); } }while((start+batchSize*count)<end); } catch (InterruptedException e) { e.printStackTrace(); } catch (sqlException e) { e.printStackTrace(); }finally{ try { pool.releaseConnection(entity.getId()); } catch (sqlException e) { e.printStackTrace(); } latch.countDown(); } } }
一个入口程序例子:
public class CreditReportEnteranceUseCountDownLatch implements Enterance { private ExecutorService executeService; //每次从oracle查询数据的每页数量 private int pageSize; //每一批的线程数 private int threadCountBatch; //每线程梳理记录数量 private int countPerThread; public CreditReportEnteranceUseCountDownLatch(ExecutorService executeService,int pageSize,int threadCountBatch,int countPerThread){ this.executeService=executeService; this.pageSize=pageSize; this.threadCountBatch=threadCountBatch; this.countPerThread=countPerThread; } public void doWork() throws Exception { HBaseUtils.getInstance().deleteTable("CreditReport"); HBaseUtils.getInstance().createTableFromEntity(CreditReport.class); long count=OracleUtils.getRecordCount("TT_CREDIT_REPORT"); //总共需要线程数 int threadCount=(int)Math.ceil(count*1.0/countPerThread); int start=1; //总共需要线程数小于设定的每批线程数 if(threadCount<=threadCountBatch){ CountDownLatch latch=new CountDownLatch(threadCount*2); for(int i=0;i<threadCount;i++){ //每一对线程一个队列,这理也可以多个线程一个队列看具体效率 RecordQueue queue=new RecordQueue(pageSize); int end=start+countPerThread; if(count<end){ end=(int)count+1; } OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",queue,pageSize,start,end,latch); executeService.submit(oracleThread); HBaSEOutgoingWorkThreadUseCountDownLatch hbaseThread=new HBaSEOutgoingWorkThreadUseCountDownLatch(queue,pageSize/2,"CreditReport",latch,end-start); executeService.submit(hbaseThread); start+=countPerThread; } latch.await(); }else{ //线程批次 int threadBatch=(int)Math.ceil(threadCount*1.0/threadCountBatch); start=1; if(threadCount%threadCountBatch==0){ for(int j=0;j<threadBatch;j++){ CountDownLatch latch=new CountDownLatch(threadCountBatch*2); for(int i=0;i<threadCountBatch;i++){ RecordQueue queue=new RecordQueue(pageSize); int end=start+countPerThread; if(count<end){ end=(int)count+1; } OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",latch); executeService.submit(oracleThread); HBaSEOutgoingWorkThreadUseCountDownLatch hbaseThread=new HBaSEOutgoingWorkThreadUseCountDownLatch(queue,end-start); executeService.submit(hbaseThread); start+=countPerThread; } latch.await(); } }else{ for(int j=0;j<threadBatch-1;j++){ CountDownLatch latch=new CountDownLatch(threadCountBatch*2); for(int i=0;i<threadCountBatch;i++){ RecordQueue queue=new RecordQueue(pageSize); int end=start+countPerThread; if(count<end){ end=(int)count+1; } OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",end-start); executeService.submit(hbaseThread); start+=countPerThread; } latch.await(); } int restCount=threadCount-(threadBatch-1)*threadCountBatch; CountDownLatch latch=new CountDownLatch(restCount*2); for(int i=0;i<restCount;i++){ RecordQueue queue=new RecordQueue(pageSize); int end=start+countPerThread; if(count<end){ end=(int)count+1; } OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",latch); executeService.submit(oracleThread); HBaSEOutgoingWorkThreadUseCountDownLatch hbaseThread=new HBaSEOutgoingWorkThreadUseCountDownLatch(queue,end-start); executeService.submit(hbaseThread); start+=countPerThread; } latch.await(); } } ConnectionPool.getInstance().shutdown(); HBaseUtils.closeConnection(); } }
工具类:
oracle连接池:
public class ConnectionPool { private static ConcurrentHashMap<String,ConnectionEntity> idelConnections=null; private static ConcurrentHashMap<String,ConnectionEntity> activeConnections=null; private static int initSize; private static int maxSize; private static AtomicInteger idelSize=new AtomicInteger(0); private static AtomicInteger activeSize=new AtomicInteger(0); private static ConnectionPool instance=null; private static Lock lock= new ReentrantLock(); private Object object=new Object(); private static boolean isShutdown=false; private ConnectionPool(int initSize,int maxSize){ this.initSize=initSize; this.maxSize=maxSize; idelConnections=new ConcurrentHashMap<String,ConnectionEntity>(); activeConnections=new ConcurrentHashMap<String,ConnectionEntity>(); initConnections(); new DetectFailConnection().start(); } public ConnectionEntity getConnection() throws InterruptedException,sqlException{ if(isShutdown){ throw new RuntimeException("pool is shutdown."); } lock.lock(); try{ if(idelSize.get()>0){ if(idelConnections.size()<=0){ throw new RuntimeException(""); } String key=idelConnections.entrySet().iterator().next().getKey(); ConnectionEntity entity=idelConnections.entrySet().iterator().next().getValue(); entity.setStatus(Connection_Status.active); idelConnections.remove(key); idelSize.decrementAndGet(); if(entity.getConnection().isClosed()){ return getConnection(); } activeConnections.put(key,entity); activeSize.incrementAndGet(); return entity; } }finally{ lock.unlock(); } if(activeSize.get()>maxSize){ throw new RuntimeException("活跃数量大于最大值"); } if(activeSize.get()==maxSize){ synchronized (object) { object.wait(); } } if(isShutdown){ throw new RuntimeException("pool is shutdown."); } Connection conn=OracleUtils.getConnection(); String id=UUID.randomUUID().toString(); ConnectionEntity entity=new ConnectionEntity(); entity.setId(id); entity.setConnection(conn); entity.setStatus(Connection_Status.active); activeConnections.put(id,entity); activeSize.incrementAndGet(); return entity; } public void releaseConnection(String id) throws sqlException{ if(isShutdown){ throw new RuntimeException("pool is shutdown."); } if(idelSize.get()==maxSize){ OracleUtils.closeConnection(activeConnections.remove(id).getConnection()); }else{ ConnectionEntity entity=activeConnections.remove(id); entity.setStatus(Connection_Status.idel); idelConnections.put(id,entity); idelSize.incrementAndGet(); activeSize.decrementAndGet(); synchronized (object) { object.notify(); } } } private void initConnections(){ for(int i=0;i<this.initSize;i++){ ConnectionEntity entity=new ConnectionEntity(); String id=UUID.randomUUID().toString(); entity.setId(id); entity.setConnection(OracleUtils.getConnection()); entity.setStatus(Connection_Status.idel); idelConnections.put(id,entity); idelSize.getAndAdd(1); } } public void shutdown(){ isShutdown=true; synchronized (object) { object.notifyAll(); } Iterator<String> idelIt=idelConnections.keySet().iterator(); while(idelIt.hasNext()){ String key=idelIt.next(); ConnectionEntity entity=idelConnections.get(key); try { entity.getConnection().close(); } catch (sqlException e) { e.printStackTrace(); } } Iterator<String>activeIt=activeConnections.keySet().iterator(); while(activeIt.hasNext()){ String key=activeIt.next(); ConnectionEntity entity=activeConnections.get(key); try { entity.getConnection().close(); } catch (sqlException e) { e.printStackTrace(); } } initSize=0; maxSize=0; idelSize=new AtomicInteger(0); activeSize=new AtomicInteger(0); } public int getIdelSize(){ return this.idelSize.get(); } public int getActiveSize(){ return this.activeSize.get(); } public static ConnectionPool getInstance(){ if(isShutdown){ throw new RuntimeException("pool is already shutdown."); } if(maxSize>0){ return instance; } return getInstance(10,30); } public static ConnectionPool getInstance(int initSize,int maxSize){ if(isShutdown){ throw new RuntimeException("pool is already shutdown."); } if(initSize<0||maxSize<1){ throw new RuntimeException("initSize必须不小于0,maxsize必须大于等于1"); } if(initSize>maxSize){ initSize=maxSize; } synchronized (ConnectionPool.class) { if(instance==null){ instance=new ConnectionPool(initSize,maxSize); } } return instance; } public static class ConnectionEntity{ private String id; private Connection connection; private Connection_Status status; public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public Connection_Status getStatus() { return status; } public void setStatus(Connection_Status status) { this.status = status; } public String getId() { return id; } public void setId(String id) { this.id = id; } } private enum Connection_Status{ idel,active,close } class DetectFailConnection extends Thread{ @Override public void run() { Iterator<String> itIdel=idelConnections.keySet().iterator(); while(itIdel.hasNext()){ String key =itIdel.next(); ConnectionEntity entity=idelConnections.get(key); try { if(entity.getConnection().isClosed()){ idelConnections.remove(key); idelSize.decrementAndGet(); } } catch (sqlException e) { e.printStackTrace(); } } Iterator<String> itActive=activeConnections.keySet().iterator(); while(itActive.hasNext()){ String key=itActive.next(); ConnectionEntity entity=activeConnections.get(key); try { if(entity.getConnection().isClosed()){ activeConnections.remove(key); activeSize.decrementAndGet(); } } catch (sqlException e) { e.printStackTrace(); } } try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
反射工具:
public class EntityUtils { private static final String basePackage="com.topcheer.transfer.entity"; public static Class getEntityByOracleTableName(String tableName){ List<Class> allClass = getClasses(basePackage); return filterOracleEntity(allClass,tableName); } /*public static void main(String[] args) { Class c=getEntityByOracleTableName("Test"); System.out.println(c.getName().substring(c.getName().lastIndexOf(".")+1)); }*/ private static Class filterOracleEntity(List<Class> allClass,String tableName){ for(Class c:allClass){ if(c.isAnnotationPresent(Table.class)){ Table ann=(Table) c.getAnnotation(Table.class); if(ann.oracle_name().equals(tableName)){ return c; }else if(ann.oracle_name().equals("")&&c.getName().substring(c.getName().lastIndexOf(".")+1).equals(tableName)){ return c; } } } throw new RuntimeException("table entity not found"); } /** * 从包package中获取所有的Class * @param pack * @return */ private static List<Class> getClasses(String packageName){ //第一个class类的集合 List<Class> classes = new ArrayList<Class>(); //是否循环迭代 boolean recursive = true; //获取包的名字 并进行替换 String packageDirName = packageName.replace('.','/'); //定义一个枚举的集合 并进行循环来处理这个目录下的things Enumeration<URL> dirs; try { dirs = Thread.currentThread().getContextClassLoader().getResources(packageDirName); //循环迭代下去 while (dirs.hasMoreElements()){ //获取下一个元素 URL url = dirs.nextElement(); //得到协议的名称 String protocol = url.getProtocol(); //如果是以文件的形式保存在服务器上 if ("file".equals(protocol)) { //获取包的物理路径 String filePath = URLDecoder.decode(url.getFile(),"UTF-8"); //以文件的方式扫描整个包下的文件 并添加到集合中 findAndAddClassesInPackageByFile(packageName,filePath,recursive,classes); } } } catch (IOException e) { e.printStackTrace(); } return classes; } /** * 以文件的形式来获取包下的所有Class * @param packageName * @param packagePath * @param recursive * @param classes */ private static void findAndAddClassesInPackageByFile(String packageName,String packagePath,final boolean recursive,List<Class> classes){ //获取此包的目录 建立一个File File dir = new File(packagePath); //如果不存在或者 也不是目录就直接返回 if (!dir.exists() || !dir.isDirectory()) { return; } //如果存在 就获取包下的所有文件 包括目录 File[] dirfiles = dir.listFiles(new FileFilter() { //自定义过滤规则 如果可以循环(包含子目录) 或则是以.class结尾的文件(编译好的java类文件) public boolean accept(File file) { return (recursive && file.isDirectory()) || (file.getName().endsWith(".class")); } }); //循环所有文件 for (File file : dirfiles) { //如果是目录 则继续扫描 if (file.isDirectory()) { findAndAddClassesInPackageByFile(packageName + "." + file.getName(),file.getAbsolutePath(),classes); } else { //如果是java类文件 去掉后面的.class 只留下类名 String className = file.getName().substring(0,file.getName().length() - 6); try { //添加到集合中去 classes.add(Class.forName(packageName + '.' + className)); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } } }
hbase工具类:
public class HBaseUtils { private static Configuration conf = HBaseConfiguration.create(); private static ExecutorService poolx=Executors.newScheduledThreadPool(20); private static Connection connection = null; private static HBaseUtils instance=null; private HBaseUtils(){ if(connection==null){ try { connection=ConnectionFactory.createConnection(conf,poolx); } catch (IOException e) { e.printStackTrace(); } } } public static void closeConnection(){ try { connection.close(); poolx.shutdownNow(); } catch (IOException e) { e.printStackTrace(); } } public static synchronized HBaseUtils getInstance(){ if(instance==null){ instance=new HBaseUtils(); } return instance; } /** * 创建表 * @throws IOException */ public void createTable(String tableName,String[] columns) throws IOException { Admin admin = connection.getAdmin(); TableName name=TableName.valueOf(tableName); if (admin.tableExists(name)) { admin.disableTable(name); admin.deleteTable(name); } else { HTableDescriptor desc =new HTableDescriptor(); desc.setName(TableName.valueOf(tableName)); for (String column : columns) { desc.addFamily(new HColumnDescriptor(column)); } admin.createTable(desc); } } public void createTableFromEntity(Class entityClass){ com.topcheer.transfer.annotation.Table ann=(com.topcheer.transfer.annotation.Table)entityClass.getAnnotation(com.topcheer.transfer.annotation.Table.class); String hbase_name=ann.hbase_name(); if(hbase_name.equals("")){ hbase_name=entityClass.getName(); } Field[]fields=entityClass.getDeclaredFields(); Set<String> fimalySet=new HashSet<String>(); for(Field field:fields){ Column column=field.getAnnotation(Column.class); fimalySet.add(column.family()); } String [] familys=fimalySet.toArray(new String[]{}); try { createTable(hbase_name,familys); } catch (IOException e) { e.printStackTrace(); } } /** * 插入一行记录 * @param tablename * 表名 * @param row * 行名称 * @param columnFamily * 列族名 * @param columns * (列族名:column)组合成列名 * @param values * 行与列确定的值 */ public void insertRecord(String tableName,String row,String columnFamily,String[] columns,String[] values) { try { TableName name=TableName.valueOf(tableName); Table table = connection.getTable(name); Put put = new Put(Bytes.toBytes(row)); for (int i = 0; i < columns.length; i++) { put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(String.valueOf(columns[i])),Bytes.toBytes(values[i])); table.put(put); } } catch (IOException e) { e.printStackTrace(); } } /** * 將对象转化为Put * @param obj * @return * @throws IllegalAccessException * @throws IllegalArgumentException */ private Put convertEntityToPut(BaseEntity obj) throws IllegalArgumentException,IllegalAccessException{ Field[]fields=obj.getClass().getDeclaredFields(); Field rowKeyField=null; for(Field field:fields){ if(field.isAnnotationPresent(RowKey.class)){ rowKeyField=field; rowKeyField.setAccessible(true); } } if(rowKeyField==null){ throw new RuntimeException("rowKey注解不存在"); } Put p = new Put(Bytes.toBytes(String.valueOf(rowKeyField.get(obj)))); for(Field field:fields){ if(field!=rowKeyField){ Column column=field.getAnnotation(Column.class); String family=column.family(); String col=column.column(); if(col.equals("")){ col=field.getName(); } field.setAccessible(true); Object value=field.get(obj); p.addColumn(Bytes.toBytes(family),Bytes.toBytes(col),Bytes.toBytes(String.valueOf(value))); } } return p; } /** * 批量插入 * @param tableName * @param entityList */ public void batchInsertRecord(String tableName,List<BaseEntity> entityList){ BufferedMutator table=null; try { table = connection.getBufferedMutator(TableName.valueOf(tableName)); List<Mutation> mutations = new ArrayList<Mutation>(); for(BaseEntity entity:entityList){ mutations.add(convertEntityToPut(entity)); } table.mutate(mutations); table.flush(); } catch (IOException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } /** * 删除一行记录 * @param tablename * 表名 * @param rowkey * 行名 * @throws IOException */ public void deleteRow(String tablename,String rowkey) throws IOException { TableName name=TableName.valueOf(tablename); Table table = connection.getTable(name); List<Delete> list = new ArrayList<Delete>(); Delete d1 = new Delete(rowkey.getBytes()); list.add(d1); table.delete(list); } /** * 查找一行记录 * @param tablename * 表名 * @param rowkey * 行名 */ public static void selectRow(String tablename,String rowKey) throws IOException { TableName name=TableName.valueOf(tablename); Table table = connection.getTable(name); Get g = new Get(rowKey.getBytes()); Result rs = table.get(g); for (Cell cell : rs.rawCells()) { System.out.print(new String(cell.getRowArray()) + " "); System.out.print(new String(cell.getFamilyArray()) + ":"); System.out.print(new String(cell.getQualifierArray()) + " "); System.out.print(cell.getTimestamp() + " "); System.out.println(new String(cell.getValueArray())); } } /** * 查询表中所有行 * @param tablename */ public void scanAllRecord(String tablename) { try { TableName name=TableName.valueOf(tablename); Table table = connection.getTable(name); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for(Result result : rs){ for (Cell cell : result.rawCells()) { System.out.print(new String(cell.getRowArray()) + " "); System.out.print(new String(cell.getFamilyArray()) + ":"); System.out.print(new String(cell.getQualifierArray()) + " "); System.out.print(cell.getTimestamp() + " "); System.out.println(new String(cell.getValueArray())); } } } catch (IOException e) { e.printStackTrace(); } } /** * 删除表操作 * @param tablename * @throws IOException */ public void deleteTable(String tablename) throws IOException { try { TableName name=TableName.valueOf(tablename); Admin admin = connection.getAdmin(); if(admin.tableExists(name)){ admin.disableTable(name); admin.deleteTable(name); } } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } } }oracle工具类:
public class OracleUtils { static{ try { Class.forName("oracle.jdbc.driver.OracleDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public static Connection getConnection() { int i=0; Connection conn=null; do{ try { conn=DriverManager.getConnection(); if(conn!=null){ break; } Thread.sleep(500); i++; } catch (sqlException e) { e.printStackTrace(); }catch(InterruptedException e){ e.printStackTrace(); } }while(conn==null&&i<5); return conn; } public static void closeConnection(Connection conn){ try { conn.close(); } catch (sqlException e) { e.printStackTrace(); } } public static void closeResultSet(ResultSet rs){ try { rs.close(); } catch (sqlException e) { e.printStackTrace(); } } public static void closePrepareStatement(PreparedStatement statement){ try { statement.close(); } catch (sqlException e) { e.printStackTrace(); } } public static List<String> getTableColumns(Connection conn,String tableName){ ResultSet resultSet=null; try { resultSet=conn.getMetaData().getColumns(null,"%",tableName,"%"); List<String> columnNames=new ArrayList<String>(); while(resultSet.next()){ String columnName=resultSet.getString("COLUMN_NAME"); columnNames.add(columnName); } return columnNames; }catch (sqlException e) { e.printStackTrace(); }finally{ closeResultSet(resultSet); } return null; } public static List<BaseEntity> convertResultSetToEntity(ResultSet rs,List<String> columnNames,String tableName){ Class cla=EntityUtils.getEntityByOracleTableName(tableName); Field[] fields=cla.getDeclaredFields(); List<BaseEntity> result=new ArrayList<BaseEntity>(); Map<String,Field> fieldMap=new HashMap<String,Field>(); for(Field field:fields){ Column column=field.getAnnotation(Column.class); String oracleColumn=column.oracle_column(); if(oracleColumn.equals("")){ oracleColumn=field.getName(); } fieldMap.put(oracleColumn,field); } try { while(rs.next()){ BaseEntity obj=(BaseEntity)cla.newInstance(); Iterator<String> it=fieldMap.keySet().iterator(); while(it.hasNext()){ String fieldName=it.next(); Field field=fieldMap.get(fieldName); field.setAccessible(true); String type=field.getType().toString(); type=type.substring(type.lastIndexOf(".")+1); if(!type.equals("Date")){ if(rs.getObject(fieldName) instanceof java.math.BigDecimal){ if(type.equals("int")){ field.set(obj,((BigDecimal)rs.getObject(fieldName)).intValue()); }else if(type.equals("long")){ field.set(obj,((BigDecimal)rs.getObject(fieldName)).longValue()); }else if(type.equals("float")){ field.set(obj,((BigDecimal)rs.getObject(fieldName)).floatValue()); }else if(type.equals("double")){ field.set(obj,((BigDecimal)rs.getObject(fieldName)).doubleValue()); } }else{ field.set(obj,rs.getObject(fieldName)); } }else{ String fName=field.getName(); fName=fName.substring(0,1).toUpperCase()+fName.substring(1); fName="set"+fName; Method m = cla.getDeclaredMethod(fName,Date.class); m.setAccessible(true);//因为写成private 所以这里必须设置 if(rs.getObject(fieldName) instanceof oracle.sql.TIMESTAMP){ m.invoke(obj,new Date(((TIMESTAMP)rs.getObject(fieldName)).timestampValue().getTime())); }else{ m.invoke(obj,rs.getObject(fieldName)); } } } result.add(obj); } } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch(sqlException e){ e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (SecurityException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return result; } public static long getRecordCount(String table){ ConnectionPool pool=ConnectionPool.getInstance(); ConnectionEntity entity=null; boolean exception=false; int count=0; do{ try { entity = pool.getConnection(); Connection conn=entity.getConnection(); Statement statement=conn.createStatement(); ResultSet rs=statement.executeQuery("select count(*) from "+table); if(rs.next()){ return rs.getLong(1); } } catch (InterruptedException e) { exception=true; e.printStackTrace(); } catch (sqlException e) { exception=true; e.printStackTrace(); }finally{ try { pool.releaseConnection(entity.getId()); } catch (sqlException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count++; if(count>0)break; }while(exception=true); return 0L; } }
main程序:
public class MainEnterance { private static ExecutorService executeService=Executors.newCachedThreadPool(); public static void main(String[] args) throws Exception { // Enterance abc=new AbcEnterance(executeService); // abc.doWork(); long startDate=new Date().getTime(); Enterance creditReport=new CreditReportEnterance(executeService); creditReport.doWork(); // Enterance creditReport=new CreditReportEnteranceUseCountDownLatch(executeService,1000,20,10000); // creditReport.doWork(); ///executeService.shutdown(); long endDate=new Date().getTime(); System.out.println("一共耗时:"+(endDate-startDate)/1000+"s"); } }
后续改进:
1数据库字段类型没有全覆盖
3从hbase导数据到关系型数据库
5hbase rowkey设置策略