Hot questions for Using ZeroMQ in ruby

Question:

Stumble upon reading ZeroMQ FAQ about a Thread safety.

My multi-threaded program keeps crashing in weird places inside the ZeroMQ library. What am I doing wrong?

ZeroMQ sockets are not thread-safe. This is covered in some detail in the Guide.

The short version is that sockets should not be shared between threads. We recommend creating a dedicated socket for each thread.

For those situations where a dedicated socket per thread is infeasible, a socket may be shared if and only if each thread executes a full memory barrier before accessing the socket. Most languages support a Mutex or Spinlock which will execute the full memory barrier on your behalf.

My multi-threaded program keeps crashing in weird places inside the ZeroMQ library. What am I doing wrong?

Following is my following code:

Celluloid::ZMQ.init
module Scp
    module DataStore
    class DataSocket
        include Celluloid::ZMQ 
            def pull_socket(socket)
                @read_socket = Socket::Pull.new.tap do |read_socket|
                    ## IPC socket
                    read_socket.connect(socket)
                end
            end

            def push_socket(socket)
                @write_socket = Socket::Push.new.tap do |write_socket|
                    ## IPC socket
                    write_socket.connect(socket)
                end
            end

            def run
                pull_socket and push_socket and loopify!
            end

            def loopify!
                loop {
                   async.evaluate_response(read_socket.read_multipart)
                }
            end

            def evaluate_response(data)
                return_response(message_id,routing,Parser.parser(data))
            end

            def return_response(message_id,routing,object)
                data = object.to_response
                write_socket.send([message_id,routing,data])
            end
        end
    end
end  

DataSocket.new.run 

Now, there are couple things I'm unclear off:

1) Assuming that async spawns a new Thread ( every time ) and the write_socket is shared between the all threads and ZeroMQ says that their socket is not thread-safe. I certainly see the write_socket running into threads safety issue. ( Btw, hasn't faced this issue in all end to end testing thus far. )

Question 1 : Is my understanding correct on this?

To solve this, ZeroMQ asks us to achieve this using Mutex, Semaphore.

Which results in Question 2

2) Context Switching.

Given a threaded application can context switch anytime. Looking at the ffi-rzmq code Celluloid::ZMQ .send() internally calls send_strings(), which internally called send_multiple()

Question 2: Context Switching can happen ( anywhere ) inside ( even on critical section ) (here)[https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L510]

This can also lead to a data ordering issue.

Is my following observation correct?

Note:

Operating system ( MacOS, Linux and CentOS )  
Ruby - MRI 2.2.2/2.3.0

Answer:

No one ought risk the application robustness by putting it on thin ice

Forgive this story to be a rather long read, but authors life-long experience shows that reasons why are far more important than any few SLOCs of ( potentially doubtful or mystically-looking or root-cause-ignorant ) attempts to experimentally find how

Initial note

While ZeroMQ has for several decades been promoted as Zero-Sharing ( Zero-Blocking, ( almost )-Zero-Latency and a few more design-maxims. The best place to read about pros & cons are Pieter HINTJENS' books, not just the fabulous "Code Connected, Volume 1", but also the advanced design & engineering in real social-domain ones ) philosophy, the very recent API documentation has introduced and advertises some IMHO features with relaxed relation to these corner-stone principles for distributed-computing, that do not so sharp whistle on Zero-Sharing so loud. This said, I still remain a Zero-Sharing guy, so kindly view the rest of this post in this light.

Answer 1:No, sir. -- or better -- Yes and No, sir.

ZeroMQ does not ask one to use Mutex/Semaphore barriers. This is something contradicting the ZeroMQ design maxims.

Yes, recent API changes started to mention that ( under some additional conditions ) one may start using shared-sockets ... with ( many ) additional measures ... so the implication was reversed. If one "wants", the one also takes all the additional steps and measures ( and pays all the initally hidden design & implementation costs for "allowing" shared toys to ( hopefully ) survive the principal ( un-necessary ) battle with the rest of the uncontrollable distributed-system environment -- thus suddenly also bearing a risk of failing ( which was for many wise reasons not the case in the inital ZeroMQ Zero-sharing evangelisation ) -- so, user decides on which path to go. That is fair. ).

Sound & robust designs IMHO still had better develop as per initial ZeroMQ API & evangelism, where Zero-sharing was a principle.

Answer 2:There is by-design always a principal uncertainty about ZeroMQ data-flow ordering, one of ZeroMQ design-maxims keeps designers not to rely on unsupported assumptions on message ordering and many others ( exceptions apply ). There is just a certainty that any message dispatched into the ZeroMQ infrastructure is either delivered as a complete-message, or not delivered at all. So one can be sure just about the fact, that no fragmented wrecks ever appear on delivery. For furhter details, read below.


ThreadId does not prove anything ( unless inproc transport-class used )

Given the internal design of ZeroMQ data-pumping engines, the instantiation of a zmq.Context( number_of_IO_threads ) decides on how many threads get spawned for handling the future data-flows. This could be anywhere { 0, 1: default, 2, .. } up to almost depleting the kernel-fixed max-number-of-threads. The value of 0 gives a reasonable choice not to waste resources in case, where inproc:// transport-class is actually a direct-memory region mapped handling of data-flow ( that actually never flow ang get nailed down directly into the landing-pad of the receiving socket-abstraction :o) ) and no thread is ever needed for such job. Next to this, the <aSocket>.setsockopt( zmq.AFFINITY, <anIoThreadEnumID#> ) permits to fine-tune the data-related IO-"hydraulics", so as to prioritise, load-balance, performance-tweak the thread-loads onto the enumerated pool of zmq.Context()-instance's IO-threads and gain from better and best settings in the above listed design & data-flow operations aspects.


The cornerstone-element is the Context()s' instance, not a Socket()'s one

Once a Context()'s instance got instantiated and configured ( ref. above why and how ), it is ( almost ) free-to-be-shared ( if design cannot resist from sharing or has a need to avoid a setup of a fully fledged distributed-computing infrastructure ).

In other words, the brain is always inside the zmq.Context()'s instance - all the socket-related dFSA-engines are setup / configured / operated there ( yes, even though the syntax is <aSocket>.setsockopt(...) the effect of such is implemented inside The Brain -- in the respective zmq.Context - not in some wire-from-A-to-B.

Better never share <aSocket> ( even if API-4.2.2+ promises you could )

So far, one might have seen a lot of code-snippets, where ZeroMQ Context and it's sockets get instantiated and disposed off in a snap, serving just a few SLOC-s in a row, but -- this does not mean, that such practice is wise or adjusted by any other need than a that very academic example ( that was made in just a need to get printed in as few SLOCs as possible, because of the book publisher's policies ).

Even in such cases a fair warning about indeed immense costs of zmq.Context infrastructure setup / tear-down ought be present, thus to avoid any generalisation, the less any copy/paste replicas of such the code, that was used short-handedly just for such illustrative purposes.

Just imagine the realistic setups needed to take place for any single Context instance -- to get ready a pool of respective dFSA-engines, maintaining all their respective configuration setups plus all the socket-end-point pools related transport-class specific hardware + external O/S-services handlers, round-robin event-scanners, buffer-memory-pools allocations + their dynamic-allocators etc, etc. This all takes both time and O/S resources, so handle these ( natural ) costs wisely and with care for adjusted overheads, if performance is not to suffer.

If still in doubt why to mention this, just imagine if anybody would insist of tearing down all the LAN-cables right after a packet was sent and having a need to wait until a new cabling gets installed right before a need to sent the next packet appears. Hope this "reasonable-instantiation" view could be now better percepted and an argument to share ( if at all ) a zmq.Context()-instance(s), without any further fights for trying to share ZeroMQ socket-instances ( even if newly becoming ( almost ) thread-safe per-se ).

The ZeroMQ philosophy is robust if taken as an advanced design evangelism for high performance distributed-computing infrastructures. Tweaking just one ( minor ) aspect typically does not adjust all the efforts and costs as on the global view on how to design safe and performant systems, the result would not move a single bit better ( and even the absolutely-share-able risk-free ( if that were ever possible ) socket-instances will not change this, whereas all the benefits for sound-design, clean-code and reasonably achievable test-ability & debugging will get lost ) if just this one detail gets changed -- So, rather pull another wire from an existing brain to such a new thread, or equip a new thread with it's own brain, that will locally handle it's resources and allow it to connect own wires back to all other brains -- as necessary to communicate to -- in the distributed-system ).

If still in doubts, try to imagine what would happen to your national olympic hockey-team, if it were sharing just one single hockey-stick during the tournament. Or how would you like, if all neighbours in your home-town would share the same phone number to answer all the many incoming calls ( yes, with ringing all the phones and mobiles, sharing the same number, at the same time ). How well would that work?


Language bindings need not reflect all the API-features available

Here, one can raise, and in some cases being correct, that not all ZeroMQ language-bindings or all popular framework-wrappers keep all API-details exposed to user for application-level programming ( author of this post has struggled for a long time with such legacy conflicts, that remained unresolvable right to this reason and had to scratch his head a lot to find any feasible way to get around this fact - so it is ( almost ) always doable )


Epilogue:

It is fair to note, that recent versions of ZeroMQ API 4.2.2+ started to creep the inital evangelisated principles.

Nevertheless, worth to remember the anxient memento mori

( emphases added, capitalisation not )

Thread safety

ØMQ has both thread safe socket type and not thread safe socket types. Applications MUST NOT use a not thread safe socket from multiple threads except after migrating a socket from one thread to another with a "full fence" memory barrier.

Following are the thread safe sockets: * ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER

While this text might sound to some ears as a promising, calling barriers to service is the worst thing one can do in designing advanced distributed-computing systems, where performance is a must.

The last thing one would like to see is to block one's own code, as such agent gets into a principally uncontrollable blocking-state, where no-one can heel it from ( neither the agent per-se internally, nor anyone from outside ), in case a remote agent never delivers a-just-expected event ( which in distributed-systems can happen by so many reasons or under so many circumstances that are outside of one's control).

Building a system that is prone to hang itself ( with a broad smile of supported ( but naively employed ) syntax-possibility ) is indeed nothing happy to do, the less a serious design job.

One would also not become surprised here, that many additional ( initially not visible ) restrictions apply down the line of the new moves into using shared-{ hockey-stick | telephones } API:

ZMQ_CLIENT sockets are threadsafe. They do not accept the ZMQ_SNDMORE option on sends not ZMQ_RCVMORE on receives. This limits them to single part data. The intention is to extend the API to allow scatter/gather of multi-part data.

c/a

Celluloid::ZMQ does not report any of these new-API-( a sin of sharing almost forgiving ) socket types in its section on supported socket typed so no good news to be expected a-priori and Celluloid::ZMQ master activity seems to have faded out somewhere in 2015, so expectations ought be somewhat realistic from this corner.

This said, one interesting point might be found behind a notice:

before you go building your own distributed Celluloid systems with Celluloid::ZMQ, be sure to give DCell a look and decide if it fits your purposes.


Last but not least, combining event-loop system inside another event-loop is a painful job. Trying to integrate an embedded hard-real-time system into another hard-real-time system could even mathematically prove itself to be impossible.

Similarly, building multi-agent system using another agent-based component brings additional kinds of collisions and race-conditions, if meeting the same resources, that are harnessed ( be it knowingly or by "just" some functional side-effect ) from both ( multiple ) agent-based frameworks.

Un-salvageable mutual dead-locks are just one kind of these collisions, that introduce initally un-seen troubles down the line of un-aware design attempts. The very first step outside of a single-agent system design makes one lose many more warranties, that were un-noticed in place before going multi-agent ( distributed ), so open minds and being ready to learn many "new" concepts and concentration on many new concerns to be carefully watched for and fought to avoid are quite an important prerequisite, so as not to ( un-knowingly ) introduce patterns, that are now actually anti-patterns in distributed-systems ( multi-agent ) domain.

At leastYou have been warned:o)

Question:

I want to make a simple connection between a Python program and a Ruby program using ZeroMQ, I am trying to use a PAIR connection, but I have not been able.

This is my code in python (the server):

import zmq 
import time 

port = "5553" 
context = zmq.Context() 
socket = context.socket(zmq.PAIR) 
socket.bind("tcp://*:%s" % port) 

while True: 
    socket.send(b"Server message to client3") 
    print("Enviado mensaje") 
    time.sleep(1)

It does not display anything until I connect a client.

This is the code in Ruby (the client)

require 'ffi-rzmq'
context = ZMQ::Context.new
subscriber = context.socket ZMQ::PAIR
subscriber.connect "tcp://localhost:5553"
loop do
    address = ''
    subscriber.recv_string address
    puts "[#{address}]"
end

The ruby script just freezes, it does not print anything, and the python script starts printing Enviando mensaje

B.T.W: I am using Python 3.6.9 and Ruby 2.6.5

What is the correct way to connect a zmq PAIR between Ruby and Python?


Answer:

Welcome to the Zen of Zero! In case one has never worked with ZeroMQ, one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds" before diving into further details

Q : It does not display anything until I connect a client.

Sure, it does not, your code imperatively asked to block until a PAIR/PAIR delivery channel got happen to become able to deliver a message. As the v4.2+ API defines, the .send()-method will block during all the duration of a "mute state".

When a ZMQ_PAIR socket enters the mute state due to having reached the high water mark for the connected peer, or if no peer is connected, then any zmq_send(3) operations on the socket shall block until the peer becomes available for sending; messages are not discarded.

May try non-blocking mode of sending ( always a sign of a good engineering practice to avoid blocking, the more in distributed-computing ) and better include also <aSocket>.close() and <aContext>.term() as a rule of thumb ( best with explicit .setsockopt( zmq.LINGER, 0 ) ) for avoiding hang-ups and as a good engineering practice to explicitly close resources and release them back to the system

socket.send( b"Server message #[_{0:_>10d}_] to client3".format( i ), zmq.NOBLOCK )

Last but not least :

Q : What is the correct way to connect a zmq PAIR between Ruby and Python?

as the API documentation explains:

ZMQ_PAIR sockets are designed for inter-thread communication across the zmq_inproc(7) transport and do not implement functionality such as auto-reconnection.

there is no best way to do this, as Python / Ruby are not a case of inter-thread communications. ZeroMQ has since v2.1+ explicitly warned, that the PAIR/PAIR archetype is an experimental and ought be used only with bearing that in mind.

One may always substitute each such use-case with a tandem of PUSH/PULL-simplex channels, providing the same comfort with a pair of a .send()-only + .recv()-only channels.

Question:

I've created a tap0 device (IP 10.0.0.101), and am using zeromq's pgm pub/sub (e.g. pgm://192.168.100.2;234.5.6.7:5555) to transport Ethernet frames from the tap to the zmq subscribers and vice versa. The idea is to create a virtual network using pgm. I have 2 tap hosts on the network: 10.0.0.101, 10.0.0.11. They also have physical Ethernet adapters at 192.168.106.126, 192.168.106.55.

'The problem is that ping works, but http and ssh protocols do not.

Wireshark shows a successful TCP startup sequence, but then I start to see duplicate ACKs, retransmissions, and curl and ssh hang for a while and eventually error out.

A snippet from Wireshark is below, followed by most of the (hopefully) relevant ruby source code. This is using rb_tuntap and ffi-rzmq gems.

No.     Time               Source                Destination           Protocol Length Info
      7 11:41:45.464867000 10.0.0.11             10.0.0.101            TCP      74     51659 > 3000 [SYN] Seq=0 Win=14600 Len=0 MSS=1460 SACK_PERM=1 TSval=1953042 TSecr=0 WS=64

Frame 7: 74 bytes on wire (592 bits), 74 bytes captured (592 bits) on interface 0
Ethernet II, Src: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f), Dst: 56:c8:52:17:31:67 (56:c8:52:17:31:67)
Internet Protocol Version 4, Src: 10.0.0.11 (10.0.0.11), Dst: 10.0.0.101 (10.0.0.101)
Transmission Control Protocol, Src Port: 51659 (51659), Dst Port: 3000 (3000), Seq: 0, Len: 0

No.     Time               Source                Destination           Protocol Length Info
      8 11:41:45.464956000 10.0.0.101            10.0.0.11             TCP      74     3000 > 51659 [SYN, ACK] Seq=0 Ack=1 Win=28960 Len=0 MSS=1460 SACK_PERM=1 TSval=10191992 TSecr=1953042 WS=128

Frame 8: 74 bytes on wire (592 bits), 74 bytes captured (592 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 0, Ack: 1, Len: 0

No.     Time               Source                Destination           Protocol Length Info
     11 11:41:45.473101000 10.0.0.11             10.0.0.101            TCP      66     51659 > 3000 [ACK] Seq=1 Ack=1 Win=14656 Len=0 TSval=1953044 TSecr=10191992

Frame 11: 66 bytes on wire (528 bits), 66 bytes captured (528 bits) on interface 0
Ethernet II, Src: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f), Dst: 56:c8:52:17:31:67 (56:c8:52:17:31:67)
Internet Protocol Version 4, Src: 10.0.0.11 (10.0.0.11), Dst: 10.0.0.101 (10.0.0.101)
Transmission Control Protocol, Src Port: 51659 (51659), Dst Port: 3000 (3000), Seq: 1, Ack: 1, Len: 0

No.     Time               Source                Destination           Protocol Length Info
     12 11:41:45.473429000 10.0.0.11             10.0.0.101            HTTP     145    GET / HTTP/1.1 

Frame 12: 145 bytes on wire (1160 bits), 145 bytes captured (1160 bits) on interface 0
Ethernet II, Src: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f), Dst: 56:c8:52:17:31:67 (56:c8:52:17:31:67)
Internet Protocol Version 4, Src: 10.0.0.11 (10.0.0.11), Dst: 10.0.0.101 (10.0.0.101)
Transmission Control Protocol, Src Port: 51659 (51659), Dst Port: 3000 (3000), Seq: 1, Ack: 1, Len: 79
Hypertext Transfer Protocol

No.     Time               Source                Destination           Protocol Length Info
     13 11:41:45.473460000 10.0.0.101            10.0.0.11             TCP      66     3000 > 51659 [ACK] Seq=1 Ack=80 Win=29056 Len=0 TSval=10192001 TSecr=1953046

Frame 13: 66 bytes on wire (528 bits), 66 bytes captured (528 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 1, Ack: 80, Len: 0

No.     Time               Source                Destination           Protocol Length Info
     15 11:41:45.491555000 10.0.0.101            10.0.0.11             TCP      717    [TCP segment of a reassembled PDU]

Frame 15: 717 bytes on wire (5736 bits), 717 bytes captured (5736 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 1, Ack: 80, Len: 651

No.     Time               Source                Destination           Protocol Length Info
     16 11:41:45.491599000 10.0.0.101            10.0.0.11             TCP      1514   [TCP segment of a reassembled PDU]

Frame 16: 1514 bytes on wire (12112 bits), 1514 bytes captured (12112 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 652, Ack: 80, Len: 1448

No.     Time               Source                Destination           Protocol Length Info
     21 11:41:45.496998000 10.0.0.11             10.0.0.101            TCP      66     51659 > 3000 [ACK] Seq=80 Ack=652 Win=17536 Len=0 TSval=1953058 TSecr=10192019

Frame 21: 66 bytes on wire (528 bits), 66 bytes captured (528 bits) on interface 0
Ethernet II, Src: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f), Dst: 56:c8:52:17:31:67 (56:c8:52:17:31:67)
Internet Protocol Version 4, Src: 10.0.0.11 (10.0.0.11), Dst: 10.0.0.101 (10.0.0.101)
Transmission Control Protocol, Src Port: 51659 (51659), Dst Port: 3000 (3000), Seq: 80, Ack: 652, Len: 0

No.     Time               Source                Destination           Protocol Length Info
     22 11:41:45.497026000 10.0.0.101            10.0.0.11             HTTP     231    HTTP/1.1 200 OK   (text/html)

Frame 22: 231 bytes on wire (1848 bits), 231 bytes captured (1848 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 2100, Ack: 80, Len: 165
[3 Reassembled TCP Segments (2264 bytes): #15(651), #16(1448), #22(165)]
Hypertext Transfer Protocol
Line-based text data: text/html

No.     Time               Source                Destination           Protocol Length Info
     25 11:41:45.502736000 10.0.0.11             10.0.0.101            TCP      78     [TCP Dup ACK 21#1] 51659 > 3000 [ACK] Seq=80 Ack=652 Win=17536 Len=0 TSval=1953061 TSecr=10192019 SLE=2100 SRE=2265

Frame 25: 78 bytes on wire (624 bits), 78 bytes captured (624 bits) on interface 0
Ethernet II, Src: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f), Dst: 56:c8:52:17:31:67 (56:c8:52:17:31:67)
Internet Protocol Version 4, Src: 10.0.0.11 (10.0.0.11), Dst: 10.0.0.101 (10.0.0.101)
Transmission Control Protocol, Src Port: 51659 (51659), Dst Port: 3000 (3000), Seq: 80, Ack: 652, Len: 0

No.     Time               Source                Destination           Protocol Length Info
     26 11:41:45.504245000 10.0.0.101            10.0.0.11             TCP      1514   [TCP Retransmission] 3000 > 51659 [ACK] Seq=652 Ack=80 Win=29056 Len=1448 TSval=10192032 TSecr=1953061[Reassembly error, protocol TCP: New fragment overlaps old data (retransmission?)]

Frame 26: 1514 bytes on wire (12112 bits), 1514 bytes captured (12112 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 652, Ack: 80, Len: 1448
[Reassembly error, protocol TCP: New fragment overlaps old data (retransmission?)]

No.     Time               Source                Destination           Protocol Length Info
     33 11:41:45.711324000 10.0.0.101            10.0.0.11             TCP      1514   [TCP Retransmission] 3000 > 51659 [ACK] Seq=652 Ack=80 Win=29056 Len=1448 TSval=10192239 TSecr=1953061[Reassembly error, protocol TCP: New fragment overlaps old data (retransmission?)]

Frame 33: 1514 bytes on wire (12112 bits), 1514 bytes captured (12112 bits) on interface 0
Ethernet II, Src: 56:c8:52:17:31:67 (56:c8:52:17:31:67), Dst: 3a:e2:d5:f3:8e:6f (3a:e2:d5:f3:8e:6f)
Internet Protocol Version 4, Src: 10.0.0.101 (10.0.0.101), Dst: 10.0.0.11 (10.0.0.11)
Transmission Control Protocol, Src Port: 3000 (3000), Dst Port: 51659 (51659), Seq: 652, Ack: 80, Len: 1448
[Reassembly error, protocol TCP: New fragment overlaps old data (retransmission?)]

Ruby code:

def run

    stop = false

    Signal.trap("SIGINT")  {
        stop=true
    }

    tap.up

    #binding.pry
    pids = []
    begin

        # tap => zmq_pub 
        pids << Thread.new do
            while !stop do
                read_and_pub
            end
        end

        # zmq_sub => tap
        pids << Thread.new do
            while !stop do
                sub_and_write
            end
        end

    rescue
        stop=true
    end

    pids.each { |pid| pid.join }

    tap.down
    tap.close
    self.tap = nil

    zmq_pub.close
    zmq_sub.close
    zmq_ctx.terminate

end # run

def read_and_pub

    selected = IO.select([tap.to_io],nil,nil,1)
    if !selected.nil? and !selected[0].nil? and selected[0].length>0 then
        msg = tap.to_io.sysread(tap.mtu)
        if !msg.nil? && msg.length > 0 then
            sent = zmq_pub.send_string(msg) 
            if sent != msg.length
                puts "**** published #{sent}/#{msg.length} ****"
            end
            print_packet msg, tap
        else
            puts "IO.select returned #{selected} but msg is #{msg}"
        end
    end
end

def sub_and_write
    msg = ''
    zmq_sub.recv_string msg
    if !msg.nil? && msg.length > 0 then
        sent = tap.to_io.syswrite(msg)
        if sent != msg.length
            puts "**** wrote #{sent}/#{msg.length} ****"
        end
        print_packet msg, zmq_sub
    end
end

Answer:

The problem is here: tap.to_io.sysread(tap.mtu). It seems that tap.mtu is not necessarily the maximum number of bytes that sysread will receive, and unread bytes seem to be disarded from a tap interface. I changed the code to tap.to_io.sysread(10000) and http and ssh work as expected. The largest frame I received was 1514.

Question:

I have gem zmq installed properly on mac osx maverick. After upgrading to mac osx yosemite, It failed to install the gem with following error log:

Gem::Installer::ExtensionBuildError: ERROR: Failed to build gem native extension.

/Users/apple/.rbenv/versions/1.9.3-p484/bin/ruby extconf.rb
checking for zmq.h... yes
checking for zmq_init() in -lzmq... yes
Cool, I found your zmq install...
creating Makefile

make
compiling rbzmq.c
rbzmq.c:968:7: error: use of undeclared identifier 'ZMQ_RECOVERY_IVL_MSEC'
        case ZMQ_RECOVERY_IVL_MSEC:
             ^
rbzmq.c:990:10: error: use of undeclared identifier 'ZMQ_HWM'
    case ZMQ_HWM:
         ^
rbzmq.c:991:10: error: use of undeclared identifier 'ZMQ_SWAP'
    case ZMQ_SWAP:
         ^
rbzmq.c:995:10: error: use of undeclared identifier 'ZMQ_MCAST_LOOP'
    case ZMQ_MCAST_LOOP:
         ^
rbzmq.c:1292:10: error: use of undeclared identifier 'ZMQ_HWM'
    case ZMQ_HWM:
         ^
rbzmq.c:1293:10: error: use of undeclared identifier 'ZMQ_SWAP'
    case ZMQ_SWAP:
         ^
rbzmq.c:1297:10: error: use of undeclared identifier 'ZMQ_MCAST_LOOP'
    case ZMQ_MCAST_LOOP:
         ^
rbzmq.c:1315:10: error: use of undeclared identifier 'ZMQ_RECOVERY_IVL_MSEC'
    case ZMQ_RECOVERY_IVL_MSEC:
         ^
rbzmq.c:1443:81: error: too few arguments to function call, expected 4, have 3
    send_args->rc = zmq_send(send_args->socket, send_args->msg, send_args->flags);
                    ~~~~~~~~                                                    ^
/usr/local/include/zmq.h:354:1: note: 'zmq_send' declared here
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
^
/usr/local/include/zmq.h:67:27: note: expanded from macro 'ZMQ_EXPORT'
#       define ZMQ_EXPORT __attribute__ ((visibility("default")))
                          ^
rbzmq.c:1517:38: error: too few arguments to function call, expected 4, have 3
        rc = zmq_send (s, &msg, flags);
             ~~~~~~~~                ^
/usr/local/include/zmq.h:354:1: note: 'zmq_send' declared here
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
^
/usr/local/include/zmq.h:67:27: note: expanded from macro 'ZMQ_EXPORT'
#       define ZMQ_EXPORT __attribute__ ((visibility("default")))
                          ^
rbzmq.c:1541:81: error: too few arguments to function call, expected 4, have 3
    recv_args->rc = zmq_recv(recv_args->socket, recv_args->msg, recv_args->flags);
                    ~~~~~~~~                                                    ^
/usr/local/include/zmq.h:356:1: note: 'zmq_recv' declared here
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
^
/usr/local/include/zmq.h:67:27: note: expanded from macro 'ZMQ_EXPORT'
#       define ZMQ_EXPORT __attribute__ ((visibility("default")))
                          ^
rbzmq.c:1602:38: error: too few arguments to function call, expected 4, have 3
        rc = zmq_recv (s, &msg, flags);
             ~~~~~~~~                ^
/usr/local/include/zmq.h:356:1: note: 'zmq_recv' declared here
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
^
/usr/local/include/zmq.h:67:27: note: expanded from macro 'ZMQ_EXPORT'
#       define ZMQ_EXPORT __attribute__ ((visibility("default")))
                          ^
rbzmq.c:1675:50: error: use of undeclared identifier 'ZMQ_HWM'
    rb_define_const (zmq_module, "HWM", INT2NUM (ZMQ_HWM));
                                                 ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:973:35: note: expanded from macro 'INT2NUM'
# define INT2NUM(v) INT2FIX((int)(v))
                                  ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:225:45: note: expanded from macro 'INT2FIX'
#define INT2FIX(i) ((VALUE)(((SIGNED_VALUE)(i))<<1 | FIXNUM_FLAG))
                                            ^
rbzmq.c:1676:51: error: use of undeclared identifier 'ZMQ_SWAP'
    rb_define_const (zmq_module, "SWAP", INT2NUM (ZMQ_SWAP));
                                                  ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:973:35: note: expanded from macro 'INT2NUM'
# define INT2NUM(v) INT2FIX((int)(v))
                                  ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:225:45: note: expanded from macro 'INT2FIX'
#define INT2FIX(i) ((VALUE)(((SIGNED_VALUE)(i))<<1 | FIXNUM_FLAG))
                                            ^
rbzmq.c:1683:57: error: use of undeclared identifier 'ZMQ_MCAST_LOOP'
    rb_define_const (zmq_module, "MCAST_LOOP", INT2NUM (ZMQ_MCAST_LOOP));
                                                        ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:973:35: note: expanded from macro 'INT2NUM'
# define INT2NUM(v) INT2FIX((int)(v))
                                  ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:225:45: note: expanded from macro 'INT2FIX'
#define INT2FIX(i) ((VALUE)(((SIGNED_VALUE)(i))<<1 | FIXNUM_FLAG))
                                            ^
rbzmq.c:1698:64: error: use of undeclared identifier 'ZMQ_RECOVERY_IVL_MSEC'
    rb_define_const (zmq_module, "RECOVERY_IVL_MSEC", INT2NUM (ZMQ_RECOVERY_IVL_MSEC));
                                                               ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:973:35: note: expanded from macro 'INT2NUM'
# define INT2NUM(v) INT2FIX((int)(v))
                                  ^
/Users/apple/.rbenv/versions/1.9.3-p484/include/ruby-1.9.1/ruby/ruby.h:225:45: note: expanded from macro 'INT2FIX'
#define INT2FIX(i) ((VALUE)(((SIGNED_VALUE)(i))<<1 | FIXNUM_FLAG))
                                            ^
16 errors generated.
make: *** [rbzmq.o] Error 1


Gem files will remain installed in /Users/apple/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/zmq-2.1.4 for inspection.
Results logged to /Users/apple/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/zmq-2.1.4/./gem_make.out.

I did a brew install zmq successfully and i learned from the error log that bundler could find zmp as well.

Is there any dependencies I missed here ? Please help.


Answer:

The idea of the answer came from this blog http://blog.cuberoot.in/installing-zmq-gem-on-mountain-lion-mac-os-x/. I just resolved it in different way.

This is what is being mentioned in the blog

it turned out that the zmq gem is not updated for the latest stable version 3.2.2 of ZeroMQ, it works (basically installs) with version 2.2.0.

so the zmq gem works well with version 2.2.0

I remove all my zeromq from brew like this

brew uninstall zeromq

Then I search for other versions of zeromq

brew search zeromq

I got this result from brew search

homebrew/versions/zeromq22  homebrew/versions/zeromq32                       
zeromq homebrew/versions/zeromq3  homebrew/versions/zeromq405

Luckily brew still supports zeromq22. let's just install it

brew install homebrew/versions/zeromq22

Then I try to install the gem again

gem install zmq -v 2.1.4

but I still got the following errors:

gem install zmq -v 2.1.4
Building native extensions.  This could take a while...
ERROR:  Error installing zmq:
ERROR: Failed to build gem native extension.

    /Users/ilab/.rbenv/versions/1.9.3-p484/bin/ruby extconf.rb
checking for zmq.h... no
checking for zmq.h in /opt/local/include,/usr/local/include,/usr/include... no
extconf.rb:36:in `<main>': Couldn't find zmq library. try setting --with-zmq-dir=<path> to tell me where it is. (RuntimeError)      

It seems like the zmq gem could not find the zeromq. then I try to do again with the lastest zeromq.

brew install zeromq

then the zmq gem knows the zeromq installed. so I think there should be something wrong with brew install and link. I forced the brew to link back to zeromq22 as below:

brew link --overwrite zeromq22 --force 

and try to install the gem again. It works!