EntropyEngine::Core::Concurrency::WorkGraph
EntropyEngine::Core::Concurrency::WorkGraph
Section titled “EntropyEngine::Core::Concurrency::WorkGraph”Orchestrates complex parallel workflows with automatic dependency management. More…
#include <WorkGraph.h>
Inherits from EntropyEngine::Core::Debug::Named, EntropyEngine::Core::Debug::INamed
Public Classes
Section titled “Public Classes”| Name | |
|---|---|
| struct | WaitResult What you get back from wait() - the final score of your graph execution. |
Public Types
Section titled “Public Types”| Name | |
|---|---|
| using Graph::AcyclicNodeHandle< WorkGraphNode > | NodeHandle |
Public Functions
Section titled “Public Functions”| Name | |
|---|---|
| ~WorkGraph() Cleans up the graph and ensures all callbacks complete. | |
| WaitResult | wait() Blocks until your entire workflow finishes - success or failure. |
| void | suspend() Suspends graph execution - no new nodes will be scheduled. |
| void | setNodeCompleteCallback(std::function< void(NodeHandle)> callback) Install a hook that fires whenever a node finishes. |
| size_t | scheduleRoots() Kicks off your workflow by scheduling all nodes that have no dependencies. |
| void | resume() Resumes graph execution after suspension. |
| void | reset() Resets execution state so the graph can be re-executed. |
| size_t | processDeferredNodes() Manually drain the deferred queue when capacity becomes available. |
| bool | isSuspended() const Checks if the graph is currently suspended. |
| bool | isHandleValid(const NodeHandle & handle) const Test if a node handle still points to a real node. |
| bool | isComplete() const Quick non-blocking check if your workflow is done. |
| WorkGraphStats::Snapshot | getStats() const Snapshot of your workflow’s current state - how’s it doing? |
| uint32_t | getPendingCount() const Quick check of how much work remains. |
| WorkGraphNode * | getNodeData(const NodeHandle & node) Access the underlying node payload for a given handle. |
| const WorkGraphNode * | getNodeData(const NodeHandle & node) const |
| size_t | getNodeCount() const Get the total number of nodes in the graph. |
| Core::EventBus * | getEventBus() Access the event system for monitoring graph execution. |
| const WorkGraphConfig & | getConfig() const Access the configuration this graph was created with. |
| std::vector< NodeHandle > | getChildren(const NodeHandle & node) const Find all nodes that depend on this one. |
| void | execute() Lights the fuse on your workflow - starts the cascade of execution. |
| void | clear() Removes all nodes and dependencies from the graph. |
| size_t | checkTimedDeferrals() Checks timed deferrals and schedules nodes whose wake time has arrived. |
| NodeHandle | addYieldableNode(YieldableWorkFunction work, const std::string & name ="", void * userData =nullptr, ExecutionType executionType =ExecutionType::AnyThread, std::optional< uint32_t > maxReschedules =std::nullopt) Adds a yieldable task that can suspend and resume execution. |
| NodeHandle | addNode(std::function< void()> work, const std::string & name ="", void * userData =nullptr, ExecutionType executionType =ExecutionType::AnyThread) Adds a task to your workflow - it won’t run until its time comes. |
| void | addDependency(NodeHandle from, const NodeHandle & to) Wire up your workflow - tell nodes who they’re waiting for. |
| NodeHandle | addContinuation(const std::vector< NodeHandle > & parents, std::function< void()> work, const std::string & name ="", ExecutionType executionType =ExecutionType::AnyThread) Create a “join” node that waits for multiple parents - perfect for fan-in patterns. |
| WorkGraph(WorkContractGroup * workContractGroup) Creates a work graph backed by your thread pool. | |
| WorkGraph(WorkContractGroup * workContractGroup, const WorkGraphConfig & config) Creates a work graph with custom behavior options. |
Additional inherited members
Section titled “Additional inherited members”Public Functions inherited from EntropyEngine::Core::Debug::Named
| Name | |
|---|---|
| virtual void | setName(std::string_view name) override Set the debug name for this object. |
| virtual std::string_view | getName() const override Get the debug name of this object. |
| Named() =default | |
| Named(std::string_view name) |
Public Functions inherited from EntropyEngine::Core::Debug::INamed
| Name | |
|---|---|
| virtual | ~INamed() =default |
| virtual void | setName(std::string_view name) =0 Set the debug name for this object. |
| virtual bool | hasName() const Check if this object has a debug name set. |
| virtual std::string_view | getName() const =0 Get the debug name of this object. |
Detailed Description
Section titled “Detailed Description”class EntropyEngine::Core::Concurrency::WorkGraph;Orchestrates complex parallel workflows with automatic dependency management.
WorkGraph provides high-level workflow management for parallel task execution. It accepts task definitions with dependency relationships and automatically determines optimal execution order. As tasks complete, the system triggers dependent tasks in cascade, ensuring correct execution flow through complex dependency chains.
Bridges the gap between low-level work execution (WorkContractGroup) and high-level workflow requirements. While WorkContractGroup provides raw execution capabilities, WorkGraph adds intelligent scheduling based on dependency relationships.
Key features:
- Automatic dependency resolution without manual scheduling
- Dynamic graph construction during execution
- Failure propagation to cancel dependent tasks
- Thread-safe operations for concurrent modifications
- Zero-copy integration with existing WorkContractGroup
- Main thread execution support for UI and render operations
Common applications:
- Build systems (compile → link → package)
- Data pipelines (load → transform → analyze → save)
- Game asset processing (texture → compress → pack)
- Mixed UI/background workflows (process → update UI → save)
Complexity characteristics:
- Dependency tracking: O(1) atomic operations
- Completion cascade: O(children) per completed node
// Mixed execution pipeline with UI updatesWorkContractGroup group(1024);WorkService service(2); // 2 worker threadsservice.addWorkContractGroup(&group);
WorkGraph graph(&group);
// Background data processingauto load = graph.addNode([]{ auto data = loadFromDisk(); processData(data);}, "loader");
// Main thread UI updateauto updateUI = graph.addNode([]{ progressBar.setValue(50); statusLabel.setText("Processing...");}, "ui-update", nullptr, ExecutionType::MainThread);
// More background workauto save = graph.addNode([]{ auto data = getProcessedData(); saveToDisk(data);}, "saver");
// Wire up dependencies - UI update after load, save after UIgraph.addDependency(load, updateUI);graph.addDependency(updateUI, save);
// Start executiongraph.execute();service.start();
// Main thread pump (in your event loop)while (!graph.isComplete()) { // Process main thread work group.executeMainThreadWork(5); // Max 5 per frame
// Handle UI events processEvents(); renderFrame();}Public Types Documentation
Section titled “Public Types Documentation”using NodeHandle
Section titled “using NodeHandle”using EntropyEngine::Core::Concurrency::WorkGraph::NodeHandle = Graph::AcyclicNodeHandle<WorkGraphNode>;Public Functions Documentation
Section titled “Public Functions Documentation”function ~WorkGraph
Section titled “function ~WorkGraph”~WorkGraph()Cleans up the graph and ensures all callbacks complete.
Waits for active callbacks before destroying. Safe to destroy with pending work - it continues executing in the WorkContractGroup.
function wait
Section titled “function wait”WaitResult wait()Blocks until your entire workflow finishes - success or failure.
Return: Execution summary with success/failure counts
Synchronization point using condition variables. Returns execution summary. Thread-safe.
graph.execute();// Do other work while graph runs...
auto result = graph.wait();if (result.allCompleted) { LOG_INFO("Workflow completed successfully!");} else if (result.failedCount > 0) { LOG_ERROR("Workflow had {} failures", result.failedCount); // Check which nodes failed for debugging} else if (result.droppedCount > 0) { LOG_WARN("Dropped {} nodes due to capacity", result.droppedCount); // Maybe increase WorkContractGroup size?}function suspend
Section titled “function suspend”void suspend()Suspends graph execution - no new nodes will be scheduled.
Currently executing nodes will complete, but no new nodes will be scheduled (including yielded nodes trying to reschedule). The graph remains suspended until resume() is called.
Thread-safe. Can be called while graph is executing.
graph.execute();// ... some time latergraph.suspend(); // Pause execution// ... do something elsegraph.resume(); // Continue where we left offfunction setNodeCompleteCallback
Section titled “function setNodeCompleteCallback”inline void setNodeCompleteCallback( std::function< void(NodeHandle)> callback)Install a hook that fires whenever a node finishes.
Parameters:
- callback Function to call on each node completion
Simple completion tracking. Called synchronously - keep it fast!
// Simple progress trackerstd::atomic<int> completed{0};graph.setNodeCompleteCallback([&completed](NodeHandle node) { int count = ++completed; if (count % 100 == 0) { LOG_INFO("Completed {} nodes", count); }});function scheduleRoots
Section titled “function scheduleRoots”size_t scheduleRoots()Kicks off your workflow by scheduling all nodes that have no dependencies.
Return: Number of root nodes that were scheduled
Finds root nodes and schedules them. Called automatically by execute().
// Manual execution controlgraph.addNode([]{ step1(); }, "step1");graph.addNode([]{ step2(); }, "step2");// Both are roots since no dependencies were added
size_t roots = graph.scheduleRoots(); // Returns 2LOG_INFO("Started {} independent tasks", roots);function resume
Section titled “function resume”void resume()Resumes graph execution after suspension.
Allows scheduling to continue. Any nodes that became ready while suspended will be scheduled. Yielded nodes waiting to reschedule will also continue.
Thread-safe. Safe to call even if not suspended.
if (needToPause) { graph.suspend(); handleHighPriorityWork(); graph.resume();}function reset
Section titled “function reset”void reset()Resets execution state so the graph can be re-executed.
Note: Not thread-safe with concurrent execute()/wait() calls
Keeps all nodes and dependencies intact, but resets:
- Execution started flag
- Pending/completed/failed/dropped counters
- Node states back to Pending
- Node completion processed flags
- Pending dependency counts (restored from edge structure)
After reset(), you can call execute() again to re-run the same workflow. This is much faster than destroying and recreating the graph.
// Reusable graph patternWorkGraph graph(&threadPool);auto nodeA = graph.addNode([&data]{ process(data); }, "processor");
for (int frame = 0; frame < 100; ++frame) { updateData(data); // Update what the node operates on graph.reset(); // Reset execution state graph.execute(); // Re-run the workflow graph.wait();}function processDeferredNodes
Section titled “function processDeferredNodes”size_t processDeferredNodes()Manually drain the deferred queue when capacity becomes available.
Return: How many deferred nodes were successfully scheduled
Schedules deferred nodes when capacity frees up. Usually automatic via callbacks.
// After manually cancelling some workworkGroup.cancelSomeContracts();size_t scheduled = graph.processDeferredNodes();LOG_INFO("Scheduled {} previously deferred nodes", scheduled);function isSuspended
Section titled “function isSuspended”inline bool isSuspended() constChecks if the graph is currently suspended.
Return: true if suspend() was called and resume() hasn’t been called yet
function isHandleValid
Section titled “function isHandleValid”inline bool isHandleValid( const NodeHandle & handle) constTest if a node handle still points to a real node.
Parameters:
- handle The handle to validate
Return: true if the handle points to a valid node
function isComplete
Section titled “function isComplete”bool isComplete() constQuick non-blocking check if your workflow is done.
Return: true if all nodes are done, false if work remains
Returns true when all nodes reached terminal state. Perfect for polling in game loops.
// In your game loopif (!graph.isComplete()) { renderLoadingScreen();} else { auto stats = graph.getStats(); if (stats.failedNodes == 0) { proceedToNextLevel(); } else { showErrorDialog(); }}function getStats
Section titled “function getStats”WorkGraphStats::Snapshot getStats() constSnapshot of your workflow’s current state - how’s it doing?
Return: Complete statistics snapshot at this moment
Returns consistent snapshot of all stats. Great for progress bars or monitoring.
// Progress monitoringauto stats = graph.getStats();float progress = (float)stats.completedNodes / stats.totalNodes * 100;LOG_INFO("Progress: {:.1f}% ({}/{} nodes)", progress, stats.completedNodes, stats.totalNodes);
if (stats.failedNodes > 0) { LOG_WARN("Failures detected: {} nodes failed", stats.failedNodes);}function getPendingCount
Section titled “function getPendingCount”inline uint32_t getPendingCount() constQuick check of how much work remains.
Return: Nodes that haven’t reached terminal state yet
function getNodeData
Section titled “function getNodeData”inline WorkGraphNode * getNodeData( const NodeHandle & node)Access the underlying node payload for a given handle.
Parameters:
- node Handle to the node whose data you want
Return: Pointer to node data (mutable/const), or nullptr if not valid
Useful for debugging or custom instrumentation. Returns nullptr if the handle is invalid or the node has been removed. Prefer using public APIs for scheduling and state changes rather than mutating node data directly.
// Read node name for loggingif (auto* n = graph.getNodeData(handle)) { LOG_INFO("Node: {}", n->name);}function getNodeData
Section titled “function getNodeData”inline const WorkGraphNode * getNodeData( const NodeHandle & node) constfunction getNodeCount
Section titled “function getNodeCount”inline size_t getNodeCount() constGet the total number of nodes in the graph.
Return: Total node count (useful for checking if graph needs rebuilding)
function getEventBus
Section titled “function getEventBus”Core::EventBus * getEventBus()Access the event system for monitoring graph execution.
Return: Event bus for subscriptions, or nullptr if events disabled
Lazy-initialized event bus if events enabled. Returns nullptr otherwise.
// Subscribe to completion eventsif (auto* bus = graph.getEventBus()) { bus->subscribe<NodeCompletedEvent>([](const auto& event) { LOG_INFO("Node completed: {} in {}ms", event.node.getData()->name, duration_cast<milliseconds>(event.executionTime).count()); });
bus->subscribe<NodeFailedEvent>([](const auto& event) { LOG_ERROR("Node failed: {}", event.node.getData()->name); // Could rethrow the exception for debugging });}function getConfig
Section titled “function getConfig”inline const WorkGraphConfig & getConfig() constAccess the configuration this graph was created with.
Return: The config struct passed to constructor (or defaults)
function getChildren
Section titled “function getChildren”inline std::vector< NodeHandle > getChildren( const NodeHandle & node) constFind all nodes that depend on this one.
Parameters:
- node The parent whose children you want
Return: List of nodes that depend on the given node
Returns direct children. Useful for debugging or visualization.
// Find what depends on a critical nodeauto children = graph.getChildren(criticalNode);LOG_INFO("Node has {} dependents", children.size());for (auto& child : children) { LOG_INFO(" - {}", child.getData()->name);}function execute
Section titled “function execute”void execute()Lights the fuse on your workflow - starts the cascade of execution.
Finds and schedules root nodes. Safe to call multiple times. Thread-safe with dynamic modifications.
// Fire and forgetgraph.execute();// Graph is now running in the background
// You can even add more work while it runs!auto newNode = graph.addNode([]{ lateWork(); });graph.addDependency(existingNode, newNode);// newNode will execute when existingNode completesfunction clear
Section titled “function clear”void clear()Removes all nodes and dependencies from the graph.
Note: Not thread-safe with concurrent operations
Clears the entire graph structure. After clear(), the graph is empty and ready for new nodes to be added. Use when the workflow structure changes (different number of nodes, different dependencies).
// Dynamic workflow that changes structureif (configurationChanged) { graph.clear(); // Remove old structure // Add new nodes based on new configuration for (auto& task : newTasks) { graph.addNode(task.work, task.name); }} else { graph.reset(); // Just reset execution state}graph.execute();function checkTimedDeferrals
Section titled “function checkTimedDeferrals”size_t checkTimedDeferrals()Checks timed deferrals and schedules nodes whose wake time has arrived.
Return: Number of timed nodes successfully scheduled
Examines nodes that yielded with a specific wake time (e.g., timers) and schedules any whose scheduled time has passed. Call this periodically from your main loop or worker threads to ensure timers fire promptly.
// In main loopwhile (running) { graph.checkTimedDeferrals(); // Wake up any ready timers workService->executeMainThreadWork(10); std::this_thread::sleep_for(10ms);}function addYieldableNode
Section titled “function addYieldableNode”NodeHandle addYieldableNode( YieldableWorkFunction work, const std::string & name ="", void * userData =nullptr, ExecutionType executionType =ExecutionType::AnyThread, std::optional< uint32_t > maxReschedules =std::nullopt)Adds a yieldable task that can suspend and resume execution.
Parameters:
- work Yieldable function returning WorkResult
- name Human-readable name for debugging
- userData Your own context pointer
- executionType Where to run: AnyThread or MainThread
- maxReschedules Optional limit on reschedules (prevent infinite loops)
Return: Handle to reference this node
Creates a node that can yield control back to the scheduler and be rescheduled later. Perfect for polling operations, staged processing, or any task that needs to wait without blocking a thread.
// Polling task that yields until readyauto poller = graph.addYieldableNode([]() -> WorkResultContext { if (!dataReady()) { return WorkResultContext::yield(); // Try again later } processData(); return WorkResultContext::complete();}, "data-poller");
// Staged processing with yield between stagesint stage = 0;auto staged = graph.addYieldableNode([&stage]() -> WorkResultContext { switch (stage++) { case 0: doStage1(); return WorkResultContext::yield(); case 1: doStage2(); return WorkResultContext::yield(); case 2: doStage3(); return WorkResultContext::complete(); default: return WorkResultContext::complete(); }}, "staged-processor", nullptr, ExecutionType::AnyThread, 10);function addNode
Section titled “function addNode”NodeHandle addNode( std::function< void()> work, const std::string & name ="", void * userData =nullptr, ExecutionType executionType =ExecutionType::AnyThread)Adds a task to your workflow - it won’t run until its time comes.
Parameters:
- work Your task - lambda, function, or any callable
- name Human-readable name for debugging
- userData Your own context pointer
- executionType Where to run: AnyThread (worker pool) or MainThread
Return: Handle to reference this node
Creates a node that waits for dependencies before running. Thread-safe - can add nodes while graph executes. Use ExecutionType::MainThread for UI updates or other main-thread-only operations.
// Simple taskauto task = graph.addNode([]{ doSomeWork();}, "worker-1");
// Main thread taskauto uiUpdate = graph.addNode([]{ updateUI();}, "ui-updater", nullptr, ExecutionType::MainThread);
// Task with capturesstd::string filename = "data.txt";auto loader = graph.addNode([filename]{ loadFile(filename);}, "file-loader");
// Task with user dataauto* context = new ProcessContext();auto processor = graph.addNode( [context]{ context->process(); }, "processor", context // Attach as user data);function addDependency
Section titled “function addDependency”void addDependency( NodeHandle from, const NodeHandle & to)Wire up your workflow - tell nodes who they’re waiting for.
Parameters:
- from The prerequisite task
- to The dependent task
Exceptions:
- std::invalid_argument if this would create a cycle
- std::runtime_error if nodes invalid or completed
Defines execution order: “to” waits for “from” to finish. If “from” fails, “to” is cancelled. Prevents cycles. Thread-safe.
// Linear pipeline: A → B → Cauto A = graph.addNode([]{ stepA(); }, "A");auto B = graph.addNode([]{ stepB(); }, "B");auto C = graph.addNode([]{ stepC(); }, "C");graph.addDependency(A, B); // B waits for Agraph.addDependency(B, C); // C waits for B
// Fan-out: A → {B, C, D}auto A = graph.addNode([]{ generateData(); }, "generator");auto B = graph.addNode([]{ process1(); }, "proc1");auto C = graph.addNode([]{ process2(); }, "proc2");auto D = graph.addNode([]{ process3(); }, "proc3");graph.addDependency(A, B); // All three processgraph.addDependency(A, C); // the same datagraph.addDependency(A, D); // in parallel
// Fan-in: {A, B, C} → Dauto D = graph.addNode([]{ mergeResults(); }, "merger");graph.addDependency(A, D); // D waits forgraph.addDependency(B, D); // all threegraph.addDependency(C, D); // to completefunction addContinuation
Section titled “function addContinuation”NodeHandle addContinuation( const std::vector< NodeHandle > & parents, std::function< void()> work, const std::string & name ="", ExecutionType executionType =ExecutionType::AnyThread)Create a “join” node that waits for multiple parents - perfect for fan-in patterns.
Parameters:
- parents All nodes that must complete first
- work What to do after all parents finish
- name Debug label for the continuation node
- executionType Where this node should execute (default: AnyThread)
Return: Handle to the newly created continuation node
Convenience for creating a node that waits for multiple parents. Runs only after ALL parents complete successfully.
// Parallel processing with mergeauto part1 = graph.addNode([]{ processPart1(); });auto part2 = graph.addNode([]{ processPart2(); });auto part3 = graph.addNode([]{ processPart3(); });
// Single merge pointauto merge = graph.addContinuation( {part1, part2, part3}, []{ mergeResults(); }, "merger");
// Main thread UI update after mergeauto uiUpdate = graph.addContinuation( {merge}, []{ updateUI(); }, "ui-updater", ExecutionType::MainThread);function WorkGraph
Section titled “function WorkGraph”explicit WorkGraph( WorkContractGroup * workContractGroup)Creates a work graph backed by your thread pool.
Parameters:
- workContractGroup Your thread pool for executing work (must outlive the graph)
The graph doesn’t own the WorkContractGroup - just uses it to schedule work. Multiple graphs can share the same thread pool.
// Typical setupWorkContractGroup threadPool(1024); // Shared thread poolWorkGraph pipeline1(&threadPool); // Asset processing pipelineWorkGraph pipeline2(&threadPool); // Data analysis pipeline// Both pipelines share the same threads!function WorkGraph
Section titled “function WorkGraph”WorkGraph( WorkContractGroup * workContractGroup, const WorkGraphConfig & config)Creates a work graph with custom behavior options.
Parameters:
- workContractGroup Your thread pool for executing work
- config Tuning knobs and feature flags
Use for advanced features like events, state management, or custom allocation.
// Example: Graph with event notifications for monitoringWorkGraphConfig config;config.enableEvents = true;config.expectedNodeCount = 1000; // Pre-allocate for performanceconfig.maxDeferredNodes = 100; // Limit queue size
WorkGraph monitoredGraph(&threadPool, config);
// Now you can subscribe to eventsmonitoredGraph.getEventBus()->subscribe<NodeCompletedEvent>( [](const NodeCompletedEvent& e) { LOG_INFO("Node completed in {}ms", chrono::duration_cast<chrono::milliseconds>(e.executionTime).count()); });Updated on 2026-01-26 at 16:50:32 -0500