0
votes

Problematic

I have a PIL Image and i want to convert it to a bytes array. I can't save the image on my hard disk so i can't use the default open(file_path, 'rb') function.

What i tried

To overturn this problem i'm trying to use the io library doing this :

buf = io.BytesIO()
image.save(buf, format='JPEG')
b_image = buf.getvalue()

Considering image as a functional PIL Image.

the "b_image" will be used as argument for the Microsoft Azure cognitives services function read_in_stream()

If we look in the documentation, we can see that this function image argument have to be :

image xref:Generator

Required

An image stream.

Documentation available here

The issue

When i execute it i got the error :

File "C:...\envs\trainer\lib\site-packages\msrest\service_client.py", line 137, in stream_upload chunk = data.read(self.config.connection.data_block_size)

AttributeError: 'bytes' object has no attribute 'read'

There is no error in the client authentification or at another point because when i give as parameter an image imported with this line :

image = open("./1.jpg", 'rb')

Everything is working correctly..

Sources

I also saw this post that explains exactly what i want to do but in my case it's not working. Any idea would be appreciated.

1

1 Answers

2
votes

When we use the method read_in_stream, we need to provide a stream. But the code BytesIO.getvalue will return the content of the stream as string or bytes. So please update code as below

buf = io.BytesIO()
image.save(buf, format='JPEG')
computervision_client.read_in_stream(buf)

For more details, please refer to here


Update

Regarding the issue, I suggest you use rest API to implement your need.

import io
import requests
from PIL import Image
import time
url = "{your endpoint}/vision/v3.1/read/analyze"
key = ''
headers = {
    'Ocp-Apim-Subscription-Key': key,
    'Content-Type': 'application/octet-stream'
}

// process image
...
with io.BytesIO() as buf:
    im.save(buf, 'jpeg')
    response = requests.request(
        "POST", url, headers=headers, data=buf.getvalue())
    
    # get result

    while True:
        res = requests.request(
            "GET", response.headers['Operation-Location'], headers=headers)
        status = res.json()['status']
        if status == 'succeeded':
            print(res.json()['analyzeResult'])
            break
        time.sleep(1)

enter image description here