Skip to content

EntropyEngine::Core::Concurrency::WorkGraph

EntropyEngine::Core::Concurrency::WorkGraph

Section titled “EntropyEngine::Core::Concurrency::WorkGraph”

Orchestrates complex parallel workflows with automatic dependency management. More…

#include <WorkGraph.h>

Inherits from EntropyEngine::Core::Debug::Named, EntropyEngine::Core::Debug::INamed

Name
structWaitResult
What you get back from wait() - the final score of your graph execution.
Name
using Graph::AcyclicNodeHandle< WorkGraphNode >NodeHandle
Name
~WorkGraph()
Cleans up the graph and ensures all callbacks complete.
WaitResultwait()
Blocks until your entire workflow finishes - success or failure.
voidsuspend()
Suspends graph execution - no new nodes will be scheduled.
voidsetNodeCompleteCallback(std::function< void(NodeHandle)> callback)
Install a hook that fires whenever a node finishes.
size_tscheduleRoots()
Kicks off your workflow by scheduling all nodes that have no dependencies.
voidresume()
Resumes graph execution after suspension.
voidreset()
Resets execution state so the graph can be re-executed.
size_tprocessDeferredNodes()
Manually drain the deferred queue when capacity becomes available.
boolisSuspended() const
Checks if the graph is currently suspended.
boolisHandleValid(const NodeHandle & handle) const
Test if a node handle still points to a real node.
boolisComplete() const
Quick non-blocking check if your workflow is done.
WorkGraphStats::SnapshotgetStats() const
Snapshot of your workflow’s current state - how’s it doing?
uint32_tgetPendingCount() const
Quick check of how much work remains.
WorkGraphNode *getNodeData(const NodeHandle & node)
Access the underlying node payload for a given handle.
const WorkGraphNode *getNodeData(const NodeHandle & node) const
size_tgetNodeCount() const
Get the total number of nodes in the graph.
Core::EventBus *getEventBus()
Access the event system for monitoring graph execution.
const WorkGraphConfig &getConfig() const
Access the configuration this graph was created with.
std::vector< NodeHandle >getChildren(const NodeHandle & node) const
Find all nodes that depend on this one.
voidexecute()
Lights the fuse on your workflow - starts the cascade of execution.
voidclear()
Removes all nodes and dependencies from the graph.
size_tcheckTimedDeferrals()
Checks timed deferrals and schedules nodes whose wake time has arrived.
NodeHandleaddYieldableNode(YieldableWorkFunction work, const std::string & name ="", void * userData =nullptr, ExecutionType executionType =ExecutionType::AnyThread, std::optional< uint32_t > maxReschedules =std::nullopt)
Adds a yieldable task that can suspend and resume execution.
NodeHandleaddNode(std::function< void()> work, const std::string & name ="", void * userData =nullptr, ExecutionType executionType =ExecutionType::AnyThread)
Adds a task to your workflow - it won’t run until its time comes.
voidaddDependency(NodeHandle from, const NodeHandle & to)
Wire up your workflow - tell nodes who they’re waiting for.
NodeHandleaddContinuation(const std::vector< NodeHandle > & parents, std::function< void()> work, const std::string & name ="", ExecutionType executionType =ExecutionType::AnyThread)
Create a “join” node that waits for multiple parents - perfect for fan-in patterns.
WorkGraph(WorkContractGroup * workContractGroup)
Creates a work graph backed by your thread pool.
WorkGraph(WorkContractGroup * workContractGroup, const WorkGraphConfig & config)
Creates a work graph with custom behavior options.

Public Functions inherited from EntropyEngine::Core::Debug::Named

Name
virtual voidsetName(std::string_view name) override
Set the debug name for this object.
virtual std::string_viewgetName() const override
Get the debug name of this object.
Named() =default
Named(std::string_view name)

Public Functions inherited from EntropyEngine::Core::Debug::INamed

Name
virtual~INamed() =default
virtual voidsetName(std::string_view name) =0
Set the debug name for this object.
virtual boolhasName() const
Check if this object has a debug name set.
virtual std::string_viewgetName() const =0
Get the debug name of this object.
class EntropyEngine::Core::Concurrency::WorkGraph;

Orchestrates complex parallel workflows with automatic dependency management.

WorkGraph provides high-level workflow management for parallel task execution. It accepts task definitions with dependency relationships and automatically determines optimal execution order. As tasks complete, the system triggers dependent tasks in cascade, ensuring correct execution flow through complex dependency chains.

Bridges the gap between low-level work execution (WorkContractGroup) and high-level workflow requirements. While WorkContractGroup provides raw execution capabilities, WorkGraph adds intelligent scheduling based on dependency relationships.

Key features:

  • Automatic dependency resolution without manual scheduling
  • Dynamic graph construction during execution
  • Failure propagation to cancel dependent tasks
  • Thread-safe operations for concurrent modifications
  • Zero-copy integration with existing WorkContractGroup
  • Main thread execution support for UI and render operations

Common applications:

  • Build systems (compile → link → package)
  • Data pipelines (load → transform → analyze → save)
  • Game asset processing (texture → compress → pack)
  • Mixed UI/background workflows (process → update UI → save)

Complexity characteristics:

  • Dependency tracking: O(1) atomic operations
  • Completion cascade: O(children) per completed node
// Mixed execution pipeline with UI updates
WorkContractGroup group(1024);
WorkService service(2); // 2 worker threads
service.addWorkContractGroup(&group);
WorkGraph graph(&group);
// Background data processing
auto load = graph.addNode([]{
auto data = loadFromDisk();
processData(data);
}, "loader");
// Main thread UI update
auto updateUI = graph.addNode([]{
progressBar.setValue(50);
statusLabel.setText("Processing...");
}, "ui-update", nullptr, ExecutionType::MainThread);
// More background work
auto save = graph.addNode([]{
auto data = getProcessedData();
saveToDisk(data);
}, "saver");
// Wire up dependencies - UI update after load, save after UI
graph.addDependency(load, updateUI);
graph.addDependency(updateUI, save);
// Start execution
graph.execute();
service.start();
// Main thread pump (in your event loop)
while (!graph.isComplete()) {
// Process main thread work
group.executeMainThreadWork(5); // Max 5 per frame
// Handle UI events
processEvents();
renderFrame();
}
using EntropyEngine::Core::Concurrency::WorkGraph::NodeHandle = Graph::AcyclicNodeHandle<WorkGraphNode>;
~WorkGraph()

Cleans up the graph and ensures all callbacks complete.

Waits for active callbacks before destroying. Safe to destroy with pending work - it continues executing in the WorkContractGroup.

WaitResult wait()

Blocks until your entire workflow finishes - success or failure.

Return: Execution summary with success/failure counts

Synchronization point using condition variables. Returns execution summary. Thread-safe.

graph.execute();
// Do other work while graph runs...
auto result = graph.wait();
if (result.allCompleted) {
LOG_INFO("Workflow completed successfully!");
} else if (result.failedCount > 0) {
LOG_ERROR("Workflow had {} failures", result.failedCount);
// Check which nodes failed for debugging
} else if (result.droppedCount > 0) {
LOG_WARN("Dropped {} nodes due to capacity", result.droppedCount);
// Maybe increase WorkContractGroup size?
}
void suspend()

Suspends graph execution - no new nodes will be scheduled.

Currently executing nodes will complete, but no new nodes will be scheduled (including yielded nodes trying to reschedule). The graph remains suspended until resume() is called.

Thread-safe. Can be called while graph is executing.

graph.execute();
// ... some time later
graph.suspend(); // Pause execution
// ... do something else
graph.resume(); // Continue where we left off
inline void setNodeCompleteCallback(
std::function< void(NodeHandle)> callback
)

Install a hook that fires whenever a node finishes.

Parameters:

  • callback Function to call on each node completion

Simple completion tracking. Called synchronously - keep it fast!

// Simple progress tracker
std::atomic<int> completed{0};
graph.setNodeCompleteCallback([&completed](NodeHandle node) {
int count = ++completed;
if (count % 100 == 0) {
LOG_INFO("Completed {} nodes", count);
}
});
size_t scheduleRoots()

Kicks off your workflow by scheduling all nodes that have no dependencies.

Return: Number of root nodes that were scheduled

Finds root nodes and schedules them. Called automatically by execute().

// Manual execution control
graph.addNode([]{ step1(); }, "step1");
graph.addNode([]{ step2(); }, "step2");
// Both are roots since no dependencies were added
size_t roots = graph.scheduleRoots(); // Returns 2
LOG_INFO("Started {} independent tasks", roots);
void resume()

Resumes graph execution after suspension.

Allows scheduling to continue. Any nodes that became ready while suspended will be scheduled. Yielded nodes waiting to reschedule will also continue.

Thread-safe. Safe to call even if not suspended.

if (needToPause) {
graph.suspend();
handleHighPriorityWork();
graph.resume();
}
void reset()

Resets execution state so the graph can be re-executed.

Note: Not thread-safe with concurrent execute()/wait() calls

Keeps all nodes and dependencies intact, but resets:

  • Execution started flag
  • Pending/completed/failed/dropped counters
  • Node states back to Pending
  • Node completion processed flags
  • Pending dependency counts (restored from edge structure)

After reset(), you can call execute() again to re-run the same workflow. This is much faster than destroying and recreating the graph.

// Reusable graph pattern
WorkGraph graph(&threadPool);
auto nodeA = graph.addNode([&data]{ process(data); }, "processor");
for (int frame = 0; frame < 100; ++frame) {
updateData(data); // Update what the node operates on
graph.reset(); // Reset execution state
graph.execute(); // Re-run the workflow
graph.wait();
}
size_t processDeferredNodes()

Manually drain the deferred queue when capacity becomes available.

Return: How many deferred nodes were successfully scheduled

Schedules deferred nodes when capacity frees up. Usually automatic via callbacks.

// After manually cancelling some work
workGroup.cancelSomeContracts();
size_t scheduled = graph.processDeferredNodes();
LOG_INFO("Scheduled {} previously deferred nodes", scheduled);
inline bool isSuspended() const

Checks if the graph is currently suspended.

Return: true if suspend() was called and resume() hasn’t been called yet

inline bool isHandleValid(
const NodeHandle & handle
) const

Test if a node handle still points to a real node.

Parameters:

  • handle The handle to validate

Return: true if the handle points to a valid node

bool isComplete() const

Quick non-blocking check if your workflow is done.

Return: true if all nodes are done, false if work remains

Returns true when all nodes reached terminal state. Perfect for polling in game loops.

// In your game loop
if (!graph.isComplete()) {
renderLoadingScreen();
} else {
auto stats = graph.getStats();
if (stats.failedNodes == 0) {
proceedToNextLevel();
} else {
showErrorDialog();
}
}
WorkGraphStats::Snapshot getStats() const

Snapshot of your workflow’s current state - how’s it doing?

Return: Complete statistics snapshot at this moment

Returns consistent snapshot of all stats. Great for progress bars or monitoring.

// Progress monitoring
auto stats = graph.getStats();
float progress = (float)stats.completedNodes / stats.totalNodes * 100;
LOG_INFO("Progress: {:.1f}% ({}/{} nodes)",
progress, stats.completedNodes, stats.totalNodes);
if (stats.failedNodes > 0) {
LOG_WARN("Failures detected: {} nodes failed", stats.failedNodes);
}
inline uint32_t getPendingCount() const

Quick check of how much work remains.

Return: Nodes that haven’t reached terminal state yet

inline WorkGraphNode * getNodeData(
const NodeHandle & node
)

Access the underlying node payload for a given handle.

Parameters:

  • node Handle to the node whose data you want

Return: Pointer to node data (mutable/const), or nullptr if not valid

Useful for debugging or custom instrumentation. Returns nullptr if the handle is invalid or the node has been removed. Prefer using public APIs for scheduling and state changes rather than mutating node data directly.

// Read node name for logging
if (auto* n = graph.getNodeData(handle)) {
LOG_INFO("Node: {}", n->name);
}
inline const WorkGraphNode * getNodeData(
const NodeHandle & node
) const
inline size_t getNodeCount() const

Get the total number of nodes in the graph.

Return: Total node count (useful for checking if graph needs rebuilding)

Core::EventBus * getEventBus()

Access the event system for monitoring graph execution.

Return: Event bus for subscriptions, or nullptr if events disabled

Lazy-initialized event bus if events enabled. Returns nullptr otherwise.

// Subscribe to completion events
if (auto* bus = graph.getEventBus()) {
bus->subscribe<NodeCompletedEvent>([](const auto& event) {
LOG_INFO("Node completed: {} in {}ms",
event.node.getData()->name,
duration_cast<milliseconds>(event.executionTime).count());
});
bus->subscribe<NodeFailedEvent>([](const auto& event) {
LOG_ERROR("Node failed: {}", event.node.getData()->name);
// Could rethrow the exception for debugging
});
}
inline const WorkGraphConfig & getConfig() const

Access the configuration this graph was created with.

Return: The config struct passed to constructor (or defaults)

inline std::vector< NodeHandle > getChildren(
const NodeHandle & node
) const

Find all nodes that depend on this one.

Parameters:

  • node The parent whose children you want

Return: List of nodes that depend on the given node

Returns direct children. Useful for debugging or visualization.

// Find what depends on a critical node
auto children = graph.getChildren(criticalNode);
LOG_INFO("Node has {} dependents", children.size());
for (auto& child : children) {
LOG_INFO(" - {}", child.getData()->name);
}
void execute()

Lights the fuse on your workflow - starts the cascade of execution.

Finds and schedules root nodes. Safe to call multiple times. Thread-safe with dynamic modifications.

// Fire and forget
graph.execute();
// Graph is now running in the background
// You can even add more work while it runs!
auto newNode = graph.addNode([]{ lateWork(); });
graph.addDependency(existingNode, newNode);
// newNode will execute when existingNode completes
void clear()

Removes all nodes and dependencies from the graph.

Note: Not thread-safe with concurrent operations

Clears the entire graph structure. After clear(), the graph is empty and ready for new nodes to be added. Use when the workflow structure changes (different number of nodes, different dependencies).

// Dynamic workflow that changes structure
if (configurationChanged) {
graph.clear(); // Remove old structure
// Add new nodes based on new configuration
for (auto& task : newTasks) {
graph.addNode(task.work, task.name);
}
} else {
graph.reset(); // Just reset execution state
}
graph.execute();
size_t checkTimedDeferrals()

Checks timed deferrals and schedules nodes whose wake time has arrived.

Return: Number of timed nodes successfully scheduled

Examines nodes that yielded with a specific wake time (e.g., timers) and schedules any whose scheduled time has passed. Call this periodically from your main loop or worker threads to ensure timers fire promptly.

// In main loop
while (running) {
graph.checkTimedDeferrals(); // Wake up any ready timers
workService->executeMainThreadWork(10);
std::this_thread::sleep_for(10ms);
}
NodeHandle addYieldableNode(
YieldableWorkFunction work,
const std::string & name ="",
void * userData =nullptr,
ExecutionType executionType =ExecutionType::AnyThread,
std::optional< uint32_t > maxReschedules =std::nullopt
)

Adds a yieldable task that can suspend and resume execution.

Parameters:

  • work Yieldable function returning WorkResult
  • name Human-readable name for debugging
  • userData Your own context pointer
  • executionType Where to run: AnyThread or MainThread
  • maxReschedules Optional limit on reschedules (prevent infinite loops)

Return: Handle to reference this node

Creates a node that can yield control back to the scheduler and be rescheduled later. Perfect for polling operations, staged processing, or any task that needs to wait without blocking a thread.

// Polling task that yields until ready
auto poller = graph.addYieldableNode([]() -> WorkResultContext {
if (!dataReady()) {
return WorkResultContext::yield(); // Try again later
}
processData();
return WorkResultContext::complete();
}, "data-poller");
// Staged processing with yield between stages
int stage = 0;
auto staged = graph.addYieldableNode([&stage]() -> WorkResultContext {
switch (stage++) {
case 0: doStage1(); return WorkResultContext::yield();
case 1: doStage2(); return WorkResultContext::yield();
case 2: doStage3(); return WorkResultContext::complete();
default: return WorkResultContext::complete();
}
}, "staged-processor", nullptr, ExecutionType::AnyThread, 10);
NodeHandle addNode(
std::function< void()> work,
const std::string & name ="",
void * userData =nullptr,
ExecutionType executionType =ExecutionType::AnyThread
)

Adds a task to your workflow - it won’t run until its time comes.

Parameters:

  • work Your task - lambda, function, or any callable
  • name Human-readable name for debugging
  • userData Your own context pointer
  • executionType Where to run: AnyThread (worker pool) or MainThread

Return: Handle to reference this node

Creates a node that waits for dependencies before running. Thread-safe - can add nodes while graph executes. Use ExecutionType::MainThread for UI updates or other main-thread-only operations.

// Simple task
auto task = graph.addNode([]{
doSomeWork();
}, "worker-1");
// Main thread task
auto uiUpdate = graph.addNode([]{
updateUI();
}, "ui-updater", nullptr, ExecutionType::MainThread);
// Task with captures
std::string filename = "data.txt";
auto loader = graph.addNode([filename]{
loadFile(filename);
}, "file-loader");
// Task with user data
auto* context = new ProcessContext();
auto processor = graph.addNode(
[context]{ context->process(); },
"processor",
context // Attach as user data
);
void addDependency(
NodeHandle from,
const NodeHandle & to
)

Wire up your workflow - tell nodes who they’re waiting for.

Parameters:

  • from The prerequisite task
  • to The dependent task

Exceptions:

  • std::invalid_argument if this would create a cycle
  • std::runtime_error if nodes invalid or completed

Defines execution order: “to” waits for “from” to finish. If “from” fails, “to” is cancelled. Prevents cycles. Thread-safe.

// Linear pipeline: A → B → C
auto A = graph.addNode([]{ stepA(); }, "A");
auto B = graph.addNode([]{ stepB(); }, "B");
auto C = graph.addNode([]{ stepC(); }, "C");
graph.addDependency(A, B); // B waits for A
graph.addDependency(B, C); // C waits for B
// Fan-out: A → {B, C, D}
auto A = graph.addNode([]{ generateData(); }, "generator");
auto B = graph.addNode([]{ process1(); }, "proc1");
auto C = graph.addNode([]{ process2(); }, "proc2");
auto D = graph.addNode([]{ process3(); }, "proc3");
graph.addDependency(A, B); // All three process
graph.addDependency(A, C); // the same data
graph.addDependency(A, D); // in parallel
// Fan-in: {A, B, C} → D
auto D = graph.addNode([]{ mergeResults(); }, "merger");
graph.addDependency(A, D); // D waits for
graph.addDependency(B, D); // all three
graph.addDependency(C, D); // to complete
NodeHandle addContinuation(
const std::vector< NodeHandle > & parents,
std::function< void()> work,
const std::string & name ="",
ExecutionType executionType =ExecutionType::AnyThread
)

Create a “join” node that waits for multiple parents - perfect for fan-in patterns.

Parameters:

  • parents All nodes that must complete first
  • work What to do after all parents finish
  • name Debug label for the continuation node
  • executionType Where this node should execute (default: AnyThread)

Return: Handle to the newly created continuation node

Convenience for creating a node that waits for multiple parents. Runs only after ALL parents complete successfully.

// Parallel processing with merge
auto part1 = graph.addNode([]{ processPart1(); });
auto part2 = graph.addNode([]{ processPart2(); });
auto part3 = graph.addNode([]{ processPart3(); });
// Single merge point
auto merge = graph.addContinuation(
{part1, part2, part3},
[]{ mergeResults(); },
"merger"
);
// Main thread UI update after merge
auto uiUpdate = graph.addContinuation(
{merge},
[]{ updateUI(); },
"ui-updater",
ExecutionType::MainThread
);
explicit WorkGraph(
WorkContractGroup * workContractGroup
)

Creates a work graph backed by your thread pool.

Parameters:

  • workContractGroup Your thread pool for executing work (must outlive the graph)

The graph doesn’t own the WorkContractGroup - just uses it to schedule work. Multiple graphs can share the same thread pool.

// Typical setup
WorkContractGroup threadPool(1024); // Shared thread pool
WorkGraph pipeline1(&threadPool); // Asset processing pipeline
WorkGraph pipeline2(&threadPool); // Data analysis pipeline
// Both pipelines share the same threads!
WorkGraph(
WorkContractGroup * workContractGroup,
const WorkGraphConfig & config
)

Creates a work graph with custom behavior options.

Parameters:

  • workContractGroup Your thread pool for executing work
  • config Tuning knobs and feature flags

Use for advanced features like events, state management, or custom allocation.

// Example: Graph with event notifications for monitoring
WorkGraphConfig config;
config.enableEvents = true;
config.expectedNodeCount = 1000; // Pre-allocate for performance
config.maxDeferredNodes = 100; // Limit queue size
WorkGraph monitoredGraph(&threadPool, config);
// Now you can subscribe to events
monitoredGraph.getEventBus()->subscribe<NodeCompletedEvent>(
[](const NodeCompletedEvent& e) {
LOG_INFO("Node completed in {}ms",
chrono::duration_cast<chrono::milliseconds>(e.executionTime).count());
});

Updated on 2026-01-26 at 16:50:32 -0500