Various interprocess communication mechanisms are well supported by standard python libraries such as Threading
and Multiprocessing
. However, these means are designed to implement IPC mechanisms between related processes, that is, those that are generated from a common ancestor and thus inherit IPC objects. However, it’s often required to use IPC facilities in unrelated processes that start independently. In this case, named IPC objects (POSIX or SysV) should be used, which allow unrelated processes to obtain an IPC object by a unique name. This interaction is not supported by standard Python tools.
Python 3.8 introduced the multiprocessing.shared_memory library, which is the first step to implementing IPC tools for communication of unrelated processes. This article was just conceived as a demonstration case of this library usage. However, everything went wrong. As of November 29, 2019, the implementation of shared memory in this library is incorrect – the shared memory object is deleted even if the process just wants to stop using the object without the intention of deleting it. Despite the presence of two calls
close ()
andunlink ()
, regardless of their call or non-call, the object is deleted when any of the processes using the object terminates.
We were able to solve the problem using a third-party implementation of POSIX IPC, which, although it is low-level, it works great. Next, we implement two programs:
write.py
, which reads OpenCV frames from a webcam (NumPy Ndarray
) and transfers it toread.py
through a shared memory segment;read.py
, which reads the frame from the shared memory segment and displays it on a screen.
Why one needs that? The transfer and sharing of objects between processes through shared memory are much more efficient than serialization and deserialization, as it is practically free, therefore it fits great for implementing a low-latency high-bandwidth data exchange between processes within a single node. Unfortunately, if it is required to exchange data between compute nodes, the traditional approaches based on the transmission of messages must be used.
The following implementation demonstrates:
- POSIX semaphore, which is used for the shared memory access race condition prevention;
- sharing of
Ctypes
structure; - sharing of
NumPy Ndarray
structure.
The source code can be found on GitHub: https://github.com/bwsw/shared-ctypes-numpy-posix-ipc-python.
Metadata Structure
from ctypes import Structure, c_int32, c_int64
class MD(Structure):
_fields_ = [
('shape_0', c_int32),
('shape_1', c_int32),
('shape_2', c_int32),
('size', c_int64),
('count', c_int64)
]
In this structure, we define the metadata that are used to share the stream specification between the processes. Through it, the writer (write.py
) passes to the reader (read.py
) what buffer size is used and what shape of the NumPy Ndarray
array should be used when restoring an object from the shared memory buffer. This code should be placed in the structures.py
file.
Writer
The writer receives the frame from the OpenCV video camera and transfers its specification through the metadata structure to the reader process. Based on this data, the reader is able to properly connect to the shared memory buffer, which is used to transmit the frame itself.
To protect against a race condition when accessing the shared memory, a binary semaphore is used, which prevents the case where the reader reads incomplete data from the metadata buffer or frame buffer.
import cv2
import numpy as np
import mmap
from posix_ipc import Semaphore, O_CREX, ExistentialError, O_CREAT, SharedMemory, unlink_shared_memory
from ctypes import sizeof, memmove, addressof, create_string_buffer
from structures import MD
md_buf = create_string_buffer(sizeof(MD))
class ShmWrite:
def __init__(self, name):
self.shm_region = None
self.md_region = SharedMemory(name + '-meta', O_CREAT, size=sizeof(MD))
self.md_buf = mmap.mmap(self.md_region.fd, self.md_region.size)
self.md_region.close_fd()
self.shm_buf = None
self.shm_name = name
self.count = 0
try:
self.sem = Semaphore(name, O_CREX)
except ExistentialError:
sem = Semaphore(name, O_CREAT)
sem.unlink()
self.sem = Semaphore(name, O_CREX)
self.sem.release()
def add(self, frame: np.ndarray):
byte_size = frame.nbytes
if not self.shm_region:
self.shm_region = SharedMemory(self.shm_name, O_CREAT, size=byte_size)
self.shm_buf = mmap.mmap(self.shm_region.fd, byte_size)
self.shm_region.close_fd()
self.count += 1
md = MD(frame.shape[0], frame.shape[1], frame.shape[2], byte_size, self.count)
self.sem.acquire()
memmove(md_buf, addressof(md), sizeof(md))
self.md_buf[:] = bytes(md_buf)
self.shm_buf[:] = frame.tobytes()
self.sem.release()
def release(self):
self.sem.acquire()
self.md_buf.close()
unlink_shared_memory(self.shm_name + '-meta')
self.shm_buf.close()
unlink_shared_memory(self.shm_name)
self.sem.release()
self.sem.close()
if __name__ == '__main__':
cap = cv2.VideoCapture(0)
shm_w = ShmWrite('abc')
try:
while True:
ret, frame = cap.read()
shm_w.add(frame)
except KeyboardInterrupt:
pass
shm_w.release()
Reader
The reader receives the frame specification from the metadata buffer and instantiates the NumPy Ndarray
array of the required shape from the frame buffer. To avoid the situation of reading non-integral data, a binary semaphore is used.
import cv2
import numpy as np
import mmap
from posix_ipc import Semaphore, SharedMemory, ExistentialError
from ctypes import sizeof, memmove, addressof, create_string_buffer
from time import sleep
from structures import MD
md_buf = create_string_buffer(sizeof(MD))
class ShmRead:
def __init__(self, name):
self.shm_buf = None
self.md_buf = None
while not self.md_buf:
try:
print("Waiting for MetaData shared memory is available.")
md_region = SharedMemory(name + '-meta')
self.md_buf = mmap.mmap(md_region.fd, sizeof(MD))
md_region.close_fd()
sleep(1)
except ExistentialError:
sleep(1)
self.shm_name = name
self.sem = Semaphore(name, 0)
def get(self):
md = MD()
self.sem.acquire()
md_buf[:] = self.md_buf
memmove(addressof(md), md_buf, sizeof(md))
self.sem.release()
while not self.shm_buf:
try:
print("Waiting for Data shared memory is available.")
shm_region = SharedMemory(name=self.shm_name)
self.shm_buf = mmap.mmap(shm_region.fd, md.size)
shm_region.close_fd()
sleep(1)
except ExistentialError:
sleep(1)
self.sem.acquire()
f = np.ndarray(shape=(md.shape_0, md.shape_1, md.shape_2), dtype='uint8', buffer=self.shm_buf)
self.sem.release()
return f
def release(self):
self.md_buf.close()
self.shm_buf.close()
if __name__ == '__main__':
shm_r = ShmRead('abc')
while True:
sleep(0.03)
f = shm_r.get()
cv2.imshow('frame', f)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
shm_r.release()
Dependencies
The code uses the following 3rd-party requirements:
opencv-python
numpy
posix_ipc
IPC namespace sharing for applications in Docker containers
Let’s explore how to use shared memory in Docker since this environment is the most popular one for modern applications and allows one to fully reveal the possibilities of POSIX shared memory described in this article.
By default, Docker containers do not share IPC objects, which prevents applications, running on different containers, to access common IPC objects. This is the correct behavior because it reflects the idea of controlling and restricting access to container resources and isolating resources between containers. However, there is a standard Docker mechanism that enables the sharing of the IPC namespaces between the containers. Let’s know how to do it.
Create a container that will be used to test the code using the following Dockerfile
:
FROM python:3.8
MAINTAINER Bitworks Software info@bitworks.software
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
RUN DEBIAN_FRONTEND=noninteractive apt-get update
RUN DEBIAN_FRONTEND=noninteractive apt-get install -y -q python3-pip python-dev
RUN pip3 install --upgrade pip
RUN pip3 install opencv-python numpy posix_ipc
COPY ./*.py /opt/
WORKDIR /opt
ENTRYPOINT ["/usr/local/bin/python3.8"]
CMD ["/opt/write.py"]
Build it:
docker build -t opencv .
Now we can execute write.py
, passing webcam into it:
docker run -it --rm --name write --device /dev/video0:/dev/video0 opencv
The container is running in the interactive mode. Open one more terminal for the launching of
read.py
.
Make sure the read.py
process does not have access to the IPC of the write
container by default:
xhost +local:
docker run -it --name read --rm -e DISPLAY=unix$DISPLAY -v /tmp/.X11-unix:/tmp/.X11-unix opencv /opt/read.py
Waiting for MetaData shared memory is available.
Waiting for MetaData shared memory is available.
Waiting for MetaData shared memory is available.
...
Messages indicate IPC objects of the write
container are unavailable.
When starting up, X11 forwarding is used, otherwise, the
read.py
application will not be able to use the means of displaying the image on the screen.
Now run it in IPC namespace shared with the write
container:
docker run -it --name read --ipc container:write --rm -e DISPLAY=unix$DISPLAY -v /tmp/.X11-unix:/tmp/.X11-unix opencv /opt/read.py
Now you should see a window with the image from the webcam, which is formed by transferring the frame through shared memory between the write
and read
containers.
Summary
The source code can be found on GitHub: https://github.com/bwsw/shared-ctypes-numpy-posix-ipc-python.
Though standard Python tools do not yet support implementing the interaction of unrelated processes using POSIX IPC or SystemV IPC, with the use of third-party libraries such communication is possible. Python developers are making efforts to include tools for organizing such interaction in the standard toolkit, which let us hope for full support for such IPC mechanisms in the future.