Celery Integration
Automatic task tracing for Celery background jobs.
Quick Start
tasks.pypython
12345678910111213141516171819# tasks.pyimport traciumfrom celery import Celeryfrom openai import OpenAI # Initialize Tracium in your Celery workertracium.trace() app = Celery("tasks", broker="redis://localhost:6379/0")openai_client = OpenAI() @app.taskdef process_message(message: str): # This task is automatically traced response = openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": message}] ) return response.choices[0].message.contentWhat Gets Captured
- Task name - Celery task function name
- Task ID - Unique task identifier
- Arguments - Task args and kwargs
- Result - Task return value
- Duration - Task execution time
- Child spans - LLM calls, DB queries within task
- Errors - Task failures and exceptions
Worker Configuration
12345678910111213# celery_config.pyimport tracium # Initialize at module level so it runs when worker startstracium.trace() # Or in your celery.pyfrom celery import Celeryfrom celery.signals import worker_process_init @worker_process_init.connectdef init_tracium(**kwargs): tracium.trace()Manual Traces in Tasks
12345678910111213141516171819202122232425262728293031323334import traciumfrom celery import Celeryfrom openai import OpenAI tracium.trace() app = Celery("tasks", broker="redis://localhost:6379/0")openai_client = OpenAI()tracium_client = tracium.get_client() @app.taskdef complex_analysis(data: dict): with tracium_client.agent_trace( agent_name="celery-analyzer", tags=["celery", "analysis"] ) as trace: # Preprocessing span with trace.span(span_type="tool", name="preprocess") as span: span.record_input(data) processed = preprocess_data(data) span.record_output({"processed": True}) # Multiple LLM calls with trace.span(span_type="llm", name="analyze") as span: span.record_input({"data": processed}) result = openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": str(processed)}] ) span.record_output({"result": result.choices[0].message.content}) trace.set_summary({"status": "completed"}) return result.choices[0].message.contentTask Chains
1234567891011121314151617181920212223242526272829303132333435import traciumfrom celery import Celery, chainfrom openai import OpenAI tracium.trace() app = Celery("tasks", broker="redis://localhost:6379/0")openai_client = OpenAI() @app.taskdef fetch_data(query: str): # First task in chain return search_database(query) @app.taskdef process_data(data: dict): # Second task - receives result from fetch_data return transform_data(data) @app.taskdef generate_response(processed: dict): # Third task - LLM call response = openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": str(processed)}] ) return response.choices[0].message.content # Chain execution - each task creates its own traceworkflow = chain( fetch_data.s("user query"), process_data.s(), generate_response.s())result = workflow.apply_async()Error Handling
12345678910111213141516171819202122232425import traciumfrom celery import Celeryfrom openai import OpenAI, RateLimitError tracium.trace() app = Celery("tasks", broker="redis://localhost:6379/0")openai_client = OpenAI() @app.task( bind=True, autoretry_for=(RateLimitError,), retry_backoff=True, max_retries=3)def process_with_retry(self, message: str): try: response = openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": message}] ) return response.choices[0].message.content except RateLimitError as e: # Retry will create a new trace raise self.retry(exc=e)Running Workers
# Start Celery worker
celery -A tasks worker --loglevel=info
# With multiple workers
celery -A tasks worker --loglevel=info --concurrency=4