2
votes

I'm facing with a task: using zmq socket to send and receive base64 string (which generated from 800x600 images). Currently, I'm using pub/sub connection to perform this task. But look like the message is large so that the socket can't transfer it immediately, and the later messages stuck in network buffer. Although I don't want to lose so many messages, I must restrict the HWM value so that the socket works properly. So I have some questions:

  1. Is there another effective library/way to perform my task? Or should I use other connection type that zmq provides (router/dealer and request/reply)?
  2. To transfer image (that processed by OpenCV), is there an approach I can use to minimize the size of the sending image, except converting into base64 format?
  3. If I must continue using zmq pub/sub connection, how can I limit the time for storing old messages, not the number of them, like that for 3 minutes?

Here my python code for the socket:

Publisher

import numpy as np
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.setsockopt(zmq.SNDHWM, 1)
footage_socket.connect(<tcp address>)

def send_func(frame, camera_link):
   height, width, _ = frame.shape
   frame = np.ascontiguousarray(frame)
   base64_img = base64.b64encode(frame)
   data = {"camera_link":camera_link,'base64_img':base64_img, "img_width":width, "img_height":height}
   footage_socket.send_json(data)

Subcriber

footage_socket = context.socket(zmq.SUB)
footage_socket.bind(<tcp address>)
footage_socket.setsockopt(zmq.RCVHWM, 1)
def rcv_func(): 
    while True:
        print("run socket")
        try:
            framex = footage_socket.recv_string()
            data = json.loads(framex)
            frame = data['base64_img']
            img = np.frombuffer(base64.b64decode(frame), np.uint8)
            img = img.reshape(int(frame_height), int(frame_width), 3)

        except Exception as e:
            print(e)
1
Why do you need base64? Why not just bytes? docs.scipy.org/doc/numpy/reference/generated/…Joe
Sorry, for recently, we have tried only this method. I will try your suggestionnOpe
Ok, let me know how it goes.Joe
I am not sure how important the meta data is for you (height, width, etc). You could also try to pickle the numpy array.Joe
I saw that if I use python function sys.getsizeof() with numpy array, it returns a value less than with base64 string. But I don't sure how the size of message affects the speed of both sending and receiving fieldnOpe

1 Answers

1
votes

Before we start, let me take a few notes:

- avoid re-packing data into JSON, if it were just for the ease of coding. JSON-re-serialised data "grow"-in size, without delivering you a single value-added for ultra-fast & resources-efficient stream-processing. Professional systems "resort" to JSON-format only if they have plenty of time and almost unlimited spare CPU-processing power, they waste into re-packing the valuable data into a just another box-of-data-inside-another-box-of-data. Where feasible, they can pay all the costs and inefficiencies - here, you will result in getting nothing in exchange to the spent CPU-clocks, more than doubled the RAM-needed to re-pack itself and also having to transport even larger data

- review, if camera indeed provides image-data that "deserve" to become 8-Byte / 64-bit "deep", if not, you have the first remarkable image-data reduction free-of-chage

Using sys.getsizeof() may surprise you:

>>> aa = np.ones( 1000 )
>>> sys.getsizeof(  aa )
8096 <---------------------------- 8096 [B] object here "contains"
>>> (lambda array2TEST: array2TEST.itemsize * array2TEST.size )( aa )
8000 <---------------------------- 8000 [B] of data
>>> bb = aa.view()     # a similar effect happen in smart VECTORISED computing
>>> sys.getsizeof( bb )
96 <------------------------------   96 [B] object here "contains"
>>> (lambda array2TEST: array2TEST.itemsize * array2TEST.size )( bb )
8000 <---------------------------- 8000 [B] of data
>>> bb.flags
  C_CONTIGUOUS    : True
  F_CONTIGUOUS    : True
  OWNDATA         : False <-------------------------------||||||||||
  WRITEABLE       : True
  ALIGNED         : True
  WRITEBACKIFCOPY : False
  UPDATEIFCOPY    : False
  >>> bb.dtype
  dtype('float64') <--------------    8 [B] per image-pixel even for {1|0} B/W

Q : is there an approach I can use to minimize the size of the sending image...?

Yes, there have been already spent millions of [man * years] of R&D, dedicated to solving this problem, and still evolving the best of the class methods for doing it.

The best results, as anyone may have already expected on one's own, are needed for extremely corner-cases - for a satellite imagery transport from away, far in a deep space, back home - like when JAXA was on it's second asteroid rendezvous mission, this time visiting the Ryugu asteroid.

Your as-is code produces 800x600-image-frames at so far unspecified fps-rate and color-depth. A brief view shows, how much data that can easily generate, within those said -3-minutes-, if the process is not handled with more attention and a due care:

>>> (lambda a2T: a2T.itemsize * a2T.size )( np.ones( ( 800, 600, 3 ) ) ) / 1E6
11.52 <---- each 800x600-RGB-FRAME handled/processed this way takes ~ 11.5 [MB]
                    @~30 fps                                        ~345.6 [MB/s]
                                                                    ~ 62.2 [GB/3min]

Solution? Take the inspiration from The Best in the Class know-how :

There you have limited power ( both the energy-wise and the processing-wise - do not forget, the CPU-s "inside" this satellite were already manufactured more than some 5 - 7 years ago, before the Project launch - no one serious will dare to send a mission with bright and hot new, but unproven, COTS chips ), limited RAM ( again, the power plus weight limits, as the amount of the fuel needed to liftoff and fly "there" grows with every single gram of "The Useful Payload" ) and the last but not least - the most limiting factor - you have very limited means of R/F-COMMs - a so "loooooooong"-wire ( it takes almost half a day, to get a first bit from "there" back "here" + the same, if you try to ACK/NACK from "here" answering any remote-request or requesting a re-send after an error was detected ). The current DSN effective-telemetry data transport-speeds are about 6.4 ~ 9.6 kbps ( yes, not more than about 7000 bits/sec )

Here, the brightest minds have put all the art of the human intellect, into making this happen:

  • ultimate means of image compression - never send a bit unless it is indeed vital & necessary
  • ultimate means of transcoded-image data error self-correction added - if anything is worth adding, the error-detection is not ( you will have to wait for almost a day, to get it "re-transmited" again, hopefully without another error there ). Here we need a means of ( limited - see the costs of sending a single bit above, so this has to be very economic add-on ) self-correction, which can indeed repair some limited-scope of signal/data-transport errors, that may appear and do appear during the R/F-COMMs signal traveling from deep space back home. On larger errors, you have to wait a few days to get a re-scheduled image-data error recovery solved by another try to send a larger pack, that was not recoverable from the "damaged"-data by the capabilities engineered into the built-in error self-correction.

Where to start from?

If your use-case does not have the remarkable amount of the "spare" CPU-power available ( it is indeed needed to have pretty enough "free" CPU+RAM-resources to perform any such advanced image-data trans-coding & error-recovery re-processing, both in scale ( volume of additional data for the trans-coding and re-processing - both of which come at large sizes - orders of magnitude larger than a size of a single image-frame ) and in time ( speed of the additional CPU-processing ) ) there is no magic trick to get the ultimate image-data compression and your story ends here.

If your use-case can spin up more CPU-power, your next enemy is the time. Both the time to design a clever-enough image-processing and the time to process each image-frame, using your engineered image-data trans-coding, within a reasonably short amount of time, before sending over to the recipient end. The former is manageable by your Project-resources ( by finance - to get the right skilled engineers on board, and by people who execute (do) the actual design & engineering phase ). The latter is not manageable, it depends on your Project's needs - how fast ( fps ) and bearing what latency ( how late, in accumulated [ms] of delays ) your Project can still survive to perform the intended function.

  • python is an easy prototyping eco-system, once you need to boost the throughput ( ref. above ), this most probably ( 30+ years of experience make me awfully well confident in saying this - even if you pull in add-on steroids, like moving into cython + C-extensions for doing the whole circus indeed a bit, but only a bit faster, at an immense add-on cost of having to ( acquire new skill if not already on board - having an expensive learning curve duration and grows in salaries for those well-skilled ) re-engineer and re-factor your so far well-prototyped code-base ) will be the first blocker of the show going on

  • OpenCV can and will provide you some elementary image-manipulation tools to start from

  • image-data trans-coding and ordinary or ultimate data-compression have to follow, to reduce the data-size

  • ZeroMQ is the least problematic part - both performance-wise scalable and having unique low-latency throughput capabilities. Without any details, one may forget about the PUB/SUB, unless you keep prevented and avoided any subscription-list processing at all ( the costs of doing this would cause immense side-effects on the { central-node | network-dataflows + all remote-nodes }-overloads, having no practical effect for the intended fast and right-sized image-data pipeline-processing.


Q : If I must continue using zmq pub/sub connection, how can I limit the time for storing old messages, not the number of them, like that for 3 minutes?

ZeroMQ is a smart tool, yet one has to understand it's powers - ZeroCopy will help you in keeping low-RAM-profile in production, yet if you plan to store -3-minutes of image-data streaming, you will need both immense loads of RAM and CPU-power and it all also heavily depends on the actual amount of .recv()-ing peers.

ZeroMQ is a broker-less system, so you do not actually "store" messages, but the .send()-method just tells the ZeroMQ infrastructure, that the provided data are free-to-get-sent, whenever ZeroMQ infrastructure is seeing a chance to dispatch 'em to the designated peer-recipient ( be it locally or over the Atlantic or over the satellite-connection ). This means, the proper ZeroMQ configuration is a must, if you plan to have the sending/receiving-side's ready to enqueue / transmit / receive / dequeue ~3-minutes of even the most compressed image-data stream(s), potentially providing multiples of that, in case 1:many-party communication appears in production.

Proper analysis and sound design decisions are the only chance for your Project to survive all these requirements, given the CPU, RAM and transport-means are a-priori known to be limited.