Event-Driven Archaeological Data Processing with Azure Durable Functions

How do you orchestrate complex, multi-stage data processing pipelines that involve LiDAR analysis, satellite imagery, and AI-powered archaeological discovery?

When building Archaios, I faced a challenging architectural question: how to coordinate a workflow that starts with a massive LiDAR point cloud file upload, triggers containerized Python processing, waits for completion, launches Google Earth Engine analysis, stores results in a graph database, and finally runs multi-agent AI reasoning — all while maintaining reliability, scalability, and observability.

The answer: Azure Durable Functions with event-driven patterns, sub-orchestrators, and external event handling.

In this blog, I’ll walk through the actual orchestration code powering Archaios, showing how Durable Functions handles the entire archaeological data processing pipeline.

In this blog, I will cover the following topics:

🔹 Why Durable Functions for complex multi-stage workflows
🔹 Event-driven architecture with Azure Storage Queues
🔹 External event patterns for waiting on Container App completions
🔹 Sub-orchestrator patterns for modular processing
🔹 Chunked blob upload with duplicate detection
🔹 Real-world orchestration code from Archaios

Archaios processes LiDAR datasets to discover archaeological sites. Here’s what the pipeline needs to do:

Stage 1: Data Ingestion

  1. Accept large LAS/LAZ file uploads (often 500MB - 5GB)
  2. Check for duplicate uploads by the same user
  3. Chunk upload to Azure Blob Storage

Stage 2: LiDAR Processing 4. Extract metadata and configure processing parameters 5. Publish message to Azure Storage Queue 6. Wait for Container App Job to complete processing 7. Receive processing results via external event

Stage 3: Satellite Imagery (Optional) 8. If coordinates available, trigger Google Earth Engine sub-orchestrator 9. Process NDVI, TrueColor, and FalseColor imagery in parallel 10. Update site with satellite data

Stage 4: AI Analysis 11. Trigger multi-agent AI workflow sub-orchestrator 12. Analyze terrain features with image analysis agent 13. Run archaeological team discussion 14. Store results in Neo4j graph database

Traditional approaches would struggle with: ❌ Long-running processes - LiDAR processing can take 10-30 minutes
Complex branching - Conditional GEE processing based on coordinates
External dependencies - Waiting for Container Apps to complete
Error handling - Retries, timeouts, and graceful failures
State management - Tracking progress across multiple stages

Here’s the complete architecture diagram showing the orchestration flow:

https://www.googleapis.com/download/storage/v1/b/kaggle-user-content/o/inbox%2F16585359%2Fbea45ee11b4b0cceca5f2ab2c419af82%2FSlide2.JPG?generation=1750701540638043&alt=media

The Durable Functions orchestrator sits at the heart, coordinating:

  • Azure Container Apps - Python-based LiDAR and GEE processors
  • Azure Storage Queues - Asynchronous message passing
  • Neo4j Graph Database - Site and relationship storage
  • Azure Cosmos DB - Historical context and vector search
  • Semantic Kernel - Multi-agent AI orchestration

Before orchestration begins, we need to handle large file uploads. Here’s the actual chunked upload handler:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
[Function("UploadChunkedBlob")]
[Authorize]
public async Task<HttpResponseData> Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "put", Route = "upload")] HttpRequestData req)
{
    _logger.LogInformation("Processing upload request.");
    try
    {
        var query = System.Web.HttpUtility.ParseQueryString(req.Url.Query);
        var fileName = query["filename"];
        var blockId = query["blockid"];
        var isFinal = query["final"];

        if (string.IsNullOrEmpty(fileName) || isFinal != "true" && string.IsNullOrEmpty(blockId))
        {
            var badResponse = req.CreateResponse(System.Net.HttpStatusCode.BadRequest);
            await badResponse.WriteStringAsync("Missing 'filename' or 'blockid'.");
            return badResponse;
        }

        _logger.LogInformation($"Received request for file: {fileName}, blockId: {blockId}, isFinal: {isFinal}");

        // Check if this is the first chunk
        bool isFirstChunk = false;
        if (!string.IsNullOrEmpty(blockId))
        {
            try
            {
                // Decode base64 to get the original padded number
                var decodedBytes = Convert.FromBase64String(blockId);
                var decodedString = System.Text.Encoding.UTF8.GetString(decodedBytes);
                
                // Check if this represents "000000" (first chunk)
                isFirstChunk = decodedString == "000000";
                _logger.LogInformation($"Decoded blockId: '{decodedString}', isFirstChunk: {isFirstChunk}");
            }
            catch (Exception ex)
            {
                _logger.LogWarning($"Failed to decode blockId: {ex.Message}");
            }
        }

        if (isFirstChunk)
        {
            var currentUser = await _userContextProvider.GetCurrentUserAsync(req);
            if (currentUser == null)
            {
                var unauthorizedResponse = req.CreateResponse(System.Net.HttpStatusCode.Unauthorized);
                await unauthorizedResponse.WriteStringAsync("User authentication required");
                return unauthorizedResponse;
            }

            var fileNameWithoutExtension = System.IO.Path.GetFileNameWithoutExtension(fileName);
            
            _logger.LogInformation($"Checking if site '{fileNameWithoutExtension}' already exists for user {currentUser.Id}");

            var allSites = await _archaeologicalRepository.GetArchaiosSitesAsync();
            var existingSite = allSites.FirstOrDefault(site => 
                site.Name.Equals(fileNameWithoutExtension, StringComparison.OrdinalIgnoreCase) &&
                (site.ArchaiosUser?.Id == currentUser.Id || site.ArchaiosUser?.Oid == currentUser.Oid));

            if (existingSite != null)
            {
                _logger.LogInformation($"File '{fileName}' has already been processed as site '{existingSite.Name}' (ID: {existingSite.SiteId})");
                
                var alreadyProcessedResponse = req.CreateResponse(System.Net.HttpStatusCode.Conflict);
                await alreadyProcessedResponse.WriteStringAsync(JsonSerializer.Serialize(new { 
                    error = "FileAlreadyProcessed",
                    message = $"This file has already been processed",
                    siteId = existingSite.SiteId,
                    siteName = existingSite.Name,
                    processedDate = existingSite.LastUpdated
                }));
                return alreadyProcessedResponse;
            }
        }

        if (isFinal == "true")
        {
            _logger.LogInformation($"Finalizing upload for file: {fileName}");
            using var reader = new StreamReader(req.Body);
            var json = await reader.ReadToEndAsync();
            var blockIds = JsonSerializer.Deserialize<List<string>>(json);
            await _blobUploader.CommitBlocksAsync(fileName, blockIds);

            var okResponse = req.CreateResponse(System.Net.HttpStatusCode.OK);
            await okResponse.WriteStringAsync(JsonSerializer.Serialize(new { 
                message = "File upload completed",
                fileName
            }));
            return okResponse;
        }
        else
        {
            _logger.LogInformation($"Uploading chunk for file: {fileName}, blockId: {blockId}");
            using var ms = new MemoryStream();
            await req.Body.CopyToAsync(ms);
            ms.Position = 0;

            await _blobUploader.StageBlockAsync(fileName, blockId, ms);

            var okResponse = req.CreateResponse(System.Net.HttpStatusCode.OK);
            await okResponse.WriteStringAsync("Chunk uploaded.");
            return okResponse;
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error processing upload request.");
        var errorResponse = req.CreateResponse(System.Net.HttpStatusCode.InternalServerError);
        await errorResponse.WriteStringAsync("An error occurred while processing the request.");
        return errorResponse;
    }
}

Block-based uploads - Supports files up to 5GB by chunking
Duplicate detection - Prevents reprocessing same file by same user
User authentication - Entra ID integration with [Authorize] attribute
Base64 block IDs - Decodes to detect first chunk for duplicate check

Once upload completes, the HTTP trigger starts the main orchestration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[Function(nameof(ProcessFileOrchestrator))]
[Authorize]
public async Task<HttpResponseData> ProcessFileOrchestrator(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "process-file")] HttpRequestData req,
    [DurableClient] DurableTaskClient client)
{
    _logger.LogInformation("Received file processing request");

    try
    {
        var currentUser = await _userContextProvider.GetCurrentUserAsync(req);

        string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
        var fileProcessRequest = JsonSerializer.Deserialize<FileProcessRequest>(
            requestBody,
            new JsonSerializerOptions { PropertyNameCaseInsensitive = true });

        if (fileProcessRequest == null || string.IsNullOrEmpty(fileProcessRequest.FileName))
        {
            var errorResponse = req.CreateResponse(HttpStatusCode.BadRequest);
            await errorResponse.WriteStringAsync("Invalid request. FileName is required.");
            return errorResponse;
        }

        fileProcessRequest.User = currentUser;

        var instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
            nameof(ProcessFileWorkflow),
            fileProcessRequest);

        _logger.LogInformation($"Started orchestration with ID = {instanceId} for file {fileProcessRequest.FileName}");

        var response = req.CreateResponse(HttpStatusCode.Accepted);
        await response.WriteAsJsonAsync(new
        {
            instanceId,
            fileName = fileProcessRequest.FileName,
            message = "File processing started"
        });

        return response;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error processing file request");
        var errorResponse = req.CreateResponse(HttpStatusCode.InternalServerError);
        await errorResponse.WriteStringAsync($"Error processing request: {ex.Message}");
        return errorResponse;
    }
}

The HTTP trigger returns immediately with an instanceId — the client can poll for status while the workflow continues asynchronously.

Here’s the core orchestrator that coordinates all stages:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
[Function(nameof(ProcessFileWorkflow))]
public async Task ProcessFileWorkflow([OrchestrationTrigger] TaskOrchestrationContext context)
{
    try
    {
        var request = context.GetInput<FileProcessRequest>();

        if (request == null || string.IsNullOrEmpty(request.FileName))
        {
            _logger.LogError("Invalid file processing request received.");
            return;
        }

        request.FileNameWithoutExtension = Path.GetFileNameWithoutExtension(request.FileName);

        _logger.LogInformation($"Processing file: {request.FileName}");

        if (request.Coordinates != null)
        {
            _logger.LogInformation($"File coordinates: Lat {request.Coordinates.Latitude}, Long {request.Coordinates.Longitude}");
        }

        // STEP 1: Extract metadata and configure processing parameters
        var metadataResult = await context.CallActivityAsync<ProcessingConfiguration<ProcessingParameters>?>(
            nameof(ExtractLiDARMetaData), request);

        if (metadataResult == null || string.IsNullOrEmpty(metadataResult.FileName))
        {
            _logger.LogError("File processing activity returned null or invalid result.");
            return;
        }

        metadataResult.InstanceId = context.InstanceId;
        request.SiteId = metadataResult.SiteId;

        // STEP 2: Publish message to Azure Storage Queue for Container App processing
        await context.CallActivityAsync(nameof(InitiateProcessingPipeline), metadataResult);

        Task<SatelliteImageryResult?> geeProcessingSubOrchestratorTask = Task.FromResult<SatelliteImageryResult?>(null);

        _logger.LogInformation($"Processing pipeline initiated for {request.FileName} & site {request.SiteId} with instance ID {metadataResult.InstanceId}");

        // STEP 3: Wait for external event from Container App
        var lidarProcessingResult = await context.WaitForExternalEvent<ProcessingResult>(metadataResult.EventName);

        _logger.LogInformation($"Processing completed for {request.FileName} & site {request.SiteId} with instance ID {metadataResult.InstanceId} & status: {lidarProcessingResult?.Status}");

        if (lidarProcessingResult != null && lidarProcessingResult.Status == "success" && lidarProcessingResult.Lat != 0 && lidarProcessingResult.Lon != 0)
        {
            if (request.Coordinates is null || (request.Coordinates.Latitude == 0 && request.Coordinates.Longitude == 0))
            {
                _logger.LogInformation($"No coordinates provided or coordinates are (0,0) for site {request.SiteId}. Attempting to extract from LiDAR processing result.");

                request.Coordinates = new CoordinateOptions
                {
                    Latitude = lidarProcessingResult.Lat,
                    Longitude = lidarProcessingResult.Lon
                };
            }

            var instantiateLiDARDataTask = context.CallActivityAsync(nameof(InstantiateLiDARDataNode), request);

            metadataResult.ProcessingParameters.ApplyGeeProcessing = true;

            // STEP 4: Conditional GEE processing based on coordinates
            if (metadataResult.ProcessingParameters != null && metadataResult.ProcessingParameters.ApplyGeeProcessing && request.Coordinates != null && request.Coordinates.Latitude != 0 && request.Coordinates.Longitude != 0)
            {
                _logger.LogInformation($"Initiating GEE processing for site {request.SiteId} with coordinates: Lat {request.Coordinates.Latitude}, Long {request.Coordinates.Longitude}");

                geeProcessingSubOrchestratorTask = context.CallSubOrchestratorAsync<SatelliteImageryResult?>(nameof(GeeProcessingSubOrchestration), new GeeCoordinateMessage
                {
                    SiteId = request.SiteId,
                    Coordinates = new GeoCoordinates
                    {
                        Latitude = request.Coordinates.Latitude,
                        Longitude = request.Coordinates.Longitude
                    },
                    TimeRangeYears = metadataResult.ProcessingParameters.TimeRangeYears,
                    BufferDistance = metadataResult.ProcessingParameters.BufferDistance,
                    AnalysisType = metadataResult.ProcessingParameters.AnalysisType,
                    Collection = metadataResult.ProcessingParameters.Collection,
                    ProjectId = metadataResult.InstanceId
                });

                await Task.WhenAll(instantiateLiDARDataTask, geeProcessingSubOrchestratorTask);
            }
            else
            {
                _logger.LogInformation($"Skipping GEE processing for site {request.SiteId} due to missing or invalid coordinates.");
                await instantiateLiDARDataTask;
            }

            var geeProcessingResult = geeProcessingSubOrchestratorTask.Result;

            // STEP 5: Store LiDAR results in Neo4j
            await context.CallActivityAsync(nameof(ProcessLiDARResults),
                new ProcessLiDARResultsRequest
                {
                    SiteId = request.SiteId,
                    DtmImage = lidarProcessingResult.DtmImage,
                    DsmImage = lidarProcessingResult.DsmImage,
                    Latitude = lidarProcessingResult.Lat,
                    Longitude = lidarProcessingResult.Lon,
                    HillshadeImage = lidarProcessingResult.HillshadeImage,
                    HillshadeMultiDirectionalImage = lidarProcessingResult.HillshadeMultiDirectionalImage,
                    SlopeImage = lidarProcessingResult.SlopeImage,
                    HistoricalContext = lidarProcessingResult.HistoricalContext,
                    SystemPrompt = lidarProcessingResult.SystemPrompt,
                    Statistics = lidarProcessingResult.Statistics
                });

            // STEP 6: Launch multi-agent AI analysis sub-orchestrator
            var agenticWorkflowRequest = new AgenticWorkflowRequest
            {
                UserId = request.User?.Id ?? string.Empty,
                SiteId = request.SiteId,
                FileName = request.FileName,
                DtmImageUrl = lidarProcessingResult.DtmImage,
                DsmImageUrl = lidarProcessingResult.DsmImage,
                HillshadeImageUrl = lidarProcessingResult.HillshadeImage,
                HillshadeMultiDirectionalImageUrl = lidarProcessingResult.HillshadeMultiDirectionalImage,
                SlopeImageUrl = lidarProcessingResult.SlopeImage,
                HistoricalContext = lidarProcessingResult.HistoricalContext,
                SystemPrompt = lidarProcessingResult.SystemPrompt,
                Latitude = request.Coordinates?.Latitude ?? 0.0,
                Longitude = request.Coordinates?.Longitude ?? 0.0,
                User = request.User
            };

            if (geeProcessingResult != null)
            {
                agenticWorkflowRequest.NdviImageUrl = geeProcessingResult.NdviImageUrl;
                agenticWorkflowRequest.TrueColorImageUrl = geeProcessingResult.TrueColorImageUrl;
                agenticWorkflowRequest.FalseColorImageUrl = geeProcessingResult.FalseColorImageUrl;
            }

            await context.CallSubOrchestratorAsync(nameof(AgenticWorkflowSubOrchestration), agenticWorkflowRequest);
        }
        else
        {
            _logger.LogWarning($"Processing result for {request.FileName} & site {request.SiteId} is invalid or missing coordinates. Skipping further processing.");
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error in file processing workflow");
        throw;
    }
}

The most critical pattern is WaitForExternalEvent — this allows the orchestrator to pause until the Container App signals completion:

1
2
3
4
5
// Publish message to queue (Container App picks it up)
await context.CallActivityAsync(nameof(InitiateProcessingPipeline), metadataResult);

// Wait for Container App to raise event back to orchestration
var lidarProcessingResult = await context.WaitForExternalEvent<ProcessingResult>(metadataResult.EventName);

The Container App, after completing LiDAR processing, calls:

1
await event_service.raise_event(instance_id, event_name, processing_result)

This uses the Durable Functions HTTP API:

1
POST /runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/{eventName}

Decoupled - Container App doesn’t know about orchestration internals
Reliable - Events are persisted, survives restarts
Timeout support - Can add Task.WhenAny with timeout task

Instead of one massive orchestrator, I break complex workflows into sub-orchestrators:

1
2
3
4
5
6
7
8
9
// GEE processing is a separate sub-orchestrator
geeProcessingSubOrchestratorTask = context.CallSubOrchestratorAsync<SatelliteImageryResult?>(
    nameof(GeeProcessingSubOrchestration), 
    new GeeCoordinateMessage { ... });

// Multi-agent workflow is another sub-orchestrator
await context.CallSubOrchestratorAsync(
    nameof(AgenticWorkflowSubOrchestration), 
    agenticWorkflowRequest);

Benefits: ✅ Modularity - Each sub-orchestrator is independently testable
Reusability - GEE orchestrator can be called from multiple places
Parallel execution - Sub-orchestrators can run concurrently
Clear separation of concerns - LiDAR, GEE, and AI workflows separated

All I/O operations (database calls, queue publishing, etc.) happen in activities:

1
2
3
4
5
6
7
8
9
// Extract metadata (reads from blob, generates config)
var metadataResult = await context.CallActivityAsync<ProcessingConfiguration<ProcessingParameters>?>(
    nameof(ExtractLiDARMetaData), request);

// Publish to queue (sends message to Azure Storage Queue)
await context.CallActivityAsync(nameof(InitiateProcessingPipeline), metadataResult);

// Store results in Neo4j
await context.CallActivityAsync(nameof(ProcessLiDARResults), lidarResultsRequest);

Why activities? ✅ Deterministic replay - Orchestrators must be deterministic
Retry policies - Activities can retry on transient failures
Timeout control - Each activity can have independent timeout

Here’s how the orchestrator communicates with Container Apps via Azure Storage Queues:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class QueuePublisher
{
    private readonly QueueClient _storageProcessingQueueClient;
    private readonly string _blobStorageUrl;
    private readonly ILogger<QueuePublisher> _logger;

    public QueuePublisher(IConfiguration configuration, ILogger<QueuePublisher> logger)
    {
        _logger = logger;
        _blobStorageUrl = configuration["BlobStorageUrl"] ?? throw new ArgumentNullException("BlobStorageUrl must be set");
        var storageConnection = configuration["BlobStorageConnString"] ?? throw new ArgumentNullException("BlobStorageConnString must be set");
        var processingQueueName = configuration["StorageQueueName"] ?? "lidar-processing";

        _storageProcessingQueueClient = new QueueClient(storageConnection, processingQueueName);
        _storageProcessingQueueClient.CreateIfNotExists();
    }

    public async Task SendProcessingMessageAsync<T>(ProcessingConfiguration<T> config)
    {
        var message = new
        {
            InstanceId = config.InstanceId,
            EventName = config.EventName,
            BlobUri = $"{GetBlobContainerUrl()}/{config.FileName}",
            SiteId = config.SiteId,
            Parameters = config.ProcessingParameters
        };

        var messageJson = JsonConvert.SerializeObject(message);
        await _storageProcessingQueueClient.SendMessageAsync(
            Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes(messageJson)));

        _logger.LogInformation($"Sent {typeof(T).Name} processing message for file: {config.FileName}, instanceId: {config.InstanceId}");
    }

    private string GetBlobContainerUrl()
    {
        return $"{_blobStorageUrl}/uploads";
    }
}

The message contains:

  • InstanceId - Durable Functions instance to raise event on
  • EventName - Which event to raise (LiDARProcessingCompleted)
  • BlobUri - Location of LAS/LAZ file
  • Parameters - Processing configuration (resolution, filters, etc.)

The ExtractLiDARMetaData activity determines processing parameters based on file type:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
[Function(nameof(ExtractLiDARMetaData))]
public async Task<ProcessingConfiguration<ProcessingParameters>?> Run([ActivityTrigger] FileProcessRequest request, FunctionContext context)
{
    if (string.IsNullOrEmpty(request.FileName))
    {
        _logger.LogError("File name is missing in the workflow request");
        return default;
    }

    var parameters = CreateProcessingParameters(request);
    string eventName = DetermineEventName(request.FileName);

    if (string.IsNullOrEmpty(eventName))
    {
        _logger.LogWarning($"Unsupported file type for file: {request.FileName}");
        return default;
    }

    _logger.LogInformation($"Processing {request.FileName} with workflow of {request.Workflow?.Count ?? 0} nodes");

    var processingConfig = new ProcessingConfiguration<ProcessingParameters>
    {
        FileName = request.FileName,
        EventName = eventName,
        ProcessingParameters = parameters
    };
   
    var siteId = request.FileNameWithoutExtension + "_" + DateTime.UtcNow.ToString("yyyyMMddHHmmss");
    processingConfig.SiteId = siteId;
   
    return processingConfig;
}

private string DetermineEventName(string fileName)
{
    var extension = Path.GetExtension(fileName).ToLowerInvariant();

    return extension switch
    {
        ".las" or ".laz" => "LiDARProcessingCompleted",
        ".tif" or ".tiff" => "RasterProcessingCompleted",
        ".shp" => "ShapefileProcessingCompleted",
        ".e57" => "E57ProcessingCompleted",
        _ => string.Empty
    };
}

Different file types get different event names, allowing specialized Container Apps to handle each format.

After running Archaios in production, here are the key benefits:

Resilience - Durable Functions automatically checkpoints progress
Scalability - Multiple orchestrations run in parallel without coordination
Observability - Built-in status queries and history tracking
Long-running support - Workflows can run for hours without timeout
Decoupled components - Container Apps, orchestrators, and activities evolve independently
Cost-effective - Only pay for execution time, no idle VMs

For production, I add timeouts to external event waits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var timeoutTask = context.CreateTimer(context.CurrentUtcDateTime.AddMinutes(30), CancellationToken.None);
var processingTask = context.WaitForExternalEvent<ProcessingResult>(metadataResult.EventName);

var winner = await Task.WhenAny(processingTask, timeoutTask);

if (winner == timeoutTask)
{
    _logger.LogError($"Processing timed out for {request.FileName}");
    // Handle timeout - maybe send notification, mark as failed
}

Activities automatically retry with exponential backoff (configurable in host.json):

1
2
3
4
5
6
7
8
9
{
  "version": "2.0",
  "extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 10,
      "maxConcurrentOrchestratorFunctions": 10
    }
  }
}

Check orchestration status:

1
GET /runtime/webhooks/durabletask/instances/{instanceId}

Terminate stuck orchestrations:

1
POST /runtime/webhooks/durabletask/instances/{instanceId}/terminate

Ready to build event-driven orchestrations with Durable Functions?

🔗 GitHub Repository: https://github.com/Cloud-Jas/Archaios

📚 Key Files to Study:

🚀 Quick Start:

1
2
3
4
git clone https://github.com/Cloud-Jas/Archaios.git
cd Archaios/src/backend/Archaios.AI.DurableHandler
dotnet build
func start

Event-driven orchestration with Azure Durable Functions enables complex, multi-stage workflows without the complexity of custom state machines. If you’re building data processing pipelines or workflow automation, let’s connect on LinkedIn!

#Azure #DurableFunctions #Serverless #DotNet #EventDriven #Archaeology #DataProcessing #MicrosoftMVP