fortrangoingonforty/afs-ld / 4096883

Browse files

Parse archive members in parallel

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
4096883ff4fcf4de32b2e04ac99c2fe761d5b038
Parents
ba308a5
Tree
b2a1022

2 changed files

StatusFile+-
M src/resolve.rs 223 35
M tests/determinism.rs 99 5
src/resolve.rsmodified
@@ -15,9 +15,11 @@
15
 //! happened (inserted / replaced / kept / pending archive fetch), and they
15
 //! happened (inserted / replaced / kept / pending archive fetch), and they
16
 //! drive the outer loop.
16
 //! drive the outer loop.
17
 
17
 
18
-use std::collections::HashMap;
18
+use std::collections::{HashMap, HashSet, VecDeque};
19
-use std::path::PathBuf;
19
+use std::path::{Path, PathBuf};
20
 use std::rc::Rc;
20
 use std::rc::Rc;
21
+use std::sync::{mpsc, Arc, Mutex};
22
+use std::thread;
21
 
23
 
22
 use crate::archive::{Archive, ArchiveError};
24
 use crate::archive::{Archive, ArchiveError};
23
 use crate::input::ObjectFile;
25
 use crate::input::ObjectFile;
@@ -156,7 +158,7 @@ pub struct ArchiveInput {
156
     /// the fixed-point loop from re-ingesting the same object twice —
158
     /// the fixed-point loop from re-ingesting the same object twice —
157
     /// important both for correctness (no duplicate-strong errors from
159
     /// important both for correctness (no duplicate-strong errors from
158
     /// our own symbols) and for keeping transitions deterministic.
160
     /// our own symbols) and for keeping transitions deterministic.
159
-    pub fetched: std::collections::HashSet<u32>,
161
+    pub fetched: HashSet<u32>,
160
 }
162
 }
161
 
163
 
162
 #[derive(Debug)]
164
 #[derive(Debug)]
@@ -1168,47 +1170,164 @@ pub struct DrainReport {
1168
     pub referrers: ReferrerLog,
1170
     pub referrers: ReferrerLog,
1169
 }
1171
 }
1170
 
1172
 
1173
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1174
+struct ArchiveMemberKey {
1175
+    archive: ArchiveId,
1176
+    member: MemberId,
1177
+}
1178
+
1179
+#[derive(Clone, Copy)]
1180
+struct ArchiveMemberLoadJob<'a> {
1181
+    index: usize,
1182
+    key: ArchiveMemberKey,
1183
+    archive_path: &'a Path,
1184
+    archive_bytes: &'a [u8],
1185
+    archive_load_order: usize,
1186
+}
1187
+
1188
+struct LoadedArchiveMember {
1189
+    key: ArchiveMemberKey,
1190
+    archive_load_order: usize,
1191
+    logical_path: PathBuf,
1192
+    bytes: Vec<u8>,
1193
+    parsed: ObjectFile,
1194
+}
1195
+
1196
+fn make_archive_member_jobs<'a>(
1197
+    inputs: &'a Inputs,
1198
+    keys: Vec<ArchiveMemberKey>,
1199
+) -> Vec<ArchiveMemberLoadJob<'a>> {
1200
+    keys.into_iter()
1201
+        .enumerate()
1202
+        .map(|(index, key)| {
1203
+            let archive = &inputs.archives[key.archive.0 as usize];
1204
+            ArchiveMemberLoadJob {
1205
+                index,
1206
+                key,
1207
+                archive_path: &archive.path,
1208
+                archive_bytes: &archive.bytes,
1209
+                archive_load_order: archive.load_order,
1210
+            }
1211
+        })
1212
+        .collect()
1213
+}
1214
+
1215
+fn load_archive_members_parallel(
1216
+    inputs: &Inputs,
1217
+    keys: Vec<ArchiveMemberKey>,
1218
+) -> Vec<(ArchiveMemberKey, Result<LoadedArchiveMember, FetchError>)> {
1219
+    let jobs = make_archive_member_jobs(inputs, keys);
1220
+    if jobs.is_empty() {
1221
+        return Vec::new();
1222
+    }
1223
+    let job_count = thread::available_parallelism()
1224
+        .map(usize::from)
1225
+        .unwrap_or(1)
1226
+        .min(jobs.len())
1227
+        .max(1);
1228
+    if job_count == 1 {
1229
+        return jobs
1230
+            .into_iter()
1231
+            .map(load_archive_member_job)
1232
+            .map(|(_, key, result)| (key, result))
1233
+            .collect();
1234
+    }
1235
+
1236
+    let queue = Arc::new(Mutex::new(VecDeque::from(jobs)));
1237
+    let (tx, rx) = mpsc::channel();
1238
+    let mut results = thread::scope(|scope| {
1239
+        for _ in 0..job_count {
1240
+            let queue = Arc::clone(&queue);
1241
+            let tx = tx.clone();
1242
+            scope.spawn(move || loop {
1243
+                let Some(job) = queue
1244
+                    .lock()
1245
+                    .expect("archive member load queue mutex poisoned")
1246
+                    .pop_front()
1247
+                else {
1248
+                    break;
1249
+                };
1250
+                tx.send(load_archive_member_job(job))
1251
+                    .expect("archive member load receiver should stay live");
1252
+            });
1253
+        }
1254
+        drop(tx);
1255
+        rx.into_iter().collect::<Vec<_>>()
1256
+    });
1257
+    results.sort_by_key(|(index, _, _)| *index);
1258
+    results
1259
+        .into_iter()
1260
+        .map(|(_, key, result)| (key, result))
1261
+        .collect()
1262
+}
1263
+
1264
+fn load_archive_member_job(
1265
+    job: ArchiveMemberLoadJob<'_>,
1266
+) -> (
1267
+    usize,
1268
+    ArchiveMemberKey,
1269
+    Result<LoadedArchiveMember, FetchError>,
1270
+) {
1271
+    let result = (|| {
1272
+        let archive = Archive::open(job.archive_path, job.archive_bytes)?;
1273
+        let member =
1274
+            archive
1275
+                .member_at_offset(job.key.member.0)
1276
+                .ok_or(FetchError::MemberNotFound {
1277
+                    archive: job.key.archive,
1278
+                    member: job.key.member,
1279
+                })?;
1280
+        let logical_path =
1281
+            PathBuf::from(format!("{}({})", job.archive_path.display(), member.name));
1282
+        let bytes = member.body.to_vec();
1283
+        let parsed = ObjectFile::parse(&logical_path, &bytes)?;
1284
+        Ok(LoadedArchiveMember {
1285
+            key: job.key,
1286
+            archive_load_order: job.archive_load_order,
1287
+            logical_path,
1288
+            bytes,
1289
+            parsed,
1290
+        })
1291
+    })();
1292
+    (job.index, job.key, result)
1293
+}
1294
+
1295
+fn archive_member_key(pending: PendingFetch) -> ArchiveMemberKey {
1296
+    ArchiveMemberKey {
1297
+        archive: pending.archive,
1298
+        member: pending.member,
1299
+    }
1300
+}
1301
+
1302
+fn archive_member_is_fetched(inputs: &Inputs, key: ArchiveMemberKey) -> bool {
1303
+    inputs.archives[key.archive.0 as usize]
1304
+        .fetched
1305
+        .contains(&key.member.0)
1306
+}
1307
+
1171
 /// Shared ingest: copy one archive member's body into a fresh
1308
 /// Shared ingest: copy one archive member's body into a fresh
1172
 /// `ObjectInput`, mark it fetched, and seed its symbols. Callers either
1309
 /// `ObjectInput`, mark it fetched, and seed its symbols. Callers either
1173
 /// respond to a demand-driven `PendingFetch` or force-pull the member.
1310
 /// respond to a demand-driven `PendingFetch` or force-pull the member.
1174
-fn ingest_member_bytes(
1311
+fn ingest_loaded_member(
1175
     inputs: &mut Inputs,
1312
     inputs: &mut Inputs,
1176
     table: &mut SymbolTable,
1313
     table: &mut SymbolTable,
1177
-    archive_id: ArchiveId,
1314
+    loaded: LoadedArchiveMember,
1178
-    member_id: MemberId,
1179
     report: &mut DrainReport,
1315
     report: &mut DrainReport,
1180
 ) -> Result<Vec<PendingFetch>, FetchError> {
1316
 ) -> Result<Vec<PendingFetch>, FetchError> {
1181
-    let archive_load_order = inputs.archives[archive_id.0 as usize].load_order;
1317
+    if archive_member_is_fetched(inputs, loaded.key) {
1182
-    let ai = &inputs.archives[archive_id.0 as usize];
1183
-    if ai.fetched.contains(&member_id.0) {
1184
         return Ok(Vec::new());
1318
         return Ok(Vec::new());
1185
     }
1319
     }
1186
 
1320
 
1187
-    // Extract owned data before mutating the registry.
1321
+    inputs.archives[loaded.key.archive.0 as usize]
1188
-    let (logical_path, member_bytes) = {
1189
-        let archive = Archive::open(&ai.path, &ai.bytes)?;
1190
-        let member = archive
1191
-            .member_at_offset(member_id.0)
1192
-            .ok_or(FetchError::MemberNotFound {
1193
-                archive: archive_id,
1194
-                member: member_id,
1195
-            })?;
1196
-        let logical = format!("{}({})", ai.path.display(), member.name);
1197
-        (logical, member.body.to_vec())
1198
-    };
1199
-    let logical_path = PathBuf::from(logical_path);
1200
-    let parsed = ObjectFile::parse(&logical_path, &member_bytes)?;
1201
-
1202
-    inputs.archives[archive_id.0 as usize]
1203
         .fetched
1322
         .fetched
1204
-        .insert(member_id.0);
1323
+        .insert(loaded.key.member.0);
1205
     let input_id = InputId(inputs.objects.len() as u32);
1324
     let input_id = InputId(inputs.objects.len() as u32);
1206
     inputs.objects.push(ObjectInput {
1325
     inputs.objects.push(ObjectInput {
1207
-        path: logical_path,
1326
+        path: loaded.logical_path,
1208
-        load_order: archive_load_order,
1327
+        load_order: loaded.archive_load_order,
1209
-        archive_member_offset: Some(member_id.0),
1328
+        archive_member_offset: Some(loaded.key.member.0),
1210
-        bytes: member_bytes,
1329
+        bytes: loaded.bytes,
1211
-        parsed,
1330
+        parsed: loaded.parsed,
1212
     });
1331
     });
1213
     report.fetched_members += 1;
1332
     report.fetched_members += 1;
1214
     report
1333
     report
@@ -1222,6 +1341,23 @@ fn ingest_member_bytes(
1222
     Ok(sub_report.pending_fetches)
1341
     Ok(sub_report.pending_fetches)
1223
 }
1342
 }
1224
 
1343
 
1344
+fn load_and_ingest_member(
1345
+    inputs: &mut Inputs,
1346
+    table: &mut SymbolTable,
1347
+    key: ArchiveMemberKey,
1348
+    report: &mut DrainReport,
1349
+) -> Result<Vec<PendingFetch>, FetchError> {
1350
+    if archive_member_is_fetched(inputs, key) {
1351
+        return Ok(Vec::new());
1352
+    }
1353
+    let loaded = load_archive_members_parallel(inputs, vec![key])
1354
+        .into_iter()
1355
+        .next()
1356
+        .expect("single archive member load should produce one result")
1357
+        .1?;
1358
+    ingest_loaded_member(inputs, table, loaded, report)
1359
+}
1360
+
1225
 /// Pull `pending`'s member only if the symbol slot is still a
1361
 /// Pull `pending`'s member only if the symbol slot is still a
1226
 /// `LazyArchive` (i.e., a strong Defined hasn't superseded it). Returns
1362
 /// `LazyArchive` (i.e., a strong Defined hasn't superseded it). Returns
1227
 /// any new `PendingFetch` entries triggered by the inserted member.
1363
 /// any new `PendingFetch` entries triggered by the inserted member.
@@ -1235,7 +1371,7 @@ fn fetch_and_ingest_one(
1235
     if !slot_is_still_lazy {
1371
     if !slot_is_still_lazy {
1236
         return Ok(Vec::new());
1372
         return Ok(Vec::new());
1237
     }
1373
     }
1238
-    ingest_member_bytes(inputs, table, pending.archive, pending.member, report)
1374
+    load_and_ingest_member(inputs, table, archive_member_key(pending), report)
1239
 }
1375
 }
1240
 
1376
 
1241
 /// Pull every member of one archive (bypasses demand tracking). Respects
1377
 /// Pull every member of one archive (bypasses demand tracking). Respects
@@ -1255,9 +1391,16 @@ pub fn force_load_archive(
1255
             .map(|m| m.header_offset as u32)
1391
             .map(|m| m.header_offset as u32)
1256
             .collect()
1392
             .collect()
1257
     };
1393
     };
1394
+    let keys = member_offsets
1395
+        .into_iter()
1396
+        .map(|offset| ArchiveMemberKey {
1397
+            archive: archive_id,
1398
+            member: MemberId(offset),
1399
+        })
1400
+        .collect();
1258
     let mut queue: Vec<PendingFetch> = Vec::new();
1401
     let mut queue: Vec<PendingFetch> = Vec::new();
1259
-    for offset in member_offsets {
1402
+    for (_, loaded) in load_archive_members_parallel(inputs, keys) {
1260
-        let new = ingest_member_bytes(inputs, table, archive_id, MemberId(offset), report)?;
1403
+        let new = ingest_loaded_member(inputs, table, loaded?, report)?;
1261
         queue.extend(new);
1404
         queue.extend(new);
1262
     }
1405
     }
1263
     while let Some(p) = queue.pop() {
1406
     while let Some(p) = queue.pop() {
@@ -1556,14 +1699,59 @@ pub fn drain_fetches(
1556
     initial: Vec<PendingFetch>,
1699
     initial: Vec<PendingFetch>,
1557
 ) -> Result<DrainReport, FetchError> {
1700
 ) -> Result<DrainReport, FetchError> {
1558
     let mut queue = initial;
1701
     let mut queue = initial;
1702
+    let mut prepared = HashMap::new();
1559
     let mut report = DrainReport::default();
1703
     let mut report = DrainReport::default();
1560
     while let Some(p) = queue.pop() {
1704
     while let Some(p) = queue.pop() {
1561
-        let new_pending = fetch_and_ingest_one(inputs, table, p, &mut report)?;
1705
+        let key = archive_member_key(p);
1706
+        let slot_is_still_lazy = matches!(table.get(p.id), Symbol::LazyArchive { .. });
1707
+        if !slot_is_still_lazy || archive_member_is_fetched(inputs, key) {
1708
+            prepared.remove(&key);
1709
+            continue;
1710
+        }
1711
+        // Parse siblings ahead of time, but only ingest the current stack
1712
+        // entry after re-checking its lazy slot. This keeps member order stable.
1713
+        if !prepared.contains_key(&key) {
1714
+            preparse_pending_fetches(inputs, table, p, &queue, &mut prepared);
1715
+        }
1716
+        let Some(loaded) = prepared.remove(&key) else {
1717
+            continue;
1718
+        };
1719
+        let loaded = loaded?;
1720
+        let slot_is_still_lazy = matches!(table.get(p.id), Symbol::LazyArchive { .. });
1721
+        if !slot_is_still_lazy || archive_member_is_fetched(inputs, key) {
1722
+            continue;
1723
+        }
1724
+        let new_pending = ingest_loaded_member(inputs, table, loaded, &mut report)?;
1562
         queue.extend(new_pending);
1725
         queue.extend(new_pending);
1563
     }
1726
     }
1564
     Ok(report)
1727
     Ok(report)
1565
 }
1728
 }
1566
 
1729
 
1730
+fn preparse_pending_fetches(
1731
+    inputs: &Inputs,
1732
+    table: &SymbolTable,
1733
+    current: PendingFetch,
1734
+    queue: &[PendingFetch],
1735
+    prepared: &mut HashMap<ArchiveMemberKey, Result<LoadedArchiveMember, FetchError>>,
1736
+) {
1737
+    let mut seen = HashSet::new();
1738
+    let mut keys = Vec::new();
1739
+    for pending in std::iter::once(&current).chain(queue.iter().rev()) {
1740
+        let key = archive_member_key(*pending);
1741
+        if prepared.contains_key(&key)
1742
+            || archive_member_is_fetched(inputs, key)
1743
+            || !matches!(table.get(pending.id), Symbol::LazyArchive { .. })
1744
+            || !seen.insert(key)
1745
+        {
1746
+            continue;
1747
+        }
1748
+        keys.push(key);
1749
+    }
1750
+    for (key, result) in load_archive_members_parallel(inputs, keys) {
1751
+        prepared.insert(key, result);
1752
+    }
1753
+}
1754
+
1567
 /// Turn a wire-form `InputSymbol` into a resolver-side `Symbol`. Returns
1755
 /// Turn a wire-form `InputSymbol` into a resolver-side `Symbol`. Returns
1568
 /// `None` for kinds the resolver does not track (currently: aliases with
1756
 /// `None` for kinds the resolver does not track (currently: aliases with
1569
 /// unresolved target strx — Sprint 8's resolver defers those for now).
1757
 /// unresolved target strx — Sprint 8's resolver defers those for now).
tests/determinism.rsmodified
@@ -9,6 +9,7 @@ mod common;
9
 use std::collections::VecDeque;
9
 use std::collections::VecDeque;
10
 use std::fs;
10
 use std::fs;
11
 use std::path::{Path, PathBuf};
11
 use std::path::{Path, PathBuf};
12
+use std::process::Command;
12
 use std::sync::{Arc, Mutex};
13
 use std::sync::{Arc, Mutex};
13
 use std::thread;
14
 use std::thread;
14
 use std::time::{SystemTime, UNIX_EPOCH};
15
 use std::time::{SystemTime, UNIX_EPOCH};
@@ -69,7 +70,86 @@ fn repeated_parallel_links_are_byte_identical() {
69
     .expect("assemble determinism data fixture");
70
     .expect("assemble determinism data fixture");
70
 
71
 
71
     let inputs = vec![main_obj, helper_obj, data_obj];
72
     let inputs = vec![main_obj, helper_obj, data_obj];
72
-    let baseline = link_once(&inputs, &root, "baseline").expect("baseline deterministic link");
73
+    assert_repeated_links_identical(inputs, &root, "objects");
74
+
75
+    let _ = fs::remove_dir_all(root);
76
+}
77
+
78
+#[test]
79
+fn repeated_parallel_archive_fetches_are_byte_identical() {
80
+    if !have_xcrun() || !have_xcrun_tool("as") {
81
+        eprintln!("skipping: xcrun as unavailable");
82
+        return;
83
+    }
84
+
85
+    let root = unique_temp_dir("archive-determinism").expect("create archive determinism temp dir");
86
+    let main_obj = root.join("main.o");
87
+    assemble(
88
+        "\
89
+        .section __TEXT,__text,regular,pure_instructions\n\
90
+        .globl _main\n\
91
+        _main:\n\
92
+            bl _helper_a\n\
93
+            bl _helper_b\n\
94
+            mov w0, #0\n\
95
+            ret\n\
96
+\n\
97
+        .subsections_via_symbols\n",
98
+        &main_obj,
99
+    )
100
+    .expect("assemble archive determinism main fixture");
101
+    let helper_a_obj = root.join("helper_a.o");
102
+    assemble(
103
+        "\
104
+        .section __TEXT,__text,regular,pure_instructions\n\
105
+        .globl _helper_a\n\
106
+        _helper_a:\n\
107
+            ret\n\
108
+\n\
109
+        .subsections_via_symbols\n",
110
+        &helper_a_obj,
111
+    )
112
+    .expect("assemble archive determinism helper_a fixture");
113
+    let helper_b_obj = root.join("helper_b.o");
114
+    assemble(
115
+        "\
116
+        .section __TEXT,__text,regular,pure_instructions\n\
117
+        .globl _helper_b\n\
118
+        _helper_b:\n\
119
+            ret\n\
120
+\n\
121
+        .subsections_via_symbols\n",
122
+        &helper_b_obj,
123
+    )
124
+    .expect("assemble archive determinism helper_b fixture");
125
+    let unused_obj = root.join("unused.o");
126
+    assemble(
127
+        "\
128
+        .section __TEXT,__text,regular,pure_instructions\n\
129
+        .globl _unused\n\
130
+        _unused:\n\
131
+            ret\n\
132
+\n\
133
+        .subsections_via_symbols\n",
134
+        &unused_obj,
135
+    )
136
+    .expect("assemble archive determinism unused fixture");
137
+
138
+    let archive_path = root.join("libhelpers.a");
139
+    if let Err(error) = archive(&[helper_a_obj, helper_b_obj, unused_obj], &archive_path) {
140
+        eprintln!("skipping: archive failed: {error}");
141
+        let _ = fs::remove_dir_all(root);
142
+        return;
143
+    }
144
+
145
+    assert_repeated_links_identical(vec![main_obj, archive_path], &root, "archive");
146
+
147
+    let _ = fs::remove_dir_all(root);
148
+}
149
+
150
+fn assert_repeated_links_identical(inputs: Vec<PathBuf>, root: &Path, label: &str) {
151
+    let baseline = link_once(&inputs, root, &format!("{label}-baseline"))
152
+        .expect("baseline deterministic link");
73
     let run_count = determinism_run_count();
153
     let run_count = determinism_run_count();
74
     let jobs = determinism_jobs(run_count);
154
     let jobs = determinism_jobs(run_count);
75
     let queue = Arc::new(Mutex::new((0..run_count).collect::<VecDeque<_>>()));
155
     let queue = Arc::new(Mutex::new((0..run_count).collect::<VecDeque<_>>()));
@@ -80,7 +160,6 @@ fn repeated_parallel_links_are_byte_identical() {
80
             let queue = Arc::clone(&queue);
160
             let queue = Arc::clone(&queue);
81
             let errors = Arc::clone(&errors);
161
             let errors = Arc::clone(&errors);
82
             let baseline = baseline.clone();
162
             let baseline = baseline.clone();
83
-            let root = root.clone();
84
             let inputs = inputs.clone();
163
             let inputs = inputs.clone();
85
             scope.spawn(move || loop {
164
             scope.spawn(move || loop {
86
                 let Some(index) = queue
165
                 let Some(index) = queue
@@ -90,7 +169,7 @@ fn repeated_parallel_links_are_byte_identical() {
90
                 else {
169
                 else {
91
                     break;
170
                     break;
92
                 };
171
                 };
93
-                match link_once(&inputs, &root, &format!("run-{index:03}")) {
172
+                match link_once(&inputs, root, &format!("{label}-run-{index:03}")) {
94
                     Ok(bytes) if bytes == baseline => {}
173
                     Ok(bytes) if bytes == baseline => {}
95
                     Ok(bytes) => errors
174
                     Ok(bytes) => errors
96
                         .lock()
175
                         .lock()
@@ -118,8 +197,6 @@ fn repeated_parallel_links_are_byte_identical() {
118
         "parallel deterministic links diverged:\n{}",
197
         "parallel deterministic links diverged:\n{}",
119
         errors.join("\n")
198
         errors.join("\n")
120
     );
199
     );
121
-
122
-    let _ = fs::remove_dir_all(root);
123
 }
200
 }
124
 
201
 
125
 fn link_once(inputs: &[PathBuf], root: &Path, run_name: &str) -> Result<Vec<u8>, String> {
202
 fn link_once(inputs: &[PathBuf], root: &Path, run_name: &str) -> Result<Vec<u8>, String> {
@@ -136,6 +213,23 @@ fn link_once(inputs: &[PathBuf], root: &Path, run_name: &str) -> Result<Vec<u8>,
136
     fs::read(&out).map_err(|e| format!("read {}: {e}", out.display()))
213
     fs::read(&out).map_err(|e| format!("read {}: {e}", out.display()))
137
 }
214
 }
138
 
215
 
216
+fn archive(objects: &[PathBuf], out: &Path) -> Result<(), String> {
217
+    let output = Command::new("libtool")
218
+        .arg("-static")
219
+        .arg("-o")
220
+        .arg(out)
221
+        .args(objects)
222
+        .output()
223
+        .map_err(|e| format!("spawn libtool: {e}"))?;
224
+    if !output.status.success() {
225
+        return Err(format!(
226
+            "libtool failed: {}",
227
+            String::from_utf8_lossy(&output.stderr)
228
+        ));
229
+    }
230
+    Ok(())
231
+}
232
+
139
 fn determinism_run_count() -> usize {
233
 fn determinism_run_count() -> usize {
140
     std::env::var("AFS_LD_DETERMINISM_RUNS")
234
     std::env::var("AFS_LD_DETERMINISM_RUNS")
141
         .ok()
235
         .ok()