class MCollective::Message
container for a message, its headers, agent, collective and other meta data
Constants
- VALIDTYPES
Attributes
Public Class Methods
payload - the message body without headers etc, just the text message - the original message received from the middleware options - if the body base64 encoded? options - the agent the message is for/from options - the collective its for/from options - the message headers options - an indicator about the type of message, :message, :request, :direct_request or :reply options - if this is a reply this should old the message we are replying to options - for requests, the filter to encode into the message options - the normal client options hash options - the maximum amount of seconds this message can be valid for options - in the case of replies this is the msgid it is expecting in the replies options - specific request id to use else one will be generated
# File lib/mcollective/message.rb 23 def initialize(payload, message, options = {}) 24 options = {:base64 => false, 25 :agent => nil, 26 :headers => {}, 27 :type => :message, 28 :request => nil, 29 :filter => Util.empty_filter, 30 :options => {}, 31 :ttl => 60, 32 :expected_msgid => nil, 33 :requestid => nil, 34 :collective => nil}.merge(options) 35 36 @payload = payload 37 @message = message 38 @requestid = options[:requestid] 39 @discovered_hosts = nil 40 @reply_to = nil 41 42 @type = options[:type] 43 @headers = options[:headers] 44 @base64 = options[:base64] 45 @filter = options[:filter] 46 @expected_msgid = options[:expected_msgid] 47 @options = options[:options] 48 49 @ttl = @options[:ttl] || Config.instance.ttl 50 @msgtime = 0 51 52 @validated = false 53 54 if options[:request] 55 @request = options[:request] 56 @agent = request.agent 57 @collective = request.collective 58 @type = :reply 59 else 60 @agent = options[:agent] 61 @collective = options[:collective] 62 end 63 64 base64_decode! 65 end
Public Instance Methods
# File lib/mcollective/message.rb 129 def base64? 130 @base64 131 end
# File lib/mcollective/message.rb 115 def base64_decode! 116 return unless @base64 117 118 @payload = SSL.base64_decode(@payload) 119 @base64 = false 120 end
# File lib/mcollective/message.rb 122 def base64_encode! 123 return if @base64 124 125 @payload = SSL.base64_encode(@payload) 126 @base64 = true 127 end
# File lib/mcollective/message.rb 241 def create_reqid 242 # we gsub out the -s so that the format of the id does not 243 # change from previous versions, these should just be more 244 # unique than previous ones 245 SSL.uuid.gsub("-", "") 246 end
# File lib/mcollective/message.rb 183 def decode! 184 raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type) 185 186 begin 187 @payload = PluginManager["security_plugin"].decodemsg(self) 188 rescue Exception => e 189 if type == :request 190 # If we're a server receiving a request, reraise 191 raise(e) 192 else 193 # We're in the client, log and carry on as best we can 194 195 # Note: mc_sender is unverified. The verified identity is in the 196 # payload we just failed to decode 197 Log.warn("Failed to decode a message from '#{headers["mc_sender"]}': #{e}") 198 return 199 end 200 end 201 202 if type == :request 203 raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid]) 204 end 205 206 [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop| 207 instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop) 208 end 209 end
# File lib/mcollective/message.rb 133 def description 134 cid = "" 135 cid += payload[:callerid] + "@" if payload.include?(:callerid) 136 cid += payload[:senderid] 137 138 "#{requestid} for agent '#{agent}' in collective '#{collective}' from #{cid}" 139 end
# File lib/mcollective/message.rb 141 def encode! 142 case type 143 when :reply 144 raise "Cannot encode a reply message if no request has been associated with it" unless request 145 raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid]) 146 147 @requestid = request.payload[:requestid] 148 @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid]) 149 when :request, :direct_request 150 validate_compound_filter(@filter["compound"]) unless @filter["compound"].empty? 151 152 @requestid = create_reqid unless @requestid 153 @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl) 154 else 155 raise "Cannot encode #{type} messages" 156 end 157 end
in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.
# File lib/mcollective/message.rb 110 def expected_msgid=(msgid) 111 raise "Can only store the expected msgid for reply messages" unless @type == :reply 112 @expected_msgid = msgid 113 end
publish a reply message by creating a target name and sending it
# File lib/mcollective/message.rb 228 def publish 229 # If we've been specificaly told about hosts that were discovered 230 # use that information to do P2P calls if appropriate else just 231 # send it as is. 232 config = Config.instance 233 if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold) 234 self.type = :direct_request 235 Log.debug("Handling #{requestid} as a direct request") 236 end 237 238 PluginManager['connector_plugin'].publish(self) 239 end
Sets a custom reply-to target for requests. The connector plugin should inspect this when constructing requests and set this header ensuring replies will go to the custom target otherwise the connector should just do what it usually does
# File lib/mcollective/message.rb 99 def reply_to=(target) 100 raise "Custom reply targets can only be set on requests" unless [:request, :direct_request].include?(@type) 101 102 @reply_to = target 103 end
Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts
else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.
Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.
# File lib/mcollective/message.rb 76 def type=(type) 77 raise "Unknown message type #{type}" unless VALIDTYPES.include?(type) 78 79 if type == :direct_request 80 raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing 81 82 unless @discovered_hosts && !@discovered_hosts.empty? 83 raise "Can only set type to :direct_request if discovered_hosts have been set" 84 end 85 86 # clear out the filter, custom discovery sources might interpret the filters 87 # different than the remote mcollectived and in directed mode really the only 88 # filter that matters is the agent filter 89 @filter = Util.empty_filter 90 @filter["agent"] << @agent 91 end 92 93 @type = type 94 end
Perform validation against the message by checking filters and ttl
# File lib/mcollective/message.rb 212 def validate 213 raise "Can only validate request messages" unless type == :request 214 215 msg_age = Time.now.utc.to_i - msgtime 216 217 if msg_age > ttl 218 PluginManager["global_stats"].ttlexpired 219 raise(MsgTTLExpired, "Message #{description} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}. Rejecting message.") 220 end 221 222 raise(NotTargettedAtUs, "Message #{description} does not pass filters. Ignoring message.") unless PluginManager["security_plugin"].validate_filter?(payload[:filter]) 223 224 @validated = true 225 end
# File lib/mcollective/message.rb 159 def validate_compound_filter(compound_filter) 160 compound_filter.each do |filter| 161 filter.each do |statement| 162 if statement["fstatement"] 163 functionname = statement["fstatement"]["name"] 164 pluginname = Data.pluginname(functionname) 165 value = statement["fstatement"]["value"] 166 167 ddl = DDL.new(pluginname, :data) 168 169 # parses numbers and booleans entered as strings into proper 170 # types of data so that DDL validation will pass 171 statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"]) 172 173 Data.ddl_validate(ddl, statement["fstatement"]["params"]) 174 175 unless value && Data.ddl_has_output?(ddl, value) 176 DDL.validation_fail!(:PLMC41, "Data plugin '%{functionname}()' does not return a '%{value}' value", :error, {:functionname => functionname, :value => value}) 177 end 178 end 179 end 180 end 181 end