mirror of
https://github.com/minio/minio.git
synced 2024-12-26 23:25:54 -05:00
8293f546af
This implementation is similar to AMQP notifications: * Notifications are published on a single topic as a JSON feed * Topic is configurable, as is the QoS. Uses the paho.mqtt.golang library for the mqtt connection, and supports connections over tcp and websockets, with optional secure tls support. * Additionally the minio server configuration has been bumped up so mqtt configuration can be added. * Configuration migration code is added with tests. MQTT is an ISO standard M2M/IoT messaging protocol and was originally designed for applications for limited bandwidth networks. Today it's use is growing in the IoT space.
123 lines
3.5 KiB
Go
123 lines
3.5 KiB
Go
package packets
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
)
|
|
|
|
//ConnectPacket is an internal representation of the fields of the
|
|
//Connect MQTT packet
|
|
type ConnectPacket struct {
|
|
FixedHeader
|
|
ProtocolName string
|
|
ProtocolVersion byte
|
|
CleanSession bool
|
|
WillFlag bool
|
|
WillQos byte
|
|
WillRetain bool
|
|
UsernameFlag bool
|
|
PasswordFlag bool
|
|
ReservedBit byte
|
|
Keepalive uint16
|
|
|
|
ClientIdentifier string
|
|
WillTopic string
|
|
WillMessage []byte
|
|
Username string
|
|
Password []byte
|
|
}
|
|
|
|
func (c *ConnectPacket) String() string {
|
|
str := fmt.Sprintf("%s", c.FixedHeader)
|
|
str += " "
|
|
str += fmt.Sprintf("protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, c.Password)
|
|
return str
|
|
}
|
|
|
|
func (c *ConnectPacket) Write(w io.Writer) error {
|
|
var body bytes.Buffer
|
|
var err error
|
|
|
|
body.Write(encodeString(c.ProtocolName))
|
|
body.WriteByte(c.ProtocolVersion)
|
|
body.WriteByte(boolToByte(c.CleanSession)<<1 | boolToByte(c.WillFlag)<<2 | c.WillQos<<3 | boolToByte(c.WillRetain)<<5 | boolToByte(c.PasswordFlag)<<6 | boolToByte(c.UsernameFlag)<<7)
|
|
body.Write(encodeUint16(c.Keepalive))
|
|
body.Write(encodeString(c.ClientIdentifier))
|
|
if c.WillFlag {
|
|
body.Write(encodeString(c.WillTopic))
|
|
body.Write(encodeBytes(c.WillMessage))
|
|
}
|
|
if c.UsernameFlag {
|
|
body.Write(encodeString(c.Username))
|
|
}
|
|
if c.PasswordFlag {
|
|
body.Write(encodeBytes(c.Password))
|
|
}
|
|
c.FixedHeader.RemainingLength = body.Len()
|
|
packet := c.FixedHeader.pack()
|
|
packet.Write(body.Bytes())
|
|
_, err = packet.WriteTo(w)
|
|
|
|
return err
|
|
}
|
|
|
|
//Unpack decodes the details of a ControlPacket after the fixed
|
|
//header has been read
|
|
func (c *ConnectPacket) Unpack(b io.Reader) error {
|
|
c.ProtocolName = decodeString(b)
|
|
c.ProtocolVersion = decodeByte(b)
|
|
options := decodeByte(b)
|
|
c.ReservedBit = 1 & options
|
|
c.CleanSession = 1&(options>>1) > 0
|
|
c.WillFlag = 1&(options>>2) > 0
|
|
c.WillQos = 3 & (options >> 3)
|
|
c.WillRetain = 1&(options>>5) > 0
|
|
c.PasswordFlag = 1&(options>>6) > 0
|
|
c.UsernameFlag = 1&(options>>7) > 0
|
|
c.Keepalive = decodeUint16(b)
|
|
c.ClientIdentifier = decodeString(b)
|
|
if c.WillFlag {
|
|
c.WillTopic = decodeString(b)
|
|
c.WillMessage = decodeBytes(b)
|
|
}
|
|
if c.UsernameFlag {
|
|
c.Username = decodeString(b)
|
|
}
|
|
if c.PasswordFlag {
|
|
c.Password = decodeBytes(b)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
//Validate performs validation of the fields of a Connect packet
|
|
func (c *ConnectPacket) Validate() byte {
|
|
if c.PasswordFlag && !c.UsernameFlag {
|
|
return ErrRefusedBadUsernameOrPassword
|
|
}
|
|
if c.ReservedBit != 0 {
|
|
//Bad reserved bit
|
|
return ErrProtocolViolation
|
|
}
|
|
if (c.ProtocolName == "MQIsdp" && c.ProtocolVersion != 3) || (c.ProtocolName == "MQTT" && c.ProtocolVersion != 4) {
|
|
//Mismatched or unsupported protocol version
|
|
return ErrRefusedBadProtocolVersion
|
|
}
|
|
if c.ProtocolName != "MQIsdp" && c.ProtocolName != "MQTT" {
|
|
//Bad protocol name
|
|
return ErrProtocolViolation
|
|
}
|
|
if len(c.ClientIdentifier) > 65535 || len(c.Username) > 65535 || len(c.Password) > 65535 {
|
|
//Bad size field
|
|
return ErrProtocolViolation
|
|
}
|
|
return Accepted
|
|
}
|
|
|
|
//Details returns a Details struct containing the Qos and
|
|
//MessageID of this ControlPacket
|
|
func (c *ConnectPacket) Details() Details {
|
|
return Details{Qos: 0, MessageID: 0}
|
|
}
|