编程 NATS 深度实战:当微服务学会了「闪电通信」——从 Pub/Sub 到 JetStream 持久化的生产级完全指南(2026)

2026-06-14 21:20:26 +0800 CST views 5

NATS —— Pub/Sub JetStream 2026

****Kafka RabbitMQ Pulsar —— NATS NATS Pub/Sub JetStream Go/Python/Java


  1. NATS
  2. NATS Core NATS JetStream
  3. 10 NATS
  4. Core NATS
  5. JetStream
  6. Go
  7. Python
  8. Java Spring Boot
  9. TLSTokenJWT
  10. NATS
  11. NATS vs Kafka vs RabbitMQ vs Pulsar
  12. NATS

1. NATS

1.1

        
       /      \
      /        \
     ---- 
  • Kafka
  • RabbitMQErlang
  • Pulsar
  • Redis Streams

**NATS **

1.2 NATS

NATS
****IOCore NATS
** 10M+ msg/s**
****Docker 20MB
** + **
JetStreamAt-Least-Once / Exactly-Once
**Account **
TLS/mTLS/JWT
**60+ **Go/Java/Python/Rust/C#/JS

1.3 NATS

GitHub       - 
Netflix      - 
Walmart      - 
Siemens      - IoT 
Alibaba      - 

1.4 NATS

NATS

Core NATS NATS

  • Pub/SubRequest/Reply

JetStream

  • At-Least-OnceExactly-Once

2. NATS Core NATS JetStream

2.1 Core NATS


                   NATS Server                      
    
             Subject                        
    orders.*        → [Client1, Client2]        
    users.>         → [Client3]                 
    system.health   → [Monitor1, Monitor2]      
    
                                                     
  Client A  publish("orders.new") →  
  Client B ← "orders.new" (Sub)  
  Client C ← "orders.new" (Queue Sub) 


  • SubjectNATS Kafka Topic

    • orders.new
    • orders.*``orders.new, orders.paid
    • users.>``users.profile.update
  • Pub/Sub/

  • Queue Groups

  • Request/Reply/ RPC

2.2 JetStream


                    JetStream                         
                                                          
           
    Stream           Consumer         Storage    
    ()       →  ()         ()     
                                                 
   orders           push-sub         File/Memory 
   users            pull-sub                     
           
                                                          
  Stream                                            
  - Max Msgs: 1000000                                   
  - Max Bytes: 1GB                                      
  - Max Age: 30d                                        
  - Replicas: 3 ()                               

**JetStream **

  • Stream

    • Subject Stream
  • Consumer Stream

    • Push Consumer
    • Pull Consumer

    • ****RedeliveryAckMax Deliveries
  • **Exactly-Once **

    Publish with MsgId → NATS  →  → Consumer Ack → /
    

2.3 Subject

# bad
order_created
user_updated

# good
{domain}.{entity}.{action}
-------------------------------
shop.orders.created
shop.orders.paid
shop.orders.shipped
shop.users.registered
shop.users.updated
shop.inventory.adjusted

# 
system.services.started
system.services.stopped
system.health.check

3. 10 NATS

3.1 Docker

#  NATS Server JetStream
docker run -d \
  --name nats \
  -p 4222:4222 \
  -p 8222:8222 \
  -p 4223:4223 \
  nats:latest \
  -js -m 8222

# 
# -js         :  JetStream
# -m 8222     : HTTP
# -p 4223     :  MQTT 

# 
docker logs nats
# 
# [INF] Starting nats-server
# [INF] JetStream is enabled
# [INF] Listening for client connections on 0.0.0.0:4222

3.2

# Linux/macOS 
curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.20/nats-server-v2.10.20-linux-amd64.tar.gz | tar xz
cd nats-server-v2.10.20-linux-amd64
sudo cp nats-server /usr/local/bin/

# 
nats-server --version
# nats-server: v2.10.20

3.3

/etc/nats/nats.conf

# NATS Server 

# 
port: 4222

#  JetStream
jetstream {
    store_dir: "/var/lib/nats/jetstream"
    max_memory_store: 1GB
    max_file_store: 10GB
}

# 
http_port: 8222

# 
cluster {
    name: "prod-cluster"
    listen: "0.0.0.0:6222"
    routes: [
        "nats://192.168.1.10:6222"
        "nats://192.168.1.11:6222"
        "nats://192.168.1.12:6222"
    ]
}

# 
authorization {
    # Token 
    token: "s3cr3t-t0k3n"
    
    # 
    users: [
        {user: "app", password: "app-pass", permissions: {
            publish: ["app.>"],
            subscribe: ["app.>", "system.>"]
        }},
        {user: "monitor", password: "mon-pass", permissions: {
            publish: [],
            subscribe: ["*"]
        }}
    ]
}

# TLS 
tls {
    cert_file: "/etc/nats/tls/server.crt"
    key_file: "/etc/nats/tls/server.key"
    ca_file: "/etc/nats/tls/ca.crt"
    verify: true
}

# 
log_file: "/var/log/nats/nats-server.log"
logtime: true
debug: false
trace: false
nats-server -c /etc/nats/nats.conf

#  systemd9

3.4 nats-cli

#  CLI
curl -L https://github.com/nats-io/natscli/releases/download/v0.1.0/nats-v0.1.0-linux-amd64.tar.gz | tar xz
sudo cp nats /usr/local/bin/

# 
nats server info

#  JetStream 
nats jetstream status

#  Stream
nats jetstream stream list

#  Stream 
nats jetstream stream info ORDERS

4. Core NATS

4.1 Pub/Sub /

**Go **

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    //  NATS
    nc, err := nats.Connect(
        "nats://localhost:4222",
        nats.Name("Publisher-1"),
        nats.Timeout(10*time.Second),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    ctx := context.Background()

    // ==========  ==========
    // 
    err = nc.Publish("orders.new", []byte("Order #12345 created"))
    if err != nil {
        log.Fatal(err)
    }

    //  Header  JetStream
    js, _ := nc.JetStream()
    _, err = js.Publish("orders.new", []byte(`{
        "order_id": "12345",
        "user_id": "user-001",
        "amount": 99.99,
        "items": [{"sku": "ABC", "qty": 2}]
    }`), nats.MsgId("order-12345")) // 
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(" Message published")

    // ==========  ==========
    // 
    sub, err := nc.Subscribe("orders.*", func(msg *nats.Msg) {
        fmt.Printf(" Received: subject=%s, data=%s\n",
            msg.Subject, string(msg.Data))
    })
    if err != nil {
        log.Fatal(err)
    }
    defer sub.Unsubscribe()

    // 
    subAll, _ := nc.Subscribe(">", func(msg *nats.Msg) {
        fmt.Printf(" All Subjects: %s\n", msg.Subject)
    })
    defer subAll.Unsubscribe()

    // 
    select {} //  graceful shutdown
}

4.2 Queue Groups /

// ==========  ==========
// 

func startWorker(workerID string) {
    nc, _ := nats.Connect("nats://localhost:4222")
    defer nc.Close()

    // QueueSubscribe
    _, err := nc.QueueSubscribe("orders.process", "order-workers", func(msg *nats.Msg) {
        fmt.Printf(" Worker %s processing: %s\n", workerID, string(msg.Data))
        time.Sleep(100 * time.Millisecond) // 
        fmt.Printf(" Worker %s done\n", workerID)
    })
    if err != nil {
        log.Fatal(err)
    }

    // 
    <-make(chan struct{})
}

//  3  Worker goroutine 
go startWorker("Worker-1")
go startWorker("Worker-2")
go startWorker("Worker-3")

//  10 
for i := 0; i < 10; i++ {
    nc.Publish("orders.process", []byte(fmt.Sprintf("Task-%d", i)))
}
// 3  Worker  10 

**Queue Group **

Publisher → NATS Server → Queue Group "order-workers"
                                Worker-1 ( Task-1, Task-4, Task-7)
                                Worker-2 ( Task-2, Task-5, Task-8)
                                Worker-3 ( Task-3, Task-6, Task-9, Task-10)

4.3 Request/Reply RPC

// ========== Request/Reply ==========

// Responder
func startService() {
    nc, _ := nats.Connect("nats://localhost:4222")
    
    _, err := nc.Subscribe("user.get", func(msg *nats.Msg) {
        userID := string(msg.Data)
        
        // 
        userData := fmt.Sprintf(`{"id":"%s","name":"John Doe","email":"john@example.com"}`, userID)
        
        // msg.Respond
        err := msg.Respond([]byte(userData))
        if err != nil {
            log.Printf("Reply failed: %v", err)
        }
    })
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(" User Service started")
    <-make(chan struct{})
}

// Requester
func callService() {
    nc, _ := nats.Connect("nats://localhost:4222")
    defer nc.Close()

    // 
    msg, err := nc.Request("user.get", []byte("user-001"), 2*time.Second)
    if err != nil {
        log.Fatalf("Request failed: %v", err)
    }

    fmt.Printf(" Response: %s\n", string(msg.Data))
    //  Response: {"id":"user-001","name":"John Doe","email":"john@example.com"}
}

// 
go startService()
time.Sleep(100 * time.Millisecond) // 
callService()

**Request/Reply **

// 
func requestWithRetry(nc *nats.Conn, subject string, data []byte, maxRetries int) (*nats.Msg, error) {
    var lastErr error
    
    for i := 0; i < maxRetries; i++ {
        msg, err := nc.Request(subject, data, 2*time.Second)
        if err == nil {
            return msg, nil
        }
        
        lastErr = err
        fmt.Printf("  Attempt %d failed: %v\n", i+1, err)
        time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) // 
    }
    
    return nil, fmt.Errorf("all %d attempts failed: %w", maxRetries, lastErr)
}

5. JetStream

5.1 Stream

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func setupJetStream() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    // ==========  Stream ==========
    // Stream 
    
    streamName := "ORDERS"
    subjects := []string{"orders.>"}

    //  Stream 
    _, err = js.StreamInfo(streamName)
    if err != nil {
        // Stream 
        _, err = js.AddStream(&nats.StreamConfig{
            Name:     streamName,
            Subjects: subjects,
            
            // 
            Retention: nats.LimitsPolicy, // 3
            
            // 
            MaxMsgs:     1000000,        //  100 
            MaxBytes:    10 * 1024 * 1024 * 1024, //  10GB
            MaxAge:      30 * 24 * time.Hour,       //  30 
            
            // 
            Storage:     nats.FileStorage, // FileStorage/ MemoryStorage
            
            // 
            Replicas:   1, // =1=3
            
            //  MsgId
            Duplicates: 2 * time.Hour, // 2  MsgId 
            
            //  Kafka  Partition
            NumPartitions: 0, // 0 = 
        })
        
        if err != nil {
            log.Fatalf("Failed to add stream: %v", err)
        }
        fmt.Println(" Stream created:", streamName)
    } else {
        fmt.Println("ℹ  Stream already exists:", streamName)
    }

    // ==========  Stream ==========
    //  JetStream Publish
    
    ack, err := js.Publish("orders.new", []byte(`{
        "order_id": "ORD-2024-001",
        "user_id": "user-123",
        "items": [
            {"sku": "IPHONE15", "qty": 1, "price": 999.99}
        ],
        "total": 999.99,
        "created_at": "2024-01-15T10:30:00Z"
    }`), 
        nats.MsgId("ORD-2024-001"), //  MsgId 
    )
    
    if err != nil {
        log.Fatalf("Publish failed: %v", err)
    }
    
    fmt.Printf(" Message published: stream=%s, seq=%d\n", 
        ack.Stream, ack.Seq)
}

5.2 Consumer

func setupConsumer() {
    nc, _ := nats.Connect("nats://localhost:4222")
    defer nc.Close()
    
    js, _ := nc.JetStream()

    // ==========  Pull Consumer==========
    // 
    
    _, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig{
        Durable:       "order-processor", // 
        Description:   "Process orders from ORDERS stream",
        
        // 
        DeliverPolicy: nats.DeliverAllPolicy,    // 
        AckPolicy:     nats.AckExplicitPolicy,   //  Ack
        
        // Ack 
        MaxDeliver:    5,      //  5 
        AckWait:       30 * time.Second, //  Ack 
        
        // 
        MaxAckPending: 100,    //  100  Ack prefetch
        
        // Filter Subject 
        FilterSubject: "orders.new",
    })
    
    if err != nil {
        // 
        fmt.Println("ℹ  Consumer already exists")
    }

    // ==========  ==========
    sub, err := js.PullSubscribe("orders.new", "order-processor")
    if err != nil {
        log.Fatal(err)
    }

    //  10 
    msgs, err := sub.Fetch(10, nats.MaxWait(5*time.Second))
    if err != nil {
        log.Printf("Fetch failed: %v", err)
        return
    }

    fmt.Printf(" Fetched %d messages\n", len(msgs))
    
    for _, msg := range msgs {
        // 
        fmt.Printf("  - Seq=%d, Data=%s\n", msg.Sequence, string(msg.Data))
        
        // 
        err := msg.Ack()
        if err != nil {
            log.Printf("Ack failed: %v", err)
        }
    }
}

// ========== Push Consumer==========
// 
func setupPushConsumer() {
    nc, _ := nats.Connect("nats://localhost:4222")
    defer nc.Close()
    
    js, _ := nc.JetStream()

    //  Push Consumer
    _, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig{
        Durable:       "order-notifier",
        DeliverPolicy: nats.DeliverAllPolicy,
        AckPolicy:     nats.AckExplicitPolicy,
    })
    if err != nil {
        // ignore
    }

    // 
    _, err = js.Subscribe("orders.>", func(msg *nats.Msg) {
        fmt.Printf(" Push received: subject=%s, seq=%d\n", 
            msg.Subject, msg.Sequence)
        
        // 
        // ...
        
        // Ack
        msg.Ack()
    }, 
        nats.Durable("order-notifier"),
        nats.ManualAck(), //  Ack
    )
    
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(" Push consumer started")
    <-make(chan struct{}) // 
}

5.3

// JetStream  Queue Subscribe 

func startJetStreamWorker(workerID string) {
    nc, _ := nats.Connect("nats://localhost:4222")
    defer nc.Close()
    
    js, _ := nc.JetStream()

    //  QueueSubscribe + Durable
    //  Durable  +  = 
    _, err := js.QueueSubscribe("orders.>", "order-workers", func(msg *nats.Msg) {
        fmt.Printf(" Worker %s processing: seq=%d\n", workerID, msg.Sequence)
        
        // 
        time.Sleep(50 * time.Millisecond)
        
        // Ack
        msg.Ack()
        
        fmt.Printf(" Worker %s done\n", workerID)
    }, 
        nats.Durable("order-workers"), // 
        nats.ManualAck(),
    )
    
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf(" Worker %s started\n", workerID)
    <-make(chan struct{})
}

//  3  Worker/
go startJetStreamWorker("Worker-A")
go startJetStreamWorker("Worker-B")
go startJetStreamWorker("Worker-C")

5.4 Exactly-Once

//  MsgId 
//  nats.MsgId()

func publishWithDeduplication(js nats.JetStreamContext, orderID string, orderData []byte) error {
    // MsgId 
    msgID := fmt.Sprintf("order-%s", orderID)
    
    _, err := js.Publish("orders.new", orderData,
        nats.MsgId(msgID),           // 
        nats.ExpectStream("ORDERS"),  //  Stream
    )
    
    // 
    if err == nats.ErrKeyExists {
        fmt.Printf("  Duplicate message detected: %s (ignored)\n", msgID)
        return nil // 
    }
    
    return err
}

// 
for i := 0; i < 5; i++ {
    err := publishWithDeduplication(js, "ORD-001", []byte("order data"))
    if err != nil {
        log.Printf("Publish failed: %v", err)
    }
}
//  Stream 4 

6. Go

6.1

nats-producer-consumer/
 cmd/
    producer/
       main.go
    consumer/
        main.go
 internal/
    nats/
       client.go
       publisher.go
       subscriber.go
    config/
        config.go
 pkg/
    models/
        order.go
 docker-compose.yml
 Dockerfile
 go.mod

6.2 config.go

package config

import (
    "time"
)

type Config struct {
    NATS    NATSConfig    `yaml:"nats"`
    App     AppConfig     `yaml:"app"`
    Logging LoggingConfig `yaml:"logging"`
}

type NATSConfig struct {
    URL            string        `yaml:"url"`
    Token          string        `yaml:"token"`
    TLS            bool          `yaml:"tls"`
    TLSCert        string        `yaml:"tls_cert"`
    TLSKey         string        `yaml:"tls_key"`
    TLSRootCA      string        `yaml:"tls_root_ca"`
    
    // JetStream 
    StreamName     string        `yaml:"stream_name"`
    StreamSubjects []string      `yaml:"stream_subjects"`
    
    // 
    MaxReconnect   int           `yaml:"max_reconnect"`
    ReconnectWait  time.Duration `yaml:"reconnect_wait"`
    Timeout        time.Duration `yaml:"timeout"`
}

type AppConfig struct {
    ServiceName    string        `yaml:"service_name"`
    Environment    string        `yaml:"environment"`
    ShutdownTimeout time.Duration `yaml:"shutdown_timeout"`
}

type LoggingConfig struct {
    Level          string        `yaml:"level"`
    Format         string        `yaml:"format"`
    Output         string        `yaml:"output"`
}

func LoadConfig() *Config {
    return &Config{
        NATS: NATSConfig{
            URL:           "nats://localhost:4222",
            Token:         "s3cr3t-t0k3n",
            StreamName:    "ORDERS",
            StreamSubjects: []string{"orders.>"},
            MaxReconnect:  10,
            ReconnectWait: 2 * time.Second,
            Timeout:       5 * time.Second,
        },
        App: AppConfig{
            ServiceName:     "order-service",
            Environment:     "production",
            ShutdownTimeout: 30 * time.Second,
        },
        Logging: LoggingConfig{
            Level:  "info",
            Format: "json",
            Output: "stdout",
        },
    }
}

6.3 NATS client.go

package nats

import (
    "fmt"
    "sync"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

type Client struct {
    conn *nats.Conn
    js   jetstream.JetStream
    mu   sync.RWMutex
    
    config *Config
}

type Config struct {
    URL          string
    Token        string
    TLSEnabled   bool
    TLSCert      string
    TLSKey       string
    TLSRootCA    string
    MaxReconnect int
    ReconnectWait time.Duration
    Timeout      time.Duration
}

func NewClient(cfg *Config) (*Client, error) {
    opts := []nats.Option{
        nats.Name("OrderService-Client"),
        nats.MaxReconnects(cfg.MaxReconnect),
        nats.ReconnectWait(cfg.ReconnectWait),
        nats.Timeout(cfg.Timeout),
        nats.Token(cfg.Token),
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            fmt.Printf("  NATS disconnected: %v\n", err)
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            fmt.Printf(" NATS reconnected: %s\n", nc.ConnectedUrl())
        }),
        nats.ClosedHandler(func(nc *nats.Conn) {
            fmt.Printf(" NATS connection closed\n")
        }),
    }

    // TLS 
    if cfg.TLSEnabled {
        //  tls.LoadX509KeyPair()
        // opts = append(opts, nats.ClientCert(...))
    }

    conn, err := nats.Connect(cfg.URL, opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to NATS: %w", err)
    }

    //  JetStream 
    js, err := jetstream.New(conn)
    if err != nil {
        conn.Close()
        return nil, fmt.Errorf("failed to get JetStream context: %w", err)
    }

    return &Client{
        conn:   conn,
        js:     js,
        config: cfg,
    }, nil
}

func (c *Client) Connection() *nats.Conn {
    return c.conn
}

func (c *Client) JetStream() jetstream.JetStream {
    return c.js
}

func (c *Client) Close() {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if c.conn != nil && !c.conn.IsClosed() {
        fmt.Println(" Closing NATS connection...")
        c.conn.Close()
    }
}

// 
func (c *Client) HealthCheck() error {
    if c.conn == nil || c.conn.IsClosed() {
        return fmt.Errorf("NATS connection is closed")
    }
    
    if !c.conn.IsConnected() {
        return fmt.Errorf("NATS is in disconnected state")
    }
    
    return nil
}

6.4 publisher.go

package nats

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/nats-io/nats.go/jetstream"
    "github.com/yourorg/nats-demo/pkg/models"
)

type Publisher struct {
    client *Client
    stream string
}

func NewPublisher(client *Client, stream string) *Publisher {
    return &Publisher{
        client: client,
        stream: stream,
    }
}

// PublishOrder 
func (p *Publisher) PublishOrder(ctx context.Context, order *models.Order) error {
    // 
    data, err := json.Marshal(order)
    if err != nil {
        return fmt.Errorf("failed to marshal order: %w", err)
    }

    //  MsgId
    msgID := fmt.Sprintf("order-%s", order.ID)

    //  JetStream
    ack, err := p.client.JetStream().Publish(
        ctx,
        "orders.new",
        data,
        jetstream.WithMsgID(msgID),
        jetstream.WithExpectStream(p.stream),
        jetstream.WithRetryAttempts(3),
    )
    
    if err != nil {
        return fmt.Errorf("failed to publish order: %w", err)
    }

    fmt.Printf(" Order published: stream=%s, seq=%d, order_id=%s\n",
        ack.Stream, ack.Seq, order.ID)
    
    return nil
}

// PublishBatch 
func (p *Publisher) PublishBatch(ctx context.Context, orders []*models.Order) error {
    for _, order := range orders {
        if err := p.PublishOrder(ctx, order); err != nil {
            return err
        }
    }
    return nil
}

// AsyncPublish 
func (p *Publisher) AsyncPublish(order *models.Order) <-chan error {
    errCh := make(chan error, 1)
    
    go func() {
        defer close(errCh)
        
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        
        errCh <- p.PublishOrder(ctx, order)
    }()
    
    return errCh
}

6.5 subscriber.go

package nats

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/nats-io/nats.go/jetstream"
    "github.com/yourorg/nats-demo/pkg/models"
)

type Subscriber struct {
    client      *Client
    stream      string
    consumer    string
    concurrency int
}

func NewSubscriber(client *Client, stream, consumer string, concurrency int) *Subscriber {
    return &Subscriber{
        client:      client,
        stream:      stream,
        consumer:    consumer,
        concurrency: concurrency,
    }
}

// Start Push 
func (s *Subscriber) Start(ctx context.Context) error {
    //  Consumer 
    _, err := s.client.JetStream().AddConsumer(ctx, s.stream, jetstream.ConsumerConfig{
        Durable:       s.consumer,
        DeliverPolicy: jetstream.DeliverAllPolicy,
        AckPolicy:     jetstream.AckExplicitPolicy,
        MaxDeliver:    5,
        AckWait:       30 * time.Second,
        MaxAckPending: s.concurrency * 10, // 
    })
    if err != nil {
        return fmt.Errorf("failed to add consumer: %w", err)
    }

    //  Worker
    for i := 0; i < s.concurrency; i++ {
        workerID := i
        go s.worker(ctx, workerID)
    }

    fmt.Printf(" Subscriber started: stream=%s, consumer=%s, concurrency=%d\n",
        s.stream, s.consumer, s.concurrency)
    
    //  Context 
    <-ctx.Done()
    fmt.Println(" Subscriber stopped")
    
    return nil
}

func (s *Subscriber) worker(ctx context.Context, workerID int) {
    // 
    sub, err := s.client.JetStream().Subscribe(
        ctx,
        "orders.new",
        s.consumer,
        func(msg jetstream.Msg) {
            s.handleMessage(ctx, workerID, msg)
        },
        jetstream.ManualAck(),
    )
    if err != nil {
        fmt.Printf(" Worker %d failed to subscribe: %v\n", workerID, err)
        return
    }
    defer sub.Unsubscribe()

    fmt.Printf(" Worker %d started\n", workerID)
    
    // 
    <-ctx.Done()
}

func (s *Subscriber) handleMessage(ctx context.Context, workerID int, msg jetstream.Msg) {
    start := time.Now()
    
    // 
    var order models.Order
    if err := json.Unmarshal(msg.Data(), &order); err != nil {
        fmt.Printf(" Worker %d failed to unmarshal: %v\n", workerID, err)
        // Nak 
        msg.Nak()
        return
    }

    fmt.Printf(" Worker %d processing: order_id=%s, subject=%s\n",
        workerID, order.ID, msg.Subject())

    // 
    if err := s.processOrder(ctx, &order); err != nil {
        fmt.Printf(" Worker %d processing failed: %v\n", workerID, err)
        
        // 
        if isRetryableError(err) {
            msg.Nak() // 
        } else {
            msg.Ack() // Ack 
            // TODO: 
        }
        return
    }

    // Ack
    if err := msg.Ack(); err != nil {
        fmt.Printf("  Worker %d failed to ack: %v\n", workerID, err)
    }

    elapsed := time.Since(start)
    fmt.Printf(" Worker %d done: order_id=%s, elapsed=%s\n",
        workerID, order.ID, elapsed)
}

func (s *Subscriber) processOrder(ctx context.Context, order *models.Order) error {
    // 
    time.Sleep(50 * time.Millisecond)
    
    // 
    // 1. 
    // 2. 
    // 3. 
    // 4. 
    
    return nil
}

func isRetryableError(err error) bool {
    // 
    // 
    // 
    return false // 
}

6.6 Docker Compose

# docker-compose.yml
version: '3.8'

services:
  nats:
    image: nats:latest
    container_name: nats-server
    command: >
      -js
      -m 8222
      --store_dir /data/jetstream
      --max_memory_store 1GB
      --max_file_store 10GB
    ports:
      - "4222:4222"   # Client
      - "8222:8222"   # Monitoring
      - "6222:6222"   # Cluster
    volumes:
      - nats-data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8222/healthz"]
      interval: 10s
      timeout: 5s
      retries: 5

  nats-exporter:
    image: natsio/prometheus-nats-exporter:latest
    container_name: nats-exporter
    command: >
      --connz
      --varz
      --subz
      --channelz
      http://nats:8222
    ports:
      - "7777:7777"
    depends_on:
      - nats
    restart: unless-stopped

  producer:
    build: .
    container_name: nats-producer
    command: ["./producer"]
    environment:
      - NATS_URL=nats://nats:4222
    depends_on:
      - nats
    restart: unless-stopped

  consumer:
    build: .
    container_name: nats-consumer
    command: ["./consumer"]
    deploy:
      replicas: 3  # 3 
    environment:
      - NATS_URL=nats://nats:4222
    depends_on:
      - nats
    restart: unless-stopped

volumes:
  nats-data:

7. Python

7.1

pip install nats-py

7.2 Python

import asyncio
import json
import nats
from nats.js import JetStreamContext

async def produce_orders():
    #  NATS
    nc = await nats.connect("nats://localhost:4222")
    
    #  JetStream 
    js = nc.jetstream()
    
    #  Stream 
    try:
        await js.add_stream(
            name="ORDERS",
            subjects=["orders.>"],
            retention="limits",
            max_msgs=1000000,
            max_bytes=10 * 1024 * 1024 * 1024,  # 10GB
            max_age=30 * 24 * 3600 * 1_000_000_000,  # 30
            storage="file",
            duplicates=2 * 3600 * 1_000_000_000,  # 2
        )
        print(" Stream created")
    except nats.js.errors.BadRequestError as e:
        print(f"ℹ  Stream already exists: {e}")
    
    # 
    orders = [
        {"order_id": "ORD-001", "user_id": "user-001", "total": 99.99},
        {"order_id": "ORD-002", "user_id": "user-002", "total": 199.99},
        {"order_id": "ORD-003", "user_id": "user-003", "total": 299.99},
    ]
    
    for order in orders:
        ack = await js.publish(
            "orders.new",
            json.dumps(order).encode(),
            stream="ORDERS",
            msg_id=f"order-{order['order_id']}"  # 
        )
        print(f" Published: stream={ack.stream}, seq={ack.seq}, order_id={order['order_id']}")
    
    await nc.close()

if __name__ == "__main__":
    asyncio.run(produce_orders())

7.3 Python

import asyncio
import json
import nats
from nats.js import JetStreamContext

async def consume_orders():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()
    
    #  Consumer 
    try:
        await js.add_consumer(
            "ORDERS",
            durable_name="python-order-processor",
            deliver_policy="all",
            ack_policy="explicit",
            max_deliver=5,
            ack_wait=30,
            max_ack_pending=100,
        )
        print(" Consumer created")
    except Exception as e:
        print(f"ℹ  Consumer config: {e}")
    
    # Push 
    async def message_handler(msg):
        subject = msg.subject
        data = json.loads(msg.data.decode())
        
        print(f" Received: subject={subject}, order_id={data.get('order_id')}")
        
        # 
        await asyncio.sleep(0.05)  # 
        
        # Ack
        await msg.ack()
        print(f" Acked: order_id={data.get('order_id')}")
    
    # 
    sub = await js.subscribe(
        "orders.new",
        "python-order-processor",
        cb=message_handler,
        manual_ack=True,
    )
    
    print(" Python consumer started. Waiting for messages...")
    
    # 
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("\n Shutting down...")
        await sub.unsubscribe()
        await nc.close()

if __name__ == "__main__":
    asyncio.run(consume_orders())

7.4 Python Pull Consumer

async def pull_consumer():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()
    
    #  Pull Consumer
    await js.add_consumer(
        "ORDERS",
        durable_name="python-batch-processor",
        deliver_policy="all",
        ack_policy="explicit",
    )
    
    # Pull Subscribe
    sub = await js.pull_subscribe(
        "orders.new",
        "python-batch-processor",
    )
    
    print(" Pull consumer started")
    
    while True:
        #  10 
        try:
            msgs = await sub.fetch(batch=10, timeout=5)
            print(f" Fetched {len(msgs)} messages")
            
            for msg in msgs:
                data = json.loads(msg.data.decode())
                print(f"  - Processing: order_id={data.get('order_id')}")
                await msg.ack()
            
            print(f" Batch completed")
        except nats.errors.TimeoutError:
            print("⏳ No messages available, waiting...")
            await asyncio.sleep(2)
        except Exception as e:
            print(f" Error: {e}")
            break
    
    await nc.close()

8. Java Spring Boot

8.1 Maven

<dependencies>
    <!-- NATS Java Client -->
    <dependency>
        <groupId>io.nats</groupId>
        <artifactId>jnats</artifactId>
        <version>2.19.0</version>
    </dependency>
    
    <!-- JetStream  -->
    <dependency>
        <groupId>io.nats</groupId>
        <artifactId>jetstream</artifactId>
        <version>2.19.0</version>
    </dependency>
    
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- JSON Processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

8.2 NATS

@Configuration
@ConfigurationProperties(prefix = "nats")
@Data
public class NatsConfig {
    private String url;
    private String token;
    private boolean tlsEnabled;
    private String streamName;
    private List<String> streamSubjects;
    
    @Bean
    public Connection natsConnection() throws Exception {
        Options options = new Options.Builder()
            .server(url)
            .token(token)
            .maxReconnects(10)
            .reconnectWait(Duration.ofSeconds(2))
            .connectionTimeout(Duration.ofSeconds(5))
            .errorListener(new ErrorListener() {
                @Override
                public void exceptionOccurred(Connection conn, Exception ex) {
                    log.error("NATS exception", ex);
                }
                
                @Override
                public void disconnected(Connection conn) {
                    log.warn("NATS disconnected");
                }
                
                @Override
                public void reconnected(Connection conn) {
                    log.info("NATS reconnected: {}", conn.getConnectedUrl());
                }
            })
            .build();
        
        return Nats.connect(options);
    }
    
    @Bean
    public JetStream jetStream(Connection nc) throws Exception {
        return nc.jetStream();
    }
}

8.3 Spring Boot

@Service
@Slf4j
public class OrderPublisher {
    
    @Autowired
    private JetStream jetStream;
    
    @Value("${nats.stream-name}")
    private String streamName;
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @PostConstruct
    public void initStream() {
        try {
            //  Stream
            StreamConfiguration streamConfig = StreamConfiguration.builder()
                .name(streamName)
                .subjects("orders.>")
                .retentionPolicy(RetentionPolicy.Limits)
                .maxMessages(1_000_000)
                .maxBytes(10 * 1024 * 1024 * 1024L)
                .maxAge(Duration.ofDays(30))
                .storageType(StorageType.File)
                .duplicates(2, TimeUnit.HOURS)
                .build();
            
            jetStream.addStream(streamConfig);
            log.info(" Stream created: {}", streamName);
        } catch (JetStreamApiException e) {
            log.info("ℹ  Stream already exists: {}", streamName);
        } catch (Exception e) {
            log.error("Failed to create stream", e);
        }
    }
    
    public CompletableFuture<PublishAck> publishOrder(Order order) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                String msgId = "order-" + order.getId();
                byte[] data = objectMapper.writeValueAsBytes(order);
                
                PublishOptions opts = PublishOptions.builder()
                    .stream(streamName)
                    .messageId(msgId)
                    .build();
                
                PublishAck ack = jetStream.publish(
                    "orders.new",
                    data,
                    opts
                );
                
                log.info(" Order published: stream={}, seq={}, orderId={}",
                    ack.getStream(), ack.getSeq(), order.getId());
                
                return ack;
            } catch (Exception e) {
                log.error("Failed to publish order: {}", order.getId(), e);
                throw new RuntimeException(e);
            }
        });
    }
}

8.4 Spring Boot

@Service
@Slf4j
public class OrderConsumer {
    
    @Autowired
    private Connection nc;
    
    @Autowired
    private JetStream jetStream;
    
    @Value("${nats.stream-name}")
    private String streamName;
    
    private Dispatcher dispatcher;
    
    @PostConstruct
    public void initConsumer() {
        try {
            //  Consumer
            ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder()
                .durable("spring-order-processor")
                .deliverPolicy(DeliverPolicy.All)
                .ackPolicy(AckPolicy.Explicit)
                .maxDeliver(5)
                .ackWait(30, TimeUnit.SECONDS)
                .build();
            
            jetStream.addConsumer(streamName, consumerConfig);
            log.info(" Consumer created: spring-order-processor");
            
            // 
            startSubscription();
        } catch (Exception e) {
            log.error("Failed to init consumer", e);
        }
    }
    
    private void startSubscription() throws Exception {
        //  Dispatcher
        dispatcher = nc.createDispatcher();
        
        // Push 
        JetStreamSubscription sub = jetStream.subscribe(
            "orders.new",
            dispatcher,
            this::handleMessage,
            false,  // auto-ack ( false Ack)
            "spring-order-processor"
        );
        
        log.info(" Consumer started");
    }
    
    private void handleMessage(Message msg) {
        try {
            // 
            Order order = objectMapper.readValue(msg.getData(), Order.class);
            
            log.info(" Received: orderId={}, subject={}",
                order.getId(), msg.getSubject());
            
            // 
            processOrder(order);
            
            // Ack
            msg.ack();
            log.info(" Order processed: orderId={}", order.getId());
        } catch (Exception e) {
            log.error("Failed to process message", e);
            
            // Nak
            msg.nak();
        }
    }
    
    private void processOrder(Order order) {
        // 
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    @PreDestroy
    public void cleanup() {
        if (dispatcher != null) {
            dispatcher.stop();
        }
    }
}

9.

9.1 NATS


                      NATS                               
                                                             
                      
   Node-1       Node-2       Node-3                 
  (Seed)    (Peer)    (Peer)                  
                                              
   JetStream    JetStream    JetStream              
   Replica      Replica      Replica                
                      
                                                           
                                                           
                               
               Subject                                    
                                                             
  Client A                       
  Client B                                 

9.2

Node-1

# nats-node1.conf
port: 4222
server_name: "nats-node-1"

# JetStream 
jetstream {
    store_dir: "/var/lib/nats/jetstream"
    max_memory_store: 2GB
    max_file_store: 50GB
}

# 
http_port: 8222

# 
cluster {
    name: "production-cluster"
    listen: "0.0.0.0:6222"
    
    #  routes
    # routes: []
    
    # 
    #  IP 
}

# JetStream Raft 
jetstream {
    store_dir: "/var/lib/nats/jetstream"
    
    #  peers
    raft {
        # Node-1  bootstrap 
        bootstrap: true
    }
}

**Node-2 **

# nats-node2.conf
port: 4222
server_name: "nats-node-2"

jetstream {
    store_dir: "/var/lib/nats/jetstream"
    max_memory_store: 2GB
    max_file_store: 50GB
}

http_port: 8222

cluster {
    name: "production-cluster"
    listen: "0.0.0.0:6222"
    
    # 
    routes: [
        "nats://192.168.1.10:6222"  # Node-1 
    ]
}

**Node-3 **

# nats-node3.conf
# ...  Node-2routes  Node-1  Node-2
cluster {
    name: "production-cluster"
    listen: "0.0.0.0:6222"
    routes: [
        "nats://192.168.1.10:6222",
        "nats://192.168.1.11:6222"
    ]
}

9.3

# Node-1
nats-server -c /etc/nats/nats-node1.conf

# Node-2 Node-1 
nats-server -c /etc/nats/nats-node2.conf

# Node-3
nats-server -c /etc/nats/nats-node3.conf

# 
nats server ls --server nats://192.168.1.10:4222
#  3 

9.4 Stream

//  Stream  Replicas=3
_, err := js.AddStream(&nats.StreamConfig{
    Name:        "ORDERS",
    Subjects:    []string{"orders.>"},
    Retention:   nats.LimitsPolicy,
    MaxMsgs:     1000000,
    MaxBytes:    10 * 1024 * 1024 * 1024,
    MaxAge:      30 * 24 * time.Hour,
    Storage:     nats.FileStorage,
    Replicas:    3,  // 3 
    Duplicates:  2 * time.Hour,
})

Stream ORDERS (Replicas=3)
 Leader Replica (Node-1)   ← 
 Follower Replica (Node-2) ← 
 Follower Replica (Node-3) ← 


1. Client → Leader (Node-1)
2. Leader 
3. Leader  to Follower-1 (Node-2)
4. Leader  to Follower-2 (Node-3)
5.  → Ack to Client


- Leader  → Raft  Leader
-  Leader

9.5 systemd NATS

# /etc/systemd/system/nats.service
[Unit]
Description=NATS Server
After=network.target

[Service]
Type=simple
User=nats
Group=nats
ExecStart=/usr/local/bin/nats-server -c /etc/nats/nats.conf
Restart=on-failure
RestartSec=5s
LimitNOFILE=1000000

# 
Environment="NATS_LOG_LEVEL=info"

# 
KillMode=process
TimeoutStopSec=30

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable nats
sudo systemctl start nats
sudo systemctl status nats

# 
sudo journalctl -u nats -f

10. TLSTokenJWT

10.1 Token

# nats.conf
authorization {
    #  Token
    token: "s3cr3t-t0k3n-2024"
    
    #  Token
    # token_file: "/etc/nats/token"
}
nc, err := nats.Connect("nats://localhost:4222",
    nats.Token("s3cr3t-t0k3n-2024"),
)

10.2 RBAC

# nats.conf
authorization {
    users: [
        {
            user: "producer"
            password: "prod-pass-2024"
            permissions: {
                publish: ["orders.>", "users.>"]
                subscribe: []
            }
        },
        {
            user: "consumer"
            password: "cons-pass-2024"
            permissions: {
                publish: []
                subscribe: ["orders.>", "system.health"]
            }
        },
        {
            user: "admin"
            password: "admin-pass-2024"
            permissions: {
                publish: [">"]
                subscribe: [">"]
            }
        }
    ]
}

10.3 TLS/mTLS


# 1.  CA
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 365 -key ca.key -out ca.crt

# 2. 
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr -subj "/CN=nats-server"
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -out server.crt

# 3. mTLS 
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr -subj "/CN=nats-client"
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -out client.crt

NATS TLS

# nats.conf
tls {
    cert_file: "/etc/nats/tls/server.crt"
    key_file: "/etc/nats/tls/server.key"
    ca_file: "/etc/nats/tls/ca.crt"
    
    # mTLS
    verify: true
    
    # 
    # cipher_suites: ["TLS_AES_256_GCM_SHA384"]
    
    #  TLS 
    # tls_min_version: "tls1.2"
}

#  TLS
port: 4222

TLS

nc, err := nats.Connect("tls://localhost:4222",
    nats.ClientCert("/etc/nats/tls/client.crt", "/etc/nats/tls/client.key"),
    nats.RootCAs("/etc/nats/tls/ca.crt"),
)

10.4 JWT

NATS JWT SaaS

#  nsc  JWTNATS Security CLI
# go install github.com/nats-io/nsc/cmd/nsc@latest

# 
nsc init

#  Operator
nsc add operator --name MyOperator

#  Account
nsc add account --name TenantA
nsc add account --name TenantB

#  User
nsc add user --account TenantA --name app-a1

#  JWT  Seed
nsc generate creds --account TenantA --user app-a1

#  JWT 
nc, err := nats.Connect("nats://localhost:4222",
    nats.UserCredentials("app-a1.creds"),
)

11. NATS

11.1

# Linux /etc/sysctl.conf

# 1. 
fs.file-max = 1000000

# 2. 
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65535
net.ipv4.tcp_max_syn_backlog = 65535

# 3. TCP 
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216

# 4. TCP 
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30

# 5.  Swap
vm.swappiness = 0

# 
sysctl -p

11.2 NATS

# nats.conf

# 
max_connections: 65536
max_control_line: 4096
max_payload: 1048576  #  1MB

# JetStream 
jetstream {
    store_dir: "/var/lib/nats/jetstream"
    
    #  50-70%
    max_memory_store: 8GB
    
    # 
    max_file_store: 100GB
    
    #  SSD
    # store_dir: "/mnt/nvme/nats/jetstream"
    
    # 
    # sync_interval: "0s"  #  fsync
    # sync_interval: "1s"  #  fsync 
}

# 
http_port: 8222

11.3

// Go 

nc, err := nats.Connect("nats://localhost:4222",
    // 
    nats.MaxReconnects(10),
    nats.ReconnectWait(100 * time.Millisecond),
    nats.Timeout(2 * time.Second),
    
    // 
    nats.DontRandomize(),
    
    //  Pending 
    nats.MaxPendingMsgs(65536),
    nats.MaxPendingBytes(64 * 1024 * 1024), // 64MB
)

11.4 JetStream

//  Batch Publish 
func batchPublish(js nats.JetStreamContext, orders []*Order) error {
    //  Ack
    ackFutures := make([]nats.PubAckFuture, 0, len(orders))
    
    for _, order := range orders {
        data, _ := json.Marshal(order)
        future, err := js.PublishAsync("orders.new", data, nats.MsgId(fmt.Sprintf("order-%s", order.ID)))
        if err != nil {
            return err
        }
        ackFutures = append(ackFutures, future)
    }
    
    //  Ack
    for _, future := range ackFutures {
        select {
        case ack := <-future.Ok():
            fmt.Printf(" Ack: stream=%s, seq=%d\n", ack.Stream, ack.Seq)
        case err := <-future.Err():
            return fmt.Errorf("publish failed: %w", err)
        case <-time.After(5 * time.Second):
            return fmt.Errorf("publish timeout")
        }
    }
    
    return nil
}

11.5

#  nats-bench 

# 
go install github.com/nats-io/nats.go/examples/nats-bench@latest

# Pub/Sub 
nats-bench -n 1000000 -np 10 -ns 10 "orders.new"
# -n: 
# -np: 
# -ns: 

# JetStream 
nats-bench -js -n 100000 -np 5 -ns 5 "orders.new"
# -js:  JetStream

# 
# Pub stats: 10,000,000 msgs in 2.31 sec, 4,329,004 msgs/sec, 418.46 MB/sec
# Sub stats: 10,000,000 msgs in 2.45 sec, 4,081,632 msgs/sec, 394.50 MB/sec

12. NATS vs Kafka vs RabbitMQ vs Pulsar

12.1

NATSKafkaRabbitMQPulsar
Erlang
JetStream
Pub/SubQueueExchange
PartitionQueueTopic
JetStream

12.2

msg/s
NATS (Core)      10M+
NATS (JS)                5M
Kafka                      3M
RabbitMQ                         500K
Pulsar                       2M

P99
NATS (Core)                          1ms
NATS (JS)                           5ms
Kafka                           50ms
RabbitMQ                          20ms
Pulsar                            15ms

12.3

** NATS**

  • / Kubernetes
  • IoT
  • Pub/Sub Request/Reply

** Kafka**

  • Event Sourcing
  • Kafka Kafka ConnectKSQL

** RabbitMQ**

  • HeadersTopic Exchange
  • RabbitMQ
  • Erlang/Elixir

** Pulsar**

  • SaaS

13.

13.1

NATS http_port: 8222

# 
curl http://localhost:8222/varz

# 
curl http://localhost:8222/connz

# Subject 
curl http://localhost:8222/subsz

# JetStream 
curl http://localhost:8222/jsz

# 
curl http://localhost:8222/healthz

varz:
- connections: 
- total_msgs_in: 
- total_msgs_out: 
- total_bytes_in: 
- total_bytes_out: 
- cpu: CPU 
- mem: 

jetstream:
- streams: Stream 
- consumers: Consumer 
- messages: 
- bytes: 
- reserved_memory: 

13.2 Prometheus

# prometheus-alerts.yml
groups:
  - name: nats
    rules:
      - alert: NATSConnectionHigh
        expr: nats_connections > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "NATS connections too high: {{ $value }}"
      
      - alert: NATSJetStreamStorageHigh
        expr: nats_jetstream_used_bytes / nats_jetstream_reserved_bytes > 0.8
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "JetStream storage usage > 80%"
      
      - alert: NATSConsumerLagHigh
        expr: nats_jetstream_consumer_pending_messages > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer lag too high: {{ $value }} pending messages"

13.3

** 1**




1.  Subject 
   - orders.new
   - order.new  ← 
   
2.  JetStream Stream 
   nats jetstream stream list
   
3.  Consumer 
   nats jetstream consumer list ORDERS
   
4. MaxMsgsMaxBytesMaxAge
   nats jetstream stream info ORDERS
   
5.  Ack  AckNone

** 2**

pending_messages 


1. 
   nats jetstream consumer info ORDERS order-processor
   
2. 
   - 
   - 
   
3. Ack 
    NATS /var/log/nats/nats-server.log
   
4.  MaxAckPending 
    MaxAckPending

** 3**




1.  6222 
   telnet 192.168.1.11 6222
   
2. routes 
   
3.  Raft JetStream 
   nats jetstream raft list
   
4. 
   nats server request --server nats://localhost:4222 "jetstream|v1|raft|stepdown"

13.4

#  JetStream 
#  1 NATS
systemctl stop nats
tar -czf nats-backup-$(date +%Y%m%d).tar.gz /var/lib/nats/jetstream
systemctl start nats

#  2 nats CLI  Stream
nats jetstream stream backup ORDERS /backup/ORDERS

# 
nats jetstream stream restore ORDERS /backup/ORDERS

14. NATS

14.1

  1. **NATS Kafka **

    • Kafka
    • NATS
  2. JetStream NATS +


    • 20MB
    • Kubernetes

    • GoJavaPythonRustC#JavaScript 60+

14.2

 DO:
-  Subjectshop.orders.created
- JetStream Replicas=3
-  TLS/mTLS
-  MsgId 
-  pending_messages 

 DON'T:
-  NATS >30
-  Ack
- >1MB 
-  Core NATS  JetStream

14.3 NATS

  • NATS 3.0
  • **MQTT ** IoT
  • **WebSocket ** NATS
  • **WebAssembly ** NATS Wasm

GitHub: https://github.com/yourorg/nats-deep-dive-2026
 go-producer/
 go-consumer/
 python-examples/
 java-springboot/
 docker-compose.yml
 k8s-manifests/

****NATS NATS NATS



  • 2026 6 NATS v2.10.20 *
复制全文 生成海报 NATS 消息队列 微服务 JetStream 云原生

推荐文章

使用Rust进行跨平台GUI开发
2024-11-18 20:51:20 +0800 CST
mysql删除重复数据
2024-11-19 03:19:52 +0800 CST
JavaScript数组 splice
2024-11-18 20:46:19 +0800 CST
程序员茄子在线接单