一.Azkaban和Oozie的工作流程
Azkaban工作流程
Azkaban将需要操作的信息打包成zip文件发送给Server端,Server对用户的信息进行存储。用户在Web UI 或者通过HTTP Client发送操作请求后,Server会根据用户定义的*.job文件(KV 匹配),执行zip包中的Jar文件。
源码的执行过程:
- 1.从Web页面提交工作流程:
Method.GET
/executor?projectId=33&project=testSpark&ajax=executeFlow&flow=test1&disabled=%5B%5D&failureEmailsOverride=false&successEmailsOverride=false&failureAction=finishCurrent&failureEmails=&successEmails=¬ifyFailureFirst=false¬ifyFailureLast=false&concurrentOption=ignore
用户提交任务后,发送任务的详情到服务器中,Azkaban客户端会对任务以及用户的信息进行校验,封装后首先将执行的信息(任务,时间,用户等)存入数据库中(表active_executing_flows),之后执行dispatch方法,对需要执行的任务流进行调度。
在dispatch方法中,首先会更新executions_flows表,然后将操作的语句发送到指定的ip和端口进行执行。
- 2.服务器接收到了请求:如果是执行操作那么接收到的action的type为execute。接着服务器会从数据库中获取相应的工作流flow,服务器将flow封装成FlowRunner。
FlowRunner的属性
ExecutorService
线程池对象
ExecId
从数据库中获取相应的flow
numJobThreads
默认10个线程
JobTypeManager
定义Job的插件,有以下几种插件
Set<JobRunner>
将有向无环图中的node抽象成一个JobRunner进行运行
其中任务的执行是使用一个递归操作runReadyJob(),循环操作其中的node,也就是每个JobRunner。
JobRunner的主要属性:
Job
执行任务的父类接口。
JobtypeManager
根据输入的type类型返回此节点需要执行的任务类型
JobId
唯一标识符
配置文件,Job的路径,监控FlowWatch…..
其中会根据需要操作的Flow来定义Job的type。返回相应的类型。例如MR 返回的是JavaProcessJob。
也就是说:每一个节点,是通过新建一个进程去运行。在这个进程中会执行多条command,通过process.run(),运行用户定义的job。
PS.每条command都需要重新建立一个process。
Oozie工作流程
在Oozie中,用户需要准备以下文件:
Job.properties
Job文件存储HDFS,ResourceManager的配置
Workflow.xml
配置每个节点之间的依赖关系
Lib
存放着指定运行jar的关联包
.jar
运行的jar包
用户需要将这些文件放置在一个文件夹下,然后上传至HDFS中。在客户端或者终端中发送请求去执行。
源码执行流程:
使用控制行操作:
- 1.首先调用:org.apache.oozie.cli.OozieCLI。首先根据不同的command类型调用不同的发送请求,例如使用MRCommand
在这个方法中会生成一个Client去Submit指定的Properties(根据Client和Command生成)。提交的对象是HTTPJobSubmit。调用该对象中的call方法和Server进行通信。最终返回一个jobId。METHOD.POST
- 2.服务器端:首先调用相应的Servlet,调用提交作业方法,生成一个DAG图(DAGEngine,然后所有的操作都是基于DAG来实现的)。
A.如果我们在提交一个作业时生成了jobType那么,此时会选定不同的提交类型(类似于一个工厂模式),返回指定的信息。
B.首先它会调用SubmitXCommand.call()方法,将job的信息加入数据库中并且返回一个jobId。
C.之后执行start(jobId)的方法,调用Xcommand.call()方法,生意Instrument对任务进行监控,在这个方法中会调用一个SignalXCommand.execute()方法。
在Oozie的后端中会维护一个异步队列,在上述的execute中会根据job中的每一个action的类型,去生成相应的Command加入异步队列中。类型如下:
skipAction
SignXcommand
startAction
ActionStartXCommand
ForkAction
ActionStartXCommand 和上面的jobType不同
类似还有killActionXCommand,workflowNotifyActionXCommand等
PS如果是MR 或者 Spark 映射ActionStartXCommand 类型。
D.在后端异步队列CallableQueueService中。(在这个方法中使用Instrument对Java进行进行监控)。会调用这些XCommand的execute方法,不同的类型会实例化不同的executor,例如MR 和 Spark都会实例化JavaActionExecutor(同时还有SubWorkflowActionExecutor执行提交任务)。
E.在上述对象的execute方法中会根据配置生成JobClient,来获取正在运行的Running Job的信息以及提交Job SubmitJob,返回一个jobId。如果获取正在运行的runningJob在这个对象中还有job.trackerUrl也就是任务的日志。可以供以后展示。
测试用例提交流程:
看测试用例提交Hadoop作业中,首先对连接进行验证,然后每次提交会生成一个JobClient,该Oozie作为一个Client给Hadoop服务器发送操作job的请求。
其中操作Hive hadoop spark 作业均是JavaActionExecutor,该执行器中会调用submitLauncher提交Hadoop作业。
小结
Azkaban的工作流运行是依靠操作进程来提交不同的命令的,它操作任务成功和失败的信息在于进程的相应,但是这并不能有效的管理任务的成功与失败。
Oozie 执行MR 任务是依靠Hadoop的Jar包,以Server作为Client发送请求至集群进行操作。在此之前需要将任务所依赖执行的jar包上传至HDFS中才可执行。
通过了解Oozie和Azkaban的执行过程,Oozie可以作为底层的流程引擎比较合适,因为通过JobClient可以有效的监控正在执行的任务,获取任务的信息,使用Azkaban能获取进程执行的详情,但是可以通过web界面更直观的进行操作。