ra_salsa/runtime/
dependency_graph.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
use triomphe::Arc;

use crate::{DatabaseKeyIndex, RuntimeId};
use parking_lot::{Condvar, MutexGuard};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;

use super::{ActiveQuery, WaitResult};

type QueryStack = Vec<ActiveQuery>;

#[derive(Debug, Default)]
pub(super) struct DependencyGraph {
    /// A `(K -> V)` pair in this map indicates that the runtime
    /// `K` is blocked on some query executing in the runtime `V`.
    /// This encodes a graph that must be acyclic (or else deadlock
    /// will result).
    edges: FxHashMap<RuntimeId, Edge>,

    /// Encodes the `RuntimeId` that are blocked waiting for the result
    /// of a given query.
    query_dependents: FxHashMap<DatabaseKeyIndex, SmallVec<[RuntimeId; 4]>>,

    /// When a key K completes which had dependent queries Qs blocked on it,
    /// it stores its `WaitResult` here. As they wake up, each query Q in Qs will
    /// come here to fetch their results.
    wait_results: FxHashMap<RuntimeId, (QueryStack, WaitResult)>,
}

#[derive(Debug)]
struct Edge {
    blocked_on_id: RuntimeId,
    blocked_on_key: DatabaseKeyIndex,
    stack: QueryStack,

    /// Signalled whenever a query with dependents completes.
    /// Allows those dependents to check if they are ready to unblock.
    condvar: Arc<parking_lot::Condvar>,
}

impl DependencyGraph {
    /// True if `from_id` depends on `to_id`.
    ///
    /// (i.e., there is a path from `from_id` to `to_id` in the graph.)
    pub(super) fn depends_on(&mut self, from_id: RuntimeId, to_id: RuntimeId) -> bool {
        let mut p = from_id;
        while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) {
            if q == to_id {
                return true;
            }

            p = q;
        }
        p == to_id
    }

    /// Invokes `closure` with a `&mut ActiveQuery` for each query that participates in the cycle.
    /// The cycle runs as follows:
    ///
    /// 1. The runtime `from_id`, which has the stack `from_stack`, would like to invoke `database_key`...
    /// 2. ...but `database_key` is already being executed by `to_id`...
    /// 3. ...and `to_id` is transitively dependent on something which is present on `from_stack`.
    pub(super) fn for_each_cycle_participant(
        &mut self,
        from_id: RuntimeId,
        from_stack: &mut QueryStack,
        database_key: DatabaseKeyIndex,
        to_id: RuntimeId,
        mut closure: impl FnMut(&mut [ActiveQuery]),
    ) {
        debug_assert!(self.depends_on(to_id, from_id));

        // To understand this algorithm, consider this [drawing](https://is.gd/TGLI9v):
        //
        //    database_key = QB2
        //    from_id = A
        //    to_id = B
        //    from_stack = [QA1, QA2, QA3]
        //
        //    self.edges[B] = { C, QC2, [QB1..QB3] }
        //    self.edges[C] = { A, QA2, [QC1..QC3] }
        //
        //         The cyclic
        //         edge we have
        //         failed to add.
        //           :
        //    A      :    B         C
        //           :
        //    QA1    v    QB1       QC1
        // ┌► QA2    ┌──► QB2   ┌─► QC2
        // │  QA3 ───┘    QB3 ──┘   QC3 ───┐
        // │                               │
        // └───────────────────────────────┘
        //
        // Final output: [QB2, QB3, QC2, QC3, QA2, QA3]

        let mut id = to_id;
        let mut key = database_key;
        while id != from_id {
            // Looking at the diagram above, the idea is to
            // take the edge from `to_id` starting at `key`
            // (inclusive) and down to the end. We can then
            // load up the next thread (i.e., we start at B/QB2,
            // and then load up the dependency on C/QC2).
            let edge = self.edges.get_mut(&id).unwrap();
            let prefix = edge.stack.iter_mut().take_while(|p| p.database_key_index != key).count();
            closure(&mut edge.stack[prefix..]);
            id = edge.blocked_on_id;
            key = edge.blocked_on_key;
        }

        // Finally, we copy in the results from `from_stack`.
        let prefix = from_stack.iter_mut().take_while(|p| p.database_key_index != key).count();
        closure(&mut from_stack[prefix..]);
    }

    /// Unblock each blocked runtime (excluding the current one) if some
    /// query executing in that runtime is participating in cycle fallback.
    ///
    /// Returns a boolean (Current, Others) where:
    /// * Current is true if the current runtime has cycle participants
    ///   with fallback;
    /// * Others is true if other runtimes were unblocked.
    pub(super) fn maybe_unblock_runtimes_in_cycle(
        &mut self,
        from_id: RuntimeId,
        from_stack: &QueryStack,
        database_key: DatabaseKeyIndex,
        to_id: RuntimeId,
    ) -> (bool, bool) {
        // See diagram in `for_each_cycle_participant`.
        let mut id = to_id;
        let mut key = database_key;
        let mut others_unblocked = false;
        while id != from_id {
            let edge = self.edges.get(&id).unwrap();
            let prefix = edge.stack.iter().take_while(|p| p.database_key_index != key).count();
            let next_id = edge.blocked_on_id;
            let next_key = edge.blocked_on_key;

            if let Some(cycle) = edge.stack[prefix..].iter().rev().find_map(|aq| aq.cycle.clone()) {
                // Remove `id` from the list of runtimes blocked on `next_key`:
                self.query_dependents.get_mut(&next_key).unwrap().retain(|r| *r != id);

                // Unblock runtime so that it can resume execution once lock is released:
                self.unblock_runtime(id, WaitResult::Cycle(cycle));

                others_unblocked = true;
            }

            id = next_id;
            key = next_key;
        }

        let prefix = from_stack.iter().take_while(|p| p.database_key_index != key).count();
        let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some());

        (this_unblocked, others_unblocked)
    }

    /// Modifies the graph so that `from_id` is blocked
    /// on `database_key`, which is being computed by
    /// `to_id`.
    ///
    /// For this to be reasonable, the lock on the
    /// results table for `database_key` must be held.
    /// This ensures that computing `database_key` doesn't
    /// complete before `block_on` executes.
    ///
    /// Preconditions:
    /// * No path from `to_id` to `from_id`
    ///   (i.e., `me.depends_on(to_id, from_id)` is false)
    /// * `held_mutex` is a read lock (or stronger) on `database_key`
    pub(super) fn block_on<QueryMutexGuard>(
        mut me: MutexGuard<'_, Self>,
        from_id: RuntimeId,
        database_key: DatabaseKeyIndex,
        to_id: RuntimeId,
        from_stack: QueryStack,
        query_mutex_guard: QueryMutexGuard,
    ) -> (QueryStack, WaitResult) {
        let condvar = me.add_edge(from_id, database_key, to_id, from_stack);

        // Release the mutex that prevents `database_key`
        // from completing, now that the edge has been added.
        drop(query_mutex_guard);

        loop {
            if let Some(stack_and_result) = me.wait_results.remove(&from_id) {
                debug_assert!(!me.edges.contains_key(&from_id));
                return stack_and_result;
            }
            condvar.wait(&mut me);
        }
    }

    /// Helper for `block_on`: performs actual graph modification
    /// to add a dependency edge from `from_id` to `to_id`, which is
    /// computing `database_key`.
    fn add_edge(
        &mut self,
        from_id: RuntimeId,
        database_key: DatabaseKeyIndex,
        to_id: RuntimeId,
        from_stack: QueryStack,
    ) -> Arc<parking_lot::Condvar> {
        assert_ne!(from_id, to_id);
        debug_assert!(!self.edges.contains_key(&from_id));
        debug_assert!(!self.depends_on(to_id, from_id));

        let condvar = Arc::new(Condvar::new());
        self.edges.insert(
            from_id,
            Edge {
                blocked_on_id: to_id,
                blocked_on_key: database_key,
                stack: from_stack,
                condvar: condvar.clone(),
            },
        );
        self.query_dependents.entry(database_key).or_default().push(from_id);
        condvar
    }

    /// Invoked when runtime `to_id` completes executing
    /// `database_key`.
    pub(super) fn unblock_runtimes_blocked_on(
        &mut self,
        database_key: DatabaseKeyIndex,
        wait_result: WaitResult,
    ) {
        let dependents = self.query_dependents.remove(&database_key).unwrap_or_default();

        for from_id in dependents {
            self.unblock_runtime(from_id, wait_result.clone());
        }
    }

    /// Unblock the runtime with the given id with the given wait-result.
    /// This will cause it resume execution (though it will have to grab
    /// the lock on this data structure first, to recover the wait result).
    fn unblock_runtime(&mut self, id: RuntimeId, wait_result: WaitResult) {
        let edge = self.edges.remove(&id).expect("not blocked");
        self.wait_results.insert(id, (edge.stack, wait_result));

        // Now that we have inserted the `wait_results`,
        // notify the thread.
        edge.condvar.notify_one();
    }
}