DeepSeek DualPipe

A Python utility for efficient bidirectional communication between processes, enabling parallel data processing with minimal overhead.

DualPipe Architecture Visualization

Key Features

Bidirectional Communication

Enables seamless two-way communication between parent and child processes, allowing for efficient data exchange in both directions.

Non-blocking I/O

Implements non-blocking input/output operations to prevent deadlocks and ensure smooth data flow between processes.

Process Isolation

Maintains proper process isolation while providing a convenient interface for inter-process communication, enhancing system stability.

Error Handling

Robust error handling mechanisms to gracefully manage exceptions and process termination, ensuring reliable operation in production environments.

How DualPipe Works

A visual explanation of the DualPipe process flow and communication mechanism.

DualPipe Workflow Diagram

Initialization Process

  1. Create two pipes: parent-to-child and child-to-parent
  2. Fork the process to create a child process
  3. In child: redirect stdin/stdout to pipes
  4. In parent: set non-blocking I/O on pipes
  5. Execute target function in child process

Communication Flow

  1. Parent writes data to parent-to-child pipe
  2. Child reads data from stdin (redirected from pipe)
  3. Child processes data and writes to stdout
  4. Parent reads from child-to-parent pipe
  5. Non-blocking I/O prevents deadlocks

Dualpipe: Common Use Cases

Practical applications where DualPipe can solve complex inter-process communication challenges.

Parallel Data Processing

Offload CPU-intensive data processing tasks to child processes while maintaining interactive communication with the parent process.

Example:

Image processing pipelines where the parent sends image data and the child returns processed results.

Command Execution

Execute commands in a separate process and capture their output in real-time, with the ability to send additional commands based on previous results.

Example:

Interactive shell environments or command-line tools that need to maintain state between commands.

Service Communication

Create long-running service processes that communicate bidirectionally with a controller process.

Example:

Background workers that process tasks from a queue and report results back to the main application.

Sandboxed Execution

Run untrusted code in a separate process with controlled communication channels.

Example:

Code evaluation systems that need to execute user-submitted code safely while capturing output.

Dualpipe: Comparison with Other IPC Methods

How DualPipe compares to other Python inter-process communication methods.

FeatureDualPipesubprocessmultiprocessingos.popen
Bidirectional Communication✅ Built-in✅ With PIPE✅ With Queue❌ One-way only
Non-blocking I/O✅ Built-in⚠️ Requires setup✅ With Queue❌ Blocking
Process Isolation✅ Complete✅ Complete✅ Complete✅ Complete
API Simplicity✅ Simple⚠️ Complex⚠️ Moderate✅ Simple
Windows Support❌ Unix only✅ Cross-platform✅ Cross-platform✅ Cross-platform

Note: The comparison focuses on ease of use for bidirectional communication between processes.

DualPipe Documentation

Comprehensive documentation for the DualPipe utility, including installation, usage examples, and API reference.

# DualPipe

A Python utility for efficient bidirectional communication between processes.

## DualPipe Overview

DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks.

## DualPipe Key Features

- **Bidirectional Communication**: Seamless two-way communication between parent and child processes
- **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow
- **Process Isolation**: Maintains proper isolation while providing convenient IPC
- **Error Handling**: Gracefully manages exceptions and process termination

## DualPipe Installation

```bash
# Clone the repository
git clone https://github.com/deepseek-ai/dualpipe.git
cd dualpipe

# Install the package
pip install -e .
```

## DualPipe Usage Example

```python
from dualpipe import DualPipe
import time

def echo_uppercase():
    """Simple function that reads lines and echoes them in uppercase"""
    while True:
        line = input()
        if not line:
            break
        print(line.upper())

# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)

# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")

# Read the responses
time.sleep(0.1)  # Give the child process time to respond

print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())

# Clean up
pipe.close()
```

## How It Works

DualPipe uses Unix pipes and process forking to create a child process and establish communication channels:

1. Two pipes are created: one for parent-to-child communication and one for child-to-parent
2. The process is forked, creating a child process
3. In the child process, stdin/stdout are redirected to the appropriate pipe ends
4. In the parent process, non-blocking I/O is set up for the pipes
5. The parent can then write to and read from the child process

## API Reference

### DualPipe(target_func, *args, **kwargs)

Creates a new DualPipe instance with a target function to run in the child process.

#### Methods

- **write(data)**: Write data to the child process
- **read(size=1024, timeout=None)**: Read data from the child process
- **readline(timeout=None)**: Read a line from the child process
- **close()**: Close the pipes and wait for the child process to terminate

## Limitations

- Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork()
- For Windows compatibility, consider using the multiprocessing or subprocess modules

## License

MIT

DualPipe Implementation

The complete implementation of DualPipe with detailed comments explaining each component:

import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union

class DualPipe:
    """
    DualPipe: A utility for bidirectional communication with a child process.
    
    This class creates a child process and establishes two-way communication
    with it using pipes. It handles non-blocking I/O to prevent deadlocks.
    """
    
    def __init__(self, target_func: Callable, *args, **kwargs):
        """
        Initialize a DualPipe with a target function to run in the child process.
        
        Args:
            target_func: The function to execute in the child process
            *args, **kwargs: Arguments to pass to the target function
        """
        # Create pipes for parent-to-child and child-to-parent communication
        parent_to_child_r, parent_to_child_w = os.pipe()
        child_to_parent_r, child_to_parent_w = os.pipe()
        
        # Fork the process
        pid = os.fork()
        
        if pid == 0:  # Child process
            # Close unused pipe ends
            os.close(parent_to_child_w)
            os.close(child_to_parent_r)
            
            # Redirect stdin/stdout to the pipes
            os.dup2(parent_to_child_r, sys.stdin.fileno())
            os.close(parent_to_child_r)
            
            os.dup2(child_to_parent_w, sys.stdout.fileno())
            os.close(child_to_parent_w)
            
            # Execute the target function
            try:
                target_func(*args, **kwargs)
            except Exception as e:
                print(f"Error in child process: {e}", file=sys.stderr)
            finally:
                # Ensure clean exit
                os._exit(0)
        else:  # Parent process
            # Close unused pipe ends
            os.close(parent_to_child_r)
            os.close(child_to_parent_w)
            
            # Set non-blocking mode for the pipes
            for fd in (parent_to_child_w, child_to_parent_r):
                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            
            # Store the pipe file descriptors and child process ID
            self.parent_to_child_w = parent_to_child_w
            self.child_to_parent_r = child_to_parent_r
            self.child_pid = pid
            self.buffer = b""
    
    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to the child process.
        
        Args:
            data: The data to write (string or bytes)
            
        Returns:
            Number of bytes written
        """
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            return os.write(self.parent_to_child_w, data)
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, try again later
                return 0
            raise
    
    def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
        """
        Read data from the child process.
        
        Args:
            size: Maximum number of bytes to read
            timeout: Maximum time to wait for data (None = non-blocking)
            
        Returns:
            Bytes read from the child process
        """
        if timeout is not None:
            # Wait for data to be available
            r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
            if not r:
                return b""  # Timeout occurred
        
        try:
            data = os.read(self.child_to_parent_r, size)
            self.buffer += data
            return data
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, no data available
                return b""
            raise
    
    def readline(self, timeout: Optional[float] = None) -> bytes:
        """
        Read a line from the child process.
        
        Args:
            timeout: Maximum time to wait for a complete line
            
        Returns:
            A line of data (including newline character)
        """
        start_time = time.time()
        
        while b'\n' not in self.buffer:
            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    break
                remaining = timeout - elapsed
            else:
                remaining = None
                
            data = self.read(1024, remaining)
            if not data:
                break
        
        # Extract a line from the buffer if available
        if b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            return line + b'\n'
        
        # Return partial line if no newline found
        result = self.buffer
        self.buffer = b""
        return result
    
    def close(self) -> Tuple[int, int]:
        """
        Close the pipes and wait for the child process to terminate.
        
        Returns:
            Tuple of (pid, status) from os.waitpid
        """
        os.close(self.parent_to_child_w)
        os.close(self.child_to_parent_r)
        return os.waitpid(self.child_pid, 0)

# Example usage
if __name__ == "__main__":
    def echo_uppercase():
        """Simple function that reads lines and echoes them in uppercase"""
        while True:
            line = sys.stdin.readline()
            if not line:
                break
            sys.stdout.write(line.upper())
            sys.stdout.flush()
    
    # Create a DualPipe with the echo_uppercase function
    pipe = DualPipe(echo_uppercase)
    
    # Send some data
    pipe.write("Hello, world!\n")
    pipe.write("Testing DualPipe\n")
    
    # Read the responses
    time.sleep(0.1)  # Give the child process time to respond
    
    print("Response 1:", pipe.readline().decode().strip())
    print("Response 2:", pipe.readline().decode().strip())
    
    # Clean up
    pipe.close()

This implementation uses os.fork() which is only available on Unix-like systems (Linux, macOS, etc.).

Frequently Asked Questions - Dualpipe

What is DualPipe used for?
DualPipe is used for establishing bidirectional communication between a parent process and a child process in Python. It's particularly useful for parallel processing tasks where you need to send commands to a subprocess and receive results back, all while maintaining process isolation.
How does DualPipe prevent deadlocks?
DualPipe uses non-blocking I/O operations by setting the O_NONBLOCK flag on the pipe file descriptors. This ensures that read and write operations don't block indefinitely, preventing potential deadlocks where both processes are waiting for each other.
Can DualPipe be used on Windows?
The implementation shown here uses os.fork() which is only available on Unix-like systems (Linux, macOS, etc.). For Windows compatibility, you would need to modify the implementation to use the multiprocessing module or subprocess module instead.
What's the difference between DualPipe and Python's subprocess module?
While Python's subprocess module also allows communication with child processes, DualPipe provides a more streamlined interface specifically designed for bidirectional communication with non-blocking I/O. It's particularly useful when you need to continuously exchange data between processes.