Python 中的多程序佇列

Aditya Raj 2023年1月30日 2022年5月17日
  1. Python 多程序佇列
  2. Python 多程序佇列方法
  3. 使用具有多個程序的多程序佇列
  4. まとめ
Python 中的多程序佇列

程式設計時,你可以並行執行兩個或多個程式。但是,如果你需要在程式之間進行通訊,這將成為一項繁瑣的任務。

本文討論了我們如何在 Python 中使用多程序佇列在兩個 Python 程式之間進行通訊。

Python 多程序佇列

Python 為我們提供了多處理模組來並行建立、執行和管理兩個或多個 Python 程式。你可以使用以下匯入語句將多處理模組匯入你的程式。

import multiprocessing

匯入模組後,使用 Queue() 方法建立一個多程序佇列。multiprocessing.Queue() 方法返回一個多程序佇列。

程式碼:

import multiprocessing as mp

myQueue = mp.Queue()
print("The multiprocessing Queue is:")
print(myQueue)

輸出:

The multiprocessing Queue is:
<multiprocessing.queues.Queue object at 0x7fa48f038070>

你可以看到已在給定位置的記憶體中建立了一個 Python 多程序佇列。建立 Python 多程序佇列後,你可以使用它在兩個或多個程序之間傳遞資料。

Python 多程序佇列方法

有各種多程序佇列方法,藉助它們我們可以執行各種操作。

將元素插入 Python 多程序佇列

我們可以使用 put() 方法將元素插入到多程序佇列中。當在多程序佇列上呼叫時,該方法將一個元素作為其輸入引數並將該元素新增到佇列中,並在執行後返回 None

程式碼:

import multiprocessing as mp

myQueue = mp.Queue()
return_value= myQueue.put(1)
print(return_value)

輸出:

None

如果沒有為 put() 方法提供輸入引數,程式將執行到 TypeError 異常,如下所示。

程式碼:

import multiprocessing as mp

myQueue = mp.Queue()
return_value= myQueue.put()
print(return_value)

輸出:

Traceback (most recent call last):
  File "/home/aditya1117/PycharmProjects/pythonProject/string12.py", line 4, in <module>
    return_value= myQueue.put()
TypeError: put() missing 1 required positional argument: 'obj'

在這裡,我們沒有為 put() 方法提供任何輸入引數。因此,程式引發了 TypeError 異常,指出缺少所需的位置引數。

從 Python 多程序佇列中提取元素

你可以使用 get() 方法從多程序佇列中提取元素。get() 方法,當在多程序佇列上呼叫時,它會在從佇列中刪除佇列的前端元素後返回佇列的前端元素。

程式碼:

import multiprocessing as mp

myQueue = mp.Queue()
myQueue.put(1)
myQueue.put(2)
myQueue.put(3)
myQueue.put(4)
myQueue.put(5)
return_value = myQueue.get()
print(return_value)

輸出:

1

我們首先將五個元素排入多程序佇列。之後,我們使用 get() 方法獲得了一個元素。

觀察 get() 方法返回值 1 插入到多程序佇列中。這是因為佇列遵循先進先出 (FIFO) 順序來訪問元素。

獲取 Python 多程序佇列的大小

我們可以使用 qsize() 方法獲取多程序佇列的大小。qsize() 方法返回 python 多程序佇列的近似大小。

程式碼:

import multiprocessing as mp

myQueue = mp.Queue()
myQueue.put(1)
myQueue.put(2)
myQueue.put(3)
myQueue.put(4)
myQueue.put(5)
return_value = myQueue.qsize()
print("The size of multiprocessing queue is:")
print(return_value)

輸出:

The size of multiprocessing queue is:
5

在上面的示例中,我們使用了術語近似大小而不是佇列的大小。這是因為佇列在多個程序之間共享。

因此,另一個程序可能會在我們獲得它的大小之後將一個元素新增到佇列中或從佇列中刪除一個元素。因此,qsize() 方法返回的大小是不可靠的。

檢查多程序佇列是否為空

empty() 方法檢查多程序佇列是否為空,如果佇列為空,則該方法返回 True。否則,它返回 False

程式碼:

import multiprocessing as mp

myQueue = mp.Queue()
myQueue.put(1)
myQueue.put(2)
myQueue.put(3)
myQueue.put(4)
myQueue.put(5)
return_value = myQueue.empty()
print("The multiprocessing queue is empty:")
print(return_value)

輸出:

The multiprocessing queue is empty:
False

多程序佇列中有五個元素。因此,empty() 方法返回 False

關閉 Python 多程序佇列

如果你不希望任何程序寫入多程序佇列,你可以使用 close() 方法關閉佇列。close() 方法,當在任何程序中的多程序佇列上呼叫時,會關閉佇列。

在此之後,任何程序都不能將元素插入佇列。現在讓我們解決如何在 Python 程式中使用多程序佇列。

使用具有多個程序的多程序佇列

定義函式以建立流程

要在 Python 中的不同程序之間使用多程序佇列,我們​​首先需要建立多個程序。我們將首先定義兩個函式。

第一個函式將多程序佇列作為輸入引數。在執行時,它將從 11000 的正數新增到 Python 多程序佇列。

def addPositive(queue):
    print("I am in addPositive.")
    for i in range(1,1001):
        queue.put(i)

第二個函式還將多程序佇列作為輸入引數。但是,它將從 -1000-1 的負數新增到多程序佇列中。

def addNegative(queue):
    print("I am in addNegative.")
    for i in range(-1000, 0):
        queue.put(i)

建立程序以將資料寫入多程序佇列

建立函式後,我們將使用這兩個函式建立兩個單獨的程序。我們可以使用 Process() 方法來建立一個程序。

Process() 方法將一個函式作為分配給 target 引數的第一個輸入引數。它還接受一個元組,該元組包含目標中提供的函式輸入引數。

元組被分配給 Process() 方法的 args 引數。執行後,Process() 方法返回一個 Process 物件。

我們將建立一個將正數和負數新增到多程序佇列的過程。

myQueue = mp.Queue()
process1 = mp.Process(target=addPositive, args=(myQueue,))
process2 = mp.Process(target=addNegative, args=(myQueue,))

啟動將資料寫入多程序佇列的程序

建立流程後,我們可以使用 start() 方法開始執行流程。一旦程序被執行,這些數字將被寫入多程序佇列。

process1.start()
process2.start()

如果使用 terminate() 命令或由於異常突然終止任何程序,則多程序佇列可能會損壞。之後,你將無法在任何程序中從佇列中讀取或寫入佇列。

因此,所有過程都必須順利執行。

在主程序中等待子程序完成

我們在其中建立了其他程序的父程序可能會在子程序之前完成其執行。在這種情況下,殭屍程序被建立並始終存在於計算機的記憶體中。

為了避免這種情況,我們可以暫停父程序的執行,直到子程序完成它們的執行。我們可以使用 join() 方法讓父程序等待子程序完成執行。

process1.join()
process2.join()

列印多程序佇列的內容

我們可以使用 get() 方法、empty() 方法和 print() 函式列印多處理的內容。我們將使用 empty() 方法檢查多程序佇列是否為空。

如果佇列不為空,我們將使用 get() 方法從佇列中提取一個元素並列印結果。否則,我們將使用 close() 方法關閉多程序佇列以完成程式的執行。

程式碼:

import multiprocessing as mp

def addPositive(queue):
    print("I am in addPositive.")
    for i in range(1, 100):
        queue.put(i)

def addNegative(queue):
    print("I am in addNegative.")
    for i in range(-100, 0):
        queue.put(i)

myQueue = mp.Queue()
process1 = mp.Process(target=addPositive, args=(myQueue,))
process2 = mp.Process(target=addNegative, args=(myQueue,))
process1.start()
process2.start()
process1.join()
process2.join()
while myQueue:
    print(myQueue.get(), end=",")
myQueue.close()

輸出:

1,2,3,4,5,6,7,8,9,10,-1001,11,12,13,-1000,-999,-998,-997,-996,-995,-994,-993,-992,-991,-990,-989,-988,-987,-986,-985,-984,-983,-982,-981,14,-980,15,-979,16,17,18,19,20,21,22,23,24,25,26,-978,-977,-976,-975,-974,-973,-972,-971,-970...

程式碼將一直執行,直到佇列為空。

觀察佇列隨機包含正數和負數。這證明資料正在使用兩個不同的程序以並行方式寫入多程序佇列。

まとめ

在本文中,我們討論了 python 多程序佇列。multiprocessing 模組提供高階函式來建立子程序。

我們建議使用多處理模組而不是 fork() 方法來建立子程序。你可以使用 Pipe 和 SimpleQueue 物件在程序之間共享資料。

你可以在本文件中閱讀有關它們的更多資訊。

相關文章 - Python Queue