Home>

As shown below, I am processing websocket data using asyncio with django's custom command.
The original source code is extremely long, so I have omitted it.

When I followed it in debugging, I was able to get back to method3, but it stopped at return out_layer of method2.
Can anyone give me some advice?

(When using asyncio, the error message cannot be displayed and it is difficult to determine why it is stopped.)

What I want to do is start receiving method2 and a new message at the same time after save_data is done.

class Command (BaseCommand):
    def handle (self, * args, ** kwargs):
            "" "Start loop" ""
            loop = asyncio.get_event_loop ()
            loop.create_task (self.stream ())
            print ('start receiving')
            try: try:
                loop.run_forever ()
            except KeyboardInterrupt:
                exit ()
    async def stream (self):
        uri ='ws: // localhost: 1234 /'
        async with websockets.connect (uri, ping_interval = None) as ws:
            while not ws.closed:
                response = await ws.recv ()
                content = json.loads (response)
                task = loop.create_task (self.save_pushdata (content, sid))
                task.add_done_callback (self.finish)
    async def save_pushdata (self, content, sid):
        loop = asyncio.get_event_loop ()
        newData = list (
            symbol = content ['****'],
        )
        newData.save ()
        task = loop.create_task (self.method2 (content, sid))
        task.add_done_callback (self.finish)
        save_pushdata_sid = str (sid) +'save_pushdata'
        return save_pushdata_sid
    async def method2 (self, content):
        args1 = self.method4 (content)
        args2 = self.method5 (content)
        self.method3 (args1, args2)
        method2_sid = str (sid) +'method2'
        return method2_sid
    def method3 (self, content):
        print ('123')
        return content
    def method4 (self, content):
        newData = list (
            symbol = content ['****'],
        )
        newData.save ()
        return content ['****']
    def method5 (self, content):
        newData = list (
            symbol = content ['****'],
        )
        newData.save ()
        return content ['****']
    def finish (self, text):
        logging.info ('% s', text.result ())
  def handle (self, * args, ** kwargs):
            "" "Start loop" ""
            loop = asyncio.get_event_loop ()
            loop.create_task (self.stream ())
            print ('start receiving')
            try: try:
                loop.run_forever ()
            except KeyboardInterrupt:
                exit ()
    async def stream (self):
        uri ='ws: // localhost: 1234 /'
        async with websockets.connect (uri, ping_interval = None) as ws:
            while not ws.closed:
                response = await ws.recv ()
                content = json.loads (response)
                executor = concurrent.futures.ProcessPoolExecutor ()
                queue = asyncio.Queue ()
                for i in range (10):
                    queue.put_nowait (i)
                async def proc (q):
                    while not q.empty ():
                        i = await q.get ()
                        future = loop.run_in_executor (executor, save_pushdata (content, sid), i)
                        await future
                tasks = [proc (queue) for i in range (4)] # 4 = number of cpu core
                return await asyncio.wait (tasks)
def save_pushdata (self, content, sid):
        loop = asyncio.get_event_loop ()
        newData = list (
            symbol = content ['****'],
        )
        newData.save ()
        self.method2 (content, sid)
  • Answer # 1

    When using asyncio, the error message cannot be displayed and it is difficult to determine why it is stopped.

    asyncio itself does not interfere with the error message.
    Isn't it a problem with the settings or execution environment on the django side?

    I don't know about django's custom commands, but there are a number of issues with the code, so
    I think you should aim for the environment setting that can get the error message first.

    Addendum: As a regular script that doesn't depend on django, not a custom command
    Can I get an error message by implementing and executing similar code?

    (Omit the model related code and check the operation only for asyncio/websockets)


    Undefined method: save_data->save_pushdata?

      await self.save_pushdata (content)

    The built-in list has no keyword arguments or save () methods.

    newData = list (
                symbol = content ['****'],
            )
    newData.save ()

    Want to call save () on model?


    async def method2 (self, content):
        ...
        return out_layer

    Coroutines require at least one await.
    Depending on the content, I think it is better to consider whether it is necessary to make it a coroutine.

    method4, method5 are undefined

    The arguments of method3 do not match, etc.