Home>

I am creating an asynchronous process to receive messages using websockets and asyncio.
I'm thinking of a program that connects to the web after receiving a single message and processing it to a certain extent.

The question I would like to discuss this time is whether it is possible to start processing the next message while receiving and processing the previous processed message.

In terms of client processing, I want to display the message "immediately after receiving" before the message of the end of wait of the event method is displayed.

12/27 postscript
I changed it to a code that uses gather.
Method 1 and method 2 start at the same time, but for the received message, if methods 1 and 2 do not finish, the next message will not start.
I would like to go to receive the next message after only method 1 is completed. Is this possible?

import asyncio
import json
import websockets
import urllib.request
import urllib.error
import datetime
# ---

    async with websockets.connect (uri, ping_timeout = None) as ws:
        while not ws.closed:
            response = await ws.recv ()
            board = json.loads (response)
            await asyncio.gather (
                event (board),
                get_url ()
            )
async def event (board):
    print ('Wait 5 seconds from now' + str (board))
    await asyncio.sleep (5)
    print ('wait end')
    print (datetime.datetime.now ())

async def get_url ():
    req = urllib.request.Request (url)
    with urllib.request.urlopen (req) as res:
        print (res.read ())
loop = asyncio.get_event_loop ()
loop.create_task (stream ())
try: try:
    loop.run_forever ()
except KeyboardInterrupt:
    exit ()
#!/usr/bin/env python
# WS server that sends messages at random intervals
import asyncio
import datetime
import random
import websockets
import json

async def time (websocket, path):
    while True:
        now = datetime.datetime.utcnow (). isoformat () + "Z"
        char ='longlongstr'
        content = {'message': char,'time': str (now)}
        j_content = json.dumps (content)
        await websocket.send (j_content)
        await asyncio.sleep (random.random () * 3)
start_server = websockets.serve (time, "127.0.0.1", 5678)
asyncio.get_event_loop (). run_until_complete (start_server)
asyncio.get_event_loop (). run_forever ()



Wait 5 seconds from now {'message':'longlongstr','time': '2020-12-26T23: 43: 58.316069Z'}
b' end of wait
2020-12-27 08: 44: 03.311604
Wait 5 seconds from now {'message':'longlongstr','time': '2020-12-26T23: 44: 00.082327Z'}
b' end of wait
2020-12-27 08: 44: 08.321386
Wait 5 seconds from now {'message':'longlongstr','time': '2020-12-26T23: 44: 02.372114Z'}
b'

  • Answer # 1

    The text and code of the question was not easy to understand, so I rewrote client.py and server.py so that the process flow can be understood.
    As stated in the question"After performing some processing (sleep for 5 seconds in this case), receive the next message and perform" some processing "in the same way. However, download sequentially behind the scenes."It seems that the unit operation is done in parallel.

    server.py

    #!/usr/bin/env python
    # WS server that sends messages at random intervals
    import asyncio
    import datetime
    import random
    import websockets
    import json
    async def time (websocket, path):
        count = 0
        while True:
            now = datetime.datetime.utcnow (). isoformat () + "Z"
            cid = f "{count:>6}"
            char ='longlongstr'
            content = {'id': cid,'message': char,'time': str (now)}
            j_content = json.dumps (content)
            count + = 1
            await websocket.send (j_content)
            await asyncio.sleep (random.random () * 3)
    start_server = websockets.serve (time, "127.0.0.1", 5678)
    asyncio.get_event_loop (). run_until_complete (start_server)
    asyncio.get_event_loop (). run_forever ()

    client.py

    # https://stackoverflow.com/questions/312558#reply-436434
    import asyncio
    import json
    import websockets
    import urllib.request
    import urllib.error
    import datetime
    import asyncio
    import datetime
    # ---
    def print_t (text):
        print (f "{datetime.datetime.now (). strftime ('% Y-% m-% d% H:% M:% S')} {text}")
    def finish (board):
        print_t (f "[TASKID: {board.result ()}] Task finished")
    async def stream ():
        uri ='ws: //127.0.0.1:5678/'
        async with websockets.connect (uri, ping_timeout = None) as ws:
            while not ws.closed:
                response = await ws.recv ()
                board = json.loads (response)
                await event (board)
    async def event (board):
        loop = asyncio.get_event_loop ()
        print_t (f "[TASKID: {board.get ('id')}] Start" some processing "of the received message (sleep for 5 seconds as a dummy) {str (board)}")
        await asyncio.sleep (5)
        print_t (f "[TASKID: {board.get ('id')}]" Something "has been completed. ("Wait end ") Then the download will start.")
        task = loop.create_task (get_url (board.get ('id')))
        task.add_done_callback (finish)
    async def get_url (cid):
        url ='http://www.example.com/'
        req = urllib.request.Request (url)
        with urllib.request.urlopen (req) as res:
            print_t (f "[TASKID: {cid}] Download complete: First 20 characters {res.read () [: 20]}")
        return cid
    loop = asyncio.get_event_loop ()
    loop.create_task (stream ())
    try: try:
        loop.run_forever ()
    except KeyboardInterrupt:
        exit ()

    Each task is processed in order (almost) "every 5 seconds", but the download process corresponding to the same TASKID is started immediately after "some process" for the message is completed, and the download process is done behind the scenes. You can see that there is.
    (It is difficult to understand because the download time is short)

  • Answer # 2

    You can do this with asyncio.gather.

    async def stream ():
        uri ='ws: //127.0.0.1:5678/'
        async with websockets.connect (uri, ping_timeout = None) as ws:
            while not ws.closed:
                response = await ws.recv ()
                board = json.loads (response)
                await asycio.gather (
                    event (board),
                    get_url ()
                )

    By the way, please note that parallel processing and parallel processing are different things.