DeepSeek DualPipe

一个高效的双向进程通信 Python 工具,支持并行数据处理,具有极低的开销。

DualPipe Architecture Visualization

主要特性

双向通信

实现父子进程之间的无缝双向通信,允许高效的双向数据交换。

非阻塞 I/O

实现非阻塞输入/输出操作,防止死锁并确保进程间数据流的顺畅传输。

进程隔离

在提供便捷的进程间通信接口的同时,保持适当的进程隔离,增强系统稳定性。

错误处理

强大的错误处理机制,优雅地管理异常和进程终止,确保在生产环境中可靠运行。

DualPipe 工作原理

DualPipe 进程流程和通信机制的可视化解释。

DualPipe Workflow Diagram

初始化过程

  1. 创建两个管道:父到子和子到父
  2. 分叉进程以创建子进程
  3. 在子进程中:将标准输入/输出重定向到管道
  4. 在父进程中:为管道设置非阻塞 I/O
  5. 在子进程中执行目标函数

通信流程

  1. 父进程向父到子管道写入数据
  2. 子进程从标准输入(从管道重定向)读取数据
  3. 子进程处理数据并写入标准输出
  4. 父进程从子到父管道读取数据
  5. 非阻塞 I/O 防止死锁

Dualpipe: 常见用例

DualPipe 可以解决复杂进程间通信挑战的实际应用。

并行数据处理

将 CPU 密集型数据处理任务卸载到子进程,同时保持与父进程的交互式通信。

Example:

图像处理管道,父进程发送图像数据,子进程返回处理结果。

命令执行

在单独的进程中执行命令并实时捕获其输出,能够根据先前的结果发送额外的命令。

Example:

需要在命令之间维持状态的交互式 shell 环境或命令行工具。

服务通信

创建与控制器进程双向通信的长期运行服务进程。

Example:

从队列处理任务并将结果报告回主应用程序的后台工作进程。

沙盒执行

在具有受控通信通道的单独进程中运行不受信任的代码。

Example:

需要安全执行用户提交代码同时捕获输出的代码评估系统。

Dualpipe: 与其他 IPC 方法的比较

DualPipe 与其他 Python 进程间通信方法的比较。

FeatureDualPipesubprocessmultiprocessingos.popen
Bidirectional Communication✅ Built-in✅ With PIPE✅ With Queue❌ One-way only
Non-blocking I/O✅ Built-in⚠️ Requires setup✅ With Queue❌ Blocking
Process Isolation✅ Complete✅ Complete✅ Complete✅ Complete
API Simplicity✅ Simple⚠️ Complex⚠️ Moderate✅ Simple
Windows Support❌ Unix only✅ Cross-platform✅ Cross-platform✅ Cross-platform

注意:比较侧重于双向进程通信的易用性。

DualPipe 文档

DualPipe 工具的全面文档,包括安装、使用示例和 API 参考。

# DualPipe

A Python utility for efficient bidirectional communication between processes.

## DualPipe Overview

DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks.

## DualPipe Key Features

- **Bidirectional Communication**: Seamless two-way communication between parent and child processes
- **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow
- **Process Isolation**: Maintains proper isolation while providing convenient IPC
- **Error Handling**: Gracefully manages exceptions and process termination

## DualPipe Installation

```bash
# Clone the repository
git clone https://github.com/deepseek-ai/dualpipe.git
cd dualpipe

# Install the package
pip install -e .
```

## DualPipe Usage Example

```python
from dualpipe import DualPipe
import time

def echo_uppercase():
    """Simple function that reads lines and echoes them in uppercase"""
    while True:
        line = input()
        if not line:
            break
        print(line.upper())

# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)

# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")

# Read the responses
time.sleep(0.1)  # Give the child process time to respond

print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())

# Clean up
pipe.close()
```

## How It Works

DualPipe uses Unix pipes and process forking to create a child process and establish communication channels:

1. Two pipes are created: one for parent-to-child communication and one for child-to-parent
2. The process is forked, creating a child process
3. In the child process, stdin/stdout are redirected to the appropriate pipe ends
4. In the parent process, non-blocking I/O is set up for the pipes
5. The parent can then write to and read from the child process

## API Reference

### DualPipe(target_func, *args, **kwargs)

Creates a new DualPipe instance with a target function to run in the child process.

#### Methods

- **write(data)**: Write data to the child process
- **read(size=1024, timeout=None)**: Read data from the child process
- **readline(timeout=None)**: Read a line from the child process
- **close()**: Close the pipes and wait for the child process to terminate

## Limitations

- Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork()
- For Windows compatibility, consider using the multiprocessing or subprocess modules

## License

MIT

DualPipe 实现

DualPipe 的完整实现,附带详细注释解释每个组件:

import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union

class DualPipe:
    """
    DualPipe: A utility for bidirectional communication with a child process.
    
    This class creates a child process and establishes two-way communication
    with it using pipes. It handles non-blocking I/O to prevent deadlocks.
    """
    
    def __init__(self, target_func: Callable, *args, **kwargs):
        """
        Initialize a DualPipe with a target function to run in the child process.
        
        Args:
            target_func: The function to execute in the child process
            *args, **kwargs: Arguments to pass to the target function
        """
        # Create pipes for parent-to-child and child-to-parent communication
        parent_to_child_r, parent_to_child_w = os.pipe()
        child_to_parent_r, child_to_parent_w = os.pipe()
        
        # Fork the process
        pid = os.fork()
        
        if pid == 0:  # Child process
            # Close unused pipe ends
            os.close(parent_to_child_w)
            os.close(child_to_parent_r)
            
            # Redirect stdin/stdout to the pipes
            os.dup2(parent_to_child_r, sys.stdin.fileno())
            os.close(parent_to_child_r)
            
            os.dup2(child_to_parent_w, sys.stdout.fileno())
            os.close(child_to_parent_w)
            
            # Execute the target function
            try:
                target_func(*args, **kwargs)
            except Exception as e:
                print(f"Error in child process: {e}", file=sys.stderr)
            finally:
                # Ensure clean exit
                os._exit(0)
        else:  # Parent process
            # Close unused pipe ends
            os.close(parent_to_child_r)
            os.close(child_to_parent_w)
            
            # Set non-blocking mode for the pipes
            for fd in (parent_to_child_w, child_to_parent_r):
                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            
            # Store the pipe file descriptors and child process ID
            self.parent_to_child_w = parent_to_child_w
            self.child_to_parent_r = child_to_parent_r
            self.child_pid = pid
            self.buffer = b""
    
    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to the child process.
        
        Args:
            data: The data to write (string or bytes)
            
        Returns:
            Number of bytes written
        """
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            return os.write(self.parent_to_child_w, data)
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, try again later
                return 0
            raise
    
    def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
        """
        Read data from the child process.
        
        Args:
            size: Maximum number of bytes to read
            timeout: Maximum time to wait for data (None = non-blocking)
            
        Returns:
            Bytes read from the child process
        """
        if timeout is not None:
            # Wait for data to be available
            r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
            if not r:
                return b""  # Timeout occurred
        
        try:
            data = os.read(self.child_to_parent_r, size)
            self.buffer += data
            return data
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, no data available
                return b""
            raise
    
    def readline(self, timeout: Optional[float] = None) -> bytes:
        """
        Read a line from the child process.
        
        Args:
            timeout: Maximum time to wait for a complete line
            
        Returns:
            A line of data (including newline character)
        """
        start_time = time.time()
        
        while b'\n' not in self.buffer:
            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    break
                remaining = timeout - elapsed
            else:
                remaining = None
                
            data = self.read(1024, remaining)
            if not data:
                break
        
        # Extract a line from the buffer if available
        if b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            return line + b'\n'
        
        # Return partial line if no newline found
        result = self.buffer
        self.buffer = b""
        return result
    
    def close(self) -> Tuple[int, int]:
        """
        Close the pipes and wait for the child process to terminate.
        
        Returns:
            Tuple of (pid, status) from os.waitpid
        """
        os.close(self.parent_to_child_w)
        os.close(self.child_to_parent_r)
        return os.waitpid(self.child_pid, 0)

# Example usage
if __name__ == "__main__":
    def echo_uppercase():
        """Simple function that reads lines and echoes them in uppercase"""
        while True:
            line = sys.stdin.readline()
            if not line:
                break
            sys.stdout.write(line.upper())
            sys.stdout.flush()
    
    # Create a DualPipe with the echo_uppercase function
    pipe = DualPipe(echo_uppercase)
    
    # Send some data
    pipe.write("Hello, world!\n")
    pipe.write("Testing DualPipe\n")
    
    # Read the responses
    time.sleep(0.1)  # Give the child process time to respond
    
    print("Response 1:", pipe.readline().decode().strip())
    print("Response 2:", pipe.readline().decode().strip())
    
    # Clean up
    pipe.close()

此实现使用 os.fork(),仅在类 Unix 系统(Linux、macOS 等)上可用。

常见问题解答 - Dualpipe

DualPipe 用于什么?
DualPipe 用于在 Python 中建立父进程和子进程之间的双向通信。它特别适用于需要向子进程发送命令并接收结果的并行处理任务,同时保持进程隔离。
DualPipe 如何防止死锁?
DualPipe 通过在管道文件描述符上设置 O_NONBLOCK 标志来使用非阻塞 I/O 操作。这确保读写操作不会无限期阻塞,防止两个进程相互等待的潜在死锁。
DualPipe 可以在 Windows 上使用吗?
这里展示的实现使用了 os.fork(),它只在类 Unix 系统(Linux、macOS 等)上可用。对于 Windows 兼容性,您需要修改实现以使用 multiprocessing 模块或 subprocess 模块。
DualPipe 与 Python 的 subprocess 模块有什么区别?
虽然 Python 的 subprocess 模块也允许与子进程通信,但 DualPipe 提供了专为非阻塞 I/O 的双向通信设计的更简化接口。当您需要在进程之间持续交换数据时,它特别有用。