DeepSeek DualPipe

Una utilidad Python para la comunicación bidireccional eficiente entre procesos, permitiendo el procesamiento de datos en paralelo con una sobrecarga mínima.

DualPipe Architecture Visualization

Características Principales

Comunicación Bidireccional

Permite una comunicación bidireccional fluida entre procesos padre e hijo, permitiendo un intercambio eficiente de datos en ambas direcciones.

E/S No Bloqueante

Implementa operaciones de entrada/salida no bloqueantes para prevenir bloqueos mutuos y asegurar un flujo de datos fluido entre procesos.

Aislamiento de Procesos

Mantiene un aislamiento adecuado de procesos mientras proporciona una interfaz conveniente para la comunicación entre procesos, mejorando la estabilidad del sistema.

Manejo de Errores

Mecanismos robustos de manejo de errores para gestionar con elegancia las excepciones y la terminación de procesos, asegurando una operación confiable en entornos de producción.

Cómo Funciona DualPipe

Una explicación visual del flujo de procesos de DualPipe y el mecanismo de comunicación.

DualPipe Workflow Diagram

Proceso de Inicialización

  1. Crear dos tuberías: padre-a-hijo e hijo-a-padre
  2. Bifurcar el proceso para crear un proceso hijo
  3. En el hijo: redirigir stdin/stdout a las tuberías
  4. En el padre: configurar E/S no bloqueante en las tuberías
  5. Ejecutar la función objetivo en el proceso hijo

Flujo de Comunicación

  1. El padre escribe datos en la tubería padre-a-hijo
  2. El hijo lee datos desde stdin (redirigido desde la tubería)
  3. El hijo procesa datos y escribe en stdout
  4. El padre lee desde la tubería hijo-a-padre
  5. La E/S no bloqueante previene bloqueos mutuos

Dualpipe: Casos de Uso Comunes

Aplicaciones prácticas donde DualPipe puede resolver desafíos complejos de comunicación entre procesos.

Procesamiento de Datos en Paralelo

Descargar tareas de procesamiento de datos intensivas en CPU a procesos hijo mientras se mantiene la comunicación interactiva con el proceso padre.

Example:

Tuberías de procesamiento de imágenes donde el padre envía datos de imagen y el hijo devuelve resultados procesados.

Ejecución de Comandos

Ejecutar comandos en un proceso separado y capturar su salida en tiempo real, con la capacidad de enviar comandos adicionales basados en resultados previos.

Example:

Entornos de shell interactivos o herramientas de línea de comandos que necesitan mantener estado entre comandos.

Comunicación de Servicios

Crear procesos de servicio de larga duración que se comunican bidireccionalmente con un proceso controlador.

Example:

Trabajadores en segundo plano que procesan tareas desde una cola e informan resultados de vuelta a la aplicación principal.

Ejecución en Sandbox

Ejecutar código no confiable en un proceso separado con canales de comunicación controlados.

Example:

Sistemas de evaluación de código que necesitan ejecutar código enviado por usuarios de manera segura mientras capturan la salida.

Dualpipe: Comparación con Otros Métodos IPC

Cómo se compara DualPipe con otros métodos de comunicación entre procesos en Python.

FeatureDualPipesubprocessmultiprocessingos.popen
Bidirectional Communication✅ Built-in✅ With PIPE✅ With Queue❌ One-way only
Non-blocking I/O✅ Built-in⚠️ Requires setup✅ With Queue❌ Blocking
Process Isolation✅ Complete✅ Complete✅ Complete✅ Complete
API Simplicity✅ Simple⚠️ Complex⚠️ Moderate✅ Simple
Windows Support❌ Unix only✅ Cross-platform✅ Cross-platform✅ Cross-platform

Nota: La comparación se centra en la facilidad de uso para la comunicación bidireccional entre procesos.

Documentación de DualPipe

Documentación completa para la utilidad DualPipe, incluyendo instalación, ejemplos de uso y referencia de API.

# DualPipe

A Python utility for efficient bidirectional communication between processes.

## DualPipe Overview

DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks.

## DualPipe Key Features

- **Bidirectional Communication**: Seamless two-way communication between parent and child processes
- **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow
- **Process Isolation**: Maintains proper isolation while providing convenient IPC
- **Error Handling**: Gracefully manages exceptions and process termination

## DualPipe Installation

```bash
# Clone the repository
git clone https://github.com/deepseek-ai/dualpipe.git
cd dualpipe

# Install the package
pip install -e .
```

## DualPipe Usage Example

```python
from dualpipe import DualPipe
import time

def echo_uppercase():
    """Simple function that reads lines and echoes them in uppercase"""
    while True:
        line = input()
        if not line:
            break
        print(line.upper())

# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)

# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")

# Read the responses
time.sleep(0.1)  # Give the child process time to respond

print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())

# Clean up
pipe.close()
```

## How It Works

DualPipe uses Unix pipes and process forking to create a child process and establish communication channels:

1. Two pipes are created: one for parent-to-child communication and one for child-to-parent
2. The process is forked, creating a child process
3. In the child process, stdin/stdout are redirected to the appropriate pipe ends
4. In the parent process, non-blocking I/O is set up for the pipes
5. The parent can then write to and read from the child process

## API Reference

### DualPipe(target_func, *args, **kwargs)

Creates a new DualPipe instance with a target function to run in the child process.

#### Methods

- **write(data)**: Write data to the child process
- **read(size=1024, timeout=None)**: Read data from the child process
- **readline(timeout=None)**: Read a line from the child process
- **close()**: Close the pipes and wait for the child process to terminate

## Limitations

- Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork()
- For Windows compatibility, consider using the multiprocessing or subprocess modules

## License

MIT

Implementación de DualPipe

La implementación completa de DualPipe con comentarios detallados explicando cada componente:

import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union

class DualPipe:
    """
    DualPipe: A utility for bidirectional communication with a child process.
    
    This class creates a child process and establishes two-way communication
    with it using pipes. It handles non-blocking I/O to prevent deadlocks.
    """
    
    def __init__(self, target_func: Callable, *args, **kwargs):
        """
        Initialize a DualPipe with a target function to run in the child process.
        
        Args:
            target_func: The function to execute in the child process
            *args, **kwargs: Arguments to pass to the target function
        """
        # Create pipes for parent-to-child and child-to-parent communication
        parent_to_child_r, parent_to_child_w = os.pipe()
        child_to_parent_r, child_to_parent_w = os.pipe()
        
        # Fork the process
        pid = os.fork()
        
        if pid == 0:  # Child process
            # Close unused pipe ends
            os.close(parent_to_child_w)
            os.close(child_to_parent_r)
            
            # Redirect stdin/stdout to the pipes
            os.dup2(parent_to_child_r, sys.stdin.fileno())
            os.close(parent_to_child_r)
            
            os.dup2(child_to_parent_w, sys.stdout.fileno())
            os.close(child_to_parent_w)
            
            # Execute the target function
            try:
                target_func(*args, **kwargs)
            except Exception as e:
                print(f"Error in child process: {e}", file=sys.stderr)
            finally:
                # Ensure clean exit
                os._exit(0)
        else:  # Parent process
            # Close unused pipe ends
            os.close(parent_to_child_r)
            os.close(child_to_parent_w)
            
            # Set non-blocking mode for the pipes
            for fd in (parent_to_child_w, child_to_parent_r):
                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            
            # Store the pipe file descriptors and child process ID
            self.parent_to_child_w = parent_to_child_w
            self.child_to_parent_r = child_to_parent_r
            self.child_pid = pid
            self.buffer = b""
    
    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to the child process.
        
        Args:
            data: The data to write (string or bytes)
            
        Returns:
            Number of bytes written
        """
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            return os.write(self.parent_to_child_w, data)
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, try again later
                return 0
            raise
    
    def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
        """
        Read data from the child process.
        
        Args:
            size: Maximum number of bytes to read
            timeout: Maximum time to wait for data (None = non-blocking)
            
        Returns:
            Bytes read from the child process
        """
        if timeout is not None:
            # Wait for data to be available
            r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
            if not r:
                return b""  # Timeout occurred
        
        try:
            data = os.read(self.child_to_parent_r, size)
            self.buffer += data
            return data
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, no data available
                return b""
            raise
    
    def readline(self, timeout: Optional[float] = None) -> bytes:
        """
        Read a line from the child process.
        
        Args:
            timeout: Maximum time to wait for a complete line
            
        Returns:
            A line of data (including newline character)
        """
        start_time = time.time()
        
        while b'\n' not in self.buffer:
            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    break
                remaining = timeout - elapsed
            else:
                remaining = None
                
            data = self.read(1024, remaining)
            if not data:
                break
        
        # Extract a line from the buffer if available
        if b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            return line + b'\n'
        
        # Return partial line if no newline found
        result = self.buffer
        self.buffer = b""
        return result
    
    def close(self) -> Tuple[int, int]:
        """
        Close the pipes and wait for the child process to terminate.
        
        Returns:
            Tuple of (pid, status) from os.waitpid
        """
        os.close(self.parent_to_child_w)
        os.close(self.child_to_parent_r)
        return os.waitpid(self.child_pid, 0)

# Example usage
if __name__ == "__main__":
    def echo_uppercase():
        """Simple function that reads lines and echoes them in uppercase"""
        while True:
            line = sys.stdin.readline()
            if not line:
                break
            sys.stdout.write(line.upper())
            sys.stdout.flush()
    
    # Create a DualPipe with the echo_uppercase function
    pipe = DualPipe(echo_uppercase)
    
    # Send some data
    pipe.write("Hello, world!\n")
    pipe.write("Testing DualPipe\n")
    
    # Read the responses
    time.sleep(0.1)  # Give the child process time to respond
    
    print("Response 1:", pipe.readline().decode().strip())
    print("Response 2:", pipe.readline().decode().strip())
    
    # Clean up
    pipe.close()

Esta implementación utiliza os.fork() que solo está disponible en sistemas tipo Unix (Linux, macOS, etc.).

Preguntas Frecuentes - Dualpipe

¿Para qué se utiliza DualPipe?
DualPipe se utiliza para establecer comunicación bidireccional entre un proceso padre y un proceso hijo en Python. Es particularmente útil para tareas de procesamiento paralelo donde necesita enviar comandos a un subproceso y recibir resultados de vuelta, todo mientras mantiene el aislamiento de procesos.
¿Cómo previene DualPipe los bloqueos mutuos?
DualPipe utiliza operaciones de E/S no bloqueantes estableciendo la bandera O_NONBLOCK en los descriptores de archivo de las tuberías. Esto asegura que las operaciones de lectura y escritura no se bloqueen indefinidamente, previniendo potenciales bloqueos mutuos donde ambos procesos están esperando el uno al otro.
¿Puede usarse DualPipe en Windows?
La implementación mostrada aquí utiliza os.fork() que solo está disponible en sistemas tipo Unix (Linux, macOS, etc.). Para compatibilidad con Windows, necesitaría modificar la implementación para usar el módulo multiprocessing o el módulo subprocess.
¿Cuál es la diferencia entre DualPipe y el módulo subprocess de Python?
Aunque el módulo subprocess de Python también permite la comunicación con procesos hijo, DualPipe proporciona una interfaz más simplificada específicamente diseñada para comunicación bidireccional con E/S no bloqueante. Es particularmente útil cuando necesita intercambiar datos continuamente entre procesos.