As soon as we receive PUSH from websocket, we will consult with you when storing it in DB (SQLite3).

0 Current situation
As soon as I receive the PUSH, I try to "correspond to the next PUSH without firing the DB storage process * and waiting for the result", but if the processes are concentrated during peak hours, the processes will inevitably accumulate.
* 1 Processing is about 0.01 to 0.03 sec, but there are cases where it waits for 90 sec during peak hours.

1 I want to achieve
I want to finish the processing as soon as possible with multithread (?) Because there is enough free CPU resources * even while the processing is accumulated, and the processing order does not matter and the result is not waited for.
* Ryzen3900x environment, 12 cores and 24 threads

I don't think it's that outlandish, but I can't find a solution.
I would appreciate it if you could teach me. If i give me the sample code, I will tear.

2 Remarks
-Currently, fire and forget is realized by twisted x crochet by imitating what you see.
・ If possible, we will not stick to twisted. You can do it quickly with asyncio x multiprocessing! It would be very helpful if there were such things.

  • Answer # 1

    Twisted + crochet is a pretty astringent choice. If you are dealing with Websocket, websockets seems to be good. PythonmultiprocessingIt's tough, so firstthreadingIt's a good idea to use (although it's still a pain). By the way, asynchronous processing and multithreadingNot directly relatedSo please be careful. As an image, he sets up a thread on the Websocket server and a thread to write to the DB, and has a queue in between, and he throws data from the Websocket thread into the queue more and more, and the DB server just writes. If you write it down properly, it will look like this.

    import asyncio
    import websockets
    import threading
    import sql # suitable guy
    import queue
    import functools
    import time
    async def echo (websocket, path, q):
        async for message in websocket:
            await q.put (message)
    def serve_websocket (q):
        handler = functools.partial (push_and_echo, q = q) #Create a function that injects (called apply) a queue and DB
        server = websockets.serve (handler, "localhost", 8765)
        asyncio.get_event_loop (). run_until_complete (server)
        asyncio.get_event_loop (). run_forever ()
    def push (q, conn):
        #I'm buffering, but modern libraries may or may not be smart
        buffer = []
        prev = time.time ()
        MAX_BUF = 10
        TIMEOUT_SEC = 0.5
        While True:
            now = time.time ()
            duration = now --prev
            if len (buffer)<MAX_BUF&&duration<TIMEOUT_SEC:
                buffer.append (q.get ())
            else: else:
                conn.inserts (buffer)
                buffer = []
            prev = now
    if __name__ =='__main__':
        db = sql.conn ('/tmp/sqlite3.db')
        q = queue.Queue ()
        If you set # daemon = True, the child thread will die when the parent thread (= main thread) dies, so it's easy.
        t_server = threading.Thread (target = serve_websocket, args = (q,), daemon = True)
        t_push = threading.Thread (target = push, args = (q, db), daemon = True)
        t_server.start ()
        t_push.start ()
        While True:
            time.sleep (1)

    It's almost inspired, but it's almost like this. Well, you don't have to worry about the buffer part if you use today's nifty DB or ORM.echoIt's okay to throw it directly from the function, so you might want to review that.