As soon as we receive PUSH from websocket, we will consult with you when storing it in DB (SQLite3).
0 Current situation
As soon as I receive the PUSH, I try to "correspond to the next PUSH without firing the DB storage process * and waiting for the result", but if the processes are concentrated during peak hours, the processes will inevitably accumulate.
* 1 Processing is about 0.01 to 0.03 sec, but there are cases where it waits for 90 sec during peak hours.
1 I want to achieve
I want to finish the processing as soon as possible with multithread (?) Because there is enough free CPU resources * even while the processing is accumulated, and the processing order does not matter and the result is not waited for.
* Ryzen3900x environment, 12 cores and 24 threads
I don't think it's that outlandish, but I can't find a solution.
I would appreciate it if you could teach me. If i give me the sample code, I will tear.
-Currently, fire and forget is realized by twisted x crochet by imitating what you see.
・ If possible, we will not stick to twisted. You can do it quickly with asyncio x multiprocessing! It would be very helpful if there were such things.
Answer # 1
Twisted + crochet is a pretty astringent choice. If you are dealing with Websocket, websockets seems to be good. Python
multiprocessingIt's tough, so first
threadingIt's a good idea to use (although it's still a pain). By the way, asynchronous processing and multithreadingNot directly relatedSo please be careful. As an image, he sets up a thread on the Websocket server and a thread to write to the DB, and has a queue in between, and he throws data from the Websocket thread into the queue more and more, and the DB server just writes. If you write it down properly, it will look like this.
import asyncio import websockets import threading import sql # suitable guy import queue import functools import time async def echo (websocket, path, q): async for message in websocket: await q.put (message) def serve_websocket (q): handler = functools.partial (push_and_echo, q = q) #Create a function that injects (called apply) a queue and DB server = websockets.serve (handler, "localhost", 8765) asyncio.get_event_loop (). run_until_complete (server) asyncio.get_event_loop (). run_forever () def push (q, conn): #I'm buffering, but modern libraries may or may not be smart buffer =  prev = time.time () MAX_BUF = 10 TIMEOUT_SEC = 0.5 While True: now = time.time () duration = now --prev if len (buffer)<MAX_BUF&&duration<TIMEOUT_SEC: buffer.append (q.get ()) else: else: conn.inserts (buffer) buffer =  prev = now if __name__ =='__main__': db = sql.conn ('/tmp/sqlite3.db') q = queue.Queue () If you set # daemon = True, the child thread will die when the parent thread (= main thread) dies, so it's easy. t_server = threading.Thread (target = serve_websocket, args = (q,), daemon = True) t_push = threading.Thread (target = push, args = (q, db), daemon = True) t_server.start () t_push.start () While True: time.sleep (1)
It's almost inspired, but it's almost like this. Well, you don't have to worry about the buffer part if you use today's nifty DB or ORM.
echoIt's okay to throw it directly from the function, so you might want to review that.
- python 3x - i'm not sure about asynchronous processing (asyncio)
- python - processing when the result of adding the contents of the list of numbers becomes a specific value
- how to save the image ocr result file in python
- about for syntax i want to know the reason why the result is different depending on where the initial value is defined python
- processing using the len function when an integer value is obtained from python standard input
- python - about multiple processing and loop processing in discordpy
- parallel processing using python multiprocessingpool and multiprocessingqueue does not work well
- python:about processing such as timesleep and wxpython
- python - i want to restore the result of calculating the inner product with numpy
- python - avoid processing when duplicated
- python iterative processing num is not defend
- python - while syntax processing
- i want to output the entry result of tkinter with python
- please tell me how to write a program to end when the result is executed 5 times [python]
- python 3x - i want to divide the process when it is err as a result of command execution in python3 subprocess
- processing python dat files
- i want to speed up the processing of [python] for
- python - next day forecast result display error
- python - you may need to restart the kernel to use updated packages error
- dart - flutter: the instance member'stars' can't be accessed in an initializer error
- php - coincheck api authentication doesn't work
- php - i would like to introduce the coincheck api so that i can make payments with bitcoin on my ec site
- [php] i want to get account information using coincheck api
- the emulator process for avd pixel_2_api_29 was killed occurred when the android studio emulator was started, so i would like to
- sh - 'apt-get' is not recognized as an internal or external command, operable program or batch file
- i want to check the type of a shell script variable
- i want to call a child component method from a parent in vuejs