Skip to content

Commit 900a548

Browse files
somdoronbluca
authored andcommitted
problem: no API to disconnect a specific peer by routing id
Background: JeroMQ added disconnectPeer(routingId) to allow dropping a single peer connection. In libzmq there was no C API to disconnect a single connection by its routing id; applications had to tear down entire endpoints or sockets. This also created divergence between JeroMQ and libzmq for the PEER/SERVER patterns. solution: introduce a virtual xdisconnect_peer on socket_base_t and a public socket_base_t::disconnect_peer that delegates to it. Implement xdisconnect_peer in server_t to look up the outbound pipe by routing id and terminate it, which removes it from bookkeeping via xpipe_terminated. Expose a new DRAFT C API zmq_disconnect_peer(void*, uint32_t) that always calls socket_base_t::disconnect_peer (matching JeroMQ pattern of calling the socket regardless of type); unsupported types return ENOTSUP. This brings libzmq to feature parity with JeroMQ’s commit 57de9b8 (Peer support disconnect). API: add draft function zmq_disconnect_peer(void *socket, uint32_t routing_id). For ZMQ_SERVER and ZMQ_PEER this disconnects that specific connection; subsequent sends to that routing id fail with EHOSTUNREACH until a new connection is formed. Other socket types return ENOTSUP. The method is thread-safe when using thread-safe sockets. Tests: add tests/test_peer_disconnect.cpp which creates two ZMQ_PEER sockets, exchanges a message to learn the remote routing id, calls zmq_disconnect_peer on the bound side, and verifies sending back fails with EHOSTUNREACH. Hook the test into Makefile.am under ENABLE_DRAFTS. Docs: add doc/zmq_disconnect_peer.adoc manpage; link it from doc/zmq_socket.adoc; include it in doc/Makefile.am MAN3 so it is built and installed. Note the draft status (requires --enable-drafts). Implementation notes: the server_t override simply terminates the matched pipe; removal from maps happens in xpipe_terminated. The C shim intentionally does not gate by socket type and relies on the underlying implementation (returning ENOTSUP where appropriate), mirroring JeroMQ’s design. Reference JeroMQ change: 57de9b84 (kaplan-shaked: Peer support disconnect).
1 parent 7a7bfa1 commit 900a548

File tree

12 files changed

+193
-1
lines changed

12 files changed

+193
-1
lines changed

Makefile.am

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,7 @@ test_apps += tests/test_poller \
10591059
tests/test_xpub_manual_last_value \
10601060
tests/test_router_notify \
10611061
tests/test_peer \
1062+
tests/test_peer_disconnect \
10621063
tests/test_reconnect_options \
10631064
tests/test_msg_init \
10641065
tests/test_hello_msg \
@@ -1113,6 +1114,10 @@ tests_test_peer_SOURCES = tests/test_peer.cpp
11131114
tests_test_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
11141115
tests_test_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
11151116

1117+
tests_test_peer_disconnect_SOURCES = tests/test_peer_disconnect.cpp
1118+
tests_test_peer_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
1119+
tests_test_peer_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
1120+
11161121
tests_test_reconnect_options_SOURCES = tests/test_reconnect_options.cpp
11171122
tests_test_reconnect_options_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
11181123
tests_test_reconnect_options_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

doc/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# documentation
33
#
44
MAN3 = \
5-
zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3 zmq_close.3 \
5+
zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect_peer.3 zmq_disconnect.3 zmq_close.3 \
66
zmq_ctx_new.3 zmq_ctx_term.3 zmq_ctx_get.3 zmq_ctx_set.3 zmq_ctx_shutdown.3 \
77
zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_init_buffer.3 \
88
zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \

doc/zmq_disconnect_peer.adoc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
= zmq_disconnect_peer(3)
2+
3+
4+
== NAME
5+
zmq_disconnect_peer - disconnect a specific peer connection by routing id.
6+
7+
8+
== SYNOPSIS
9+
*int zmq_disconnect_peer (void '*socket', uint32_t 'routing_id');*
10+
11+
12+
== DESCRIPTION
13+
The _zmq_disconnect_peer()_ function disconnects a specific connection from a
14+
socket given the peer 'routing_id'. After a successful disconnection, attempts
15+
to send messages addressed with that 'routing_id' will fail with 'EHOSTUNREACH'
16+
until a new connection with a different 'routing_id' is established.
17+
18+
This function is supported on socket types that manage per-peer routing ids:
19+
'ZMQ_SERVER' and 'ZMQ_PEER'. Calling it on other socket types will fail with
20+
'ENOTSUP'.
21+
22+
23+
== RETURN VALUE
24+
The _zmq_disconnect_peer()_ function returns `0` if successful. Otherwise it
25+
returns `-1` and sets 'errno' to one of the values defined below.
26+
27+
28+
== ERRORS
29+
*EHOSTUNREACH*::
30+
No connection exists for the given 'routing_id'.
31+
*ENOTSUP*::
32+
The socket type does not support disconnecting by 'routing_id'.
33+
*ENOTSOCK*::
34+
The provided 'socket' was invalid.
35+
*ETERM*::
36+
The 0MQ 'context' associated with the specified 'socket' was terminated.
37+
38+
39+
== EXAMPLE
40+
.Disconnect a peer by routing id
41+
----
42+
void *server = zmq_socket (context, ZMQ_SERVER);
43+
assert (server);
44+
assert (zmq_bind (server, "tcp://*:5555") == 0);
45+
/* ... receive a message and read its routing id ... */
46+
uint32_t routing_id = /* obtained from zmq_msg_routing_id(...) */;
47+
/* Disconnect that specific peer */
48+
int rc = zmq_disconnect_peer (server, routing_id);
49+
assert (rc == 0);
50+
/* Any attempt to send to that routing id now will fail with EHOSTUNREACH */
51+
----
52+
53+
54+
== SEE ALSO
55+
* xref:zmq_connect_peer.adoc[zmq_connect_peer]
56+
* xref:zmq_disconnect.adoc[zmq_disconnect]
57+
* xref:zmq_msg_routing_id.adoc[zmq_msg_routing_id]
58+
* xref:zmq_msg_set_routing_id.adoc[zmq_msg_set_routing_id]
59+
* xref:zmq_socket.adoc[zmq_socket]
60+
* xref:zmq.adoc[zmq]
61+
62+
63+
== AUTHORS
64+
This page was written by the 0MQ community. To make a change please
65+
read the 0MQ Contribution Policy at <https://zeromq.org/how-to-contribute/>.
66+
67+

doc/zmq_socket.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ ZMQ_PEER
447447
A 'ZMQ_PEER' socket talks to a set of 'ZMQ_PEER' sockets.
448448

449449
To connect and fetch the 'routing_id' of the peer use xref:zmq_connect_peer.adoc[zmq_connect_peer]
450+
and to disconnect a specific peer by its 'routing_id' use xref:zmq_disconnect_peer.adoc[zmq_disconnect_peer].
450451

451452
Each received message has a 'routing_id' that is a 32-bit unsigned integer.
452453
The application can fetch this with xref:zmq_msg_routing_id.adoc[zmq_msg_routing_id]

include/zmq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_,
692692
ZMQ_EXPORT int zmq_join (void *s, const char *group);
693693
ZMQ_EXPORT int zmq_leave (void *s, const char *group);
694694
ZMQ_EXPORT uint32_t zmq_connect_peer (void *s_, const char *addr_);
695+
ZMQ_EXPORT int zmq_disconnect_peer (void *s_, uint32_t routing_id_);
695696

696697
/* DRAFT Msg methods. */
697698
ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);

src/server.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,16 @@ bool zmq::server_t::xhas_out ()
155155
// to be routed to.
156156
return true;
157157
}
158+
159+
int zmq::server_t::xdisconnect_peer (uint32_t routing_id_)
160+
{
161+
out_pipes_t::iterator it = _out_pipes.find (routing_id_);
162+
if (it == _out_pipes.end ()) {
163+
errno = EHOSTUNREACH;
164+
return -1;
165+
}
166+
167+
// Terminate the pipe; xpipe_terminated will erase it from maps.
168+
it->second.pipe->terminate (false);
169+
return 0;
170+
}

src/server.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class server_t : public socket_base_t
3535
void xread_activated (zmq::pipe_t *pipe_);
3636
void xwrite_activated (zmq::pipe_t *pipe_);
3737
void xpipe_terminated (zmq::pipe_t *pipe_);
38+
int xdisconnect_peer (uint32_t routing_id_);
3839

3940
private:
4041
// Fair queueing object for inbound pipes.

src/socket_base.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,13 @@ int zmq::socket_base_t::leave (const char *group_)
493493
return xleave (group_);
494494
}
495495

496+
int zmq::socket_base_t::disconnect_peer (uint32_t routing_id_)
497+
{
498+
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
499+
500+
return xdisconnect_peer (routing_id_);
501+
}
502+
496503
void zmq::socket_base_t::add_signaler (signaler_t *s_)
497504
{
498505
zmq_assert (_thread_safe);
@@ -1637,6 +1644,12 @@ int zmq::socket_base_t::xleave (const char *group_)
16371644
return -1;
16381645
}
16391646

1647+
int zmq::socket_base_t::xdisconnect_peer (uint32_t)
1648+
{
1649+
errno = ENOTSUP;
1650+
return -1;
1651+
}
1652+
16401653
int zmq::socket_base_t::xrecv (msg_t *)
16411654
{
16421655
errno = ENOTSUP;

src/socket_base.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ class socket_base_t : public own_t,
140140

141141
bool is_disconnected () const;
142142

143+
// Disconnect a specific peer given its routing id. Default ENOTSUP.
144+
int disconnect_peer (uint32_t routing_id_);
145+
143146
protected:
144147
socket_base_t (zmq::ctx_t *parent_,
145148
uint32_t tid_,
@@ -182,6 +185,9 @@ class socket_base_t : public own_t,
182185
virtual int xjoin (const char *group_);
183186
virtual int xleave (const char *group_);
184187

188+
// Default implementation returns ENOTSUP. Specific sockets may override.
189+
virtual int xdisconnect_peer (uint32_t routing_id_);
190+
185191
// Delay actual destruction of the socket.
186192
void process_destroy () ZMQ_FINAL;
187193

src/zmq.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,14 @@ uint32_t zmq_connect_peer (void *s_, const char *addr_)
333333
return s->connect_peer (addr_);
334334
}
335335

336+
int zmq_disconnect_peer (void *s_, uint32_t routing_id_)
337+
{
338+
zmq::socket_base_t *s = as_socket_base_t (s_);
339+
if (!s)
340+
return -1;
341+
return s->disconnect_peer (routing_id_);
342+
}
343+
336344

337345
int zmq_unbind (void *s_, const char *addr_)
338346
{

0 commit comments

Comments
 (0)