There is a websocket channel, audio recording chunks arrive in it. You need to glue them together, convert them to a file, do various operations, etc. The result must be sent to Google. The result from Google is to send back to the front.

I think it's all divided into two services and done in parallel. Didn't work with threads /processes before. Therefore, I ask you to suggest.

I see something like the following.

  1. create the first service in which various operations with chunks and a file will be performed. Put the result in the queue.

  2. create a second service in which data from the queue will be retrieved, sent to Google and the result will be sent to the front.

Example of a consumer websocket:

class ConnectionManager:
    def __init __ (self):
        self.active_connections: List [WebSocket]= []
        self.words_list: Dict [str, dict]= {}
    async def connect (self, websocket: WebSocket):
        await websocket.accept ()
        if websocket not in self.active_connections:
            self.active_connections.append (websocket)
        if str (websocket.url) not in self.words_list:
            self.words_list [str (websocket.url)]= {"chunks": "", "text": ""}
    async def disconnect (self, websocket: WebSocket):
        await websocket.send_text (self.words_list [str (websocket.url)] ["text"])
        self.active_connections.remove (websocket)
    async def broadcast (self, message: str, websocket: WebSocket):
        new_chunks= merge_base64 (
            self.words_list [str (websocket.url)] ["chunks"], message
        text= recognition_voice_data (new_chunks)
        if not text or self.words_list [str (websocket.url)] ["text"]. endswith (
            for connection in self.active_connections:
                if connection.url== websocket.url:
                    await connection.send_text (text)
            self.words_list [str (websocket.url)] ["text"]= ""
            self.words_list [str (websocket.url)] ["chunks"]= ""
            self.words_list [str (websocket.url)] ["text"] += text
            self.words_list [str (websocket.url)] ["chunks"]= new_chunks


  1. How to organize all this? What do you need, processes, or threads? Googling, I came to the conclusion that I need threads. Is it so?

  2. How will the second service understand that new data has appeared in the queue?

I think to use Python as a queuequeue

Possible duplicate question: Difference between threading and multiprocessing modules

icYFTL2021-02-23 14:52:32

@icYFTL yes, apparently, streams are needed, as I originally thought. But I don’t understand how to organize all this. How to make the second thread understand that new data has appeared in the queue?

Dima2021-02-23 14:57:26

"But I don’t understand how I can organize all this. How can I make the second thread understand that new data has appeared in the queue?" -check through if if there is a job in the queue (qsize). And in order not to overload threads with ifs, you can use the wait method.

Volkodaff2021-02-23 16:31:30

suggesting in the second thread if queue.size () in a while True loop? Isn't it a crutch? What do you mean by wait?

Dima2021-02-23 16:42:49

they say event= Event (); thread= Thread (); thread (event); event.wait ()?

Dima2021-02-23 16:45:27