celery.py
- snippet.python
from __future__ import absolute_import, unicode_literals import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'system_trading_django.settings') app = Celery('system_trading_django') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. # app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.conf.update( BROKER_URL='django://', CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json', 'pickle'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Asia/Seoul', CELERY_ENABLE_UTC=False, CELERY_IMPORT=['torrent.tasks'], ) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
__init__.py
- snippet.python
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
tasks.py
- snippet.python
# http://stackoverflow.com/questions/20116573/in-celery-3-1-making-django-periodic-task # http://127.0.0.1:9000/admin/djcelery/workerstate/ # myapp/tasks.py from datetime import timedelta, datetime import celery from celery.task import periodic_task from celery.schedules import crontab from torrent import views # from system_trading_django.celery import app # @periodic_task(run_every=crontab(minute="0"), ignore_result=True) # def run_collect_background(): # views.collect_backgound() import datetime import celery from torrent import views # @celery.decorators.periodic_task(run_every=datetime.timedelta(minutes=1)) # def myfunc(): # print('periodic_task', 'myfunc') # views.collect_backgound() from celery.task import periodic_task from celery.schedules import crontab @periodic_task(run_every=crontab(hour="*", minute="0", day_of_week="*"), ignore_result=True) def run_collect_background(): views.collect_backgound() from celery import Celery from celery.schedules import crontab # app = Celery() # # app.conf.beat_schedule = { # # Executes every Monday morning at 7:30 a.m. # 'add-every-monday-morning': { # 'task': 'tasks.add', # 'schedule': crontab(hour=7, minute=30, day_of_week=1), # 'args': (16, 16), # }, # 'add-every-minute': { # 'task': 'tasks.add', # 'schedule': crontab(minute='*'), # 'args': (1, 2), # } # } # @app.task # def api-test(arg): # print(arg) # from celery import task # # @task # def add(x, y): # print(x, y, x+y) # from celery import task # # @task() # def add(x, y): # return x + y # from 프로젝트이름.celery설정파일 import app from system_trading_django.celery import app @app.task(name='task_small') def task_small(is_two=False): print('task_small') # if is_two: # do_job_as_two() # else: # do_job_as_one() # @periodic_task(ignore_result=True, run_every=10) # 10 seconds, or timedelta(seconds=10) # def run_collect_background22(): # t = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # print('printed by periodic_task :' + t) # @app.task(name='sum-of-two-numbers') # def update_number(): # t = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # print('printed by update_number : ' + t) # # # @app.task(name="task_small_one") # def task_small_one(): # t = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # print('printed by task_small_one :' + t)
관련 문서
Plugin Backlinks: 아무 것도 없습니다.