fortrangoingonforty/afs-ld / ba308a5

Browse files

Parse object inputs in parallel

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
ba308a5c41e51374d36db5171da936c476189d4e
Parents
01ed08d
Tree
aac1dfb

5 changed files

StatusFile+-
M src/lib.rs 184 10
M src/resolve.rs 34 11
M tests/atom_integration.rs 3 9
M tests/determinism.rs 32 10
M tests/perf_baseline.rs 11 9
src/lib.rsmodified
@@ -26,11 +26,15 @@ pub mod why_live;
2626
 
2727
 use std::os::unix::fs::PermissionsExt;
2828
 use std::path::PathBuf;
29
+use std::sync::{mpsc, Arc, Mutex};
30
+use std::thread;
2931
 use std::time::{Duration, Instant};
30
-use std::{fs, io};
32
+use std::{collections::VecDeque, fs, io};
3133
 
34
+use archive::Archive;
3235
 use atom::{atomize_object, backpatch_symbol_atoms, AtomTable};
3336
 use icf::IcfError;
37
+use input::ObjectFile;
3438
 use layout::{ExtraLayoutSections, Layout, LayoutInput};
3539
 use macho::dylib::{DylibDependency, DylibFile, DylibLoadKind};
3640
 use macho::reader::ReadError;
@@ -494,6 +498,7 @@ impl Linker {
494498
 
495499
         let mut inputs = Inputs::new();
496500
         let mut deferred_dylibs = Vec::new();
501
+        let mut initial_loads = Vec::new();
497502
         let phase_started = Instant::now();
498503
         for (load_order, path) in load_paths.iter().enumerate() {
499504
             if matches!(
@@ -506,7 +511,10 @@ impl Linker {
506511
             if opts.trace_inputs {
507512
                 eprintln!("afs-ld: loading {}", path.display());
508513
             }
509
-            let timings = register_input(&mut inputs, path, load_order, true)?;
514
+            initial_loads.push((load_order, path.clone()));
515
+        }
516
+        for loaded in load_initial_inputs(initial_loads)? {
517
+            let timings = register_loaded_initial_input(&mut inputs, loaded);
510518
             phases.add_input_load(timings);
511519
         }
512520
         let include_tbd_exports = inputs_may_need_dylib_exports(&inputs)?;
@@ -594,14 +602,8 @@ impl Linker {
594602
         for idx in 0..inputs.objects.len() {
595603
             let input_id = resolve::InputId(idx as u32);
596604
             let obj = inputs.object_file(input_id)?;
597
-            let atomization = atomize_object(input_id, &obj, &mut atom_table);
598
-            backpatch_symbol_atoms(
599
-                &atomization,
600
-                input_id,
601
-                &obj,
602
-                &mut sym_table,
603
-                &mut atom_table,
604
-            );
605
+            let atomization = atomize_object(input_id, obj, &mut atom_table);
606
+            backpatch_symbol_atoms(&atomization, input_id, obj, &mut sym_table, &mut atom_table);
605607
             objects.push((input_id, obj));
606608
         }
607609
         phases.atomization = phase_started.elapsed();
@@ -955,6 +957,178 @@ fn default_output_path(opts: &LinkOptions) -> PathBuf {
955957
         .unwrap_or_else(|| PathBuf::from("a.out"))
956958
 }
957959
 
960
+struct LoadedObjectInput {
961
+    path: PathBuf,
962
+    load_order: usize,
963
+    bytes: Vec<u8>,
964
+    parsed: ObjectFile,
965
+    timings: InputLoadTimings,
966
+}
967
+
968
+struct LoadedArchiveInput {
969
+    path: PathBuf,
970
+    load_order: usize,
971
+    bytes: Vec<u8>,
972
+    timings: InputLoadTimings,
973
+}
974
+
975
+enum LoadedInitialInput {
976
+    Object(Box<LoadedObjectInput>),
977
+    Archive(LoadedArchiveInput),
978
+}
979
+
980
+impl LoadedInitialInput {
981
+    fn load_order(&self) -> usize {
982
+        match self {
983
+            LoadedInitialInput::Object(input) => input.load_order,
984
+            LoadedInitialInput::Archive(input) => input.load_order,
985
+        }
986
+    }
987
+}
988
+
989
+struct InitialLoadError {
990
+    load_order: usize,
991
+    error: LinkError,
992
+}
993
+
994
+fn load_initial_inputs(loads: Vec<(usize, PathBuf)>) -> Result<Vec<LoadedInitialInput>, LinkError> {
995
+    let mut results = Vec::new();
996
+    let mut object_jobs = Vec::new();
997
+    for (load_order, path) in loads {
998
+        if matches!(path.extension().and_then(|ext| ext.to_str()), Some("a")) {
999
+            results.push(load_archive_input(path, load_order));
1000
+        } else {
1001
+            object_jobs.push((load_order, path));
1002
+        }
1003
+    }
1004
+    results.extend(load_objects_parallel(object_jobs));
1005
+    results.sort_by_key(|result| match result {
1006
+        Ok(input) => input.load_order(),
1007
+        Err(error) => error.load_order,
1008
+    });
1009
+
1010
+    let mut loaded = Vec::with_capacity(results.len());
1011
+    for result in results {
1012
+        match result {
1013
+            Ok(input) => loaded.push(input),
1014
+            Err(error) => return Err(error.error),
1015
+        }
1016
+    }
1017
+    Ok(loaded)
1018
+}
1019
+
1020
+fn load_objects_parallel(
1021
+    jobs: Vec<(usize, PathBuf)>,
1022
+) -> Vec<Result<LoadedInitialInput, InitialLoadError>> {
1023
+    if jobs.is_empty() {
1024
+        return Vec::new();
1025
+    }
1026
+    let job_count = thread::available_parallelism()
1027
+        .map(usize::from)
1028
+        .unwrap_or(1)
1029
+        .min(jobs.len())
1030
+        .max(1);
1031
+    if job_count == 1 {
1032
+        return jobs
1033
+            .into_iter()
1034
+            .map(|(load_order, path)| load_object_input(path, load_order))
1035
+            .collect();
1036
+    }
1037
+
1038
+    let queue = Arc::new(Mutex::new(VecDeque::from(jobs)));
1039
+    let (tx, rx) = mpsc::channel();
1040
+    thread::scope(|scope| {
1041
+        for _ in 0..job_count {
1042
+            let queue = Arc::clone(&queue);
1043
+            let tx = tx.clone();
1044
+            scope.spawn(move || loop {
1045
+                let Some((load_order, path)) = queue
1046
+                    .lock()
1047
+                    .expect("input load queue mutex poisoned")
1048
+                    .pop_front()
1049
+                else {
1050
+                    break;
1051
+                };
1052
+                tx.send(load_object_input(path, load_order))
1053
+                    .expect("input load receiver should stay live");
1054
+            });
1055
+        }
1056
+        drop(tx);
1057
+        rx.into_iter().collect()
1058
+    })
1059
+}
1060
+
1061
+fn load_object_input(
1062
+    path: PathBuf,
1063
+    load_order: usize,
1064
+) -> Result<LoadedInitialInput, InitialLoadError> {
1065
+    let mut timings = InputLoadTimings::default();
1066
+    let phase_started = Instant::now();
1067
+    let bytes = fs::read(&path).map_err(|error| InitialLoadError {
1068
+        load_order,
1069
+        error: LinkError::Io(error),
1070
+    })?;
1071
+    timings.read = phase_started.elapsed();
1072
+
1073
+    let phase_started = Instant::now();
1074
+    let parsed = ObjectFile::parse(&path, &bytes).map_err(|error| InitialLoadError {
1075
+        load_order,
1076
+        error: LinkError::from(error),
1077
+    })?;
1078
+    timings.object_parse = phase_started.elapsed();
1079
+
1080
+    Ok(LoadedInitialInput::Object(Box::new(LoadedObjectInput {
1081
+        path,
1082
+        load_order,
1083
+        bytes,
1084
+        parsed,
1085
+        timings,
1086
+    })))
1087
+}
1088
+
1089
+fn load_archive_input(
1090
+    path: PathBuf,
1091
+    load_order: usize,
1092
+) -> Result<LoadedInitialInput, InitialLoadError> {
1093
+    let mut timings = InputLoadTimings::default();
1094
+    let phase_started = Instant::now();
1095
+    let bytes = fs::read(&path).map_err(|error| InitialLoadError {
1096
+        load_order,
1097
+        error: LinkError::Io(error),
1098
+    })?;
1099
+    timings.read = phase_started.elapsed();
1100
+
1101
+    let phase_started = Instant::now();
1102
+    Archive::open(&path, &bytes).map_err(|error| InitialLoadError {
1103
+        load_order,
1104
+        error: LinkError::from(InputAddError::from(error)),
1105
+    })?;
1106
+    timings.archive_parse = phase_started.elapsed();
1107
+
1108
+    Ok(LoadedInitialInput::Archive(LoadedArchiveInput {
1109
+        path,
1110
+        load_order,
1111
+        bytes,
1112
+        timings,
1113
+    }))
1114
+}
1115
+
1116
+fn register_loaded_initial_input(
1117
+    inputs: &mut Inputs,
1118
+    loaded: LoadedInitialInput,
1119
+) -> InputLoadTimings {
1120
+    match loaded {
1121
+        LoadedInitialInput::Object(input) => {
1122
+            inputs.add_parsed_object(input.path, input.bytes, input.parsed, input.load_order);
1123
+            input.timings
1124
+        }
1125
+        LoadedInitialInput::Archive(input) => {
1126
+            inputs.add_validated_archive(input.path, input.bytes, input.load_order);
1127
+            input.timings
1128
+        }
1129
+    }
1130
+}
1131
+
9581132
 fn register_input(
9591133
     inputs: &mut Inputs,
9601134
     path: &std::path::Path,
src/resolve.rsmodified
@@ -140,10 +140,11 @@ pub struct ObjectInput {
140140
     pub path: PathBuf,
141141
     pub load_order: usize,
142142
     pub archive_member_offset: Option<u32>,
143
-    /// Raw bytes; `ObjectFile::parse` re-runs cheaply against this on
144
-    /// demand. We don't cache a parsed view because `ObjectFile` copies
145
-    /// the fields it needs on construction, so re-parse is idempotent.
143
+    /// Raw bytes retained for diagnostics and future low-level readers.
146144
     pub bytes: Vec<u8>,
145
+    /// Parsed object view. It owns section/relocation/string-table buffers,
146
+    /// so it is safe to build off-thread and then borrow during the link.
147
+    pub parsed: ObjectFile,
147148
 }
148149
 
149150
 #[derive(Debug)]
@@ -239,15 +240,26 @@ impl Inputs {
239240
         load_order: usize,
240241
     ) -> Result<InputId, InputAddError> {
241242
         // Validate now — we'd rather catch a bad object at the add site.
242
-        ObjectFile::parse(&path, &bytes)?;
243
+        let parsed = ObjectFile::parse(&path, &bytes)?;
244
+        Ok(self.add_parsed_object(path, bytes, parsed, load_order))
245
+    }
246
+
247
+    pub fn add_parsed_object(
248
+        &mut self,
249
+        path: PathBuf,
250
+        bytes: Vec<u8>,
251
+        parsed: ObjectFile,
252
+        load_order: usize,
253
+    ) -> InputId {
243254
         let id = InputId(self.objects.len() as u32);
244255
         self.objects.push(ObjectInput {
245256
             path,
246257
             load_order,
247258
             archive_member_offset: None,
248259
             bytes,
260
+            parsed,
249261
         });
250
-        Ok(id)
262
+        id
251263
     }
252264
 
253265
     /// Register an `.a` file.
@@ -258,6 +270,15 @@ impl Inputs {
258270
         load_order: usize,
259271
     ) -> Result<ArchiveId, InputAddError> {
260272
         Archive::open(&path, &bytes)?; // validate
273
+        Ok(self.add_validated_archive(path, bytes, load_order))
274
+    }
275
+
276
+    pub fn add_validated_archive(
277
+        &mut self,
278
+        path: PathBuf,
279
+        bytes: Vec<u8>,
280
+        load_order: usize,
281
+    ) -> ArchiveId {
261282
         let id = ArchiveId(self.archives.len() as u32);
262283
         self.archives.push(ArchiveInput {
263284
             path,
@@ -265,7 +286,7 @@ impl Inputs {
265286
             bytes,
266287
             fetched: std::collections::HashSet::new(),
267288
         });
268
-        Ok(id)
289
+        id
269290
     }
270291
 
271292
     /// Register a `.dylib`. TBD-backed dylibs go through
@@ -344,11 +365,10 @@ impl Inputs {
344365
         &self.dylibs[id.0 as usize]
345366
     }
346367
 
347
-    /// Parse an `ObjectFile` view of a registered object. Fast — `ObjectFile`
348
-    /// owns its buffers, so this is just the Mach-O walk cost.
349
-    pub fn object_file(&self, id: InputId) -> Result<ObjectFile, ReadError> {
368
+    /// Borrow a parsed `ObjectFile` view of a registered object.
369
+    pub fn object_file(&self, id: InputId) -> Result<&ObjectFile, ReadError> {
350370
         let o = &self.objects[id.0 as usize];
351
-        ObjectFile::parse(&o.path, &o.bytes)
371
+        Ok(&o.parsed)
352372
     }
353373
 
354374
     /// Open an `Archive` view, borrowing from the registry's bytes for the
@@ -1176,16 +1196,19 @@ fn ingest_member_bytes(
11761196
         let logical = format!("{}({})", ai.path.display(), member.name);
11771197
         (logical, member.body.to_vec())
11781198
     };
1199
+    let logical_path = PathBuf::from(logical_path);
1200
+    let parsed = ObjectFile::parse(&logical_path, &member_bytes)?;
11791201
 
11801202
     inputs.archives[archive_id.0 as usize]
11811203
         .fetched
11821204
         .insert(member_id.0);
11831205
     let input_id = InputId(inputs.objects.len() as u32);
11841206
     inputs.objects.push(ObjectInput {
1185
-        path: PathBuf::from(logical_path),
1207
+        path: logical_path,
11861208
         load_order: archive_load_order,
11871209
         archive_member_offset: Some(member_id.0),
11881210
         bytes: member_bytes,
1211
+        parsed,
11891212
     });
11901213
     report.fetched_members += 1;
11911214
     report
tests/atom_integration.rsmodified
@@ -107,14 +107,8 @@ fn atomize_splits_text_at_symbol_boundaries_and_backpatches_symbols() {
107107
     // Atomize + back-patch.
108108
     let obj = inputs.object_file(input_id).unwrap();
109109
     let mut atom_table = AtomTable::new();
110
-    let atomization = atomize_object(input_id, &obj, &mut atom_table);
111
-    backpatch_symbol_atoms(
112
-        &atomization,
113
-        input_id,
114
-        &obj,
115
-        &mut sym_table,
116
-        &mut atom_table,
117
-    );
110
+    let atomization = atomize_object(input_id, obj, &mut atom_table);
111
+    backpatch_symbol_atoms(&atomization, input_id, obj, &mut sym_table, &mut atom_table);
118112
 
119113
     // At least one atom per defined function plus one for data_global.
120114
     assert!(
@@ -209,7 +203,7 @@ fn atomize_cstring_splits_at_null_terminators() {
209203
     let _ = seed_all(&inputs, &mut sym_table).expect("seed_all");
210204
     let obj = inputs.object_file(input_id).unwrap();
211205
     let mut atom_table = AtomTable::new();
212
-    let _atomization = atomize_object(input_id, &obj, &mut atom_table);
206
+    let _atomization = atomize_object(input_id, obj, &mut atom_table);
213207
 
214208
     let cstring_atoms: Vec<_> = atom_table
215209
         .iter()
tests/determinism.rsmodified
@@ -1,8 +1,8 @@
11
 //! Sprint 28 determinism guardrails.
22
 //!
33
 //! Parallel speedups are only safe if they never perturb the final image. This
4
-//! test repeatedly links one synthetic-section-heavy executable and requires
5
-//! byte-identical output across concurrent runs.
4
+//! test repeatedly links a multi-object executable and requires byte-identical
5
+//! output across concurrent runs.
66
 
77
 mod common;
88
 
@@ -26,28 +26,50 @@ fn repeated_parallel_links_are_byte_identical() {
2626
     }
2727
 
2828
     let root = unique_temp_dir("determinism").expect("create determinism temp dir");
29
-    let obj = root.join("input.o");
29
+    let main_obj = root.join("main.o");
3030
     assemble(
3131
         "\
3232
         .section __TEXT,__text,regular,pure_instructions\n\
3333
         .globl _main\n\
3434
         _main:\n\
35
+            bl _helper\n\
3536
             adrp x8, _value@GOTPAGE\n\
3637
             ldr x8, [x8, _value@GOTPAGEOFF]\n\
3738
             ldr w0, [x8]\n\
3839
             ret\n\
3940
 \n\
41
+        .subsections_via_symbols\n",
42
+        &main_obj,
43
+    )
44
+    .expect("assemble determinism main fixture");
45
+    let helper_obj = root.join("helper.o");
46
+    assemble(
47
+        "\
48
+        .section __TEXT,__text,regular,pure_instructions\n\
49
+        .globl _helper\n\
50
+        _helper:\n\
51
+            ret\n\
52
+\n\
53
+        .subsections_via_symbols\n",
54
+        &helper_obj,
55
+    )
56
+    .expect("assemble determinism helper fixture");
57
+    let data_obj = root.join("data.o");
58
+    assemble(
59
+        "\
4060
         .section __DATA,__data\n\
4161
         .globl _value\n\
4262
         .p2align 2\n\
4363
         _value:\n\
4464
             .long 7\n\
65
+\n\
4566
         .subsections_via_symbols\n",
46
-        &obj,
67
+        &data_obj,
4768
     )
48
-    .expect("assemble determinism fixture");
69
+    .expect("assemble determinism data fixture");
4970
 
50
-    let baseline = link_once(&obj, &root, "baseline").expect("baseline deterministic link");
71
+    let inputs = vec![main_obj, helper_obj, data_obj];
72
+    let baseline = link_once(&inputs, &root, "baseline").expect("baseline deterministic link");
5173
     let run_count = determinism_run_count();
5274
     let jobs = determinism_jobs(run_count);
5375
     let queue = Arc::new(Mutex::new((0..run_count).collect::<VecDeque<_>>()));
@@ -59,7 +81,7 @@ fn repeated_parallel_links_are_byte_identical() {
5981
             let errors = Arc::clone(&errors);
6082
             let baseline = baseline.clone();
6183
             let root = root.clone();
62
-            let obj = obj.clone();
84
+            let inputs = inputs.clone();
6385
             scope.spawn(move || loop {
6486
                 let Some(index) = queue
6587
                     .lock()
@@ -68,7 +90,7 @@ fn repeated_parallel_links_are_byte_identical() {
6890
                 else {
6991
                     break;
7092
                 };
71
-                match link_once(&obj, &root, &format!("run-{index:03}")) {
93
+                match link_once(&inputs, &root, &format!("run-{index:03}")) {
7294
                     Ok(bytes) if bytes == baseline => {}
7395
                     Ok(bytes) => errors
7496
                         .lock()
@@ -100,12 +122,12 @@ fn repeated_parallel_links_are_byte_identical() {
100122
     let _ = fs::remove_dir_all(root);
101123
 }
102124
 
103
-fn link_once(obj: &Path, root: &Path, run_name: &str) -> Result<Vec<u8>, String> {
125
+fn link_once(inputs: &[PathBuf], root: &Path, run_name: &str) -> Result<Vec<u8>, String> {
104126
     let dir = root.join(run_name);
105127
     fs::create_dir_all(&dir).map_err(|e| format!("create {}: {e}", dir.display()))?;
106128
     let out = dir.join("deterministic.out");
107129
     let opts = LinkOptions {
108
-        inputs: vec![obj.to_path_buf()],
130
+        inputs: inputs.to_vec(),
109131
         output: Some(out.clone()),
110132
         kind: OutputKind::Executable,
111133
         ..LinkOptions::default()
tests/perf_baseline.rsmodified
@@ -86,16 +86,18 @@ fn assert_profile_basics(name: &str, profile: &LinkProfile) {
8686
         profile.phases.accounted_total() > Duration::ZERO,
8787
         "{name}: all phase timings were zero"
8888
     );
89
+    let input_subphase_total = profile.phases.input_read
90
+        + profile.phases.input_object_parse
91
+        + profile.phases.input_archive_parse
92
+        + profile.phases.input_dylib_parse
93
+        + profile.phases.input_tbd_decode
94
+        + profile.phases.input_tbd_materialize
95
+        + profile.phases.input_reloc_parse;
96
+    // Input subphases are summed worker-time once object parsing is parallel,
97
+    // so they can legitimately exceed the wall-clock input parsing bucket.
8998
     assert!(
90
-        profile.phases.input_parsing
91
-            >= profile.phases.input_read
92
-                + profile.phases.input_object_parse
93
-                + profile.phases.input_archive_parse
94
-                + profile.phases.input_dylib_parse
95
-                + profile.phases.input_tbd_decode
96
-                + profile.phases.input_tbd_materialize
97
-                + profile.phases.input_reloc_parse,
98
-        "{name}: input subphases exceeded input total"
99
+        input_subphase_total > Duration::ZERO,
100
+        "{name}: all input subphase timings were zero"
99101
     );
100102
     assert!(
101103
         profile.phases.layout