DeepSeek DualPipe

أداة بايثون للاتصال ثنائي الاتجاه الفعال بين العمليات، مما يتيح معالجة البيانات المتوازية مع الحد الأدنى من النفقات العامة.

DualPipe Architecture Visualization

الميزات الرئيسية

الاتصال ثنائي الاتجاه

يمكّن الاتصال السلس ثنائي الاتجاه بين العمليات الأم والفرعية، مما يسمح بتبادل البيانات بكفاءة في كلا الاتجاهين.

الإدخال/الإخراج غير المحظور

ينفذ عمليات الإدخال/الإخراج غير المحظورة لمنع الانسدادات وضمان تدفق سلس للبيانات بين العمليات.

عزل العمليات

يحافظ على العزل المناسب للعمليات مع توفير واجهة مناسبة للاتصال بين العمليات، مما يعزز استقرار النظام.

معالجة الأخطاء

آليات قوية لمعالجة الأخطاء لإدارة الاستثناءات وإنهاء العمليات بأناقة، مما يضمن التشغيل الموثوق في بيئات الإنتاج.

كيف يعمل DualPipe

شرح مرئي لتدفق عمليات DualPipe وآلية الاتصال.

DualPipe Workflow Diagram

عملية التهيئة

  1. إنشاء أنبوبين: من الأم إلى الابن ومن الابن إلى الأم
  2. تفرع العملية لإنشاء عملية فرعية
  3. في الابن: إعادة توجيه المدخلات/المخرجات القياسية إلى الأنابيب
  4. في الأم: إعداد الإدخال/الإخراج غير المحظور للأنابيب
  5. تنفيذ الوظيفة المستهدفة في العملية الفرعية

تدفق الاتصال

  1. تكتب الأم البيانات في الأنبوب من الأم إلى الابن
  2. يقرأ الابن البيانات من المدخلات القياسية (المعاد توجيهها من الأنبوب)
  3. يعالج الابن البيانات ويكتب في المخرجات القياسية
  4. تقرأ الأم من الأنبوب من الابن إلى الأم
  5. الإدخال/الإخراج غير المحظور يمنع الانسدادات

Dualpipe: حالات الاستخدام الشائعة

تطبيقات عملية حيث يمكن لـ DualPipe حل تحديات الاتصال المعقدة بين العمليات.

معالجة البيانات المتوازية

تفريغ مهام معالجة البيانات كثيفة وحدة المعالجة المركزية إلى العمليات الفرعية مع الحفاظ على الاتصال التفاعلي مع العملية الأم.

Example:

خطوط أنابيب معالجة الصور حيث ترسل الأم بيانات الصورة ويعيد الابن النتائج المعالجة.

تنفيذ الأوامر

تنفيذ الأوامر في عملية منفصلة والتقاط مخرجاتها في الوقت الفعلي، مع القدرة على إرسال أوامر إضافية بناءً على النتائج السابقة.

Example:

بيئات الصدفة التفاعلية أو أدوات سطر الأوامر التي تحتاج إلى الحفاظ على الحالة بين الأوامر.

اتصال الخدمات

إنشاء عمليات خدمة طويلة الأمد تتواصل بشكل ثنائي الاتجاه مع عملية التحكم.

Example:

عمال الخلفية الذين يعالجون المهام من قائمة انتظار ويبلغون النتائج مرة أخرى إلى التطبيق الرئيسي.

التنفيذ في بيئة محمية

تشغيل التعليمات البرمجية غير الموثوقة في عملية منفصلة مع قنوات اتصال محكومة.

Example:

أنظمة تقييم الكود التي تحتاج إلى تنفيذ الكود المقدم من المستخدم بأمان مع التقاط المخرجات.

Dualpipe: مقارنة مع طرق IPC الأخرى

كيف يقارن DualPipe مع طرق الاتصال الأخرى بين العمليات في بايثون.

FeatureDualPipesubprocessmultiprocessingos.popen
Bidirectional Communication✅ Built-in✅ With PIPE✅ With Queue❌ One-way only
Non-blocking I/O✅ Built-in⚠️ Requires setup✅ With Queue❌ Blocking
Process Isolation✅ Complete✅ Complete✅ Complete✅ Complete
API Simplicity✅ Simple⚠️ Complex⚠️ Moderate✅ Simple
Windows Support❌ Unix only✅ Cross-platform✅ Cross-platform✅ Cross-platform

ملاحظة: تركز المقارنة على سهولة الاستخدام للاتصال ثنائي الاتجاه بين العمليات.

وثائق DualPipe

وثائق شاملة لأداة DualPipe، بما في ذلك التثبيت وأمثلة الاستخدام ومرجع API.

# DualPipe

A Python utility for efficient bidirectional communication between processes.

## DualPipe Overview

DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks.

## DualPipe Key Features

- **Bidirectional Communication**: Seamless two-way communication between parent and child processes
- **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow
- **Process Isolation**: Maintains proper isolation while providing convenient IPC
- **Error Handling**: Gracefully manages exceptions and process termination

## DualPipe Installation

```bash
# Clone the repository
git clone https://github.com/deepseek-ai/dualpipe.git
cd dualpipe

# Install the package
pip install -e .
```

## DualPipe Usage Example

```python
from dualpipe import DualPipe
import time

def echo_uppercase():
    """Simple function that reads lines and echoes them in uppercase"""
    while True:
        line = input()
        if not line:
            break
        print(line.upper())

# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)

# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")

# Read the responses
time.sleep(0.1)  # Give the child process time to respond

print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())

# Clean up
pipe.close()
```

## How It Works

DualPipe uses Unix pipes and process forking to create a child process and establish communication channels:

1. Two pipes are created: one for parent-to-child communication and one for child-to-parent
2. The process is forked, creating a child process
3. In the child process, stdin/stdout are redirected to the appropriate pipe ends
4. In the parent process, non-blocking I/O is set up for the pipes
5. The parent can then write to and read from the child process

## API Reference

### DualPipe(target_func, *args, **kwargs)

Creates a new DualPipe instance with a target function to run in the child process.

#### Methods

- **write(data)**: Write data to the child process
- **read(size=1024, timeout=None)**: Read data from the child process
- **readline(timeout=None)**: Read a line from the child process
- **close()**: Close the pipes and wait for the child process to terminate

## Limitations

- Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork()
- For Windows compatibility, consider using the multiprocessing or subprocess modules

## License

MIT

تنفيذ DualPipe

التنفيذ الكامل لـ DualPipe مع تعليقات مفصلة تشرح كل مكون:

import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union

class DualPipe:
    """
    DualPipe: A utility for bidirectional communication with a child process.
    
    This class creates a child process and establishes two-way communication
    with it using pipes. It handles non-blocking I/O to prevent deadlocks.
    """
    
    def __init__(self, target_func: Callable, *args, **kwargs):
        """
        Initialize a DualPipe with a target function to run in the child process.
        
        Args:
            target_func: The function to execute in the child process
            *args, **kwargs: Arguments to pass to the target function
        """
        # Create pipes for parent-to-child and child-to-parent communication
        parent_to_child_r, parent_to_child_w = os.pipe()
        child_to_parent_r, child_to_parent_w = os.pipe()
        
        # Fork the process
        pid = os.fork()
        
        if pid == 0:  # Child process
            # Close unused pipe ends
            os.close(parent_to_child_w)
            os.close(child_to_parent_r)
            
            # Redirect stdin/stdout to the pipes
            os.dup2(parent_to_child_r, sys.stdin.fileno())
            os.close(parent_to_child_r)
            
            os.dup2(child_to_parent_w, sys.stdout.fileno())
            os.close(child_to_parent_w)
            
            # Execute the target function
            try:
                target_func(*args, **kwargs)
            except Exception as e:
                print(f"Error in child process: {e}", file=sys.stderr)
            finally:
                # Ensure clean exit
                os._exit(0)
        else:  # Parent process
            # Close unused pipe ends
            os.close(parent_to_child_r)
            os.close(child_to_parent_w)
            
            # Set non-blocking mode for the pipes
            for fd in (parent_to_child_w, child_to_parent_r):
                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            
            # Store the pipe file descriptors and child process ID
            self.parent_to_child_w = parent_to_child_w
            self.child_to_parent_r = child_to_parent_r
            self.child_pid = pid
            self.buffer = b""
    
    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to the child process.
        
        Args:
            data: The data to write (string or bytes)
            
        Returns:
            Number of bytes written
        """
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            return os.write(self.parent_to_child_w, data)
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, try again later
                return 0
            raise
    
    def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
        """
        Read data from the child process.
        
        Args:
            size: Maximum number of bytes to read
            timeout: Maximum time to wait for data (None = non-blocking)
            
        Returns:
            Bytes read from the child process
        """
        if timeout is not None:
            # Wait for data to be available
            r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
            if not r:
                return b""  # Timeout occurred
        
        try:
            data = os.read(self.child_to_parent_r, size)
            self.buffer += data
            return data
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, no data available
                return b""
            raise
    
    def readline(self, timeout: Optional[float] = None) -> bytes:
        """
        Read a line from the child process.
        
        Args:
            timeout: Maximum time to wait for a complete line
            
        Returns:
            A line of data (including newline character)
        """
        start_time = time.time()
        
        while b'\n' not in self.buffer:
            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    break
                remaining = timeout - elapsed
            else:
                remaining = None
                
            data = self.read(1024, remaining)
            if not data:
                break
        
        # Extract a line from the buffer if available
        if b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            return line + b'\n'
        
        # Return partial line if no newline found
        result = self.buffer
        self.buffer = b""
        return result
    
    def close(self) -> Tuple[int, int]:
        """
        Close the pipes and wait for the child process to terminate.
        
        Returns:
            Tuple of (pid, status) from os.waitpid
        """
        os.close(self.parent_to_child_w)
        os.close(self.child_to_parent_r)
        return os.waitpid(self.child_pid, 0)

# Example usage
if __name__ == "__main__":
    def echo_uppercase():
        """Simple function that reads lines and echoes them in uppercase"""
        while True:
            line = sys.stdin.readline()
            if not line:
                break
            sys.stdout.write(line.upper())
            sys.stdout.flush()
    
    # Create a DualPipe with the echo_uppercase function
    pipe = DualPipe(echo_uppercase)
    
    # Send some data
    pipe.write("Hello, world!\n")
    pipe.write("Testing DualPipe\n")
    
    # Read the responses
    time.sleep(0.1)  # Give the child process time to respond
    
    print("Response 1:", pipe.readline().decode().strip())
    print("Response 2:", pipe.readline().decode().strip())
    
    # Clean up
    pipe.close()

يستخدم هذا التنفيذ os.fork() المتاح فقط في أنظمة شبيهة بـ Unix (Linux، macOS، إلخ).

الأسئلة الشائعة - Dualpipe

ما هو استخدام DualPipe؟
يستخدم DualPipe لإنشاء اتصال ثنائي الاتجاه بين عملية أم وعملية فرعية في بايثون. وهو مفيد بشكل خاص لمهام المعالجة المتوازية حيث تحتاج إلى إرسال أوامر إلى عملية فرعية وتلقي النتائج مرة أخرى، كل ذلك مع الحفاظ على عزل العمليات.
كيف يمنع DualPipe الانسدادات؟
يستخدم DualPipe عمليات الإدخال/الإخراج غير المحظورة عن طريق تعيين علامة O_NONBLOCK على واصفات ملفات الأنابيب. هذا يضمن أن عمليات القراءة والكتابة لا تحظر إلى أجل غير مسمى، مما يمنع الانسدادات المحتملة حيث تنتظر كلتا العمليتين بعضهما البعض.
هل يمكن استخدام DualPipe على Windows؟
التنفيذ الموضح هنا يستخدم os.fork() المتاح فقط في أنظمة شبيهة بـ Unix (Linux، macOS، إلخ). للتوافق مع Windows، ستحتاج إلى تعديل التنفيذ لاستخدام وحدة multiprocessing أو وحدة subprocess بدلاً من ذلك.
ما هو الفرق بين DualPipe ووحدة subprocess في بايثون؟
في حين أن وحدة subprocess في بايثون تسمح أيضًا بالاتصال مع العمليات الفرعية، يوفر DualPipe واجهة أكثر تبسيطًا مصممة خصيصًا للاتصال ثنائي الاتجاه مع الإدخال/الإخراج غير المحظور. وهو مفيد بشكل خاص عندما تحتاج إلى تبادل البيانات باستمرار بين العمليات.