Key Features
- Bidirectional Communication
Enables seamless two-way communication between parent and child processes, allowing for efficient data exchange in both directions.
- Non-blocking I/O
Implements non-blocking input/output operations to prevent deadlocks and ensure smooth data flow between processes.
- Process Isolation
Maintains proper process isolation while providing a convenient interface for inter-process communication, enhancing system stability.
- Error Handling
Robust error handling mechanisms to gracefully manage exceptions and process termination, ensuring reliable operation in production environments.
How DualPipe Works
A visual explanation of the DualPipe process flow and communication mechanism.
Initialization Process
- Create two pipes: parent-to-child and child-to-parent
- Fork the process to create a child process
- In child: redirect stdin/stdout to pipes
- In parent: set non-blocking I/O on pipes
- Execute target function in child process
Communication Flow
- Parent writes data to parent-to-child pipe
- Child reads data from stdin (redirected from pipe)
- Child processes data and writes to stdout
- Parent reads from child-to-parent pipe
- Non-blocking I/O prevents deadlocks
Dualpipe: Common Use Cases
Practical applications where DualPipe can solve complex inter-process communication challenges.
Parallel Data Processing
Offload CPU-intensive data processing tasks to child processes while maintaining interactive communication with the parent process.
Example:
Image processing pipelines where the parent sends image data and the child returns processed results.
Command Execution
Execute commands in a separate process and capture their output in real-time, with the ability to send additional commands based on previous results.
Example:
Interactive shell environments or command-line tools that need to maintain state between commands.
Service Communication
Create long-running service processes that communicate bidirectionally with a controller process.
Example:
Background workers that process tasks from a queue and report results back to the main application.
Sandboxed Execution
Run untrusted code in a separate process with controlled communication channels.
Example:
Code evaluation systems that need to execute user-submitted code safely while capturing output.
Dualpipe: Comparison with Other IPC Methods
How DualPipe compares to other Python inter-process communication methods.
Feature | DualPipe | subprocess | multiprocessing | os.popen |
---|---|---|---|---|
Bidirectional Communication | ✅ Built-in | ✅ With PIPE | ✅ With Queue | ❌ One-way only |
Non-blocking I/O | ✅ Built-in | ⚠️ Requires setup | ✅ With Queue | ❌ Blocking |
Process Isolation | ✅ Complete | ✅ Complete | ✅ Complete | ✅ Complete |
API Simplicity | ✅ Simple | ⚠️ Complex | ⚠️ Moderate | ✅ Simple |
Windows Support | ❌ Unix only | ✅ Cross-platform | ✅ Cross-platform | ✅ Cross-platform |
Note: The comparison focuses on ease of use for bidirectional communication between processes.
DualPipe Documentation
Comprehensive documentation for the DualPipe utility, including installation, usage examples, and API reference.
# DualPipe A Python utility for efficient bidirectional communication between processes. ## DualPipe Overview DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks. ## DualPipe Key Features - **Bidirectional Communication**: Seamless two-way communication between parent and child processes - **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow - **Process Isolation**: Maintains proper isolation while providing convenient IPC - **Error Handling**: Gracefully manages exceptions and process termination ## DualPipe Installation ```bash # Clone the repository git clone https://github.com/deepseek-ai/dualpipe.git cd dualpipe # Install the package pip install -e . ``` ## DualPipe Usage Example ```python from dualpipe import DualPipe import time def echo_uppercase(): """Simple function that reads lines and echoes them in uppercase""" while True: line = input() if not line: break print(line.upper()) # Create a DualPipe with the echo_uppercase function pipe = DualPipe(echo_uppercase) # Send some data pipe.write("Hello, world!\n") pipe.write("Testing DualPipe\n") # Read the responses time.sleep(0.1) # Give the child process time to respond print("Response 1:", pipe.readline().decode().strip()) print("Response 2:", pipe.readline().decode().strip()) # Clean up pipe.close() ``` ## How It Works DualPipe uses Unix pipes and process forking to create a child process and establish communication channels: 1. Two pipes are created: one for parent-to-child communication and one for child-to-parent 2. The process is forked, creating a child process 3. In the child process, stdin/stdout are redirected to the appropriate pipe ends 4. In the parent process, non-blocking I/O is set up for the pipes 5. The parent can then write to and read from the child process ## API Reference ### DualPipe(target_func, *args, **kwargs) Creates a new DualPipe instance with a target function to run in the child process. #### Methods - **write(data)**: Write data to the child process - **read(size=1024, timeout=None)**: Read data from the child process - **readline(timeout=None)**: Read a line from the child process - **close()**: Close the pipes and wait for the child process to terminate ## Limitations - Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork() - For Windows compatibility, consider using the multiprocessing or subprocess modules ## License MIT
DualPipe Implementation
The complete implementation of DualPipe with detailed comments explaining each component:
import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union
class DualPipe:
"""
DualPipe: A utility for bidirectional communication with a child process.
This class creates a child process and establishes two-way communication
with it using pipes. It handles non-blocking I/O to prevent deadlocks.
"""
def __init__(self, target_func: Callable, *args, **kwargs):
"""
Initialize a DualPipe with a target function to run in the child process.
Args:
target_func: The function to execute in the child process
*args, **kwargs: Arguments to pass to the target function
"""
# Create pipes for parent-to-child and child-to-parent communication
parent_to_child_r, parent_to_child_w = os.pipe()
child_to_parent_r, child_to_parent_w = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0: # Child process
# Close unused pipe ends
os.close(parent_to_child_w)
os.close(child_to_parent_r)
# Redirect stdin/stdout to the pipes
os.dup2(parent_to_child_r, sys.stdin.fileno())
os.close(parent_to_child_r)
os.dup2(child_to_parent_w, sys.stdout.fileno())
os.close(child_to_parent_w)
# Execute the target function
try:
target_func(*args, **kwargs)
except Exception as e:
print(f"Error in child process: {e}", file=sys.stderr)
finally:
# Ensure clean exit
os._exit(0)
else: # Parent process
# Close unused pipe ends
os.close(parent_to_child_r)
os.close(child_to_parent_w)
# Set non-blocking mode for the pipes
for fd in (parent_to_child_w, child_to_parent_r):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store the pipe file descriptors and child process ID
self.parent_to_child_w = parent_to_child_w
self.child_to_parent_r = child_to_parent_r
self.child_pid = pid
self.buffer = b""
def write(self, data: Union[str, bytes]) -> int:
"""
Write data to the child process.
Args:
data: The data to write (string or bytes)
Returns:
Number of bytes written
"""
if isinstance(data, str):
data = data.encode('utf-8')
try:
return os.write(self.parent_to_child_w, data)
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, try again later
return 0
raise
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""
Read data from the child process.
Args:
size: Maximum number of bytes to read
timeout: Maximum time to wait for data (None = non-blocking)
Returns:
Bytes read from the child process
"""
if timeout is not None:
# Wait for data to be available
r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
if not r:
return b"" # Timeout occurred
try:
data = os.read(self.child_to_parent_r, size)
self.buffer += data
return data
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, no data available
return b""
raise
def readline(self, timeout: Optional[float] = None) -> bytes:
"""
Read a line from the child process.
Args:
timeout: Maximum time to wait for a complete line
Returns:
A line of data (including newline character)
"""
start_time = time.time()
while b'\n' not in self.buffer:
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
break
remaining = timeout - elapsed
else:
remaining = None
data = self.read(1024, remaining)
if not data:
break
# Extract a line from the buffer if available
if b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
return line + b'\n'
# Return partial line if no newline found
result = self.buffer
self.buffer = b""
return result
def close(self) -> Tuple[int, int]:
"""
Close the pipes and wait for the child process to terminate.
Returns:
Tuple of (pid, status) from os.waitpid
"""
os.close(self.parent_to_child_w)
os.close(self.child_to_parent_r)
return os.waitpid(self.child_pid, 0)
# Example usage
if __name__ == "__main__":
def echo_uppercase():
"""Simple function that reads lines and echoes them in uppercase"""
while True:
line = sys.stdin.readline()
if not line:
break
sys.stdout.write(line.upper())
sys.stdout.flush()
# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)
# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")
# Read the responses
time.sleep(0.1) # Give the child process time to respond
print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())
# Clean up
pipe.close()
This implementation uses os.fork() which is only available on Unix-like systems (Linux, macOS, etc.).
Frequently Asked Questions - Dualpipe
- What is DualPipe used for?
- DualPipe is used for establishing bidirectional communication between a parent process and a child process in Python. It's particularly useful for parallel processing tasks where you need to send commands to a subprocess and receive results back, all while maintaining process isolation.
- How does DualPipe prevent deadlocks?
- DualPipe uses non-blocking I/O operations by setting the O_NONBLOCK flag on the pipe file descriptors. This ensures that read and write operations don't block indefinitely, preventing potential deadlocks where both processes are waiting for each other.
- Can DualPipe be used on Windows?
- The implementation shown here uses os.fork() which is only available on Unix-like systems (Linux, macOS, etc.). For Windows compatibility, you would need to modify the implementation to use the multiprocessing module or subprocess module instead.
- What's the difference between DualPipe and Python's subprocess module?
- While Python's subprocess module also allows communication with child processes, DualPipe provides a more streamlined interface specifically designed for bidirectional communication with non-blocking I/O. It's particularly useful when you need to continuously exchange data between processes.