我们正在尝试运行分布在docker swarm集群上的简单管道. luigi worker被部署为复制的docker服务.他们成功启动,在向luigi-server请求工作几秒钟之后,由于没有为他们分配工作而他们开始死亡,所有任务最终都分配给一个工作人员.
我们不得不在我们工人的luigi.cfg中设置keep_alive = True来强迫他们不要死,但是在管道完成后让工人保持身边似乎是一个坏主意.
有没有办法控制工作分配?
我们的测试管道:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i,self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd,stderr=subprocess.STDOUT,shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
最佳答案
您的问题是一次产生一个要求的结果,而您希望立即产生所有这些要求,如下所示:
def requires(self):
reqs = []
for i in range(self.tasks):
reqs.append(RunExampleTask(i,self.sleep_time))
yield reqs