Deprecated: Function get_magic_quotes_gpc() is deprecated in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 99

Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 619

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1169

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176

Warning: Cannot modify header information - headers already sent by (output started at /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php:99) in /hermes/walnacweb04/walnacweb04ab/b2791/pow.jasaeld/htdocs/De1337/nothing/index.php on line 1176
8000 GitHub - g41797/nats: Zig client for NATS Core and JetStream
Nothing Special   »   [go: up one dir, main page]

Skip to content

g41797/nats

Repository files navigation

License: MIT CI CI

Straight to the point

Let's try to program the prototype of the system shown in the picture above.

Key points:

  • Many related subjects are stored in a Stream
  • Consumers can have different modes of operation and receive just subsets of the messages
  • Multiple Acknowledgement modes are supported

Flow:

  • A new order
    • arrives on ORDERS.received,
    • gets sent to the NEW Consumer who, on success,
    • will create a new message on ORDERS.processed
  • The ORDERS.processed message
    • again enters the Stream
    • where a DISPATCH Consumer receives it
    • and once processed it will create an ORDERS.completed message
    • which will again enter the Stream
  • All messages are delivered to a MONITOR Consumer
    • who pushes them to monitor

Acknowledgements:

  • NEW and DISPATCH Consumers acknowledged all received messages to ensure no order is missed.
  • MONITOR Consumer receives all messages without acknowledge.

You can find more details on NATS Example page

Stream creation

const DefaultConnectOpts: protocol.ConnectOpts = .{};
const STREAM: []const u8 = "ORDERS";

var js: JetStream = try JetStream.CONNECT(std.testing.allocator, DefaultConnectOpts);
defer js.DISCONNECT();

var CONF: protocol.StreamConfig = .{ .name = STREAM, .subjects = &.{ "ORDERS.*" } };

try js.CREATE(&CONF);

Submit(publish) orders to the stream

var submitter: JetStream = try JetStream.CONNECT(std.testing.allocator, DefaultConnectOpts);
defer submitter.DISCONNECT();

try submitter.PUBLISH("ORDERS.received", null, "order-1 ...");

Consume received orders

const DeleteConsumer: bool = true;

const conf: ConsumerConfig = .{
    .durable_name = "NEW",
    .filter_subject = "ORDERS.received",
};

var NEW: Consumer = try Consumer.START(std.testing.allocator, DefaultConnectOpts, STREAM, &conf);
defer NEW.STOP(DeleteConsumer);

var order = try NEW.CONSUME(protocol.SECNS * 2);
.... process order ....
try NEW.ACK(order.?, true);
try NEW.PUBLISH("ORDERS.processed", null, "...");

Consume processed orders

const DeleteConsumer: bool = true;

const conf = .{
    .durable_name = "DISPATCH",
    .filter_subject = "ORDERS.processed",
};

var DISPATCH: Consumer = try Consumer.START(std.testing.allocator, DefaultConnectOpts, STREAM, &conf);
defer DISPATCH.STOP(DeleteConsumer);

order = try DISPATCH.CONSUME(protocol.SECNS * 2);
.... process order ....
try DISPATCH.ACK(order.?, false);
try DISPATCH.PUBLISH("ORDERS.completed", null, "...");

Track all orders(messages)

var subscriber: Subscriber = try Subscriber.SUBSCRIBE(std.testing.allocator, DefaultConnectOpts, STREAM, "ORDERS.*");
defer subscriber.UNSUBSCRIBE();

while (true) {
    const pmsg = try subscriber.NEXT(protocol.SECNS * 5);
    ... pushes info to monitor ...
    subscriber.REUSE(pmsg);
}

Installation

Add nats to build.zig.zon:

zig fetch --save=nats git+https://github.com/g41797/nats

Add nats to build.zig:

const nats = b.dependency("nats", .{
    .target = target,
    .optimize = optimize,
});

const lib = b.addStaticLibrary(..);
lib.root_module.addImport("nats", nats.module("nats"));

const lib_unit_tests = b.addTest(...);
lib_unit_tests.root_module.addImport("nats", nats.module("nats"));

Import nats:

const nats = @import("nats");

Credits

The project is largely inspired by the very existence of Nats client for php.

About

Zig client for NATS Core and JetStream

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

0