主要特性
- 双向通信
实现父子进程之间的无缝双向通信,允许高效的双向数据交换。
- 非阻塞 I/O
实现非阻塞输入/输出操作,防止死锁并确保进程间数据流的顺畅传输。
- 进程隔离
在提供便捷的进程间通信接口的同时,保持适当的进程隔离,增强系统稳定性。
- 错误处理
强大的错误处理机制,优雅地管理异常和进程终止,确保在生产环境中可靠运行。
DualPipe 工作原理
DualPipe 进程流程和通信机制的可视化解释。
初始化过程
- 创建两个管道:父到子和子到父
- 分叉进程以创建子进程
- 在子进程中:将标准输入/输出重定向到管道
- 在父进程中:为管道设置非阻塞 I/O
- 在子进程中执行目标函数
通信流程
- 父进程向父到子管道写入数据
- 子进程从标准输入(从管道重定向)读取数据
- 子进程处理数据并写入标准输出
- 父进程从子到父管道读取数据
- 非阻塞 I/O 防止死锁
Dualpipe: 常见用例
DualPipe 可以解决复杂进程间通信挑战的实际应用。
并行数据处理
将 CPU 密集型数据处理任务卸载到子进程,同时保持与父进程的交互式通信。
Example:
图像处理管道,父进程发送图像数据,子进程返回处理结果。
命令执行
在单独的进程中执行命令并实时捕获其输出,能够根据先前的结果发送额外的命令。
Example:
需要在命令之间维持状态的交互式 shell 环境或命令行工具。
服务通信
创建与控制器进程双向通信的长期运行服务进程。
Example:
从队列处理任务并将结果报告回主应用程序的后台工作进程。
沙盒执行
在具有受控通信通道的单独进程中运行不受信任的代码。
Example:
需要安全执行用户提交代码同时捕获输出的代码评估系统。
Dualpipe: 与其他 IPC 方法的比较
DualPipe 与其他 Python 进程间通信方法的比较。
Feature | DualPipe | subprocess | multiprocessing | os.popen |
---|---|---|---|---|
Bidirectional Communication | ✅ Built-in | ✅ With PIPE | ✅ With Queue | ❌ One-way only |
Non-blocking I/O | ✅ Built-in | ⚠️ Requires setup | ✅ With Queue | ❌ Blocking |
Process Isolation | ✅ Complete | ✅ Complete | ✅ Complete | ✅ Complete |
API Simplicity | ✅ Simple | ⚠️ Complex | ⚠️ Moderate | ✅ Simple |
Windows Support | ❌ Unix only | ✅ Cross-platform | ✅ Cross-platform | ✅ Cross-platform |
注意:比较侧重于双向进程通信的易用性。
DualPipe 文档
DualPipe 工具的全面文档,包括安装、使用示例和 API 参考。
# DualPipe A Python utility for efficient bidirectional communication between processes. ## DualPipe Overview DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks. ## DualPipe Key Features - **Bidirectional Communication**: Seamless two-way communication between parent and child processes - **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow - **Process Isolation**: Maintains proper isolation while providing convenient IPC - **Error Handling**: Gracefully manages exceptions and process termination ## DualPipe Installation ```bash # Clone the repository git clone https://github.com/deepseek-ai/dualpipe.git cd dualpipe # Install the package pip install -e . ``` ## DualPipe Usage Example ```python from dualpipe import DualPipe import time def echo_uppercase(): """Simple function that reads lines and echoes them in uppercase""" while True: line = input() if not line: break print(line.upper()) # Create a DualPipe with the echo_uppercase function pipe = DualPipe(echo_uppercase) # Send some data pipe.write("Hello, world!\n") pipe.write("Testing DualPipe\n") # Read the responses time.sleep(0.1) # Give the child process time to respond print("Response 1:", pipe.readline().decode().strip()) print("Response 2:", pipe.readline().decode().strip()) # Clean up pipe.close() ``` ## How It Works DualPipe uses Unix pipes and process forking to create a child process and establish communication channels: 1. Two pipes are created: one for parent-to-child communication and one for child-to-parent 2. The process is forked, creating a child process 3. In the child process, stdin/stdout are redirected to the appropriate pipe ends 4. In the parent process, non-blocking I/O is set up for the pipes 5. The parent can then write to and read from the child process ## API Reference ### DualPipe(target_func, *args, **kwargs) Creates a new DualPipe instance with a target function to run in the child process. #### Methods - **write(data)**: Write data to the child process - **read(size=1024, timeout=None)**: Read data from the child process - **readline(timeout=None)**: Read a line from the child process - **close()**: Close the pipes and wait for the child process to terminate ## Limitations - Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork() - For Windows compatibility, consider using the multiprocessing or subprocess modules ## License MIT
DualPipe 实现
DualPipe 的完整实现,附带详细注释解释每个组件:
import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union
class DualPipe:
"""
DualPipe: A utility for bidirectional communication with a child process.
This class creates a child process and establishes two-way communication
with it using pipes. It handles non-blocking I/O to prevent deadlocks.
"""
def __init__(self, target_func: Callable, *args, **kwargs):
"""
Initialize a DualPipe with a target function to run in the child process.
Args:
target_func: The function to execute in the child process
*args, **kwargs: Arguments to pass to the target function
"""
# Create pipes for parent-to-child and child-to-parent communication
parent_to_child_r, parent_to_child_w = os.pipe()
child_to_parent_r, child_to_parent_w = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0: # Child process
# Close unused pipe ends
os.close(parent_to_child_w)
os.close(child_to_parent_r)
# Redirect stdin/stdout to the pipes
os.dup2(parent_to_child_r, sys.stdin.fileno())
os.close(parent_to_child_r)
os.dup2(child_to_parent_w, sys.stdout.fileno())
os.close(child_to_parent_w)
# Execute the target function
try:
target_func(*args, **kwargs)
except Exception as e:
print(f"Error in child process: {e}", file=sys.stderr)
finally:
# Ensure clean exit
os._exit(0)
else: # Parent process
# Close unused pipe ends
os.close(parent_to_child_r)
os.close(child_to_parent_w)
# Set non-blocking mode for the pipes
for fd in (parent_to_child_w, child_to_parent_r):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store the pipe file descriptors and child process ID
self.parent_to_child_w = parent_to_child_w
self.child_to_parent_r = child_to_parent_r
self.child_pid = pid
self.buffer = b""
def write(self, data: Union[str, bytes]) -> int:
"""
Write data to the child process.
Args:
data: The data to write (string or bytes)
Returns:
Number of bytes written
"""
if isinstance(data, str):
data = data.encode('utf-8')
try:
return os.write(self.parent_to_child_w, data)
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, try again later
return 0
raise
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""
Read data from the child process.
Args:
size: Maximum number of bytes to read
timeout: Maximum time to wait for data (None = non-blocking)
Returns:
Bytes read from the child process
"""
if timeout is not None:
# Wait for data to be available
r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
if not r:
return b"" # Timeout occurred
try:
data = os.read(self.child_to_parent_r, size)
self.buffer += data
return data
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, no data available
return b""
raise
def readline(self, timeout: Optional[float] = None) -> bytes:
"""
Read a line from the child process.
Args:
timeout: Maximum time to wait for a complete line
Returns:
A line of data (including newline character)
"""
start_time = time.time()
while b'\n' not in self.buffer:
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
break
remaining = timeout - elapsed
else:
remaining = None
data = self.read(1024, remaining)
if not data:
break
# Extract a line from the buffer if available
if b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
return line + b'\n'
# Return partial line if no newline found
result = self.buffer
self.buffer = b""
return result
def close(self) -> Tuple[int, int]:
"""
Close the pipes and wait for the child process to terminate.
Returns:
Tuple of (pid, status) from os.waitpid
"""
os.close(self.parent_to_child_w)
os.close(self.child_to_parent_r)
return os.waitpid(self.child_pid, 0)
# Example usage
if __name__ == "__main__":
def echo_uppercase():
"""Simple function that reads lines and echoes them in uppercase"""
while True:
line = sys.stdin.readline()
if not line:
break
sys.stdout.write(line.upper())
sys.stdout.flush()
# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)
# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")
# Read the responses
time.sleep(0.1) # Give the child process time to respond
print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())
# Clean up
pipe.close()
此实现使用 os.fork(),仅在类 Unix 系统(Linux、macOS 等)上可用。
常见问题解答 - Dualpipe
- DualPipe 用于什么?
- DualPipe 用于在 Python 中建立父进程和子进程之间的双向通信。它特别适用于需要向子进程发送命令并接收结果的并行处理任务,同时保持进程隔离。
- DualPipe 如何防止死锁?
- DualPipe 通过在管道文件描述符上设置 O_NONBLOCK 标志来使用非阻塞 I/O 操作。这确保读写操作不会无限期阻塞,防止两个进程相互等待的潜在死锁。
- DualPipe 可以在 Windows 上使用吗?
- 这里展示的实现使用了 os.fork(),它只在类 Unix 系统(Linux、macOS 等)上可用。对于 Windows 兼容性,您需要修改实现以使用 multiprocessing 模块或 subprocess 模块。
- DualPipe 与 Python 的 subprocess 模块有什么区别?
- 虽然 Python 的 subprocess 模块也允许与子进程通信,但 DualPipe 提供了专为非阻塞 I/O 的双向通信设计的更简化接口。当您需要在进程之间持续交换数据时,它特别有用。