Code-Auflistung für mqtt.lua

Die nachstehende Code-Auflistung, mqtt.lua, enthält den Code zum Implementieren des MQTT-Protokolls auf Citrix ADC unter Verwendung von Protokollerweiterungen. Der Code hat nur die TCP-Clientdaten-Callback-Funktion definiert - client.on_data (). Für Serverdaten wird keine Callback-Funktion hinzugefügt, und der Server zum Client nimmt den schnellen nativen Pfad. Bei Clientdaten analysiert der Code die CONNECT MQTT Protokollmeldung und extrahiert die ClientID. Es verwendet dann den ClientID für user_token Wert, der verwendet wird, um den gesamten Clientdatenverkehr für die Verbindung basierend auf der ClientID zu laden, indem die LB-Methode für den LB-vserver als USER_TOKEN festgelegt wird. Es verwendet die ClientID auch für user_session Wert, der für die LB-Persistenz verwendet werden kann, indem Persistenztyp für den LB-vserver als USERSESSION festgelegt wird. Der Code verwendet die ns.send (), um LB zu tun und die Anfangsdaten zu senden. Es verwendet die ns.pipe () API, um den Rest des Clientdatenverkehrs direkt an die Serververbindung zu senden, wobei Aufrufe an den Extension-Callback-Handler umgangen werden.

--[[

  MQTT event handler for TCP client data

    ctxt - TCP client side App processing context.

    data - TCP Data stream received.

    - parse the client ID from the connect message - the first message should be connect

    - send the data to LB with ClientID as user token and session

    - pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will

      bypass the tcp client on_data handler

    - if a parse error is seen, throw an error so the connection is reset

--]]

function client.on_data(ctxt, payload)

    local data = payload.data

    local data_len = data:len()

    local offset = 1

    local byte = nil

    local utf8_str_len = 0

    local msg_type = 0

    local multiplier = 1

    local max_multiplier = 128 * 128 * 128

    local rem_length = 0

    local clientID = nil

    -- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)

    if (data_len < 2) then

        goto need_more_data

    end

    byte = data:byte(offset)

    offset = offset + 1

       -- check for connect packet - type value 1

    msg_type = bit32.rshift(byte, 4)

    if (msg_type ~= 1) then

        error("Missing MQTT Connect packet.")

    end

    -- parse the remaining length

    repeat

        if (multiplier > max_multiplier) then

           error("MQTT CONNECT packet parse error - invalid Remaining Length.")

       end

       if (data_len < offset) then

          goto need_more_data

       end

       byte = data:byte(offset)

       offset = offset + 1

       rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)

       multiplier = multiplier * 128

    until (bit32.band(byte, 0x80) == 0)

    -- protocol name

    -- check if protocol name length is present

    if (data_len < offset + 1) then

       goto need_more_data

    end

    -- protocol name length MSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = byte * 256

    -- length LSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = utf8_str_len + byte

       -- skip the variable header for connect message

    -- the four required fields (protocol name, protocol level, connect flags, keep alive)

    offset = offset + utf8_str_len + 4

    -- parse the client ID

    --

    -- check if client ID len is present

    if (data_len < offset + 1) then

       goto need_more_data

    end

       -- client ID length MSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = byte * 256

    -- length LSB

    byte = data:byte(offset)

    offset = offset + 1

    utf8_str_len = utf8_str_len + byte

    if (data_len < (offset + utf8_str_len - 1)) then

        goto need_more_data

    end

    clientID = data:sub(offset, offset + utf8_str_len - 1)

    -- send the data so far to lb, user_token is set to do LB based on clientID

    -- user_session is set to clientID as well (it will be used to persist session)

    ns.send(ctxt.output, "DATA", {data = data,

                               user_token = clientID,

                               user_session = clientID})

       -- pipe the subsequent traffic to the lb - to bypass the extension handler

    ns.pipe(ctxt.input, ctxt.output)

    goto parse_done

    ::need_more_data::

    ctxt:hold(data)

    ::parse_done::

    return

end
<!--NeedCopy-->
Code-Auflistung für mqtt.lua

In diesem Artikel