Running Python Processing Services in Azure Container Apps

Archaios uses Python services running in Azure Container Apps to process LiDAR point clouds and fetch satellite imagery. These services are event-driven — they spin up when needed, process data, and scale back to zero when idle.

Here’s the flow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
User Upload (LiDAR file) 
    
Durable Functions Orchestrator
    
📮 Azure Storage Queue: "lidar-processing"
    
🐳 Container App (Python service) - Auto-scales based on queue depth
    
Processing (PDAL  DSM/DTM/Hillshade)
    
🔔 Raise External Event back to orchestrator
    
Continue workflow (Multi-Agent AI analysis)

Two Python services run in Container Apps:

🔹 LiDARProcessor - Queue-driven job that processes point clouds (PDAL/GDAL)
🔹 GeeProcessor - HTTP microservice that fetches satellite imagery (Google Earth Engine)

I chose Container Apps because:

  • Python services need heavy dependencies (PDAL, GDAL) that don’t fit in Azure Functions
  • LiDAR processing takes 8-15 minutes per file (longer than Functions timeout)
  • Upload patterns are unpredictable — need auto-scaling that costs $0 when idle

The LiDARProcessor runs as a Container App Job that polls an Azure Storage Queue. When the Durable Functions orchestrator needs to process a LiDAR file, it publishes a message to the queue. The Container App picks it up, processes the file, and raises an event back to the orchestrator.

Python Service Entry Point

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# main.py - Container App starts here
import asyncio
import logging
import tempfile
from config import AppConfig
from infrastructure.blob_storage import AzureBlobStorage
from infrastructure.queue_storage import AzureQueueStorage
from services.event_service import DurableEventService
from services.lidar_service import LiDARService

async def main():    
    config = AppConfig.from_env()
    temp_dir = tempfile.mkdtemp()

    local_mode = config.local_mode
    
    logging.info(f"Starting LiDAR Processing Service in {'local' if local_mode else 'container'} mode")
    logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.WARNING)

    # Initialize Azure services
    blob_storage = AzureBlobStorage(config.storage_connection)
    queue_storage = AzureQueueStorage(config.storage_connection, config.queue_name)
    event_service = DurableEventService(config)
    
    # Start LiDAR processing service
    lidar_service = LiDARService(
        blob_storage,
        queue_storage,
        event_service,
        local_mode=local_mode
    )
    
    await lidar_service.run()

if __name__ == "__main__":
    asyncio.run(main())

The service:

  1. Reads configuration from environment variables (Container App sets these)
  2. Initializes connections to Azure Storage (blobs and queues)
  3. Starts the queue polling loop in LiDARService.run()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# services/lidar_service.py (simplified)
class LiDARService:
    async def run(self):
        """Main processing loop - polls queue and processes messages"""
        logger.info(f"Event-driven job started in {'local' if self.local_mode else 'container'} mode.")
        
        while True:
            try:
                # Get message from queue (visibility timeout: 300s)
                message = await self.queue_storage.get_message(visibility_timeout=300)
                
                if message:
                    await self.process_message(message)
                else:
                    logger.info("No messages in queue. Waiting...")
                    await asyncio.sleep(10)  # Poll every 10 seconds
                    
            except Exception as e:
                logger.error(f"Error in processing loop: {e}")
                await asyncio.sleep(30)

Key design:

  • Visibility timeout (300s): Message is hidden from other workers for 5 minutes during processing
  • Long polling: Sleep 10s if no messages (reduces API calls)
  • Continuous loop: Container keeps running until manually stopped
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def process_message(self, message):
    """Process a single queue message"""
    try:
        data = json.loads(message.content)
        lidar_blob_url = data["lidarBlobUrl"]
        instance_id = data["instanceId"]
        
        logger.info(f"Processing LiDAR file: {lidar_blob_url}")
        
        # Download LiDAR file from blob storage
        local_file_path = await self.blob_storage.download_file(lidar_blob_url)
        
        # Generate archaeological outputs using PDAL
        results = await self.generate_archaeological_outputs(local_file_path)
        
        # Upload results back to blob storage
        output_urls = await self.upload_results(results)
        
        # Notify orchestrator that processing is complete
        await self.event_service.raise_event(
            instance_id=instance_id,
            event_name="LiDARProcessingComplete",
            event_data=output_urls
        )
        
        # Delete message from queue (success)
        await self.queue_storage.delete_message(message)
        
        logger.info(f"✅ LiDAR processing complete for {instance_id}")
        
    except Exception as e:
        logger.error(f"❌ Failed to process message: {e}")
        # Message will reappear in queue after visibility timeout

The processing flow:

  1. Parse queue message (contains blob URL and orchestration instance ID)
  2. Download LiDAR file from blob storage
  3. Run PDAL pipelines to generate DSM, DTM, hillshade, slope
  4. Upload results back to blob storage
  5. Call Durable Functions webhook to raise “LiDARProcessingComplete” event
  6. Delete message from queue

The critical part: how does the Container App tell the orchestrator it’s done?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# services/event_service.py
import aiohttp
import json
import logging

class DurableEventService:
    def __init__(self, config):
        self.durable_endpoint = config.durable_functions_endpoint
        self.durable_code = config.durable_functions_code
    
    async def raise_event(self, instance_id: str, event_name: str, event_data: dict):
        """
        Raises an external event to Durable Functions orchestrator
        
        Args:
            instance_id: Orchestration instance ID (from queue message)
            event_name: Event name (e.g., "LiDARProcessingComplete")
            event_data: JSON-serializable data to send
        """
        url = f"{self.durable_endpoint}/runtime/webhooks/durabletask/instances/{instance_id}/raiseEvent/{event_name}"
        
        params = {"code": self.durable_code}
        headers = {"Content-Type": "application/json"}
        
        async with aiohttp.ClientSession() as session:
            async with session.post(url, params=params, headers=headers, json=event_data) as response:
                if response.status == 202:
                    logging.info(f"✅ Event '{event_name}' raised for instance {instance_id}")
                else:
                    logging.error(f"❌ Failed to raise event: {response.status}")

The Container App calls the Durable Functions webhook to unblock the orchestrator.

Back in the orchestrator (from Blog 1):

1
2
3
4
// Orchestrator waits for Container App to finish
var lidarResult = await context.WaitForExternalEvent<LiDARProcessingResult>("LiDARProcessingComplete");

_logger.LogInformation($"Received LiDAR processing results: {lidarResult.DsmUrl}");

This is decoupled communication: Container App doesn’t know about orchestration logic — it just raises an event with results.


LiDAR processing requires PDAL and GDAL — native C++ libraries. Docker makes this easy:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

# Install system dependencies for PDAL/GDAL
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libgdal-dev \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt

# Copy application code
COPY . .

# Run the service
CMD ["python", "main.py"]

What’s included:

  • python:3.11-slim - Minimal base image
  • gcc and g++ - Required to compile PDAL Python bindings
  • libgdal-dev - GDAL library for geospatial operations
  • No EXPOSE directive - Container App Jobs don’t listen on ports

The GeeProcessor is different — it runs as a long-running HTTP microservice instead of a queue job.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# main.py for GEE Processor
from fastapi import FastAPI, BackgroundTasks
from processors.gee_processor import GeeProcessor
import uvicorn

app = FastAPI(title="Archaios GEE Processor")

@app.post("/process/ndvi")
async def process_ndvi(request: ProcessRequest, background_tasks: BackgroundTasks):
    """Generate NDVI satellite imagery"""
    processor = GeeProcessor()
    result = await processor.process_ndvi(
        center_lat=request.latitude,
        center_lon=request.longitude,
        instance_id=request.instance_id
    )
    return result

@app.post("/process/truecolor")
async def process_truecolor(request: ProcessRequest):
    """Generate TrueColor satellite imagery"""
    # Similar pattern...

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Why HTTP instead of queue?

The Durable Functions orchestrator calls three GEE endpoints in parallel:

  • /process/ndvi - Vegetation stress analysis
  • /process/truecolor - Visual context imagery
  • /process/falsecolor - Infrared composite

Using HTTP means:

  • Immediate response (no queue delay)
  • Parallel processing (all three requests at once)
  • Stateless design (each request is independent)

The Dockerfile for GEE Processor:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    libgdal-dev \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt

# Copy application code
COPY . .

# Copy Google Earth Engine service account credentials
COPY gee-service-account.json .

# Expose HTTP port
EXPOSE 8000

# Run FastAPI server
CMD ["python", "main.py"]

Key differences:

  • EXPOSE 8000 - Container accepts HTTP traffic
  • gee-service-account.json - GEE authentication file
  • FastAPI + Uvicorn - Production ASGI server

Container Apps auto-scale based on queue depth for the LiDAR processor:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# LiDAR Processor scaling config
scale:
  minReplicas: 0   # Scale to zero when queue is empty
  maxReplicas: 10  # Max 10 concurrent processors
  rules:
    - name: queue-scaling
      azureQueue:
        queueName: lidar-processing
        queueLength: 5  # Trigger new replica every 5 messages
        auth:
          - secretRef: storage-connection-string
            triggerParameter: connection

How this works:

  • 0 messages in queue → 0 replicas running (costs $0)
  • 1-5 messages → 1 replica spins up
  • 6-10 messages → 2 replicas
  • 50+ messages → 10 replicas (max)

Each replica processes 1 message at a time and takes ~8 minutes per LiDAR file.

For the GEE Processor (HTTP microservice):

1
2
3
4
5
6
7
8
# GEE Processor scaling config
scale:
  minReplicas: 1    # Always 1 instance running
  maxReplicas: 5    # Max 5 concurrent instances
  rules:
    - name: http-scaling
      http:
        concurrentRequests: 10  # Scale when >10 requests/instance

I keep minReplicas: 1 for the GEE service because:

  • The orchestrator calls it in parallel (NDVI + TrueColor + FalseColor)
  • Cold start delay would slow down the entire workflow
  • Cost is minimal (~$15/month for 1 replica)

Container Apps send logs to Azure Monitor automatically:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# services/lidar_service.py
import logging

logger = logging.getLogger(__name__)

async def process_message(self, message):
    logger.info(f"📥 Received message: {message.id}")
    
    try:
        # Processing logic...
        logger.info(f"✅ Processing complete")
    except Exception as e:
        logger.error(f"❌ Processing failed: {e}", exc_info=True)

In the Azure Portal, you can:

  • Stream live logs during development
  • Query logs with KQL (Kusto Query Language)
  • Set up alerts (e.g., email when failures > 5 in 10 minutes)
  • See CPU/memory metrics for each replica

Blog 8: [Full-Stack Integration - Upload to Discovery] - How all Archaios components work together in an end-to-end archaeological discovery workflow.


  • LiDAR Processor: Archaios.AI.LiDARProcessor/ (Python service + Dockerfile)
  • GEE Processor: Archaios.AI.GeeProcessor/ (FastAPI microservice + Dockerfile)
  • Queue Integration: Archaios.AI.DurableHandler/FxDurableOrchestrator.cs (publishes queue messages)
  • Event Service: Archaios.AI.LiDARProcessor/services/event_service.py (raises Durable Functions events)

All code examples in this blog are from the actual Archaios production codebase.


Container Apps bring serverless benefits without the limitations — no time limits, full Python ecosystem, and queue-driven auto-scaling. Perfect for batch processing workloads like LiDAR and satellite data.

#AzureContainerApps #Python #Docker #EventDriven #Microservices #PDAL #GoogleEarthEngine #MicrosoftMVP