写了一个从oracle导数据到hbase的多线程java程序,只要创建一个实体加上注解基本就可以导数据了

前端之家收集整理的这篇文章主要介绍了写了一个从oracle导数据到hbase的多线程java程序,只要创建一个实体加上注解基本就可以导数据了前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

三个注解类:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
	String family() ;
	String column() default "";
	String oracle_column() default "";
}

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RowKey {

}

@Target({ElementType.TYPE}) 
@Retention(RetentionPolicy.RUNTIME)
public @interface Table {
	String oracle_name()default "";
	String hbase_name() default "";
}
实体类根据你的需求而创建,有两个基本实体类
public class BaseEntity {
	public boolean isEmpty(){
		return false;
	}
}
public class RetryEntity {
	private String oracleTableName;
	private String hbaseTableName;
	private int startIndex;
	private int endIndex;

	public String getOracleTableName() {
		return oracleTableName;
	}

	public void setOracleTableName(String oracleTableName) {
		this.oracleTableName = oracleTableName;
	}

	public String getHbaseTableName() {
		return hbaseTableName;
	}

	public void setHbaseTableName(String hbaseTableName) {
		this.hbaseTableName = hbaseTableName;
	}

	public int getStartIndex() {
		return startIndex;
	}

	public void setStartIndex(int startIndex) {
		this.startIndex = startIndex;
	}

	public int getEndIndex() {
		return endIndex;
	}

	public void setEndIndex(int endIndex) {
		this.endIndex = endIndex;
	}

	

}
一个简单的例子:
@Table(hbase_name = "CreditReport",oracle_name = "TT_CREDIT_REPORT")
public class CreditReport extends BaseEntity {
	@RowKey
	@Column(family = "CreditReport",column = "",oracle_column = "BIZGUID")
	private String bizguid;
	@Column(family = "CreditReport",oracle_column = "PATH")
	private String path;
	@Column(family = "CreditReport",oracle_column = "STATUS")
	private int status;
	@Column(family = "CreditReport",oracle_column = "ERRORMSG")
	private String errormsg;
	@Column(family = "CreditReport",oracle_column = "DOCID")
	private String docid;
	@Column(family = "CreditReport",oracle_column = "FILEID")
	private String fileid;
	@Column(family = "CreditReport",oracle_column = "LASTCHGDTS")
	private Date lastchgdts;
	@Column(family = "CreditReport",oracle_column = "CATEGORY")
	private String category;
	@Column(family = "CreditReport",oracle_column = "FILENAME")
	private String filename;
	@Column(family = "CreditReport",oracle_column = "FILEINDEX")
	private int fileindex;

	public String getBizguid() {
		return bizguid;
	}

	public void setBizguid(String bizguid) {
		this.bizguid = bizguid;
	}

	public String getPath() {
		return path;
	}

	public void setPath(String path) {
		this.path = path;
	}

	public int getStatus() {
		return status;
	}

	public void setStatus(int status) {
		this.status = status;
	}

	public String getErrormsg() {
		return errormsg;
	}

	public void setErrormsg(String errormsg) {
		this.errormsg = errormsg;
	}

	public String getDocid() {
		return docid;
	}

	public void setDocid(String docid) {
		this.docid = docid;
	}

	public String getFileid() {
		return fileid;
	}

	public void setFileid(String fileid) {
		this.fileid = fileid;
	}

	public Date getLastchgdts() {
		return lastchgdts;
	}

	public void setLastchgdts(Date lastchgdts) {
		this.lastchgdts = lastchgdts;
	}

	public String getCategory() {
		return category;
	}

	public void setCategory(String category) {
		this.category = category;
	}

	public String getFilename() {
		return filename;
	}

	public void setFilename(String filename) {
		this.filename = filename;
	}

	public int getFileindex() {
		return fileindex;
	}

	public void setFileindex(int fileindex) {
		this.fileindex = fileindex;
	}

}

public class EmptyEntity extends BaseEntity {

	@Override
	public boolean isEmpty() {
		return true;
	}
	
}
队列:
public class RecordQueue {
	private static BlockingQueue<BaseEntity> queue=null;
	
	public RecordQueue(){
		queue=new LinkedBlockingQueue<BaseEntity>(1000000);
	}
	
	public RecordQueue(int capacity){
		queue=new LinkedBlockingQueue<BaseEntity>(capacity);
	}
	
	public  BaseEntity getValue() throws InterruptedException{
		return queue.take();
	}
	
	public boolean isEmpty(){
		return queue.isEmpty();
	}
	
	public  void putValue(BaseEntity value) throws InterruptedException{
		queue.put(value);
	}
	
	public void putValues(List<BaseEntity> values) throws InterruptedException{
		for(BaseEntity obj:values){
			this.putValue(obj);
		}
	}
}
失败队列:
public class PartErrorRetryQueue {
	private static BlockingQueue<RetryEntity> queue=null;
	private AtomicBoolean retryThreadStarted=new AtomicBoolean(false);
	
	public PartErrorRetryQueue(){
		queue=new LinkedBlockingQueue<RetryEntity>(1000000);
	}
	
	public PartErrorRetryQueue(int capacity){
		queue=new LinkedBlockingQueue<RetryEntity>(capacity);
	}
	
	public boolean isEmpty(){
		return queue.isEmpty();
	}
	
	public  void putValue(RetryEntity value) throws InterruptedException{
		queue.put(value);
	}
	
	public void putValues(List<RetryEntity> values) throws InterruptedException{
		for(RetryEntity obj:values){
			this.putValue(obj);
		}
	}
	
	public void startRetryThread(){
		if(!retryThreadStarted.getAndSet(true)){
			new RetryThread().start();
		}
	}
	
	
	private class RetryThread extends Thread{
		
		@Override
		public void run() {
			while(true){
				try {
					int getConnTimes=0;
					ConnectionEntity connEntity=null;
					try {
						getConnTimes++;
						connEntity=ConnectionPool.getInstance().getConnection();
					} catch (sqlException e) {
						e.printStackTrace();
						while(connEntity==null){
							try {
								if(getConnTimes>10){
									Thread.currentThread().interrupt();
									throw new RuntimeException("get connection times exceeded");
								}
								getConnTimes++;
								connEntity=ConnectionPool.getInstance().getConnection();
							} catch (sqlException e1) {
								e1.printStackTrace();
							}
						}
					}
					
					RetryEntity entity=queue.take();
					
					Connection conn=connEntity.getConnection();
					List<String> columns=OracleUtils.getTableColumns(conn,entity.getOracleTableName());
					
					ResultSet rs=null;
					PreparedStatement statement=null;
					try{
						statement=conn.prepareStatement("select * from (select rownum rn,t.* from "+entity.getOracleTableName()+" t where rownum < ? )t where t.rn >= ?");
						statement.setInt(1,entity.getEndIndex() );
						statement.setInt(2,entity.getStartIndex());
						rs=statement.executeQuery();
						List<BaseEntity> entityList=OracleUtils.convertResultSetToEntity(rs,columns,entity.getOracleTableName());
						HBaseUtils.getInstance().batchInsertRecord(entity.getHbaseTableName(),entityList);
					} catch (Exception e) {
						queue.put(entity);
						e.printStackTrace();
					}finally{
						OracleUtils.closeResultSet(rs);
						OracleUtils.closePrepareStatement(statement);
					}
				
				} catch (InterruptedException e) {
					e.printStackTrace();
					throw new RuntimeException("get connection times exceeded");
					
				}
			}
		}
		
	}
}

hbase单表单线程处理:
public class HBaSEOutgoingWorkThread extends Thread {
	private RecordQueue queue;
	private int batchSize;
	private String tableName;
	private int totalCount;
	private BackendThread backendThread;
	public HBaSEOutgoingWorkThread(RecordQueue queue,int batchSize,String tableName,int totalCount){
		this.queue=queue;
		this.batchSize=batchSize;
		this.tableName=tableName;
		this.totalCount=totalCount;
		backendThread=new BackendThread();
		backendThread.start();
	}
	
	@Override
	public void run() {
		try {
			int currCount=0;
			while(true){
				int count=0;
				List<BaseEntity> entityList=new ArrayList<BaseEntity>();
				while(true){
					BaseEntity obj=queue.getValue();
					count++;
					if(!obj.isEmpty()){
						entityList.add(obj);
						currCount++;
					}
					if(count==batchSize){
						break;
					}
				}
				HBaseUtils.getInstance().batchInsertRecord(tableName,entityList);
				if(totalCount==currCount){
					HBaseUtils.closeConnection();
					ConnectionPool.getInstance().shutdown();
					backendThread.interrupt();
					break;
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	class BackendThread extends Thread{
		@Override
		public void run() {
			try {
				while(true){
					boolean firstEmpty=queue.isEmpty();
					Thread.sleep(100000);
					boolean secondEmpty=queue.isEmpty();
					
					if(firstEmpty&&secondEmpty){
						for(int i=0;i<batchSize;i++){
							try {
								queue.putValue(new EmptyEntity());
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				}
			} catch (InterruptedException e) {
				System.out.println("InterruptedException");
				e.printStackTrace();
			}
		}
	} 
}
hbase单表多现场,基于CountDownLatch:
public class HBaSEOutgoingWorkThreadUseCountDownLatch extends Thread {
	private RecordQueue queue;
	private int batchSize;
	private String tableName;
	private CountDownLatch latch;
	private int totalCount;
	private BackendThread backendThread;
	public HBaSEOutgoingWorkThreadUseCountDownLatch(RecordQueue queue,CountDownLatch latch,int totalCount){
		this.queue=queue;
		this.batchSize=batchSize;
		this.tableName=tableName;
		this.latch=latch;
		this.totalCount=totalCount;
		backendThread=new BackendThread();
		backendThread.start();
	}
	
	@Override
	public void run() {
		try {
			int currentCount=0;
			while(true){
				int count=0;
				List<BaseEntity> entityList=new ArrayList<BaseEntity>();
				while(true){
					BaseEntity obj=queue.getValue();
					count++;
					if(!obj.isEmpty()){
						entityList.add(obj);
						currentCount++;
					}
					if(count==batchSize){
						System.out.println(currentCount);
						break;
					}
				}
				HBaseUtils.getInstance().batchInsertRecord(tableName,entityList);
				if(totalCount==currentCount){
					System.out.println(currentCount);
					backendThread.interrupt();
					break;
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally{
			latch.countDown();
		}
	}
	
	class BackendThread extends Thread{
		@Override
		public void run() {
			try {
				while(true){
					boolean firstEmpty=queue.isEmpty();
					Thread.sleep(100000);	
					boolean secondEmpty=queue.isEmpty();
					
					if(firstEmpty&&secondEmpty){
						for(int i=0;i<batchSize;i++){
							try {
								queue.putValue(new EmptyEntity());
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	} 
}

Oracle拉取数据单线程:
public class OracleIncomeWorkThread extends Thread {
	private String tableName;
	private RecordQueue queue;
	private int batchSize;
	private int start;
	private int end;
	
	public OracleIncomeWorkThread(String tableName,RecordQueue queue,int start,int end){
		this.tableName=tableName;
		this.queue=queue;
		this.batchSize=batchSize;
		this.start=start;
		this.end=end;
	}
	
	@Override
	public void run() {
		if(start>end){
			return;
		}
		ConnectionPool pool=null;
		ConnectionEntity entity=null;
		try {
			pool=ConnectionPool.getInstance();
			entity=pool.getConnection();
			Connection conn=entity.getConnection();
			List<String> columns=OracleUtils.getTableColumns(conn,tableName);
			int count=0;
			do{
				ResultSet rs=null;
				PreparedStatement statement=null;
				try{
					if((start+batchSize*(count+1))>end){
						batchSize=end-(start+batchSize*count)+1;
					}
					statement=conn.prepareStatement("select * from (select rownum rn,t.* from "+tableName+" t where rownum < ? )t where t.rn >= ?");
					statement.setInt(1,start+batchSize);
					statement.setInt(2,start);
					rs=statement.executeQuery();
					List<BaseEntity> list=OracleUtils.convertResultSetToEntity(rs,tableName);
					queue.putValues(list);
					start+=batchSize;
				}catch(Exception e){
					e.printStackTrace();
				}finally{
					OracleUtils.closeResultSet(rs);
					OracleUtils.closePrepareStatement(statement);
				}
			}while((start+batchSize*count)<end);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (sqlException e) {
			e.printStackTrace();
		}finally{
			try {
				pool.releaseConnection(entity.getId());
			} catch (sqlException e) {
				e.printStackTrace();
			}
		}
	}
}

oracle拉取数据多线程:
public class OracleIncomeWorkThreadUseCountDownLatch extends Thread {
	private String tableName;
	private RecordQueue queue;
	private int batchSize;
	private int start;
	private int end;
	private CountDownLatch latch;
	
	public OracleIncomeWorkThreadUseCountDownLatch(String tableName,int end,CountDownLatch latch){
		this.tableName=tableName;
		this.queue=queue;
		this.batchSize=batchSize;
		this.start=start;
		this.end=end;
		this.latch=latch;
	}
	
	@Override
	public void run() {
		if(start>end){
			return;
		}
		ConnectionPool pool=null;
		ConnectionEntity entity=null;
		
		try {
			pool=ConnectionPool.getInstance();
			entity=pool.getConnection();
			Connection conn=entity.getConnection();
			List<String> columns=OracleUtils.getTableColumns(conn,tableName);
					queue.putValues(list);
					start+=batchSize;
				}catch(Exception e){
					e.printStackTrace();
				}finally{
					OracleUtils.closeResultSet(rs);
					OracleUtils.closePrepareStatement(statement);
				}
			}while((start+batchSize*count)<end);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (sqlException e) {
			e.printStackTrace();
		}finally{
			try {
				pool.releaseConnection(entity.getId());
			} catch (sqlException e) {
				e.printStackTrace();
			}
			latch.countDown();
		}
	}
}

一个入口程序例子:
public class CreditReportEnteranceUseCountDownLatch implements Enterance {
	private ExecutorService executeService;
	//每次从oracle查询数据的每页数量
	private int pageSize;
	//每一批的线程数
	private int threadCountBatch;
	//每线程梳理记录数量
	private int countPerThread;
	public CreditReportEnteranceUseCountDownLatch(ExecutorService executeService,int pageSize,int threadCountBatch,int countPerThread){
		this.executeService=executeService;
		this.pageSize=pageSize;
		this.threadCountBatch=threadCountBatch;
		this.countPerThread=countPerThread;
	}
	public void doWork() throws Exception {
		HBaseUtils.getInstance().deleteTable("CreditReport");
		HBaseUtils.getInstance().createTableFromEntity(CreditReport.class);
		long count=OracleUtils.getRecordCount("TT_CREDIT_REPORT");
		//总共需要线程数
		int threadCount=(int)Math.ceil(count*1.0/countPerThread);
		
		int start=1;
		//总共需要线程数小于设定的每批线程数
		if(threadCount<=threadCountBatch){
			CountDownLatch latch=new CountDownLatch(threadCount*2);
			for(int i=0;i<threadCount;i++){
				//每一对线程一个队列,这理也可以多个线程一个队列看具体效率
				RecordQueue queue=new RecordQueue(pageSize);
				int end=start+countPerThread;
				if(count<end){
					end=(int)count+1;
				}
				OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",queue,pageSize,start,end,latch);
				executeService.submit(oracleThread);
				HBaSEOutgoingWorkThreadUseCountDownLatch hbaseThread=new HBaSEOutgoingWorkThreadUseCountDownLatch(queue,pageSize/2,"CreditReport",latch,end-start);
				executeService.submit(hbaseThread);
				start+=countPerThread;
			}
			latch.await();
		}else{
			//线程批次
			int threadBatch=(int)Math.ceil(threadCount*1.0/threadCountBatch);
			start=1;
			if(threadCount%threadCountBatch==0){
				for(int j=0;j<threadBatch;j++){
					CountDownLatch latch=new CountDownLatch(threadCountBatch*2);
					for(int i=0;i<threadCountBatch;i++){
						RecordQueue queue=new RecordQueue(pageSize);
						int end=start+countPerThread;
						if(count<end){
							end=(int)count+1;
						}
						OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",latch);
						executeService.submit(oracleThread);
						HBaSEOutgoingWorkThreadUseCountDownLatch hbaseThread=new HBaSEOutgoingWorkThreadUseCountDownLatch(queue,end-start);
						executeService.submit(hbaseThread);
						start+=countPerThread;
					}
					latch.await();
				}
			}else{
				for(int j=0;j<threadBatch-1;j++){
					CountDownLatch latch=new CountDownLatch(threadCountBatch*2);
					for(int i=0;i<threadCountBatch;i++){
						RecordQueue queue=new RecordQueue(pageSize);
						int end=start+countPerThread;
						if(count<end){
							end=(int)count+1;
						}
						OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",end-start);
						executeService.submit(hbaseThread);
						start+=countPerThread;
					}
					latch.await();
				}
				
				int restCount=threadCount-(threadBatch-1)*threadCountBatch;
				CountDownLatch latch=new CountDownLatch(restCount*2);
				for(int i=0;i<restCount;i++){
					RecordQueue queue=new RecordQueue(pageSize);
					int end=start+countPerThread;
					if(count<end){
						end=(int)count+1;
					}
					OracleIncomeWorkThreadUseCountDownLatch oracleThread=new OracleIncomeWorkThreadUseCountDownLatch("TT_CREDIT_REPORT",latch);
					executeService.submit(oracleThread);
					HBaSEOutgoingWorkThreadUseCountDownLatch hbaseThread=new HBaSEOutgoingWorkThreadUseCountDownLatch(queue,end-start);
					executeService.submit(hbaseThread);
					start+=countPerThread;
				}
				latch.await();
			}
		}
		
		ConnectionPool.getInstance().shutdown();
		HBaseUtils.closeConnection();
	}

}

工具类:

oracle连接池:

public class ConnectionPool {
	private static  ConcurrentHashMap<String,ConnectionEntity> idelConnections=null;
	private static  ConcurrentHashMap<String,ConnectionEntity> activeConnections=null;
	private static int initSize;
	private static int maxSize;
	private static AtomicInteger idelSize=new AtomicInteger(0);
	private static AtomicInteger activeSize=new AtomicInteger(0);
	private static ConnectionPool instance=null;
	private static Lock lock= new ReentrantLock();
	private  Object object=new Object();
	private static boolean isShutdown=false;
	 
	private ConnectionPool(int initSize,int maxSize){
		this.initSize=initSize;
		this.maxSize=maxSize;
		idelConnections=new ConcurrentHashMap<String,ConnectionEntity>();
		activeConnections=new ConcurrentHashMap<String,ConnectionEntity>();
		initConnections();
		new DetectFailConnection().start();
	}
	
	public  ConnectionEntity getConnection() throws InterruptedException,sqlException{
		if(isShutdown){
			throw new RuntimeException("pool is shutdown.");
		}
	    lock.lock();
	    try{
			if(idelSize.get()>0){
				if(idelConnections.size()<=0){
					throw new RuntimeException("");
				}
				String key=idelConnections.entrySet().iterator().next().getKey();
				ConnectionEntity entity=idelConnections.entrySet().iterator().next().getValue();
				entity.setStatus(Connection_Status.active);
				idelConnections.remove(key);				
				idelSize.decrementAndGet();
				if(entity.getConnection().isClosed()){
					return getConnection();
				}
				activeConnections.put(key,entity);
				activeSize.incrementAndGet();
				return entity;
			}
	    }finally{
	    	lock.unlock();
	    }
	    
	    if(activeSize.get()>maxSize){
	    	throw new RuntimeException("活跃数量大于最大值");
	    }
	    if(activeSize.get()==maxSize){
	    	synchronized (object) {
	    		object.wait();
			}
	    	
	    }
	    
	    if(isShutdown){
	    	throw new RuntimeException("pool is shutdown.");
	    }
	    
	    Connection conn=OracleUtils.getConnection();
	    String id=UUID.randomUUID().toString();
	    ConnectionEntity entity=new ConnectionEntity();
	    entity.setId(id);
	    entity.setConnection(conn);
	    entity.setStatus(Connection_Status.active);
	    activeConnections.put(id,entity);
	    activeSize.incrementAndGet();
	    return entity;
	}
	
	
	
	public  void releaseConnection(String id) throws sqlException{
		if(isShutdown){
			throw new RuntimeException("pool is shutdown.");
		}
		if(idelSize.get()==maxSize){
			OracleUtils.closeConnection(activeConnections.remove(id).getConnection());
		}else{
			ConnectionEntity entity=activeConnections.remove(id);
			entity.setStatus(Connection_Status.idel);
			idelConnections.put(id,entity);
			idelSize.incrementAndGet();
			activeSize.decrementAndGet();
			synchronized (object) {
				object.notify();
			}
			
		}
	}
	
	private void initConnections(){
		for(int i=0;i<this.initSize;i++){
			ConnectionEntity entity=new ConnectionEntity();
			String id=UUID.randomUUID().toString();
			entity.setId(id);
			entity.setConnection(OracleUtils.getConnection());
			entity.setStatus(Connection_Status.idel);
			idelConnections.put(id,entity);
			idelSize.getAndAdd(1);
		}
	}
	
	public void shutdown(){
		isShutdown=true;
		synchronized (object) {
			object.notifyAll();
		}
		Iterator<String> idelIt=idelConnections.keySet().iterator();
		while(idelIt.hasNext()){
			String key=idelIt.next();
			ConnectionEntity entity=idelConnections.get(key);
			try {
				entity.getConnection().close();
			} catch (sqlException e) {
				e.printStackTrace();
			}
		}
		Iterator<String>activeIt=activeConnections.keySet().iterator();
		while(activeIt.hasNext()){
			String key=activeIt.next();
			ConnectionEntity entity=activeConnections.get(key);
			try {
				entity.getConnection().close();
			} catch (sqlException e) {
				e.printStackTrace();
			}
		}
		 initSize=0;
		 maxSize=0;
		 idelSize=new AtomicInteger(0);
		 activeSize=new AtomicInteger(0);
	}
	
	public int getIdelSize(){
		return this.idelSize.get();
	}
	
	public int getActiveSize(){
		return this.activeSize.get();
	}
	
	public static ConnectionPool getInstance(){
		if(isShutdown){
			throw new RuntimeException("pool is already shutdown.");
		}
		if(maxSize>0){
			return instance;
		}
		return getInstance(10,30);
	}
	public static ConnectionPool getInstance(int initSize,int maxSize){
		if(isShutdown){
			throw new RuntimeException("pool is already shutdown.");
		}
		if(initSize<0||maxSize<1){
			throw new RuntimeException("initSize必须不小于0,maxsize必须大于等于1");
		}
		if(initSize>maxSize){
			initSize=maxSize;
		}
		synchronized (ConnectionPool.class) {
			if(instance==null){
				instance=new ConnectionPool(initSize,maxSize);
			}
		}
		return instance;
	}
	
	public  static class ConnectionEntity{
		private String id;
		private Connection connection;
		private Connection_Status status;
		
		public Connection getConnection() {
			return connection;
		}
		public void setConnection(Connection connection) {
			this.connection = connection;
		}
		public Connection_Status getStatus() {
			return status;
		}
		public void setStatus(Connection_Status status) {
			this.status = status;
		}
		public String getId() {
			return id;
		}
		public void setId(String id) {
			this.id = id;
		}
		
		
		
	}
	
	private enum Connection_Status{
		idel,active,close
	}
	
	class DetectFailConnection extends Thread{
		@Override
		public void run() {
			Iterator<String> itIdel=idelConnections.keySet().iterator();
			while(itIdel.hasNext()){
				String key =itIdel.next();
				ConnectionEntity entity=idelConnections.get(key);
				try {
					if(entity.getConnection().isClosed()){
						idelConnections.remove(key);
						idelSize.decrementAndGet();
					}
				} catch (sqlException e) {
					e.printStackTrace();
				}
			}
			
			Iterator<String> itActive=activeConnections.keySet().iterator();
			while(itActive.hasNext()){
				String key=itActive.next();
				ConnectionEntity entity=activeConnections.get(key);
				try {
					if(entity.getConnection().isClosed()){
						activeConnections.remove(key);
						activeSize.decrementAndGet();
					}
				} catch (sqlException e) {
					e.printStackTrace();
				}
			}
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}


反射工具:
public class EntityUtils {
	private static final String basePackage="com.topcheer.transfer.entity";
	public static Class getEntityByOracleTableName(String tableName){
		List<Class> allClass = getClasses(basePackage);
		return filterOracleEntity(allClass,tableName);
	}
	
	/*public static void main(String[] args) {
		Class c=getEntityByOracleTableName("Test");
		System.out.println(c.getName().substring(c.getName().lastIndexOf(".")+1));
	}*/
	private static Class filterOracleEntity(List<Class> allClass,String tableName){
		for(Class c:allClass){
			if(c.isAnnotationPresent(Table.class)){
				Table ann=(Table) c.getAnnotation(Table.class);
				if(ann.oracle_name().equals(tableName)){
					return c;
				}else if(ann.oracle_name().equals("")&&c.getName().substring(c.getName().lastIndexOf(".")+1).equals(tableName)){
					return c;
				}
			}
		}
		throw new RuntimeException("table entity not found");
	}
	/** 
     * 从包package中获取所有的Class 
     * @param pack 
     * @return 
     */  
    private static List<Class> getClasses(String packageName){  
          
        //第一个class类的集合  
        List<Class> classes = new ArrayList<Class>();  
        //是否循环迭代  
        boolean recursive = true;  
        //获取包的名字 并进行替换  
        String packageDirName = packageName.replace('.','/');  
        //定义一个枚举的集合 并进行循环来处理这个目录下的things  
        Enumeration<URL> dirs;  
        try {  
            dirs = Thread.currentThread().getContextClassLoader().getResources(packageDirName);  
            //循环迭代下去  
            while (dirs.hasMoreElements()){  
                //获取下一个元素  
                URL url = dirs.nextElement();  
                //得到协议的名称  
                String protocol = url.getProtocol();  
                //如果是以文件的形式保存在服务器上  
                if ("file".equals(protocol)) {  
                    //获取包的物理路径  
                    String filePath = URLDecoder.decode(url.getFile(),"UTF-8");  
                    //以文件的方式扫描整个包下的文件添加到集合中  
                    findAndAddClassesInPackageByFile(packageName,filePath,recursive,classes);  
                }  
            }  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
         
        return classes;  
    }  
    
    /** 
     * 以文件的形式来获取包下的所有Class 
     * @param packageName 
     * @param packagePath 
     * @param recursive 
     * @param classes 
     */  
    private static void findAndAddClassesInPackageByFile(String packageName,String packagePath,final boolean recursive,List<Class> classes){  
        //获取此包的目录 建立一个File  
        File dir = new File(packagePath);  
        //如果不存在或者 也不是目录就直接返回  
        if (!dir.exists() || !dir.isDirectory()) {  
            return;  
        }  
        //如果存在 就获取包下的所有文件 包括目录  
        File[] dirfiles = dir.listFiles(new FileFilter() {  
        //自定义过滤规则 如果可以循环(包含子目录) 或则是以.class结尾的文件(编译好的java类文件)  
              public boolean accept(File file) {  
                return (recursive && file.isDirectory()) || (file.getName().endsWith(".class"));  
              }  
            });  
        //循环所有文件  
        for (File file : dirfiles) {  
            //如果是目录 则继续扫描  
            if (file.isDirectory()) {  
                findAndAddClassesInPackageByFile(packageName + "." + file.getName(),file.getAbsolutePath(),classes);  
            }  
            else {  
                //如果是java类文件 去掉后面的.class 只留下类名  
                String className = file.getName().substring(0,file.getName().length() - 6);  
                try {  
                    //添加到集合中去  
                    classes.add(Class.forName(packageName + '.' + className));  
                } catch (ClassNotFoundException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  

}

hbase工具类:
public class HBaseUtils {
	 private static Configuration conf = HBaseConfiguration.create();
	 private static ExecutorService poolx=Executors.newScheduledThreadPool(20);
	 private static Connection connection = null;
	 private static HBaseUtils instance=null;
	 
	 private HBaseUtils(){
		 if(connection==null){
			 try {
				connection=ConnectionFactory.createConnection(conf,poolx);
			} catch (IOException e) {
				e.printStackTrace();
			}
		 }
	 }
	 
	 public static void closeConnection(){
		 try {
			connection.close();
			poolx.shutdownNow();
		} catch (IOException e) {
			e.printStackTrace();
		}
	 }
	 
	 public static synchronized HBaseUtils getInstance(){
		 if(instance==null){
			 instance=new HBaseUtils();
		 }
		 return instance;
	 }
	  /**
     * 创建表
     * @throws IOException
     */
    public void createTable(String tableName,String[] columns) throws IOException {
        Admin admin = connection.getAdmin();
        TableName name=TableName.valueOf(tableName);
        if (admin.tableExists(name)) {
        	admin.disableTable(name);
        	admin.deleteTable(name);
        } else {
            HTableDescriptor desc =new HTableDescriptor();
            desc.setName(TableName.valueOf(tableName));
            for (String column : columns) {
                desc.addFamily(new HColumnDescriptor(column));
            }
            admin.createTable(desc);
        }
    }
    
    public void createTableFromEntity(Class entityClass){
    	com.topcheer.transfer.annotation.Table ann=(com.topcheer.transfer.annotation.Table)entityClass.getAnnotation(com.topcheer.transfer.annotation.Table.class);
    	String hbase_name=ann.hbase_name();
    	if(hbase_name.equals("")){
    		hbase_name=entityClass.getName();
    	}
    	Field[]fields=entityClass.getDeclaredFields();
    	Set<String> fimalySet=new HashSet<String>();
    	for(Field field:fields){
    		Column column=field.getAnnotation(Column.class);
    		fimalySet.add(column.family());
    	}
    	String [] familys=fimalySet.toArray(new String[]{});
    	try {
			createTable(hbase_name,familys);
		} catch (IOException e) {
			e.printStackTrace();
		}
    }
    
    
    /**
     * 插入一行记录
     * @param tablename
     *             表名
     * @param row
     *             行名称
     * @param columnFamily
     *             列族名
     * @param columns
     *             (列族名:column)组合成列名
     * @param values
     *             行与列确定的值
     */
    public void insertRecord(String tableName,String row,String columnFamily,String[] columns,String[] values) {
        try {
        	TableName name=TableName.valueOf(tableName);
            Table table = connection.getTable(name);
            Put put = new Put(Bytes.toBytes(row));
            for (int i = 0; i < columns.length; i++) {
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(String.valueOf(columns[i])),Bytes.toBytes(values[i]));
                table.put(put);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 將对象转化为Put
     * @param obj
     * @return
     * @throws IllegalAccessException 
     * @throws IllegalArgumentException 
     */
    private Put convertEntityToPut(BaseEntity obj) throws IllegalArgumentException,IllegalAccessException{
    	Field[]fields=obj.getClass().getDeclaredFields();
    	Field rowKeyField=null;
    	for(Field field:fields){
    		if(field.isAnnotationPresent(RowKey.class)){
    			rowKeyField=field;
    			rowKeyField.setAccessible(true);
    		}
    	}
    	if(rowKeyField==null){
    		throw new RuntimeException("rowKey注解不存在");
    	}
    	Put p = new Put(Bytes.toBytes(String.valueOf(rowKeyField.get(obj))));
    	for(Field field:fields){
    		if(field!=rowKeyField){
    			Column column=field.getAnnotation(Column.class);
    			String family=column.family();
    			String col=column.column();
    			if(col.equals("")){
    				col=field.getName();
    			}
    			field.setAccessible(true);
    			Object value=field.get(obj);
    			p.addColumn(Bytes.toBytes(family),Bytes.toBytes(col),Bytes.toBytes(String.valueOf(value)));
    		}
    	}
        
        return p;
    }
    
    /**
     * 批量插入
     * @param tableName
     * @param entityList
     */
    public void batchInsertRecord(String tableName,List<BaseEntity> entityList){
        	BufferedMutator table=null;
			try {
				table = connection.getBufferedMutator(TableName.valueOf(tableName));
				List<Mutation> mutations = new ArrayList<Mutation>();
	            for(BaseEntity entity:entityList){
	            	mutations.add(convertEntityToPut(entity));
	            }
	            table.mutate(mutations);
	            table.flush();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (IllegalArgumentException e) {
				e.printStackTrace();
			} catch (IllegalAccessException e) {
				e.printStackTrace();
			}
    }
    
    /**
     * 删除一行记录
     * @param tablename
     *             表名
     * @param rowkey
     *             行名
     * @throws IOException
     */
    public void deleteRow(String tablename,String rowkey) throws IOException {
    	TableName name=TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        List<Delete> list = new ArrayList<Delete>();
        Delete d1 = new Delete(rowkey.getBytes());
        list.add(d1);
        table.delete(list);
    }
    
    /**
     * 查找一行记录
     * @param tablename
     *             表名
     * @param rowkey
     *             行名
     */
    public static void selectRow(String tablename,String rowKey)
            throws IOException {
    	TableName name=TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Get g = new Get(rowKey.getBytes());
        Result rs = table.get(g);
        for (Cell cell : rs.rawCells()) {
            System.out.print(new String(cell.getRowArray()) + "  ");
            System.out.print(new String(cell.getFamilyArray()) + ":");
            System.out.print(new String(cell.getQualifierArray()) + "  ");
            System.out.print(cell.getTimestamp() + "  ");
            System.out.println(new String(cell.getValueArray()));
        }
    }
    
    /**
     * 查询表中所有行
     * @param tablename
     */
    public void scanAllRecord(String tablename) {
        try {
        	TableName name=TableName.valueOf(tablename);
            Table table = connection.getTable(name);
            Scan s = new Scan();
            ResultScanner rs = table.getScanner(s);
            for(Result result : rs){
	            for (Cell cell : result.rawCells()) {
	                System.out.print(new String(cell.getRowArray()) + "  ");
	                System.out.print(new String(cell.getFamilyArray()) + ":");
	                System.out.print(new String(cell.getQualifierArray()) + "  ");
	                System.out.print(cell.getTimestamp() + "  ");
	                System.out.println(new String(cell.getValueArray()));
	            }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 删除表操作
     * @param tablename
     * @throws IOException
     */
    public void deleteTable(String tablename) throws IOException {
        try {
        	TableName name=TableName.valueOf(tablename);
            Admin admin = connection.getAdmin();
            if(admin.tableExists(name)){
            	admin.disableTable(name);
            	admin.deleteTable(name);
            }
        } catch (MasterNotRunningException e) {
            e.printStackTrace();
        } catch (ZooKeeperConnectionException e) {
            e.printStackTrace();
        }
    }
}
oracle工具类:
public class OracleUtils {
	static{
		try {
			Class.forName("oracle.jdbc.driver.OracleDriver");
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}
	public static Connection getConnection()  {
		
		int i=0;
		Connection conn=null;
		do{
			try {
				conn=DriverManager.getConnection();
				if(conn!=null){
					break;
				}
				Thread.sleep(500);
				i++;
			} catch (sqlException e) {
				e.printStackTrace();
			}catch(InterruptedException e){
				e.printStackTrace();
			}
		}while(conn==null&&i<5);
		return conn;
	}
	
	public static void closeConnection(Connection conn){
		try {
			conn.close();
		} catch (sqlException e) {
			e.printStackTrace();
		}
	}
	
	public static void closeResultSet(ResultSet rs){
		try {
			rs.close();
		} catch (sqlException e) {
			e.printStackTrace();
		}
	}
	
	public static void closePrepareStatement(PreparedStatement statement){
		try {
			statement.close();
		} catch (sqlException e) {
			e.printStackTrace();
		}
	}
	
	
	public static List<String> getTableColumns(Connection conn,String tableName){
		ResultSet resultSet=null;
		try {
			resultSet=conn.getMetaData().getColumns(null,"%",tableName,"%");
			List<String> columnNames=new ArrayList<String>();
			while(resultSet.next()){
				String columnName=resultSet.getString("COLUMN_NAME");
				columnNames.add(columnName);
			}
			return columnNames;
		}catch (sqlException e) {
			e.printStackTrace();
		}finally{
			closeResultSet(resultSet);
		}
		return null;
	}
	
	public static  List<BaseEntity> convertResultSetToEntity(ResultSet rs,List<String> columnNames,String tableName){
		Class cla=EntityUtils.getEntityByOracleTableName(tableName);
		Field[] fields=cla.getDeclaredFields();
		List<BaseEntity> result=new ArrayList<BaseEntity>();
		Map<String,Field> fieldMap=new HashMap<String,Field>();
		
		for(Field field:fields){
			Column column=field.getAnnotation(Column.class);
			String oracleColumn=column.oracle_column();
			if(oracleColumn.equals("")){
				oracleColumn=field.getName();
			}
			fieldMap.put(oracleColumn,field);
		}
		
		try {
			while(rs.next()){
				BaseEntity obj=(BaseEntity)cla.newInstance();
				Iterator<String> it=fieldMap.keySet().iterator();
				while(it.hasNext()){
					String fieldName=it.next();
					Field field=fieldMap.get(fieldName);
					field.setAccessible(true);
					String type=field.getType().toString();
					type=type.substring(type.lastIndexOf(".")+1);
					if(!type.equals("Date")){
						if(rs.getObject(fieldName) instanceof java.math.BigDecimal){
							if(type.equals("int")){
								field.set(obj,((BigDecimal)rs.getObject(fieldName)).intValue());
							}else if(type.equals("long")){
								field.set(obj,((BigDecimal)rs.getObject(fieldName)).longValue());
							}else if(type.equals("float")){
								field.set(obj,((BigDecimal)rs.getObject(fieldName)).floatValue());
							}else  if(type.equals("double")){
								field.set(obj,((BigDecimal)rs.getObject(fieldName)).doubleValue());
							}
						}else{
							field.set(obj,rs.getObject(fieldName));
						}
					}else{
						String fName=field.getName();
						fName=fName.substring(0,1).toUpperCase()+fName.substring(1);
						fName="set"+fName;
						Method m = cla.getDeclaredMethod(fName,Date.class);
						m.setAccessible(true);//因为写成private 所以这里必须设置
						if(rs.getObject(fieldName) instanceof oracle.sql.TIMESTAMP){
							m.invoke(obj,new Date(((TIMESTAMP)rs.getObject(fieldName)).timestampValue().getTime()));
						}else{
							m.invoke(obj,rs.getObject(fieldName));
						}
						
					}
				}
				result.add(obj);
			}
		} catch (InstantiationException e) {
			e.printStackTrace();
		} catch (IllegalAccessException e) {
			e.printStackTrace();
		} catch(sqlException e){
			e.printStackTrace();
		} catch (NoSuchMethodException e) {
			e.printStackTrace();
		} catch (SecurityException e) {
			e.printStackTrace();
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (InvocationTargetException e) {
			e.printStackTrace();
		}
		return result;
	}
	
	public static long getRecordCount(String table){
		ConnectionPool pool=ConnectionPool.getInstance();
		ConnectionEntity entity=null;
		boolean exception=false;
		int count=0;
		do{
			try {
				entity = pool.getConnection();
				Connection conn=entity.getConnection();
				Statement statement=conn.createStatement();
				ResultSet rs=statement.executeQuery("select count(*) from "+table);
				if(rs.next()){
					return rs.getLong(1);
				}
			} catch (InterruptedException e) {
				exception=true;
				e.printStackTrace();
			} catch (sqlException e) {
				exception=true;
				e.printStackTrace();
			}finally{
				try {
					pool.releaseConnection(entity.getId());
				} catch (sqlException e) {
					e.printStackTrace();
				}
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			count++;
			if(count>0)break;
		}while(exception=true);
		return 0L;
		
	}
}

main程序:
public class MainEnterance {
	private static ExecutorService executeService=Executors.newCachedThreadPool();
	public static void main(String[] args) throws Exception {
//		Enterance abc=new AbcEnterance(executeService);
//		abc.doWork();
		long startDate=new Date().getTime();
		Enterance creditReport=new CreditReportEnterance(executeService);
		creditReport.doWork();
		
//		Enterance creditReport=new CreditReportEnteranceUseCountDownLatch(executeService,1000,20,10000);
//		creditReport.doWork();
		
		///executeService.shutdown();
		
		long endDate=new Date().getTime();
		
		System.out.println("一共耗时:"+(endDate-startDate)/1000+"s");
	}

}



后续改进:

1数据库字段类型没有全覆盖

2支持其他关系型数据库

3从hbase导数据到关系型数据库

4错误恢复代码还没加上

5hbase rowkey设置策略

原文链接:https://www.f2er.com/oracle/210723.html

猜你在找的Oracle相关文章