国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Python 的異步 IO:Asyncio 之 TCP Client

anonymoussf / 2217人閱讀

摘要:當(dāng)被調(diào)用時(shí),表示已經(jīng)斷開連接。第三版去掉第三版的目的是去掉。協(xié)程保持不變,但是已被剔除不再需要請求發(fā)送之后,繼續(xù)異步等待數(shù)據(jù)的接收,即。的作用是結(jié)束那個導(dǎo)致等待的,這樣也就可以結(jié)束了結(jié)束,以便結(jié)束。

關(guān)于 Asyncio 的其他文章:

Python 的異步 IO:Asyncio 簡介

Python 的異步 IO:Aiohttp Client 代碼分析

如果不知道 Asyncio 是什么,先看「Asyncio 簡介」那一篇。

一個簡單的 HTTP Server

首先,為了便于測試,我們用 Python 內(nèi)建的 http 模塊,運(yùn)行一個簡單的 HTTP Server。

新建一個目錄,添加文件 index.html,內(nèi)容為 Hello, World!(不是合法的 HTML 格式也沒有關(guān)系),然后運(yùn)行如下命令(Ubuntu 請用 python3):

$ python -m http.server
Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

后面不同的 Client 實(shí)現(xiàn),都會連接這個 Server:Host 為 localhost,Port 為 8000

所有的示例代碼,import 語句一律從略。

import asyncio
第一版

第一版改寫自 Python 官方文檔里的 例子。
Python 的例子是 Echo Client,我們稍微復(fù)雜一點(diǎn),是 HTTP Client,都是 TCP。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop

    def connection_made(self, transport):
        request = "GET / HTTP/1.1
Host: localhost

"
        transport.write(request.encode())

    def data_received(self, data):
        print(data.decode())

    def connection_lost(self, exc):
        self.loop.stop()

async def main(loop):
    await loop.create_connection(
        lambda: ClientProtocol(loop), "localhost", 8000)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_forever()

TCP 連接由 loop.create_connection() 創(chuàng)建,后者需要一個 Protocol 工廠,即 lambda: ClientProtocol(loop)
Protocol 提供了 connection_made()data_received()connection_lost() 等接口,這些接口就像回調(diào)函數(shù)一樣,會在恰當(dāng)?shù)臅r(shí)候被調(diào)用。
我們在 connection_made() 中,通過參數(shù) transport 發(fā)送一個 HTTP GET 請求,隨后在 data_received() 里,將收到 HTTP 應(yīng)答。
當(dāng) connection_lost() 被調(diào)用時(shí),表示 Server 已經(jīng)斷開連接。

運(yùn)行結(jié)果:

HTTP/1.0 200 OK
Server: SimpleHTTP/0.6 Python/3.6.3
Date: Mon, 04 Dec 2017 06:11:52 GMT
Content-type: text/html
Content-Length: 13
Last-Modified: Thu, 30 Nov 2017 05:37:31 GMT


Hello, World!

這就是一個標(biāo)準(zhǔn)的 HTTP 應(yīng)答,包含 Status Line,Headers 和 Body。

值得注意的是,loop 其實(shí)運(yùn)行了兩遍:

loop.run_until_complete(main(loop))  # 第一遍
loop.run_forever()  # 第二遍

如果沒有 run_forever(),在收到數(shù)據(jù)之前,loop 可能就結(jié)束了。協(xié)程 main() 只是創(chuàng)建好連接,隨后 run_until_complete() 自然也就無事可做而終。

加了 run_forever() 后,data_received() 等便有了被調(diào)用的機(jī)會。但是也有問題,loop 一直在跑,程序沒辦法結(jié)束,所以才在 connection_lost() 里主動停止 loop:

    def connection_lost(self, exc):
        self.loop.stop()
第二版:ClientSession

第一版在 connection_made() 中 hard code 了一個 HTTP GET 請求,靈活性較差,以后必然還有 POST 等其他 HTTP 方法需要支持,所以有必要新增一個 ClientSession 類,來抽象客戶端的會話。于是,HTTP 請求的發(fā)送,便從 connection_made() 挪到了 ClientSession.get()

ClientSession 應(yīng)該為每一個 HTTP 方法提供一個相應(yīng)的方法,比如 postput 等等,雖然我們只考慮 HTTP GET。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print(data.decode())

    def connection_lost(self, exc):
        self.loop.stop()

class ClientSession:
    def __init__(self, loop):
        self._loop = loop

    async def get(self, url, host, port):
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop), host, port)

        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        transport.write(request.encode())

首先,ClientProtocol 新增了一個屬性 transport,是在 connection_made() 時(shí)保存下來的,這樣在 ClientSession 里才能通過它來發(fā)送請求。

第三版:去掉 run_forever()

第三版的目的是:去掉 run_forever()

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None
        self._eof = False  # 有沒有收到 EOF
        self._waiter = None  # 用來等待接收數(shù)據(jù)的 future

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print(data.decode())

    def eof_received(self):
        self._eof = True
        self._wakeup_waiter()

    def connection_lost(self, exc):
        pass  # 不再調(diào)用 self.loop.stop()

    async def wait_for_data(self):
        assert not self._eof
        assert not self._waiter

        self._waiter = self.loop.create_future()
        await self._waiter
        self._waiter = None

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            waiter.set_result(None)

class ClientSession:
    def __init__(self, loop):
        self._loop = loop

    async def get(self, url, host, port):
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop), host, port)

        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        transport.write(request.encode())

        # 等待接收數(shù)據(jù)。
        await protocol.wait_for_data()

協(xié)程 main() 保持不變,但是 loop.run_forever() 已被剔除:

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# 不再需要 loop.run_forever()

HTTP 請求發(fā)送之后,繼續(xù)異步等待(await)數(shù)據(jù)的接收,即 protocol.wait_for_data()
這個等待動作,是通過往 loop 里新增一個 future 來實(shí)現(xiàn)的:

    async def wait_for_data(self):
        # ...
        self._waiter = self.loop.create_future()
        await self._waiter
        self._waiter = None

self._waiter 就是這個導(dǎo)致等待的 future,它會保證 loop 一直運(yùn)行,直到數(shù)據(jù)接收完畢。
eof_received() 被調(diào)用時(shí),數(shù)據(jù)就接收完畢了(EOF 的意思不用多說了吧?)。

    def eof_received(self):
        self._eof = True
        self._wakeup_waiter()

_wakeup_waiter() 的作用是結(jié)束那個導(dǎo)致等待的 future,這樣 loop 也就可以結(jié)束了:

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            # 結(jié)束 waiter future,以便 loop 結(jié)束。
            waiter.set_result(None)
第四版:Reader

data_received() 里直接輸出 HTTP 的應(yīng)答結(jié)果,實(shí)在算不上什么完美的做法。

    def data_received(self, data):
        print(data.decode())

為了解決這一問題,我們引入一個 Reader 類,用來緩存收到的數(shù)據(jù),并提供「讀」的接口給用戶。

首先,Protocol 被簡化了,前一版引入的各種處理,都轉(zhuǎn)交給了 Reader。

class ClientProtocol(asyncio.Protocol):
    def __init__(self, loop, reader):
        self.loop = loop
        self.transport = None
        self._reader = reader

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self._reader.feed(data)  # 轉(zhuǎn)交給 Reader

    def eof_received(self):
        self._reader.feed_eof()  # 轉(zhuǎn)交給 Reader

    def connection_lost(self, exc):
        pass

下面是 ClientSession.get() 基于 Reader 的實(shí)現(xiàn):

class ClientSession:
    async def get(self, url, host, port):
        reader = Reader(self._loop)
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop, reader), host, port)
        # 發(fā)送請求,代碼從略...
        data = await reader.read()
        print(data.decode())

Reader 本身是從上一版的 Protocol 抽取出來的,唯一不同的是,接收的數(shù)據(jù)被臨時(shí)放在了一個 bytearray 緩存里。

class Reader:
    def __init__(self, loop):
        self._loop = loop
        self._buffer = bytearray()  # 緩存
        self._eof = False
        self._waiter = None

    def feed(self, data):
        self._buffer.extend(data)
        self._wakeup_waiter()

    def feed_eof(self):
        self._eof = True
        self._wakeup_waiter()

    async def read(self):
        if not self._buffer and not self._eof:
            await self._wait_for_data()
            
        data = bytes(self._buffer)
        del self._buffer[:]
        return data

    async def _wait_for_data(self):
        assert not self._eof
        assert not self._waiter

        self._waiter = self._loop.create_future()
        await self._waiter
        self._waiter = None

    def _wakeup_waiter(self):
        waiter = self._waiter
        if waiter:
            self._waiter = None
            waiter.set_result(None)

稍微解釋一下 read(),比較重要的是開始的一句判斷:

        # 如果緩存為空,并且 EOF 還沒收到,那就(繼續(xù))等待接收數(shù)據(jù)。
        if not self._buffer and not self._eof:
            # read() 會停在這個地方,直到 feed() 或 feed_eof() 被調(diào)用,
            # 也就是說有數(shù)據(jù)可讀了。
            await self._wait_for_data()

接下來就是把緩存倒空:

        data = bytes(self._buffer)
        del self._buffer[:]

運(yùn)行一下,不難發(fā)現(xiàn),ClientSession.get() 里讀數(shù)據(jù)的那一句是有問題的。

        data = await reader.read()

收到的 data 并不是完整的 HTTP 應(yīng)答,可能只包含了 HTTP 的 Headers,而沒有 Body。

一個 HTTP 應(yīng)答,Server 端可能分多次發(fā)送過來。比如這個測試用的 Hello World Server,Headers 和 Body 就分了兩次發(fā)送,也就是說 data_received() 會被調(diào)用兩次。

之前我們在 eof_received() 里才喚醒 waiter(_wakeup_waiter()),現(xiàn)在在 data_received() 里就喚醒了,于是第一次數(shù)據(jù)收完, waiter 就結(jié)束了,loop 也便跟著結(jié)束。

為了讀到完整的 HTTP 應(yīng)答,方法也很簡單,把 read() 放在循環(huán)里:

        blocks = []
        while True:
            block = await reader.read()
            if not block:
                break
            blocks.append(block)
        data = b"".join(blocks)
        print(data.decode())

每一次 read(),如果緩存為空,并且 EOF 還沒收到的話,就會再次創(chuàng)建 waiter,放到 loop 里,繼續(xù)等待接收數(shù)據(jù)。

這個循環(huán)顯然應(yīng)該交給 Reader 處理,對 ClientSession 需保持透明。

class Reader:
    async def read(self):
        blocks = []
        while True:
            block = await self._read()
            if not block:
                break
            blocks.append(block)
        data = b"".join(blocks)
        return data

    async def _read(self):
        if not self._buffer and not self._eof:
            await self._wait_for_data()
            
        data = bytes(self._buffer)
        del self._buffer[:]
        return data

最后,原來的 read() 重命名為 _read(),新的 read() 在循環(huán)中反復(fù)調(diào)用 _read(),直到無數(shù)據(jù)可讀。ClientSession 這邊直接調(diào)用新的 read() 即可。

第五版:Writer

到目前為止,發(fā)送 HTTP 請求時(shí),都是直接調(diào)用較為底層的 transport.write()

    async def get(self, url, host, port):
        # ...
        transport.write(request.encode())

可以把它封裝在 Writer 中,與 Reader 的做法類似,但是 Writer 要簡單得多:

class Writer:
    def __init__(self, transport):
        self._transport = transport

    def write(self, data):
        self._transport.write(data)

然后在 ClientSession.get() 中創(chuàng)建 Writer

    async def get(self, url, host, port):
        reader = Reader(self._loop)
        transport, protocol = await self._loop.create_connection(
            lambda: ClientProtocol(loop, reader), host, port)

        writer = Writer(transport)
        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        writer.write(request.encode())
        # ...

ClientSession 來說,只需知道 ReaderWriter 就足夠了,所以不妨提供一個函數(shù) open_connection(),直接返回 ReaderWriter

async def open_connection(host, port, loop):
    reader = Reader(loop)
    protocol = ClientProtocol(loop, reader)
    transport, _ = await loop.create_connection(lambda: protocol, host, port)
    writer = Writer(transport)
    return reader, writer

然后 ClientSession 就可以簡化成這樣:

class ClientSession:
    async def get(self, url, host, port):
        reader, writer = await open_connection(host, port, self._loop)
        # ...
第六版:Asyncio Streams

其實(shí) Asyncio 已經(jīng)提供了 Reader 和 Writer,詳見 官方文檔。

下面以 Asyncio Streams 實(shí)現(xiàn) ClientSession.get()

class ClientSession:
    async def get(self, url, host, port):
        reader, writer = await asyncio.open_connection(
            host, port, loop=self._loop)

        request = "GET {} HTTP/1.1
Host: {}

".format(url, host)
        writer.write(request.encode())

        data = await reader.read(-1)
        print(data.decode())
        writer.close()

asyncio.open_connection() 就相當(dāng)于我們的 open_connection()ReaderWriter 也都類似,只是復(fù)雜了一些。

全文完

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/44528.html

相關(guān)文章

  • python基礎(chǔ)教程:異步IO API

    摘要:具有以下基本同步原語子進(jìn)程提供了通過創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個版本,asyncio又做了比較大的調(diào)整,把這個庫的API分為了 高層級API和...

    vboy1010 評論0 收藏0
  • Python 異步 IO:Aiohttp Client 代碼分析

    摘要:的異步代碼分析是的一個框架,基于,所以叫。不可避免的,可讀性會比較差。想找教程的話,請移步官方教程,寫得還是挺不錯的。建議不要直接使用,而只把它當(dāng)成的一個樣例。 Python 的異步 IO:Aiohttp Client 代碼分析 Aiohttp 是 Python 的一個 HTTP 框架,基于 asyncio,所以叫 Aiohttp。 我主要是看源碼,想理解它的設(shè)計(jì),所以附上了類圖與時(shí)序...

    fai1017 評論0 收藏0
  • python基礎(chǔ)教程:異步IO 概念和歷史

    摘要:并發(fā)的方式有多種,多線程,多進(jìn)程,異步等。多線程和多進(jìn)程之間的場景切換和通訊代價(jià)很高,不適合密集型的場景關(guān)于多線程和多進(jìn)程的特點(diǎn)已經(jīng)超出本文討論的范疇,有興趣的同學(xué)可以自行搜索深入理解。 編程中,我們經(jīng)常會遇到并發(fā)這個概念,目的是讓軟件能充分利用硬件資源,提高性能。并發(fā)的方式有多種,多線程,多進(jìn)程,異步IO等。多線程和多進(jìn)程更多應(yīng)用于CPU密集型的場景,比如科學(xué)計(jì)算的時(shí)間都耗費(fèi)在CPU...

    BicycleWarrior 評論0 收藏0
  • python基礎(chǔ)教程:異步IO 編程例子

    摘要:創(chuàng)建第一個協(xié)程推薦使用語法來聲明協(xié)程,來編寫異步應(yīng)用程序。協(xié)程兩個緊密相關(guān)的概念是協(xié)程函數(shù)通過定義的函數(shù)協(xié)程對象調(diào)用協(xié)程函數(shù)返回的對象。它是一個低層級的可等待對象,表示一個異步操作的最終結(jié)果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...

    wangxinarhat 評論0 收藏0
  • sanic異步框架中文文檔

    摘要:實(shí)例實(shí)例測試結(jié)果增加路由實(shí)例測試結(jié)果提供了一個方法,根據(jù)處理程序方法名生成。異常拋出異常要拋出異常,只需從異常模塊中提出相應(yīng)的異常。 typora-copy-images-to: ipic [TOC] 快速開始 在安裝Sanic之前,讓我們一起來看看Python在支持異步的過程中,都經(jīng)歷了哪些比較重大的更新。 首先是Python3.4版本引入了asyncio,這讓Python有了支...

    elliott_hu 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<