Video stream copying to a ring buffer of files with fixed duration using OpenCV, Python, and ZeroMQ

The problem described in this article is often met in intelligent video analytics solutions. In general, a user wants to get an access to a fragment that contains some identified events. A fragment is a set of one or more small video files that contain both the event itself and the history and development of the situation associated with it.

It is convenient to solve this task with a ring buffer of video fragments, presented by small files, for example, for 30 seconds. So, when the application detects that some of them contain important signals, it copies the files that include the signal from the ring buffer into the permanent storage. The buffer is a ring because old files are deleted from the disk (for example, after 10 minutes have passed), so the buffer always takes a fixed amount of space on the storage.

You will learn how to develop such a ring buffer, which connects to the main video processing pipeline and manages the creation and deletion of video files that form the buffer. To solve the problem OpenCV, Python, LZ4, and ZeroMQ will be used. For simplicity, we assume that the FPS for video files is the same as FPS of a stream, that is, all video frames from the stream are written to files. In real implementations, removal of redundant frames from a stream, a change in resolution, etc., may take place.

The architecture of the solution is presented below:

Within the shown architecture, there is the main processing pipeline, which is represented by a chain Decode → Pub → Process → DB which performs analytical operations and can itself be represented by a distributed processing graph. To save video fragments, an additional processing pipeline is used, which is represented by the Sub → Encode → Files chain. The frame is transferred from the main pipeline to the additional pipeline by the ZeroMQ Pub/Sub. This mechanism is ideal for solving this problem because:

  • the main pipeline will not block even if the additional (ring buffer) pipeline is not launched or broken (the property of Pub/Sub);
  • several additional pipelines can be used mutually in parallel – e.g. the first just saves video as is, the second converts FPS and the resolution;
    • ZeroMQ can be deployed over several different transport backends – unicast or multicast network sockets, Unix sockets, which allows implementing of the distributed data processing easily, without the need for additional software and sophisticated network programming.

Since the transmitted frame takes up a significant amount of memory, the code implements LZ4 frame compression which reduces the network link requirement and enables the transfer of 2K video through the 1Gbit/s channel. Let’s look at the implementation in Python.

The Main Pipeline

A basic implementation of the main pipeline is as follows:

import cv2
import zmq
import lz4.frame
import pickle
from time import time

cap = cv2.VideoCapture(0)

context = zmq.Context()
dst = context.socket(zmq.PUB)
dst.bind("tcp://127.0.0.1:5557")

frameno = 0
COMPRESSED = True

while True:
    ret, frame = cap.read()
    ts = time()
    frameno += 1
    if COMPRESSED:
        dst.send(lz4.frame.compress(pickle.dumps(frame)))
    else:
        dst.send_pyobj(frame)
    dst.send_pyobj(dict(ts=ts, frameno=frameno))

This pipeline is not blocked, even in the absence of subscribers, which is convenient.

An Additional Pipeline

This pipeline receives messages from the ZeroMQ PUB/SUB and manages a ring buffer:

import zmq
import cv2
from time import time
import os
import glob
import lz4.frame
import pickle


class SplitWriter:
    def __init__(self, split_size=30,
                 pub_address="tcp://127.0.0.1:5557",
                 directory='/tmp',
                 split_history=2,
                 split_prefix='split',
                 compressed=True,
                 fps=30):
        self.pub_address = pub_address
        self.split_size = split_size
        self.directory = directory
        self.split_history = split_history
        self.fps = fps
        self.compressed = compressed
        self.split_prefix = split_prefix

        self.zmq_context = zmq.Context()
        self.src = self.zmq_context.socket(zmq.SUB)
        self.src.connect(self.pub_address)
        self.src.setsockopt_string(zmq.SUBSCRIBE, "")

        self.current_split = 0
        self.new_split = 0
        self.writer = None
        self.last_frame_delay = 0
        self.remote_frameno = 0
        self.frameno = 0

    def _gen_split_name(self):
        return os.path.join(self.directory, self.split_prefix + '.%d.%d.avi' % (self.current_split, self.split_size))

    def _start_new_split(self, frame):
        self.current_split = int(time())
        self.new_split = self.current_split + self.split_size
        if self.writer:
            self.writer.release()
        self.writer = cv2.VideoWriter(self._gen_split_name(),
                                      cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'),
                                      self.fps, (frame.shape[1], frame.shape[0]))
        print("++", self._gen_split_name(),
              "Last_Frame_Delay", self.last_frame_delay,
              "Frame_Delta", self.remote_frameno - self.frameno)
        self._clear_old_splits()

    def write(self):
        if self.compressed:
            frame = pickle.loads(lz4.frame.decompress(self.src.recv()))
        else:
            frame = self.src.recv_pyobj()
        meta = self.src.recv_pyobj()
        now = time()
        self.frameno += 1
        self.remote_frameno = meta['frameno']
        self.last_frame_delay = int((now - meta['ts']) * 1000)
        if now > self.new_split:
            self._start_new_split(frame)
        self.writer.write(frame)

    def _clear_old_splits(self):
        for f in glob.glob(os.path.join(self.directory, self.split_prefix + '.*.*.avi')):
            parts = f.split('.')
            ts = int(parts[-3])
            if ts < time() - self.split_size * self.split_history:
                print("--", f)
                os.unlink(f)

    def release(self):
        self.writer.release()
        self.src.close()
        self.zmq_context.destroy()


if __name__ == "__main__":
    w = SplitWriter(split_history=3, split_size=5)
    while True:
        w.write()

ZeroMQ handles exceptions on PUB / SUB on its own, so both pipelines can be either started or restarted independently of each other. For the convenience of monitoring, when adding a new segment, the lag between the main pipeline and the ring buffer pipeline calculated in the number of frames and milliseconds is displayed.

File names are generated in the notation PREFIX.ID.SPLITSIZE.avi, where PREFIX – arbitrary string, ID – UNIX timestamp, а SPLITSIZE – segment size in seconds.

To build and run the code the following dependencies are required in requirements.txt:

opencv-python
pyzmq
lz4
numpy

If you like the post, don’t forget to share it with the friends.