1、Web端管理工作流
Azkaban提供了易操作的Web管理界面,具体操作可参考:http://azkaban.github.io/azkaban/docs/2.5/#ajax-api
值得注意的是,可以使用Azkaban提供的Web界面,定义或覆盖工作流的具体执行参数,操作界面如下:
2.1、登录认证
命令:curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://localhost:8443
返回json格式:
<span style="white-space:pre"> </span>{ "status" : "success","session.id" : "c001aba5-a90f-4daf-8f11-62330d034c0a" }@H_301_14@2.2、创建一个Project
命令:curl -k -X POST --data "session.id=9089beb2-576d-47e3-b040-86dbdc7f523e&name=aaaa&description=11" https://localhost:8443/manager?action=create
返回json格式:<span style="white-space:pre"> </span>{ "status":"success","path":"manager?project=aaaa","action":"redirect" } @H_301_14@2.3、上传一个Project的zip文件
命令:curl -k -i -H "Content-Type: multipart/mixed" -X POST --form 'session.id=e7a29776-5783-49d7-afa0-b0e688096b5e' --form 'ajax=upload' --form 'file=@myproject.zip;type=application/zip' --form 'project=MyProject' https://localhost:8443/manager
注意:在zip文件的目录下执行该命令,否则无法找到zip文件
返回json格式:<span style="white-space:pre"> </span>{ "error" : "Installation Failed.\nError unzipping file.","projectId" : "192","version" : "1" }@H_301_14@2.4、执行工作流
命令:curl -k --get --data 'session.id=189b956b-f39f-421e-9a95-e3117e7543c9' --data 'ajax=executeFlow' --data 'project=azkaban-test-project' --data 'flow=test' https://localhost:8443/executor
返回json格式:<span style="white-space:pre"> </span>{ message: "Execution submitted successfully with exec id 295",project: "foo-demo",flow: "test",execid: 295 }@H_301_14@3、使用Ajax API操作工作流
在程序中执行工作流的话,则需要使用Azkaban提供的ajax api了,以下是使用Java模拟https请求的代码:
AzkabanHttpsPost类package hadoop.azkaban; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.util.Properties; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import net.sf.json.JSONObject; public class AzkabanHttpsPost { static String keystorePassword; static String keystore; static String truststore; static{ InputStream is=Thread.currentThread().getContextClassLoader(). getResourceAsStream("azkaban.properties"); Properties p=new Properties(); try{ p.load(is); keystorePassword = p.getProperty("PASSWORD"); keystore = p.getProperty("KEYSTORE"); truststore = p.getProperty("TRUSTSTORE"); }catch(Exception e){ e.printStackTrace(); } } /** * 获得KeyStore. * * @param storePath * 密钥库路径 * @param password * 密码 * @return 密钥库 * @throws Exception */ public static KeyStore getKeyStore(String password,String storePath) throws Exception { // 实例化密钥库 KeyStore ks = KeyStore.getInstance("JKS"); // 获得密钥库文件流 FileInputStream is = new FileInputStream(storePath); // 加载密钥库 ks.load(is,password.tocharArray()); // 关闭密钥库文件流 is.close(); return ks; } /** * 获得SSLSocketFactory. * * @param password * 密码 * @param keyStorePath * 密钥库路径 * @param trustStorePath * 信任库路径 * @return SSLSocketFactory * @throws Exception */ public static SSLContext getSSLContext() throws Exception { // 实例化密钥库 KeyManagerFactory keyManagerFactory = KeyManagerFactory .getInstance(KeyManagerFactory.getDefaultAlgorithm()); // 获得密钥库 KeyStore keyStore = getKeyStore(AzkabanHttpsPost.keystorePassword,AzkabanHttpsPost.keystore); // 初始化密钥工厂 keyManagerFactory.init(keyStore,AzkabanHttpsPost.keystorePassword.tocharArray()); // 实例化信任库 TrustManagerFactory trustManagerFactory = TrustManagerFactory .getInstance(TrustManagerFactory.getDefaultAlgorithm()); // 获得信任库 KeyStore trustStore = getKeyStore(AzkabanHttpsPost.keystorePassword,AzkabanHttpsPost.truststore); // 初始化信任库 trustManagerFactory.init(trustStore); // 实例化SSL上下文 SSLContext ctx = SSLContext.getInstance("TLS"); // 初始化SSL上下文 ctx.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(),null); // 获得SSLSocketFactory return ctx; } /** * 初始化HttpsURLConnection. * * @param password * 密码 * @param keyStorePath * 密钥库路径 * @param trustStorePath * 信任库路径 * @throws Exception */ public static void initHttpsURLConnection() throws Exception { // 声明SSL上下文 SSLContext sslContext = null; // 实例化主机名验证接口 HostnameVerifier hnv = new MyHostnameVerifier(); try { sslContext = getSSLContext(); } catch (GeneralSecurityException e) { e.printStackTrace(); } if (sslContext != null) { HttpsURLConnection.setDefaultSSLSocketFactory(sslContext .getSocketFactory()); } HttpsURLConnection.setDefaultHostnameVerifier(hnv); } /** * 发送请求. * * @param httpsUrl * 请求的地址,如https://localhost:8043 * @param xmlStr * 请求的数据,如action=login&username=azkaban&password=azkaban * @throws Exception */ public static JSONObject post(String url,String xmlStr) throws Exception { initHttpsURLConnection(); JSONObject jsonObj = null; HttpsURLConnection urlCon = null; try { urlCon = (HttpsURLConnection) (new URL(url)).openConnection(); urlCon.setDoInput(true); urlCon.setDoOutput(true); urlCon.setRequestMethod("POST"); // 如下设置后,azkaban才能识别出是以ajax的方式访问,从而返回json格式的操作信息 urlCon.setRequestProperty("Content-Type","application/x-www-form-urlencoded"); urlCon.setRequestProperty("X-Requested-With","XMLHttpRequest"); urlCon.setUseCaches(true); // 设置为gbk可以解决服务器接收时读取的数据中文乱码问题 urlCon.getOutputStream().write(xmlStr.getBytes("gbk")); urlCon.getOutputStream().flush(); urlCon.getOutputStream().close(); BufferedReader in = new BufferedReader(new InputStreamReader( urlCon.getInputStream())); String line=""; String temp; while ((temp = in.readLine()) != null) { line = line + temp; } jsonObj = JSONObject.fromObject(line); } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return jsonObj; }@H_301_14@MyHostnameVerifier类
package hadoop.azkaban; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLSession; /** * 实现用于主机名验证的基接口。 * 在握手期间,如果 URL 的主机名和服务器的标识主机名不匹配,则验证机制可以回调此接口的实现程序来确定是否应该允许此连接。 */ public class MyHostnameVerifier implements HostnameVerifier { @Override public boolean verify(String hostname,SSLSession session) { if("localhost".equals(hostname)){ return true; } else { return false; } } } @H_301_14@以下是调用https,操作Azkaban的示例:
package hadoop.azkaban; import java.io.InputStream; import java.util.Properties; import net.sf.json.JSONObject; /** * * @author hu * */ public class AzkabanOperator { public static String url; public static String azkabanUser; public static String azkabanPassword; public static String GDI_Project; public static String GDI_Workflow; static { InputStream is = Thread.currentThread().getContextClassLoader() .getResourceAsStream("azkaban.properties"); Properties p = new Properties(); try { p.load(is); url = p.getProperty("URL"); azkabanUser = p.getProperty("AZKABANUSER"); azkabanPassword = p.getProperty("AZKABANPASSWORD"); GDI_Project = p.getProperty("GDI_Project"); GDI_Workflow = p.getProperty("GDI_Workflow"); } catch (Exception e) { e.printStackTrace(); } } public JSONObject login() throws Exception { JSONObject result = null; String queryStr = "action=login&username=" + azkabanUser + "&password=" + azkabanPassword; result = AzkabanHttpsPost.post(url,queryStr); return result; } public JSONObject executeGDIFlow(String sessionID,String project,String flow,String cwParams,String smParams,String gdiParams) throws Exception { JSONObject result = null; String executeStr = "session.id=" + sessionID + "&ajax=executeFlow&project=" + project + "&flow=" + flow + "&flowOverride[cw_params]=" + cwParams + "&flowOverride[sm_params]=" + smParams + "&flowOverride[gdi_params]=" + gdiParams; String executeUrl = url + "/executor"; result = AzkabanHttpsPost.post(executeUrl,executeStr); return result; } public JSONObject fetchFlow(String sessionID,String execID) throws Exception { JSONObject result = null; String executeStr = "session.id=" + sessionID + "&ajax=fetchexecflow&execid=" + execID; String executeUrl = url + "/executor"; result = AzkabanHttpsPost.post(executeUrl,executeStr); return result; }@H_301_14@