Skip to content

Commit

Permalink
Eliminate extra message queuing
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Krey committed Feb 10, 2015
1 parent 2b324f4 commit 34f816d
Show file tree
Hide file tree
Showing 24 changed files with 487 additions and 275 deletions.
6 changes: 3 additions & 3 deletions MQTTClient.podspec
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
Pod::Spec.new do |s|
s.name = "MQTTClient"
s.version = "0.0.3"
s.version = "0.0.4"
s.summary = "IOS native ObjectiveC MQTT Framework"
s.homepage = "https://github.com/ckrey/MQTT-Client-Framework"
s.license = { :type => "MIT", :file => "LICENSE" }
s.author = { "Christoph Krey" => "[email protected]" }
s.source = { :git => "https://github.com/ckrey/MQTT-Client-Framework.git", :tag => "0.0.3" }
s.source = { :git => "https://github.com/ckrey/MQTT-Client-Framework.git", :tag => "0.0.4" }

s.source_files = "MQTTClient/MQTTClient", "MQTTClient/MQTTClient/**/*.{h,m}"
s.requires_arc = true

s.ios.deployment_target = "7.0"
s.ios.deployment_target = "6.1"
end
12 changes: 11 additions & 1 deletion MQTTClient/MQTTClient/MQTTSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ typedef NS_ENUM(NSInteger, MQTTSessionEvent) {
/** gets called when the content of MQTTClients internal buffers change
use for monitoring the completion of transmitted and received messages
@param session the MQTTSession reporting the change
@param queued the number of queued messages waiting to be send when the connection becomes established and ready
@param for backward compatibility only: MQTTClient does not queue messages anymore except during QoS protocol
@param flowingIn the number of incoming messages not acknowledged by the MQTTClient yet
@param flowingOut the number of outgoing messages not yet acknowledged by the MQTT broker
*/
Expand All @@ -189,6 +189,16 @@ typedef NS_ENUM(NSInteger, MQTTSessionEvent) {
flowingIn:(NSUInteger)flowingIn
flowingOut:(NSUInteger)flowingOut;

/** gets called when the content of MQTTClients internal buffers change
use for monitoring the completion of transmitted and received messages
@param session the MQTTSession reporting the change
@param flowingIn the number of incoming messages not acknowledged by the MQTTClient yet
@param flowingOut the number of outgoing messages not yet acknowledged by the MQTT broker
*/
- (void)buffered:(MQTTSession *)session
flowingIn:(NSUInteger)flowingIn
flowingOut:(NSUInteger)flowingOut;

@end

/** Session implements the MQTT protocol for your application
Expand Down
104 changes: 23 additions & 81 deletions MQTTClient/MQTTClient/MQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ @interface MQTTSession() <MQTTDecoderDelegate, MQTTEncoderDelegate>
@property (nonatomic) UInt16 txMsgId;
@property (strong, nonatomic) NSMutableDictionary *txFlows;
@property (strong, nonatomic) NSMutableDictionary *rxFlows;
@property (strong, nonatomic) NSMutableArray *queue;

@property (nonatomic) BOOL synchronPub;
@property (nonatomic) UInt16 synchronPubMid;
Expand Down Expand Up @@ -133,17 +132,10 @@ - (MQTTSession *)initWithClientId:(NSString *)clientId
self.runLoop = runLoop;
self.runLoopMode = runLoopMode;

self.queue = [NSMutableArray array];
self.txMsgId = 1;
self.txFlows = [[NSMutableDictionary alloc] init];
self.rxFlows = [[NSMutableDictionary alloc] init];

if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];

return self;
}
Expand Down Expand Up @@ -636,12 +628,7 @@ - (UInt16)publishData:(NSData*)data
flow.msg = msg;
flow.deadline = [NSDate dateWithTimeIntervalSinceNow:DUPTIMEOUT];
self.txFlows[[NSNumber numberWithUnsignedInt:(uint)msgId]] = flow;
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];
}
[self send:msg];

Expand Down Expand Up @@ -766,12 +753,7 @@ - (void)closeInternal
[self.delegate connectionClosed:self];
}

if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];
self.synchronPub = FALSE;
self.synchronPubMid = 0;
self.synchronSub = FALSE;
Expand Down Expand Up @@ -849,17 +831,7 @@ - (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent)eventCode err
case MQTTSessionStatusConnecting:
break;
case MQTTSessionStatusConnected:
if ([self.queue count] > 0) {
MQTTMessage *msg = (self.queue)[0];
[self.queue removeObjectAtIndex:0];
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self.encoder encodeMessage:msg];
}
[self tell];
[self checkTxFlows];
break;
case MQTTSessionStatusDisconnecting:
Expand Down Expand Up @@ -974,22 +946,7 @@ - (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg
}

self.synchronConnect = FALSE;

if ([self.queue count] > 0) {
if (self.encoder.status == MQTTEncoderStatusReady) {
MQTTMessage *msg = (self.queue)[0];
[self.queue removeObjectAtIndex:0];
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self.encoder encodeMessage:msg];
}
}
}
else {
} else {
NSString *errorDescription;
switch (bytes[1]) {
case 1:
Expand Down Expand Up @@ -1108,12 +1065,7 @@ - (void)handlePublish:(MQTTMessage*)msg
@"retained": @(msg.retainFlag),
@"mid": @(msgId)};
(self.rxFlows)[[NSNumber numberWithUnsignedInt:msgId]] = dict;
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];
[self send:[MQTTMessage pubrecMessageWithMessageId:msgId]];
}
}
Expand All @@ -1133,12 +1085,7 @@ - (void)handlePuback:(MQTTMessage*)msg
if ([[flow msg] type] == MQTTPublish && [[flow msg] qos] == 1) {

[self.txFlows removeObjectForKey:msgId];
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];
if ([self.delegate respondsToSelector:@selector(messageDelivered:msgID:)]) {
[self.delegate messageDelivered:self msgID:[msgId unsignedIntValue]];
}
Expand Down Expand Up @@ -1229,12 +1176,7 @@ - (void)handlePubrel:(MQTTMessage*)msg
}

[self.rxFlows removeObjectForKey:msgId];
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];
}
[self send:[MQTTMessage pubcompMessageWithMessageId:[msgId unsignedIntegerValue]]];
}
Expand All @@ -1250,12 +1192,7 @@ - (void)handlePubcomp:(MQTTMessage*)msg {
MQttTxFlow *flow = (self.txFlows)[msgId];
if (flow != nil && [[flow msg] type] == MQTTPubrel) {
[self.txFlows removeObjectForKey:msgId];
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
[self tell];
if ([self.delegate respondsToSelector:@selector(messageDelivered:msgID:)]) {
[self.delegate messageDelivered:self msgID:[msgId unsignedIntValue]];
}
Expand Down Expand Up @@ -1307,15 +1244,6 @@ - (void)send:(MQTTMessage*)msg {
if ([self.encoder status] == MQTTEncoderStatusReady) {
[self.encoder encodeMessage:msg];
}
else {
[self.queue addObject:msg];
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:[self.queue count]
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
}
}

- (UInt16)nextMsgId {
Expand All @@ -1326,4 +1254,18 @@ - (UInt16)nextMsgId {
return self.txMsgId;
}

- (void)tell {
if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:0
flowingIn:[self.rxFlows count]
flowingOut:[self.txFlows count]];
}
}

@end
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,23 @@ typedef NS_ENUM(NSInteger, MQTTSessionEvent) {
*/
- (void)received:(MQTTSession *)session type:(int)type qos:(MQTTQosLevel)qos retained:(BOOL)retained duped:(BOOL)duped mid:(UInt16)mid data:(NSData *)data;

/** gets called when a command is received from the MQTT broker
use this for low level control of the MQTT connection
@param session the MQTTSession reporting the received command
@param type the MQTT command type
@param qos the Quality of Service of the command
@param retained the retained status of the command
@param duped the duplication status of the command
@param mid the Message Identifier of the command
@param data the payload data of the command if any, might be zero length
@return true if the sessionmanager should ignore the received message
*/
- (BOOL)ignoreReceived:(MQTTSession *)session type:(int)type qos:(MQTTQosLevel)qos retained:(BOOL)retained duped:(BOOL)duped mid:(UInt16)mid data:(NSData *)data;

/** gets called when the content of MQTTClients internal buffers change
use for monitoring the completion of transmitted and received messages
@param session the MQTTSession reporting the change
@param queued the number of queued messages waiting to be send when the connection becomes established and ready
@param for backward compatibility only: MQTTClient does not queue messages anymore except during QoS protocol
@param flowingIn the number of incoming messages not acknowledged by the MQTTClient yet
@param flowingOut the number of outgoing messages not yet acknowledged by the MQTT broker
*/
Expand All @@ -176,6 +189,16 @@ typedef NS_ENUM(NSInteger, MQTTSessionEvent) {
flowingIn:(NSUInteger)flowingIn
flowingOut:(NSUInteger)flowingOut;

/** gets called when the content of MQTTClients internal buffers change
use for monitoring the completion of transmitted and received messages
@param session the MQTTSession reporting the change
@param flowingIn the number of incoming messages not acknowledged by the MQTTClient yet
@param flowingOut the number of outgoing messages not yet acknowledged by the MQTT broker
*/
- (void)buffered:(MQTTSession *)session
flowingIn:(NSUInteger)flowingIn
flowingOut:(NSUInteger)flowingOut;

@end

/** Session implements the MQTT protocol for your application
Expand Down
Binary file modified MQTTClient/dist/MQTTClient.framework/Versions/A/MQTTClient
Binary file not shown.
10 changes: 10 additions & 0 deletions MQTTClient/dist/documentation/html/Nodes.xml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@
<Name>&lt;MQTTSessionDelegate&gt;</Name>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
<Subnodes>
<Node>
<Name>buffered:flowingIn:flowingOut:</Name>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
<Anchor>a66d6b40de38077ee28d05048eec4e3f9</Anchor>
</Node>
<Node>
<Name>buffered:queued:flowingIn:flowingOut:</Name>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
Expand Down Expand Up @@ -333,6 +338,11 @@
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
<Anchor>a168b8ff4bd95ffbfe61a8b10236860f8</Anchor>
</Node>
<Node>
<Name>ignoreReceived:type:qos:retained:duped:mid:data:</Name>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
<Anchor>a83b477a650b00526b0a39599ddd6fd12</Anchor>
</Node>
<Node>
<Name>messageDelivered:msgID:</Name>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
Expand Down
22 changes: 22 additions & 0 deletions MQTTClient/dist/documentation/html/Tokens.xml
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,17 @@
<Anchor>aefaae4e610a97984e2135bbd902081ca</Anchor>
<DeclaredIn>MQTTSession.h</DeclaredIn>
</Token>
<Token>
<TokenIdentifier>
<Name>ignoreReceived:type:qos:retained:duped:mid:data:</Name>
<APILanguage>occ</APILanguage>
<Type>intfm</Type>
<Scope>MQTTSessionDelegate-p</Scope>
</TokenIdentifier>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
<Anchor>a83b477a650b00526b0a39599ddd6fd12</Anchor>
<DeclaredIn>MQTTSession.h</DeclaredIn>
</Token>
<Token>
<TokenIdentifier>
<Name>buffered:queued:flowingIn:flowingOut:</Name>
Expand All @@ -500,4 +511,15 @@
<Anchor>a8d9602d0b89fa2c3a0009150e57edc87</Anchor>
<DeclaredIn>MQTTSession.h</DeclaredIn>
</Token>
<Token>
<TokenIdentifier>
<Name>buffered:flowingIn:flowingOut:</Name>
<APILanguage>occ</APILanguage>
<Type>intfm</Type>
<Scope>MQTTSessionDelegate-p</Scope>
</TokenIdentifier>
<Path>protocol_m_q_t_t_session_delegate-p.html</Path>
<Anchor>a66d6b40de38077ee28d05048eec4e3f9</Anchor>
<DeclaredIn>MQTTSession.h</DeclaredIn>
</Token>
</Tokens>
Loading

0 comments on commit 34f816d

Please sign in to comment.