最详细的nats讲解-部署加应用

2025-09-19 09:58:46 3176

什么是Nats

NATS:NATS是一个开源的、分布式的、高性能的消息传递系统,用于构建分布式和微服务应用程序。它提供了一种简单、可靠、高效的方式来传递信息,具有良好的可扩展性和容错性。

使用Go语言重写,能够达到每秒8-11百万个消息,整个程序很小只有3M。

NATS适合云基础设施的消息通信系统、IoT设备消息通信和微服务架构。

类似于 kafka等中间件,发布-订阅模型的设计。

常应用于:物联网消息传递,云原生和微服务。

支持的协议:

TCPMQTTWebsocket

Nats默认端口

监听客户端连接端口:4222提供监控服务端http端口:8222集群连接端口:6222

NATS官方学习文档

NATS.io – Cloud Native, Open Source, High-performance Messaging

nats下载地址

官网:https://nats.io/download/

github:

备注 不管是core-nats 或者nats-jetstream都是用这个nats-server执行程序,通过更改配置文件或者启动命令来开启nats-jetstream

部署

core-nats单节点部署和集群部署

单机部署

启动配置文件nats.conf

#nats.conf

# 端口Client port of 4222 on all interfaces

port: 4222

# http 可以用客户端工具连接nats 获取数据HTTP monitoring port

http_port: 8222

authorization {

user: nats

password:

timeout: 10

}

启动命令

运行 ./nats-server -c nats.conf

core-nats集群部署(三节点)

节点1

# node1 - nats.conf

listen: 0.0.0.0:4222

# HTTP监控端口

http_port: 8222

# 客户端连接的认证信息

authorization {

user: node1_user

password: node1_pass

timeout: 1

}

# 集群定义

cluster {

name: "nats-cluster"

listen: 0.0.0.0:6222

routes = [

nats-route://route2_user:route2_pass@node2:6222,

nats-route://route3_user:route3_pass@node3:6222

]

# 路由连接的认证信息

authorization {

user: route1_user

password: route1_pass

timeout: 1

}

}

# 日志选项

log_file: "./nats.log"

# PID进程文件

pid_file: "./nats.pid"

节点2

# node2 - nats.conf

listen: 0.0.0.0:4222

# HTTP监控端口

http_port: 8222

# 客户端连接的认证信息

authorization {

user: node2_user

password: node2_pass

timeout: 1

}

# 集群定义

cluster {

name: "nats-cluster"

listen: 0.0.0.0:6222

routes = [

nats-route://route1_user:route1_pass@node1:6222,

nats-route://route3_user:route3_pass@node3:6222

]

# 路由连接的认证信息

authorization {

user: route2_user

password: route2_pass

timeout: 1

}

}

# 日志选项

log_file: "./nats.log"

# PID进程文件

pid_file: "./nats.pid"

节点3

# node3 - nats.conf

listen: 0.0.0.0:4222

# HTTP监控端口

http_port: 8222

# 客户端连接的认证信息

authorization {

user: node3_user

password: node3_pass

timeout: 1

}

# 集群定义

cluster {

name: "nats-cluster"

listen: 0.0.0.0:6222

routes = [

nats-route://route1_user:route1_pass@node1:6222,

nats-route://route2_user:route2_pass@node2:6222

]

# 路由连接的认证信息

authorization {

user: route3_user

password: route3_pass

timeout: 1

}

}

# 日志选项

log_file: "./nats.log"

# PID进程文件

pid_file: "./nats.pid"

启动命令

每个节点 运行 nats-server -c nats.conf

nats-jetstream单机部署和集群部署

单机部署

#server_name=n1-c1

# Client port of 4222 on all interfaces

port: 4222

# HTTP monitoring port

http_port: 8222

jetstream {

store_dir=/usr/local/nats/storage

enabled: true

max_mem_store: 100MB

max_file_store: 500MB

}

# This is for clustering multiple servers together.

#cluster {

# It is recommended to set a cluster name

# name: "my_cluster"

# Route connections to be received on any interface on port 6222

# port: 6222

# Routes are protected, so need to use them with --routes flag

# e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222

# authorization {

# user: ruser

# password: T0pS3cr3t

# timeout: 2

# }

# Routes are actively solicited and connected to from this server.

# This Docker image has none by default, but you can pass a

# flag to the nats-server docker image to create one to an existing server.

# routes = []

#}

集群部署

节点1

server_name=n1-c1

listen=4222

accounts {

$SYS {

users = [

{ user: "admin",

pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm"

}

]

}

}

jetstream {

store_dir=/nats/storage

}

cluster {

name: C1

listen: 0.0.0.0:6222

routes: [

nats://host_b:6222

nats://host_c:6222

]

}

节点2

server_name=n2-c1

listen=4222

accounts {

$SYS {

users = [

{ user: "admin",

pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm"

}

]

}

}

jetstream {

store_dir=/nats/storage

}

cluster {

name: C1

listen: 0.0.0.0:6222

routes: [

nats://host_a:6222

nats://host_c:6222

]

}

节点3

server_name=n3-c1

listen=4222

accounts {

$SYS {

users = [

{ user: "admin",

pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm"

}

]

}

}

jetstream {

store_dir=/nats/storage

}

cluster {

name: C1

listen: 0.0.0.0:6222

routes: [

nats://host_a:6222

nats://host_b:6222

]

}

Nats优点

Nats产品分类和产品选择

产品介绍

Core-Nats核心 Nats-JetStream

两者主要区别:

提供的消息质量:

Nats核心只提供最多交付一次的消息质量。JetStream提供至少一次/恰好一次 QoS

消息存储:

NATS核心不提供消息持久化,JetStream提供消息存储和过期策略和删除策略等。

Nats使用和配置文件讲解

NATS主题-讲解

格式

用"."来分隔主题的层次

time.us

time.us.east

time.us.east.atlanta

time.eu.east

time.eu.east.warsaw

通配符讲解

nats中通配符有两种:"*"和">".

* 通配符用来匹配单个标记,例如:time.*.east 可以监听到这将匹配time.us.east和time.eu.east。不能匹配time.eu.us.east.

>通配符它将匹配一个或多个标记。

例如,time.us.>将匹配time.us.east和time.us.east.atlanta,而time.us.*只会匹配,time.us.east因为它不能匹配多个标记。

混合通配符

通配符*可以在同一主题中出现多次。两种类型都可以使用。例如,*.*.east.>将收到time.us.east.atlanta。

Nats如何解决消息队列中常见问题

订阅

订阅和kafka等常见的消息队列类似,值得强调的是nats中也有类似于kafka中的消费者组的概念

-不同的客户端用相同的消费者组去订阅同一个主图,服务端保证每个消息只被消费者组中的一个消费者消费。(提供的内置负载平衡功能)

确保应用程序容错

工作负载处理可以扩大或缩小

扩大或缩小消费者规模,无需重复发送消息

无需额外配置

队列组由应用程序及其队列订阅者定义,而不是由服务器配置定义

普通订阅 core-nats产品举例

nc, err := nats.Connect("demo.nats.io")

if err != nil {

log.Fatal(err)

}

defer nc.Close()

// Use a WaitGroup to wait for a message to arrive

wg := sync.WaitGroup{}

wg.Add(1)

// 异步订阅

if _, err := nc.Subscribe("updates", func(m *nats.Msg) {

wg.Done()

}); err != nil {

log.Fatal(err)

}

//同步订阅 保证消息顺序

// sub, err := nc.SubscribeSync("updates")

// Wait for a message to come in

wg.Wait()

消费者组订阅实例代码:core-nats产品举例

package main

import "github.com/nats-io/nats.go"

func main() {

connect, err := nats.Connect("")

if err != nil {

}

connect.QueueSubscribeSync() //同步保证消息顺序

connect.QueueSubscribe() //异步不保证顺序

connect.QueueSubscribeSyncWithChan() //同步消息顺序发送到管道里接收

connect.ChanQueueSubscribe() //异步不保证顺序

}

消费者组订阅实例代码:nats-jetStream产品举例(有个Pull 拉去消息的概念)

import (

"github.com/nats-io/nats.go"

"log"

"time"

)

func main() {

nc, err := nats.Connect("")

if err != nil {

}

js, err := nc.JetStream()

if err != nil {

log.Fatal(err)

}

js.QueueSubscribeSync() //顺序

js.QueueSubscribe()

js.ChanQueueSubscribe() // 消息发布到指定管道

}

注意 JetStream这里订阅的有个常用的:通过消费者主动去服务端拉去消息的(拉取消费者在 NATS 服务器上产生的 CPU 负载较少,因此扩展性更好) PullSubscribe

func ExampleJetStream() {

nc, err := nats.Connect("localhost")

if err != nil {

log.Fatal(err)

}

// Use the JetStream context to produce and consumer messages

// that have been persisted.

js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))

if err != nil {

log.Fatal(err)

}

js.AddStream(&nats.StreamConfig{

Name: "FOO",

Subjects: []string{"foo"},

})

js.Publish("foo", []byte("Hello JS!"))

// Publish messages asynchronously.

for i := 0; i < 500; i++ {

js.PublishAsync("foo", []byte("Hello JS Async!"))

}

select {

case <-js.PublishAsyncComplete():

case <-time.After(5 * time.Second):

fmt.Println("Did not resolve in time")

}

// Create Pull based consumer with maximum 128 inflight.

sub, _ := js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

defer cancel()

for {

select {

case <-ctx.Done():

return

default:

}

// Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed.

msgs, _ := sub.Fetch(10, nats.Context(ctx))

for _, msg := range msgs {

msg.Ack()

}

}

}

发布

core-nats产品简单发(核心版本的nats不保证可靠性交付:发送一次就不管了,适用于可容忍消息丢失的情况)

nc, err := nats.Connect("demo.nats.io", nats.Name("API PublishBytes Example"))

if err != nil {

log.Fatal(err)

}

defer nc.Close()

if err := nc.Publish("updates", []byte("All is Well")); err != nil {

log.Fatal(err)

}

关于nats-jetstream的发布需要先穿就“流”,放到下面Jetstream中进行讲解

Nats-JetStream

概念

NATS 有一个内置的持久性引擎,称为JetStream,它允许存储消息并在以后重播。与需要您拥有有效订阅才能在消息发生时处理消息的NATS Core不同,JetStream 允许 NATS 服务器捕获消息并根据需要将其重播给消费者。此功能为您的 NATS 消息提供了不同的服务质量,并实现了容错和高可用性配置。

流创建及配置

用代码简单介绍一下流的创建个关键信息配置

package main

import (

"github.com/nats-io/nats.go"

"log"

"time"

)

func main() {

nc, err := nats.Connect("")

if err != nil {

}

js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))

if err != nil {

log.Fatal(err)

}

_, err = js.AddStream(&nats.StreamConfig{

Name: "device", //流名称

Subjects: []string{"device.*"}, //需要监视并且存储消息的主题名

Storage: nats.FileStorage, //消息存储方式 文件或者内存

Retention: nats.LimitsPolicy, //保留策略

Discard: nats.DiscardOld, //丢弃策略 最老的消息丢弃或者最新的

MaxAge: 48 * time.Hour, //存储消息的最大时间

Duplicates: time.Hour * 24, // 恰好一次消息的检测时间段 在消息头里如果加了消息id,那么24小时内存在相同的id就认为是同一个消息

})

}

//这样就创建了一个名称为device的流

核心配置讲解

// jsm.go

// StreamConfig 用于定义一个流, 大多数参数都有合理的默认值

// 如果 subject 没写, 就会分配一个随机的 subject

type StreamConfig struct {

// 名称

Name string

// 描述

Description string

// 对应的多个 Subject

Subjects []string

// 消息 3 种保留策略

// RetentionPolicy 最大消息数, 最大存储空间或者最大存活时间达到限制, 就可以删除消息

// InterestPolicy 需要所有 consumer 确认可以删除消息

// WorkQueuePolicy 只需要一个 consumer 确认可以删除消息

Retention RetentionPolicy

// 最大 Consumer 数量

MaxConsumers int

// 最大存储 Mgs 数量

MaxMsgs int64

// 最大储存占用

MaxBytes int64

// 消息 2 种淘汰策略

// DiscardOld 消息达到限制后, 丢弃最早的消息

// DiscardNew 消息达到限制后, 信息消息新推送会失败

Discard DiscardPolicy

// 消息存活时间

MaxAge time.Duration

// 每个 subject 最大消息数量

MaxMsgsPerSubject int64

// 每个消息最大大小

MaxMsgSize int32

// 支持文件储存和内存储存 2 种类型

Storage StorageType

// 消息分片数量

Replicas int

// 不需要 ack

NoAck bool

// ...

}

订阅-消费者

消费者关键配置

// jsm.go

type ConsumerConfig struct {

// 名称

Durable string `json:"durable_name,omitempty"`

// 描述

Description string `json:"description,omitempty"`

// 交付 Subject

DeliverSubject string `json:"deliver_subject,omitempty"`

// 交付 Group

DeliverGroup string `json:"deliver_group,omitempty"`

// 交付策略

// 交付所有 (默认), 交付最后一个, 交付最新, 自定义开始序号, 自定义开始时间

DeliverPolicy DeliverPolicy `json:"deliver_policy"`

// 开始序号

OptStartSeq uint64 `json:"opt_start_seq,omitempty"`

// 开始时间

OptStartTime *time.Time `json:"opt_start_time,omitempty"`

// ack 策略

// 不需要ack (默认), 隐式ack All , 每个都需要显示ack

AckPolicy AckPolicy `json:"ack_policy"`

// ack等待时间

AckWait time.Duration `json:"ack_wait,omitempty"`

MaxDeliver int `json:"max_deliver,omitempty"`

BackOff []time.Duration `json:"backoff,omitempty"`

// 过滤的Subject

FilterSubject string `json:"filter_subject,omitempty"`

// 重试策略

// 尽快重试, ReplayOriginalPolicy 相同时间重试

ReplayPolicy ReplayPolicy `json:"replay_policy"`

// 限速

RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec

// 采样频率

SampleFrequency string `json:"sample_freq,omitempty"`

// 最大等待数量

MaxWaiting int `json:"max_waiting,omitempty"`

//最大Pending ack数量

MaxAckPending int `json:"max_ack_pending,omitempty"`

// flow 控制

FlowControl bool `json:"flow_control,omitempty"`

// 心跳时间

Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`

// ...

}

常见文件讲解

nats如何保证消息不重复

nats核心功能 只提供交付一次的消息质量,要向保证发送消息不重复只能在消息发送端做唯一限制。

nats-jetstream 提供恰好一次和至少一次的消息质量,对于发布方,它依赖于发布应用程序在消息头中附加唯一的消息或发布 ID,并依赖于服务器在可配置的滚动时间段内跟踪这些 ID,以便检测发布者是否两次发布同一条消息。对于订阅放可以加必要的外在设计增加接收消息不重复,例如唯一主键等。

如何保证消息不丢失

订阅方订阅每条消息需要主动向nats服务回复ack确认,否则nats则认为丢失,进行消息重播。进而实现可靠交付。

nats服务提供持久的消息保留策略。

发布方每发布一条消息都会收到nats的ack回复。

这保证了消息不丢失。

顺序消费问题

单个分区可以实现顺序消费,同步消费(重要消息放到单个分区)

nats也提供队列消费,Group的概念。进而提供负载均衡。

实战