Características Principales
- Comunicación Bidireccional
Permite una comunicación bidireccional fluida entre procesos padre e hijo, permitiendo un intercambio eficiente de datos en ambas direcciones.
- E/S No Bloqueante
Implementa operaciones de entrada/salida no bloqueantes para prevenir bloqueos mutuos y asegurar un flujo de datos fluido entre procesos.
- Aislamiento de Procesos
Mantiene un aislamiento adecuado de procesos mientras proporciona una interfaz conveniente para la comunicación entre procesos, mejorando la estabilidad del sistema.
- Manejo de Errores
Mecanismos robustos de manejo de errores para gestionar con elegancia las excepciones y la terminación de procesos, asegurando una operación confiable en entornos de producción.
Cómo Funciona DualPipe
Una explicación visual del flujo de procesos de DualPipe y el mecanismo de comunicación.
Proceso de Inicialización
- Crear dos tuberías: padre-a-hijo e hijo-a-padre
- Bifurcar el proceso para crear un proceso hijo
- En el hijo: redirigir stdin/stdout a las tuberías
- En el padre: configurar E/S no bloqueante en las tuberías
- Ejecutar la función objetivo en el proceso hijo
Flujo de Comunicación
- El padre escribe datos en la tubería padre-a-hijo
- El hijo lee datos desde stdin (redirigido desde la tubería)
- El hijo procesa datos y escribe en stdout
- El padre lee desde la tubería hijo-a-padre
- La E/S no bloqueante previene bloqueos mutuos
Dualpipe: Casos de Uso Comunes
Aplicaciones prácticas donde DualPipe puede resolver desafíos complejos de comunicación entre procesos.
Procesamiento de Datos en Paralelo
Descargar tareas de procesamiento de datos intensivas en CPU a procesos hijo mientras se mantiene la comunicación interactiva con el proceso padre.
Example:
Tuberías de procesamiento de imágenes donde el padre envía datos de imagen y el hijo devuelve resultados procesados.
Ejecución de Comandos
Ejecutar comandos en un proceso separado y capturar su salida en tiempo real, con la capacidad de enviar comandos adicionales basados en resultados previos.
Example:
Entornos de shell interactivos o herramientas de línea de comandos que necesitan mantener estado entre comandos.
Comunicación de Servicios
Crear procesos de servicio de larga duración que se comunican bidireccionalmente con un proceso controlador.
Example:
Trabajadores en segundo plano que procesan tareas desde una cola e informan resultados de vuelta a la aplicación principal.
Ejecución en Sandbox
Ejecutar código no confiable en un proceso separado con canales de comunicación controlados.
Example:
Sistemas de evaluación de código que necesitan ejecutar código enviado por usuarios de manera segura mientras capturan la salida.
Dualpipe: Comparación con Otros Métodos IPC
Cómo se compara DualPipe con otros métodos de comunicación entre procesos en Python.
Feature | DualPipe | subprocess | multiprocessing | os.popen |
---|---|---|---|---|
Bidirectional Communication | ✅ Built-in | ✅ With PIPE | ✅ With Queue | ❌ One-way only |
Non-blocking I/O | ✅ Built-in | ⚠️ Requires setup | ✅ With Queue | ❌ Blocking |
Process Isolation | ✅ Complete | ✅ Complete | ✅ Complete | ✅ Complete |
API Simplicity | ✅ Simple | ⚠️ Complex | ⚠️ Moderate | ✅ Simple |
Windows Support | ❌ Unix only | ✅ Cross-platform | ✅ Cross-platform | ✅ Cross-platform |
Nota: La comparación se centra en la facilidad de uso para la comunicación bidireccional entre procesos.
Documentación de DualPipe
Documentación completa para la utilidad DualPipe, incluyendo instalación, ejemplos de uso y referencia de API.
# DualPipe A Python utility for efficient bidirectional communication between processes. ## DualPipe Overview DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks. ## DualPipe Key Features - **Bidirectional Communication**: Seamless two-way communication between parent and child processes - **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow - **Process Isolation**: Maintains proper isolation while providing convenient IPC - **Error Handling**: Gracefully manages exceptions and process termination ## DualPipe Installation ```bash # Clone the repository git clone https://github.com/deepseek-ai/dualpipe.git cd dualpipe # Install the package pip install -e . ``` ## DualPipe Usage Example ```python from dualpipe import DualPipe import time def echo_uppercase(): """Simple function that reads lines and echoes them in uppercase""" while True: line = input() if not line: break print(line.upper()) # Create a DualPipe with the echo_uppercase function pipe = DualPipe(echo_uppercase) # Send some data pipe.write("Hello, world!\n") pipe.write("Testing DualPipe\n") # Read the responses time.sleep(0.1) # Give the child process time to respond print("Response 1:", pipe.readline().decode().strip()) print("Response 2:", pipe.readline().decode().strip()) # Clean up pipe.close() ``` ## How It Works DualPipe uses Unix pipes and process forking to create a child process and establish communication channels: 1. Two pipes are created: one for parent-to-child communication and one for child-to-parent 2. The process is forked, creating a child process 3. In the child process, stdin/stdout are redirected to the appropriate pipe ends 4. In the parent process, non-blocking I/O is set up for the pipes 5. The parent can then write to and read from the child process ## API Reference ### DualPipe(target_func, *args, **kwargs) Creates a new DualPipe instance with a target function to run in the child process. #### Methods - **write(data)**: Write data to the child process - **read(size=1024, timeout=None)**: Read data from the child process - **readline(timeout=None)**: Read a line from the child process - **close()**: Close the pipes and wait for the child process to terminate ## Limitations - Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork() - For Windows compatibility, consider using the multiprocessing or subprocess modules ## License MIT
Implementación de DualPipe
La implementación completa de DualPipe con comentarios detallados explicando cada componente:
import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union
class DualPipe:
"""
DualPipe: A utility for bidirectional communication with a child process.
This class creates a child process and establishes two-way communication
with it using pipes. It handles non-blocking I/O to prevent deadlocks.
"""
def __init__(self, target_func: Callable, *args, **kwargs):
"""
Initialize a DualPipe with a target function to run in the child process.
Args:
target_func: The function to execute in the child process
*args, **kwargs: Arguments to pass to the target function
"""
# Create pipes for parent-to-child and child-to-parent communication
parent_to_child_r, parent_to_child_w = os.pipe()
child_to_parent_r, child_to_parent_w = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0: # Child process
# Close unused pipe ends
os.close(parent_to_child_w)
os.close(child_to_parent_r)
# Redirect stdin/stdout to the pipes
os.dup2(parent_to_child_r, sys.stdin.fileno())
os.close(parent_to_child_r)
os.dup2(child_to_parent_w, sys.stdout.fileno())
os.close(child_to_parent_w)
# Execute the target function
try:
target_func(*args, **kwargs)
except Exception as e:
print(f"Error in child process: {e}", file=sys.stderr)
finally:
# Ensure clean exit
os._exit(0)
else: # Parent process
# Close unused pipe ends
os.close(parent_to_child_r)
os.close(child_to_parent_w)
# Set non-blocking mode for the pipes
for fd in (parent_to_child_w, child_to_parent_r):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store the pipe file descriptors and child process ID
self.parent_to_child_w = parent_to_child_w
self.child_to_parent_r = child_to_parent_r
self.child_pid = pid
self.buffer = b""
def write(self, data: Union[str, bytes]) -> int:
"""
Write data to the child process.
Args:
data: The data to write (string or bytes)
Returns:
Number of bytes written
"""
if isinstance(data, str):
data = data.encode('utf-8')
try:
return os.write(self.parent_to_child_w, data)
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, try again later
return 0
raise
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""
Read data from the child process.
Args:
size: Maximum number of bytes to read
timeout: Maximum time to wait for data (None = non-blocking)
Returns:
Bytes read from the child process
"""
if timeout is not None:
# Wait for data to be available
r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
if not r:
return b"" # Timeout occurred
try:
data = os.read(self.child_to_parent_r, size)
self.buffer += data
return data
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, no data available
return b""
raise
def readline(self, timeout: Optional[float] = None) -> bytes:
"""
Read a line from the child process.
Args:
timeout: Maximum time to wait for a complete line
Returns:
A line of data (including newline character)
"""
start_time = time.time()
while b'\n' not in self.buffer:
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
break
remaining = timeout - elapsed
else:
remaining = None
data = self.read(1024, remaining)
if not data:
break
# Extract a line from the buffer if available
if b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
return line + b'\n'
# Return partial line if no newline found
result = self.buffer
self.buffer = b""
return result
def close(self) -> Tuple[int, int]:
"""
Close the pipes and wait for the child process to terminate.
Returns:
Tuple of (pid, status) from os.waitpid
"""
os.close(self.parent_to_child_w)
os.close(self.child_to_parent_r)
return os.waitpid(self.child_pid, 0)
# Example usage
if __name__ == "__main__":
def echo_uppercase():
"""Simple function that reads lines and echoes them in uppercase"""
while True:
line = sys.stdin.readline()
if not line:
break
sys.stdout.write(line.upper())
sys.stdout.flush()
# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)
# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")
# Read the responses
time.sleep(0.1) # Give the child process time to respond
print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())
# Clean up
pipe.close()
Esta implementación utiliza os.fork() que solo está disponible en sistemas tipo Unix (Linux, macOS, etc.).
Preguntas Frecuentes - Dualpipe
- ¿Para qué se utiliza DualPipe?
- DualPipe se utiliza para establecer comunicación bidireccional entre un proceso padre y un proceso hijo en Python. Es particularmente útil para tareas de procesamiento paralelo donde necesita enviar comandos a un subproceso y recibir resultados de vuelta, todo mientras mantiene el aislamiento de procesos.
- ¿Cómo previene DualPipe los bloqueos mutuos?
- DualPipe utiliza operaciones de E/S no bloqueantes estableciendo la bandera O_NONBLOCK en los descriptores de archivo de las tuberías. Esto asegura que las operaciones de lectura y escritura no se bloqueen indefinidamente, previniendo potenciales bloqueos mutuos donde ambos procesos están esperando el uno al otro.
- ¿Puede usarse DualPipe en Windows?
- La implementación mostrada aquí utiliza os.fork() que solo está disponible en sistemas tipo Unix (Linux, macOS, etc.). Para compatibilidad con Windows, necesitaría modificar la implementación para usar el módulo multiprocessing o el módulo subprocess.
- ¿Cuál es la diferencia entre DualPipe y el módulo subprocess de Python?
- Aunque el módulo subprocess de Python también permite la comunicación con procesos hijo, DualPipe proporciona una interfaz más simplificada específicamente diseñada para comunicación bidireccional con E/S no bloqueante. Es particularmente útil cuando necesita intercambiar datos continuamente entre procesos.