python - Parallel execution using Canvas Workflow -
i've been working on project requires execution of tasks in parallel way as possible. code below exemplifies problem:
# -*- coding: utf-8 -*- __future__ import absolute_import celery.app import celery celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = celery('celeryproject', backend="redis://localhost:6379/0", broker="amqp://guest:guest@localhost:5672//") @app.task def step1(x): # list of str webservice using x return ["step1-%s-%s" % (x, i) in range(5)] @app.task def step2(y): # list of str webservice using x return ["step2-%s-%s" % (x, i) in range(5, 10)] @app.task def step3(z): # process z logger.info(z) @app.task def pipeline(xs): x in xs: r1_list = step1.delay(x).get() r1 in r1_list: r2_list = step2.delay(x, r1).get() r2 in r2_list: step3.delay(r2)
i need execute task.pipeline start process. tried canvas workflow examples (http://docs.celeryproject.org/en/latest/userguide/canvas.html) got problems along way. you, guys, me?
edit: step1 returns list , want deal each element of list inside step2. this, returns list , want deal each element of list inside step3.
i apply step2 each element returned step1 , apply step3 each element returned map before.
i tried, example: chord(step1.s('xxx'))(step2.map())
but got error:
typeerror('map() takes 2 arguments (1 given)',)
also tried use groups didn't work.
Comments
Post a Comment