Rust · 20604 bytes Raw Blame History
1 use crate::command::find_in_path;
2 use crate::redirect::{apply_redirects, RedirectError};
3 use crate::{ExecutionError, ExecutionResult};
4 use rush_expand::Context;
5 use rush_interactive::ErrorHints;
6 use rush_parser::ast::{AndOrList, AndOrOp, Pipeline, SimpleCommand};
7 use std::process::{Command, Stdio};
8 use thiserror::Error;
9
10 #[cfg(unix)]
11 use std::os::unix::process::ExitStatusExt;
12
13 #[derive(Error, Debug)]
14 pub enum PipelineError {
15 #[error("Execution error: {0}")]
16 ExecutionError(#[from] ExecutionError),
17
18 #[error("Redirect error: {0}")]
19 RedirectError(#[from] RedirectError),
20
21 #[error("Expansion error: {0}")]
22 ExpansionError(String),
23
24 #[error("Pipeline is empty")]
25 EmptyPipeline,
26
27 #[error("I/O error: {0}")]
28 IoError(#[from] std::io::Error),
29
30 // Control flow signals (not really errors)
31 #[error("break")]
32 Break,
33
34 #[error("continue")]
35 Continue,
36
37 #[error("return")]
38 Return(i32),
39 }
40
41 /// Execute a pipeline of commands
42 ///
43 /// Each command's stdout is connected to the next command's stdin via a pipe.
44 /// Returns the exit status of the last command in the pipeline.
45 pub fn execute_pipeline(
46 pipeline: &Pipeline,
47 context: &mut Context,
48 ) -> Result<ExecutionResult, PipelineError> {
49 if pipeline.commands.is_empty() {
50 return Err(PipelineError::EmptyPipeline);
51 }
52
53 // Helper to negate exit status if pipeline is negated
54 let negate_if_needed = |result: ExecutionResult, negated: bool| -> ExecutionResult {
55 if negated {
56 let exit_code = result.exit_code();
57 crate::command::exit_code_to_result(if exit_code == 0 { 1 } else { 0 })
58 } else {
59 result
60 }
61 };
62
63 // Special case: single command (not really a pipeline)
64 if pipeline.commands.len() == 1 {
65 match &pipeline.commands[0] {
66 rush_parser::ast::PipelineElement::Simple(cmd) => {
67 let result = execute_simple_with_redirects(cmd, context, false)
68 .map_err(PipelineError::from)?;
69 return Ok(negate_if_needed(result, pipeline.negated));
70 }
71 rush_parser::ast::PipelineElement::Subshell(subshell) => {
72 let result = crate::execute_subshell(subshell, context)
73 .map_err(|e| PipelineError::ExecutionError(ExecutionError::CommandNotFound(e)))?;
74 return Ok(negate_if_needed(result, pipeline.negated));
75 }
76 rush_parser::ast::PipelineElement::ExtendedTest(cond) => {
77 let result = crate::control_flow::execute_extended_test(cond, context)?;
78 return Ok(negate_if_needed(result, pipeline.negated));
79 }
80 }
81 }
82
83 // Build and spawn all commands in the pipeline
84 // We track raw file descriptors for proper subshell stdin handling
85 let mut children = Vec::new();
86 let mut subshell_pids = Vec::new();
87 #[cfg(unix)]
88 let mut prev_stdout_fd: Option<i32> = None;
89 #[cfg(not(unix))]
90 let mut prev_stdout: Option<Stdio> = None;
91
92 for (i, element) in pipeline.commands.iter().enumerate() {
93 let is_first = i == 0;
94 let is_last = i == pipeline.commands.len() - 1;
95
96 match element {
97 rush_parser::ast::PipelineElement::Simple(simple_cmd) => {
98 // Expand words
99 let expanded = rush_expand::expand_words(&simple_cmd.words, context)
100 .map_err(|e| PipelineError::ExpansionError(e.to_string()))?;
101
102 if expanded.is_empty() {
103 continue; // Skip empty commands
104 }
105
106 let command_name = &expanded[0];
107 let args = &expanded[1..];
108
109 // Find the command in PATH
110 let program_path = find_in_path(command_name)
111 .ok_or_else(|| ExecutionError::CommandNotFound(ErrorHints::command_not_found(command_name)))?;
112
113 // Build the command
114 let mut cmd = Command::new(program_path);
115 cmd.args(args);
116
117 // Set up stdin
118 #[cfg(unix)]
119 {
120 use std::os::unix::io::FromRawFd;
121 if is_first {
122 cmd.stdin(Stdio::inherit());
123 } else if let Some(fd) = prev_stdout_fd.take() {
124 cmd.stdin(Stdio::from(unsafe { std::fs::File::from_raw_fd(fd) }));
125 }
126 }
127 #[cfg(not(unix))]
128 {
129 if is_first {
130 cmd.stdin(Stdio::inherit());
131 } else if let Some(prev_out) = prev_stdout.take() {
132 cmd.stdin(prev_out);
133 }
134 }
135
136 // Set up stdout
137 if is_last {
138 cmd.stdout(Stdio::inherit());
139 } else {
140 cmd.stdout(Stdio::piped());
141 }
142
143 cmd.stderr(Stdio::inherit());
144 apply_redirects(&mut cmd, &simple_cmd.redirects, context)?;
145
146 let mut child = cmd.spawn()?;
147 if !is_last {
148 #[cfg(unix)]
149 {
150 use std::os::unix::io::IntoRawFd;
151 prev_stdout_fd = child.stdout.take().map(|f| f.into_raw_fd());
152 }
153 #[cfg(not(unix))]
154 {
155 prev_stdout = child.stdout.take().map(Stdio::from);
156 }
157 }
158 children.push(child);
159 }
160 rush_parser::ast::PipelineElement::Subshell(subshell) => {
161 // Execute subshell in pipeline using fork
162 #[cfg(unix)]
163 {
164 use nix::unistd::{fork, ForkResult, pipe as nix_pipe, dup2};
165 use std::os::unix::io::IntoRawFd;
166
167 // Take stdin fd from previous command
168 let stdin_fd = prev_stdout_fd.take();
169
170 // Create pipe for stdout if not last
171 let pipe_fds = if !is_last {
172 let (r, w) = nix_pipe().map_err(|e| ExecutionError::IoError(
173 std::io::Error::new(std::io::ErrorKind::Other, format!("pipe failed: {}", e))
174 ))?;
175 Some((r.into_raw_fd(), w.into_raw_fd()))
176 } else {
177 None
178 };
179
180 match unsafe { fork() } {
181 Ok(ForkResult::Child) => {
182 // Child process: execute subshell with redirected I/O
183 use nix::libc;
184
185 // Set up stdin from previous command
186 if let Some(fd) = stdin_fd {
187 let _ = dup2(fd, 0); // Redirect stdin
188 unsafe { libc::close(fd); }
189 }
190
191 // Set up stdout to pipe if not last
192 if let Some((read_fd, write_fd)) = pipe_fds {
193 unsafe { libc::close(read_fd); } // Close read end in child
194 let _ = dup2(write_fd, 1);
195 unsafe { libc::close(write_fd); }
196 }
197
198 // Execute subshell commands
199 let exit_code = crate::subshell::execute_subshell_child(&subshell.commands, context);
200 std::process::exit(exit_code);
201 }
202 Ok(ForkResult::Parent { child }) => {
203 // Parent process: close stdin fd and save stdout pipe
204 use nix::libc;
205
206 // Close the stdin fd in parent (child has its own copy)
207 if let Some(fd) = stdin_fd {
208 unsafe { libc::close(fd); }
209 }
210
211 if let Some((read_fd, write_fd)) = pipe_fds {
212 unsafe { libc::close(write_fd); }
213 prev_stdout_fd = Some(read_fd);
214 }
215
216 // Track the subshell PID for later waiting
217 subshell_pids.push(child);
218 }
219 Err(e) => {
220 return Err(PipelineError::ExecutionError(ExecutionError::IoError(
221 std::io::Error::new(std::io::ErrorKind::Other, format!("fork failed: {}", e))
222 )));
223 }
224 }
225 }
226 #[cfg(not(unix))]
227 {
228 return Err(PipelineError::ExecutionError(ExecutionError::CommandNotFound(
229 "Subshells in pipelines not supported on this platform".to_string()
230 )));
231 }
232 }
233 rush_parser::ast::PipelineElement::ExtendedTest(_) => {
234 // Extended tests in multi-command pipelines are not supported
235 return Err(PipelineError::ExecutionError(ExecutionError::CommandNotFound(
236 "Extended tests in pipelines not yet supported".to_string()
237 )));
238 }
239 }
240 }
241
242 // Wait for all commands to complete
243 let mut last_exit_status = None;
244
245 // Wait for regular process children
246 for child in &mut children {
247 let status = child.wait()?;
248 last_exit_status = Some(status);
249 }
250
251 // Wait for subshell PIDs
252 #[cfg(unix)]
253 {
254 use nix::sys::wait::{waitpid, WaitStatus};
255 for pid in subshell_pids {
256 match waitpid(pid, None) {
257 Ok(WaitStatus::Exited(_, code)) => {
258 last_exit_status = Some(crate::command::exit_code_to_result(code).exit_status);
259 }
260 Ok(WaitStatus::Signaled(_, signal, _)) => {
261 last_exit_status = Some(crate::command::exit_code_to_result(128 + signal as i32).exit_status);
262 }
263 _ => {}
264 }
265 }
266 }
267
268 // Return the exit status of the last command (with negation if needed)
269 let result = ExecutionResult {
270 exit_status: last_exit_status.unwrap(),
271 #[cfg(unix)]
272 job_control: None,
273 };
274 Ok(negate_if_needed(result, pipeline.negated))
275 }
276
277 /// Execute an AndOrList (commands connected by && or ||)
278 ///
279 /// Commands are executed left-to-right with short-circuit evaluation:
280 /// - && executes the next command only if the previous succeeded (exit code 0)
281 /// - || executes the next command only if the previous failed (exit code != 0)
282 pub fn execute_and_or_list(
283 and_or_list: &AndOrList,
284 context: &mut Context,
285 ) -> Result<ExecutionResult, PipelineError> {
286 // Execute the first pipeline
287 let mut last_result = execute_pipeline(&and_or_list.first, context)?;
288 let mut last_exit_code = last_result.exit_code();
289
290 // Execute remaining pipelines with their operators
291 for (op, pipeline) in &and_or_list.rest {
292 let should_execute = match op {
293 AndOrOp::And => last_exit_code == 0, // && - execute if previous succeeded
294 AndOrOp::Or => last_exit_code != 0, // || - execute if previous failed
295 };
296
297 if should_execute {
298 last_result = execute_pipeline(pipeline, context)?;
299 last_exit_code = last_result.exit_code();
300 }
301 }
302
303 Ok(last_result)
304 }
305
306 /// Execute a simple command with redirections
307 ///
308 /// This is used for both standalone commands and commands within pipelines.
309 pub fn execute_simple_with_redirects(
310 cmd: &SimpleCommand,
311 context: &mut Context,
312 interactive: bool,
313 ) -> Result<ExecutionResult, PipelineError> {
314 // Process variable assignments
315 for assignment in &cmd.assignments {
316 // Check if this is an array literal assignment: arr=(one two three)
317 let is_array_literal = assignment.value.parts.iter().any(|part| {
318 matches!(part, rush_parser::ast::WordPart::ArrayLiteral(_))
319 });
320
321 if is_array_literal {
322 // Extract array elements from the ArrayLiteral
323 for part in &assignment.value.parts {
324 if let rush_parser::ast::WordPart::ArrayLiteral(elements) = part {
325 let mut array_values = Vec::new();
326 for elem in elements {
327 let expanded = rush_expand::expand_word(elem, context)
328 .map_err(|e| PipelineError::ExpansionError(e.to_string()))?;
329 array_values.push(expanded);
330 }
331 context.arrays.insert(assignment.name.clone(), rush_expand::context::ArrayType::Indexed(array_values));
332 }
333 }
334 } else if let Some(index) = &assignment.index {
335 // arr[index]=value - indexed array assignment
336 let value = rush_expand::expand_words(&[assignment.value.clone()], context)
337 .map_err(|e| PipelineError::ExpansionError(e.to_string()))?;
338
339 // Use Context helper method which handles both indexed and associative arrays
340 if let Err(err) = context.set_array_element(&assignment.name, index.clone(), value.join(" ")) {
341 return Err(PipelineError::IoError(std::io::Error::new(
342 std::io::ErrorKind::InvalidInput,
343 err,
344 )));
345 }
346 } else {
347 // Regular variable assignment
348 let value = rush_expand::expand_words(&[assignment.value.clone()], context)
349 .map_err(|e| PipelineError::ExpansionError(e.to_string()))?;
350
351 // Check if readonly
352 if let Err(name) = context.set_var(&assignment.name, value.join(" ")) {
353 return Err(PipelineError::IoError(std::io::Error::new(
354 std::io::ErrorKind::PermissionDenied,
355 format!("{}: readonly variable", name),
356 )));
357 }
358 }
359 }
360
361 // If there's no command to execute (just assignments), return success
362 if cmd.words.is_empty() {
363 return Ok(crate::command::success_result());
364 }
365
366 // Expand all words
367 let expanded = rush_expand::expand_words(&cmd.words, context)
368 .map_err(|e| PipelineError::ExpansionError(e.to_string()))?;
369
370 if expanded.is_empty() {
371 return Ok(crate::command::success_result());
372 }
373
374 let command_name = &expanded[0];
375 let args = &expanded[1..];
376
377 // Expand aliases (only for the command name, not args)
378 let (actual_command, actual_args): (String, Vec<String>) = if let Some(alias_value) = context.aliases.get(command_name).cloned() {
379 // Parse the alias value to get command and its args
380 let parts: Vec<String> = alias_value.split_whitespace().map(|s| s.to_string()).collect();
381 if parts.is_empty() {
382 (command_name.to_string(), args.to_vec())
383 } else {
384 let cmd = parts[0].clone();
385 let mut new_args: Vec<String> = parts[1..].to_vec();
386 new_args.extend_from_slice(args);
387 (cmd, new_args)
388 }
389 } else {
390 (command_name.to_string(), args.to_vec())
391 };
392
393 // Handle control flow commands (must propagate as errors, not regular results)
394 match actual_command.as_str() {
395 "break" => return Err(PipelineError::Break),
396 "continue" => return Err(PipelineError::Continue),
397 "return" => {
398 let code = actual_args.first()
399 .and_then(|s| s.parse::<i32>().ok())
400 .unwrap_or(context.last_exit_status);
401 return Err(PipelineError::Return(code));
402 }
403 _ => {}
404 }
405
406 // Check if it's a built-in command
407 if let Some(result) = {
408 // Apply redirects to current process for builtins
409 let _saved_fds = crate::redirect::apply_redirects_to_process(&cmd.redirects, context)
410 .map_err(|e| PipelineError::IoError(std::io::Error::new(
411 std::io::ErrorKind::Other,
412 e.to_string(),
413 )))?;
414 crate::command::execute_builtin(&actual_command, &actual_args, context)
415 // _saved_fds is dropped here, restoring the original file descriptors
416 } {
417 return Ok(result);
418 }
419
420 // Check if it's a function
421 if let Some(function_def) = context.functions.get(&actual_command).cloned() {
422 // Set up function parameters ($1, $2, etc.)
423 // Save current positional parameters
424 let saved_params = context.positional_params.clone();
425
426 // Set new positional parameters from function arguments
427 context.positional_params = actual_args.to_vec();
428
429 // Push a new local scope for this function
430 context.push_scope();
431
432 // Execute the function body
433 let mut last_result = crate::command::success_result();
434 for cmd in &function_def.body {
435 match crate::control_flow::execute_complete_command(cmd, context) {
436 Ok(result) => last_result = result,
437 Err(PipelineError::Return(code)) => {
438 // Clean up: pop scope and restore positional parameters
439 context.pop_scope();
440 context.positional_params = saved_params;
441
442 // Return from function with specified exit code
443 #[cfg(unix)]
444 {
445 return Ok(crate::command::ExecutionResult {
446 exit_status: std::process::ExitStatus::from_raw(code << 8),
447 job_control: None,
448 });
449 }
450 #[cfg(not(unix))]
451 {
452 // On non-Unix, approximate the exit code
453 if code == 0 {
454 return Ok(crate::command::success_result());
455 } else {
456 return Ok(crate::command::ExecutionResult {
457 exit_status: std::process::ExitStatus::default(),
458 });
459 }
460 }
461 }
462 Err(e) => {
463 // Clean up: pop scope and restore positional parameters
464 context.pop_scope();
465 context.positional_params = saved_params;
466 return Err(e);
467 }
468 }
469 }
470
471 // Clean up: pop scope and restore positional parameters
472 context.pop_scope();
473 context.positional_params = saved_params;
474
475 return Ok(last_result);
476 }
477
478 // Find the command in PATH
479 let program_path = find_in_path(&actual_command)
480 .ok_or_else(|| PipelineError::ExecutionError(
481 ExecutionError::CommandNotFound(ErrorHints::command_not_found(&actual_command))
482 ))?;
483
484 // Build the command
485 let mut command = Command::new(program_path);
486 command.args(&actual_args);
487
488 // Apply redirections and get optional stdin content
489 let stdin_content = apply_redirects(&mut command, &cmd.redirects, context)?;
490
491 // If we have stdin content (heredoc/herestring), handle it specially
492 if let Some(content) = stdin_content {
493 use std::io::Write;
494
495 // Spawn the command
496 let mut child = command.spawn()?;
497
498 // Write to stdin
499 if let Some(mut stdin) = child.stdin.take() {
500 stdin.write_all(content.as_bytes())?;
501 // Close stdin by dropping it
502 drop(stdin);
503 }
504
505 // Wait for the command to complete
506 let status = child.wait()?;
507
508 Ok(crate::command::ExecutionResult {
509 exit_status: status,
510 #[cfg(unix)]
511 job_control: None,
512 })
513 } else {
514 // No stdin content - execute normally with terminal handling
515 #[cfg(unix)]
516 {
517 crate::terminal::unix::execute_with_terminal_control(command, interactive)
518 .map_err(PipelineError::from)
519 }
520
521 #[cfg(not(unix))]
522 {
523 crate::terminal::non_unix::execute_with_terminal_control(command, interactive)
524 .map_err(PipelineError::from)
525 }
526 }
527 }
528
529 // Make find_in_path public (it's currently private in command.rs)
530 // We'll need to update command.rs to make it pub(crate)
531