转自:http://blog.csdn.net/post_yuan/article/details/77856320
Embulk提供很多plugins,包括Input plugins、Output plugins、File parser plugins等,详细可以参考http://www.embulk.org/plugins/。embulk不仅可实现数据库到数据库的抽取,也可以实现csv文件/csv.gz文件到数据库的抽取。(相关yml文件的配置可以参考官网)
用户也可以开发自己的plugins并上传,如EsgynDB就基于trafodion自己开发一款基于trafodion的upsert using load实现的批量加载插件,如下图,
图片描述" title="" style="border:0px;vertical-align:middle;">
本文在此利用上述的trafodion output插件及oracle input插件通过实例描述如何使用embulk实现oracle到trafodion的数据加载。
1 下载安装Embulk
wgethttps://dl.embulk.org/embulk-latest.jarmkdir~/.embulk/bin mvembulk-latest.jar~/.embulk/bin/embulk chmod+x~/.embulk/bin/embulk echo'exportPATH="$HOME/.embulk/bin:$PATH"'>>~/.bashrc source~/.bashrc123456
2 验证Embulk下载安装成功
[root@n12~]#embulkgemlist 2017-09-0520:19:51.184+0800:Embulkv0.8.31 ***LOCALGEMS*** did_you_mean(default:1.0.1) jar-dependencies(default:0.3.5) jruby-openssl(0.9.17java) json(1.8.3java) minitest(default:5.4.1) net-telnet(default:0.1.1) power_assert(default:0.2.3) psych(2.0.17java) racc(1.4.14java) rake(default:10.4.2) rdoc(default:4.2.0) test-unit(default:3.1.1)
3 安装oracle input插件及trafodion output插件
embulkgeminstallembulk-input-oracle embulkgeminstallembulk-output-trafodion12
4 验证以上插件安装成功
root@n12~]#embulkgemlist 2017-09-0520:22:57.146+0800:Embulkv0.8.31 ***LOCALGEMS*** did_you_mean(default:1.0.1) embulk-input-oracle(0.8.5) embulk-output-trafodion(0.1.1) jar-dependencies(default:0.3.5) jruby-openssl(0.9.17java) json(1.8.3java) minitest(default:5.4.1) net-telnet(default:0.1.1) power_assert(default:0.2.3) psych(2.0.17java) racc(1.4.14java) rake(default:10.4.2) rdoc(default:4.2.0) test-unit(default:3.1.1)
5 准备oracle jar包,用于读取oracle数据库
[root@n12~]#ll/opt/drivers/ total2676 -rw-rw-r--1trafodiontrafodion2739670Sep520:38ojdbc6.jar123
6 编辑YAML文件oracle_to_trafodion.yml
注:Embulk使用YAML文件来定义数据批量加载的方式,YAML文件格式定义可参考http://www.embulk.org/docs/built-in.html#embulk-configuration-file-format
exec: max_threads:8 min_output_tasks:4in:type:oracle driver_path:/opt/drivers/ojdbc6.jar url:"jdbc:oracle:thin:@10.10.11.16:1521:esgyn" user:system password:system12345 query:|select*fromtest_tbl out:type:trafodion url:"jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase" user:trafodion password:traf123 default_timezone:'Asia/Shanghai' table:TEST_TBL mode:insert_direct12345678910111213141516171819
7 创建Oracle测试表并插入测试数据
sql>createtabletest_tbl(achar(10),bchar(10)); sql>insertintotest_tblvalues('a','b'); sql>insertintotest_tblvalues('c','d'); sql>insertintotest_tblvalues('e','f'); sql>select*fromtest_tbl; AB -------------------- ab cd ef
8 创建Trafodion目标表
>>createtabletest_tbl(achar(10),bchar(10)); ---sqloperationcomplete. >>select*fromtest_tbl; ---0row(s)selected.123456
9 运行Embulk任务
--预览embulkpreview~/oracle_to_trafodion.yml [root@n12~]#embulkpreview~/oracle_to_trafodion.yml 2017-09-0520:41:24.093+0800:Embulkv0.8.31 2017-09-0520:41:27.303+0800[INFO](0001:preview):Loadedpluginembulk-input-oracle(0.8.5) 2017-09-0520:41:27.662+0800[INFO](0001:preview):UsingJDBCDriver11.2.0.4.0 2017-09-0520:41:27.977+0800[INFO](0001:preview):sql:select*fromtest_tbl2017-09-0520:41:27.999+0800[INFO](0001:preview):>0.02seconds +------------+------------+ |A:string|B:string| +------------+------------+ |a|b| |c|d| |e|f| +------------+------------+ --运行 embulkrun~/oracle_to_trafodion.yml [root@n12~]#embulkrun~/oracle_to_trafodion.yml 2017-09-0521:01:26.419+0800:Embulkv0.8.31 2017-09-0521:01:30.260+0800[INFO](0001:transaction):Loadedpluginembulk-input-oracle(0.8.5) 2017-09-0521:01:30.298+0800[INFO](0001:transaction):Loadedpluginembulk-output-trafodion(0.1.1) 2017-09-0521:01:30.807+0800[INFO](0001:transaction):UsingJDBCDriver11.2.0.4.0 2017-09-0521:01:30.995+0800[INFO](0001:transaction):Usinglocalthreadexecutorwithmax_threads=8/outputtasks4=inputtasks1*4 -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-0521:01:31.019+0800[INFO](0001:transaction):Connectingtojdbc:t4jdbc://192.168.1.93:23400/:schema=seabaSEOptions{user=trafodion,tcpKeepAlive=true,useCompression=true,rewriteBatchedStatements=true,connectTimeout=300000,socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@1cc41b77 2017-09-0521:01:36.271+0800[INFO](0001:transaction):Usinginsert_directmode [WARN]Pluginusesdeprecatedconstructoroforg.embulk.spi.time.TimestampFormatter. [WARN]Reportpluginsinyourconfigat: 2017-09-0521:01:40.345+0800[INFO](0001:transaction):{done:0/1,running:0} -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-0521:01:40.389+0800[INFO](0026:task-0000):Connectingtojdbc:t4jdbc://192.168.1.93:23400/:schema=seabaSEOptions{user=trafodion,socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@4ee9df382017-09-0521:01:40.970+0800[INFO](0026:task-0000):Preparedsql:UPSERTUSINGLOADINTO"TEST_TBL"("A","B")VALUES(?,?) -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-0521:01:47.001+0800[INFO](0026:task-0000):Connectingtojdbc:t4jdbc://192.168.1.93:23400/:schema=seabaSEOptions{user=trafodion,socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@785aae0e2017-09-0521:01:52.935+0800[INFO](0026:task-0000):Preparedsql:UPSERTUSINGLOADINTO"TEST_TBL"("A",?) -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-0521:02:01.036+0800[INFO](0026:task-0000):Connectingtojdbc:t4jdbc://192.168.1.93:23400/:schema=seabaSEOptions{user=trafodion,socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@38f4c2382017-09-0521:02:03.872+0800[INFO](0026:task-0000):Preparedsql:UPSERTUSINGLOADINTO"TEST_TBL"("A",?) -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-0521:02:10.364+0800[INFO](0026:task-0000):Connectingtojdbc:t4jdbc://192.168.1.93:23400/:schema=seabaSEOptions{user=trafodion,socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@6c47ecb4 2017-09-0521:02:17.565+0800[INFO](0026:task-0000):Preparedsql:UPSERTUSINGLOADINTO"TEST_TBL"("A",?) 2017-09-0521:02:36.756+0800[INFO](0026:task-0000):sql:select*fromtest_tbl 2017-09-0521:02:36.777+0800[INFO](0026:task-0000):>0.02seconds2017-09-0521:02:36.783+0800[INFO](0026:task-0000):Loading3rows 2017-09-0521:02:36.821+0800[INFO](0026:task-0000):>0.04seconds(loaded3rowsintotal) 2017-09-0521:02:37.653+0800[INFO](0001:transaction):{done:1/1,running:0} 2017-09-0521:02:37.665+0800[INFO](main):Committed. 2017-09-0521:02:37.665+0800[INFO](main):Nextconfigdiff:{"in":{},"out":{}}
10 检查Embulk任务执行成功
sql>select*fromtest_tbl; AB -------------------- ab cd ef ---3row(s)selected.
至此,使用Embulk从Oracle抽取数据到Trafodion演示完毕!