الميزات الرئيسية
- الاتصال ثنائي الاتجاه
يمكّن الاتصال السلس ثنائي الاتجاه بين العمليات الأم والفرعية، مما يسمح بتبادل البيانات بكفاءة في كلا الاتجاهين.
- الإدخال/الإخراج غير المحظور
ينفذ عمليات الإدخال/الإخراج غير المحظورة لمنع الانسدادات وضمان تدفق سلس للبيانات بين العمليات.
- عزل العمليات
يحافظ على العزل المناسب للعمليات مع توفير واجهة مناسبة للاتصال بين العمليات، مما يعزز استقرار النظام.
- معالجة الأخطاء
آليات قوية لمعالجة الأخطاء لإدارة الاستثناءات وإنهاء العمليات بأناقة، مما يضمن التشغيل الموثوق في بيئات الإنتاج.
كيف يعمل DualPipe
شرح مرئي لتدفق عمليات DualPipe وآلية الاتصال.
عملية التهيئة
- إنشاء أنبوبين: من الأم إلى الابن ومن الابن إلى الأم
- تفرع العملية لإنشاء عملية فرعية
- في الابن: إعادة توجيه المدخلات/المخرجات القياسية إلى الأنابيب
- في الأم: إعداد الإدخال/الإخراج غير المحظور للأنابيب
- تنفيذ الوظيفة المستهدفة في العملية الفرعية
تدفق الاتصال
- تكتب الأم البيانات في الأنبوب من الأم إلى الابن
- يقرأ الابن البيانات من المدخلات القياسية (المعاد توجيهها من الأنبوب)
- يعالج الابن البيانات ويكتب في المخرجات القياسية
- تقرأ الأم من الأنبوب من الابن إلى الأم
- الإدخال/الإخراج غير المحظور يمنع الانسدادات
Dualpipe: حالات الاستخدام الشائعة
تطبيقات عملية حيث يمكن لـ DualPipe حل تحديات الاتصال المعقدة بين العمليات.
معالجة البيانات المتوازية
تفريغ مهام معالجة البيانات كثيفة وحدة المعالجة المركزية إلى العمليات الفرعية مع الحفاظ على الاتصال التفاعلي مع العملية الأم.
Example:
خطوط أنابيب معالجة الصور حيث ترسل الأم بيانات الصورة ويعيد الابن النتائج المعالجة.
تنفيذ الأوامر
تنفيذ الأوامر في عملية منفصلة والتقاط مخرجاتها في الوقت الفعلي، مع القدرة على إرسال أوامر إضافية بناءً على النتائج السابقة.
Example:
بيئات الصدفة التفاعلية أو أدوات سطر الأوامر التي تحتاج إلى الحفاظ على الحالة بين الأوامر.
اتصال الخدمات
إنشاء عمليات خدمة طويلة الأمد تتواصل بشكل ثنائي الاتجاه مع عملية التحكم.
Example:
عمال الخلفية الذين يعالجون المهام من قائمة انتظار ويبلغون النتائج مرة أخرى إلى التطبيق الرئيسي.
التنفيذ في بيئة محمية
تشغيل التعليمات البرمجية غير الموثوقة في عملية منفصلة مع قنوات اتصال محكومة.
Example:
أنظمة تقييم الكود التي تحتاج إلى تنفيذ الكود المقدم من المستخدم بأمان مع التقاط المخرجات.
Dualpipe: مقارنة مع طرق IPC الأخرى
كيف يقارن DualPipe مع طرق الاتصال الأخرى بين العمليات في بايثون.
Feature | DualPipe | subprocess | multiprocessing | os.popen |
---|---|---|---|---|
Bidirectional Communication | ✅ Built-in | ✅ With PIPE | ✅ With Queue | ❌ One-way only |
Non-blocking I/O | ✅ Built-in | ⚠️ Requires setup | ✅ With Queue | ❌ Blocking |
Process Isolation | ✅ Complete | ✅ Complete | ✅ Complete | ✅ Complete |
API Simplicity | ✅ Simple | ⚠️ Complex | ⚠️ Moderate | ✅ Simple |
Windows Support | ❌ Unix only | ✅ Cross-platform | ✅ Cross-platform | ✅ Cross-platform |
ملاحظة: تركز المقارنة على سهولة الاستخدام للاتصال ثنائي الاتجاه بين العمليات.
وثائق DualPipe
وثائق شاملة لأداة DualPipe، بما في ذلك التثبيت وأمثلة الاستخدام ومرجع API.
# DualPipe A Python utility for efficient bidirectional communication between processes. ## DualPipe Overview DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks. ## DualPipe Key Features - **Bidirectional Communication**: Seamless two-way communication between parent and child processes - **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow - **Process Isolation**: Maintains proper isolation while providing convenient IPC - **Error Handling**: Gracefully manages exceptions and process termination ## DualPipe Installation ```bash # Clone the repository git clone https://github.com/deepseek-ai/dualpipe.git cd dualpipe # Install the package pip install -e . ``` ## DualPipe Usage Example ```python from dualpipe import DualPipe import time def echo_uppercase(): """Simple function that reads lines and echoes them in uppercase""" while True: line = input() if not line: break print(line.upper()) # Create a DualPipe with the echo_uppercase function pipe = DualPipe(echo_uppercase) # Send some data pipe.write("Hello, world!\n") pipe.write("Testing DualPipe\n") # Read the responses time.sleep(0.1) # Give the child process time to respond print("Response 1:", pipe.readline().decode().strip()) print("Response 2:", pipe.readline().decode().strip()) # Clean up pipe.close() ``` ## How It Works DualPipe uses Unix pipes and process forking to create a child process and establish communication channels: 1. Two pipes are created: one for parent-to-child communication and one for child-to-parent 2. The process is forked, creating a child process 3. In the child process, stdin/stdout are redirected to the appropriate pipe ends 4. In the parent process, non-blocking I/O is set up for the pipes 5. The parent can then write to and read from the child process ## API Reference ### DualPipe(target_func, *args, **kwargs) Creates a new DualPipe instance with a target function to run in the child process. #### Methods - **write(data)**: Write data to the child process - **read(size=1024, timeout=None)**: Read data from the child process - **readline(timeout=None)**: Read a line from the child process - **close()**: Close the pipes and wait for the child process to terminate ## Limitations - Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork() - For Windows compatibility, consider using the multiprocessing or subprocess modules ## License MIT
تنفيذ DualPipe
التنفيذ الكامل لـ DualPipe مع تعليقات مفصلة تشرح كل مكون:
import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union
class DualPipe:
"""
DualPipe: A utility for bidirectional communication with a child process.
This class creates a child process and establishes two-way communication
with it using pipes. It handles non-blocking I/O to prevent deadlocks.
"""
def __init__(self, target_func: Callable, *args, **kwargs):
"""
Initialize a DualPipe with a target function to run in the child process.
Args:
target_func: The function to execute in the child process
*args, **kwargs: Arguments to pass to the target function
"""
# Create pipes for parent-to-child and child-to-parent communication
parent_to_child_r, parent_to_child_w = os.pipe()
child_to_parent_r, child_to_parent_w = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0: # Child process
# Close unused pipe ends
os.close(parent_to_child_w)
os.close(child_to_parent_r)
# Redirect stdin/stdout to the pipes
os.dup2(parent_to_child_r, sys.stdin.fileno())
os.close(parent_to_child_r)
os.dup2(child_to_parent_w, sys.stdout.fileno())
os.close(child_to_parent_w)
# Execute the target function
try:
target_func(*args, **kwargs)
except Exception as e:
print(f"Error in child process: {e}", file=sys.stderr)
finally:
# Ensure clean exit
os._exit(0)
else: # Parent process
# Close unused pipe ends
os.close(parent_to_child_r)
os.close(child_to_parent_w)
# Set non-blocking mode for the pipes
for fd in (parent_to_child_w, child_to_parent_r):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store the pipe file descriptors and child process ID
self.parent_to_child_w = parent_to_child_w
self.child_to_parent_r = child_to_parent_r
self.child_pid = pid
self.buffer = b""
def write(self, data: Union[str, bytes]) -> int:
"""
Write data to the child process.
Args:
data: The data to write (string or bytes)
Returns:
Number of bytes written
"""
if isinstance(data, str):
data = data.encode('utf-8')
try:
return os.write(self.parent_to_child_w, data)
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, try again later
return 0
raise
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""
Read data from the child process.
Args:
size: Maximum number of bytes to read
timeout: Maximum time to wait for data (None = non-blocking)
Returns:
Bytes read from the child process
"""
if timeout is not None:
# Wait for data to be available
r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
if not r:
return b"" # Timeout occurred
try:
data = os.read(self.child_to_parent_r, size)
self.buffer += data
return data
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, no data available
return b""
raise
def readline(self, timeout: Optional[float] = None) -> bytes:
"""
Read a line from the child process.
Args:
timeout: Maximum time to wait for a complete line
Returns:
A line of data (including newline character)
"""
start_time = time.time()
while b'\n' not in self.buffer:
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
break
remaining = timeout - elapsed
else:
remaining = None
data = self.read(1024, remaining)
if not data:
break
# Extract a line from the buffer if available
if b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
return line + b'\n'
# Return partial line if no newline found
result = self.buffer
self.buffer = b""
return result
def close(self) -> Tuple[int, int]:
"""
Close the pipes and wait for the child process to terminate.
Returns:
Tuple of (pid, status) from os.waitpid
"""
os.close(self.parent_to_child_w)
os.close(self.child_to_parent_r)
return os.waitpid(self.child_pid, 0)
# Example usage
if __name__ == "__main__":
def echo_uppercase():
"""Simple function that reads lines and echoes them in uppercase"""
while True:
line = sys.stdin.readline()
if not line:
break
sys.stdout.write(line.upper())
sys.stdout.flush()
# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)
# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")
# Read the responses
time.sleep(0.1) # Give the child process time to respond
print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())
# Clean up
pipe.close()
يستخدم هذا التنفيذ os.fork() المتاح فقط في أنظمة شبيهة بـ Unix (Linux، macOS، إلخ).
الأسئلة الشائعة - Dualpipe
- ما هو استخدام DualPipe؟
- يستخدم DualPipe لإنشاء اتصال ثنائي الاتجاه بين عملية أم وعملية فرعية في بايثون. وهو مفيد بشكل خاص لمهام المعالجة المتوازية حيث تحتاج إلى إرسال أوامر إلى عملية فرعية وتلقي النتائج مرة أخرى، كل ذلك مع الحفاظ على عزل العمليات.
- كيف يمنع DualPipe الانسدادات؟
- يستخدم DualPipe عمليات الإدخال/الإخراج غير المحظورة عن طريق تعيين علامة O_NONBLOCK على واصفات ملفات الأنابيب. هذا يضمن أن عمليات القراءة والكتابة لا تحظر إلى أجل غير مسمى، مما يمنع الانسدادات المحتملة حيث تنتظر كلتا العمليتين بعضهما البعض.
- هل يمكن استخدام DualPipe على Windows؟
- التنفيذ الموضح هنا يستخدم os.fork() المتاح فقط في أنظمة شبيهة بـ Unix (Linux، macOS، إلخ). للتوافق مع Windows، ستحتاج إلى تعديل التنفيذ لاستخدام وحدة multiprocessing أو وحدة subprocess بدلاً من ذلك.
- ما هو الفرق بين DualPipe ووحدة subprocess في بايثون؟
- في حين أن وحدة subprocess في بايثون تسمح أيضًا بالاتصال مع العمليات الفرعية، يوفر DualPipe واجهة أكثر تبسيطًا مصممة خصيصًا للاتصال ثنائي الاتجاه مع الإدخال/الإخراج غير المحظور. وهو مفيد بشكل خاص عندما تحتاج إلى تبادل البيانات باستمرار بين العمليات.