Celery Integration

Automatic task tracing for Celery background jobs.

Quick Start

tasks.pypython
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# tasks.py
import tracium
from celery import Celery
from openai import OpenAI
# Initialize Tracium in your Celery worker
tracium.trace()
app = Celery("tasks", broker="redis://localhost:6379/0")
openai_client = OpenAI()
@app.task
def process_message(message: str):
# This task is automatically traced
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message}]
)
return response.choices[0].message.content

What Gets Captured

  • Task name - Celery task function name
  • Task ID - Unique task identifier
  • Arguments - Task args and kwargs
  • Result - Task return value
  • Duration - Task execution time
  • Child spans - LLM calls, DB queries within task
  • Errors - Task failures and exceptions

Worker Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
# celery_config.py
import tracium
# Initialize at module level so it runs when worker starts
tracium.trace()
# Or in your celery.py
from celery import Celery
from celery.signals import worker_process_init
@worker_process_init.connect
def init_tracium(**kwargs):
tracium.trace()

Manual Traces in Tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import tracium
from celery import Celery
from openai import OpenAI
tracium.trace()
app = Celery("tasks", broker="redis://localhost:6379/0")
openai_client = OpenAI()
tracium_client = tracium.get_client()
@app.task
def complex_analysis(data: dict):
with tracium_client.agent_trace(
agent_name="celery-analyzer",
tags=["celery", "analysis"]
) as trace:
# Preprocessing span
with trace.span(span_type="tool", name="preprocess") as span:
span.record_input(data)
processed = preprocess_data(data)
span.record_output({"processed": True})
# Multiple LLM calls
with trace.span(span_type="llm", name="analyze") as span:
span.record_input({"data": processed})
result = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": str(processed)}]
)
span.record_output({"result": result.choices[0].message.content})
trace.set_summary({"status": "completed"})
return result.choices[0].message.content

Task Chains

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import tracium
from celery import Celery, chain
from openai import OpenAI
tracium.trace()
app = Celery("tasks", broker="redis://localhost:6379/0")
openai_client = OpenAI()
@app.task
def fetch_data(query: str):
# First task in chain
return search_database(query)
@app.task
def process_data(data: dict):
# Second task - receives result from fetch_data
return transform_data(data)
@app.task
def generate_response(processed: dict):
# Third task - LLM call
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": str(processed)}]
)
return response.choices[0].message.content
# Chain execution - each task creates its own trace
workflow = chain(
fetch_data.s("user query"),
process_data.s(),
generate_response.s()
)
result = workflow.apply_async()

Error Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import tracium
from celery import Celery
from openai import OpenAI, RateLimitError
tracium.trace()
app = Celery("tasks", broker="redis://localhost:6379/0")
openai_client = OpenAI()
@app.task(
bind=True,
autoretry_for=(RateLimitError,),
retry_backoff=True,
max_retries=3
)
def process_with_retry(self, message: str):
try:
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message}]
)
return response.choices[0].message.content
except RateLimitError as e:
# Retry will create a new trace
raise self.retry(exc=e)

Running Workers

# Start Celery worker
celery -A tasks worker --loglevel=info

# With multiple workers
celery -A tasks worker --loglevel=info --concurrency=4