资讯专栏INFORMATION COLUMN

NATS--NATS Streaming持久化

qiangdada / 1005人阅读

摘要:持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。运行基于的持久化示例你将会看到如下的输出可以看出默认的是基于内存的持久化。

前言

最近项目中需要使用到一个消息队列,主要用来将原来一些操作异步化。根据自己的使用场景和熟悉程度,选择了NATS Streaming。之所以,选择NATS Streaming。一,因为我选型一些中间件,我会优先选取一些自己熟悉的语言编写的,这样方便排查问题和进一步的深究。二,因为自己一直做k8s等云原生这块,偏向于cncf基金会管理的项目,毕竟这些项目从一开始就考虑了如何部署在k8s当中。三,是评估项目在不断发展过程中,引入的组件是否能够依旧满足需求。

消息队列的使用场景

如果问为什么这么做,需要说一下消息队列的使用场景。之前看知乎的时候,看到一些回答比较认同,暂时拿过来,更能形象表达。感谢ScienJus同学的精彩解答。

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。

使用场景的话,举个例子:

假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:

校验用户名等信息,如果没问题会在数据库中添加一个用户记录

如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信

分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他

发送给用户一个包含操作指南的系统通知等等……

但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。

或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。

所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。

其实,总结一下消息队列的作用

削峰,形象点的话,可以比喻为蓄水池。比如elk日志收集系统中的kafka,主要在日志高峰期的时候,在牺牲实时性的同时,保证了整个系统的安全。

同步系统异构化。原先一个同步操作里的诸多步骤,可以考虑将一些不影响主线发展的步骤,通过消息队列异步处理。比如,电商行业,一个订单完成之后,一般除了直接返回给客户购买成功的消息,还要通知账户组进行扣费,通知处理库存变化,通知物流进行派送等,通知一些用户组做一些增加会员积分等操作等。

NATS Streaming 简介

NATS Streaming是一个由NATS驱动的数据流系统,用Go编程语言编写。 NATS Streaming服务器的可执行文件名是nats-streaming-server。 NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作。 NATS Streaming服务器作为Apache-2.0许可下的开源软件提供。 Synadia积极维护和支持NATS Streaming服务器。

特点

除了核心NATS平台的功能外,NATS Streaming还提供以下功能:

增强消息协议

NATS Streaming使用谷歌协议缓冲区实现自己的增强型消息格式。这些消息通过二进制数据流在NATS核心平台进行传播,因此不需要改变NATS的基本协议。NATS Streaming信息包含以下字段:

  - 序列 - 一个全局顺序序列号为主题的通道
  - 主题 - 是NATS Streaming 交付对象
  - 答复内容 - 对应"reply-to"对应的对象内容
  - 数据 - 真是数据内容
  - 时间戳 - 接收的时间戳,单位是纳秒
  - 重复发送 - 标志这条数据是否需要服务再次发送
  - CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通讯领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法

 - 消息/事件的持久性
  NATS Streaming提供了可配置的消息持久化,持久目的地可以为内存或者文件。另外,对应的存储子系统使用了一个公共接口允许我们开发自己自定义实现来持久化对应的消息

 - 至少一次的发送
  NATS Streaming提供了发布者和服务器之间的消息确认(发布操作) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其他外部存储器)用来为需要重新接受消息的订阅者进行重发消息。

 - 发布者发送速率限定
  NATS Streaming提供了一个连接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任何时候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。

- 每个订阅者的速率匹配/限制
  NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止

以主题重发的历史数据

  新订阅的可以在已经存储起来的订阅的主题频道指定起始位置消息流。通过使用这个选项,消息就可以开始发送传递了:

  1. 订阅的主题存储的最早的信息
  2. 与当前订阅主题之前的最近存储的数据,这通常被认为是 "最后的值" 或 "初值" 对应的缓存
  3. 一个以纳秒为基准的 日期/时间
  4. 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒
  5. 一个特定的消息序列号

持久订阅

  订阅也可以指定一个“持久化的名称”可以在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者重新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最早的未被确认的消息处恢复。

docker 运行NATS Streaming

在运行之前,前面已经讲过NATS Streaming 相比nats,多了持久化的一个future。所以我们在接下来的demo演示中,会重点说这点。

运行基于memory的持久化示例:
docker run -ti -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0

你将会看到如下的输出:

[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3
[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set]
[4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:13:01.770581 [INF] Server is ready
[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state
[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY
[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:13:02.052601 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:13:02.052613 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:13:02.052624 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:13:02.052635 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:13:02.052649 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------

可以看出默认的是基于内存的持久化。

运行基于file的持久化示例:
docker run -ti -v /Users/gao/test/mq:/datastore  -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0  -store file --dir /datastore -m 8222

你将会看到如下的输出:

[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM
[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set]
[5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:16:07.643932 [INF] Server is ready
[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state
[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE
[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore
[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:16:07.933711 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:16:07.933749 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:16:07.933793 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:16:07.933837 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:16:07.933857 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------
PS

如果部署在k8s当中,那么就可以采取基于file的持久化,通过挂载一个块存储来保证,数据可靠。比如,aws的ebs或是ceph的rbd。

4222为客户端连接的端口。8222为监控端口。

启动以后访问:localhost:8222,可以看到如下的网页:

启动参数解析
Streaming Server Options:
    -cid, --cluster_id           Cluster ID (default: test-cluster)
    -st,  --store                Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir                  For FILE store type, this is the root directory
    -mc,  --max_channels            Max number of channels (0 for unlimited)
    -msu, --max_subs                Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs                Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes              Max messages total size per channel (0 for unlimited)
    -ma,  --max_age            Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity     Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server          Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config          Streaming server configuration file
    -hbi, --hb_interval        Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout         How long server waits for a heartbeat response
    -hbf, --hb_fail_count           Number of failed heartbeats before server closes the client connection
          --ft_group             Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal [=]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt                Specify if server should use encryption at rest
          --encryption_cipher    Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key        Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered                    Run the server in a clustered configuration (default: false)
    --cluster_node_id            ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap            Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers              List of cluster peer node IDs to bootstrap cluster state.
    --cluster_log_path           Directory to store log replication data
    --cluster_log_cache_size        Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots         Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs         Number of log entries to leave after a snapshot and compaction
    --cluster_sync                 Do a file sync after every write to the replication log and message store
    --cluster_raft_logging         Enable logging from the Raft library (disabled by default)

Streaming Server File Store Options:
    --file_compact_enabled         Enable file compaction
    --file_compact_frag             File fragmentation threshold for compaction
    --file_compact_interval         Minimum interval (in seconds) between file compactions
    --file_compact_min_size        Minimum file size for compaction
    --file_buffer_size             File buffer size (in bytes)
    --file_crc                     Enable file CRC-32 checksum
    --file_crc_poly                 Polynomial used to make the table used for CRC-32 checksum
    --file_sync                    Enable File.Sync on Flush
    --file_slice_max_msgs           Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes         Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age       Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script  Path to script to use if you want to archive a file slice being removed
    --file_fds_limit                Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery        On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof        Truncate files for which there is an unexpected EOF on recovery, dataloss may occur

Streaming Server SQL Store Options:
    --sql_driver             Name of the SQL Driver ("mysql" or "postgres")
    --sql_source             Datasource used when opening an SQL connection to the database
    --sql_no_caching           Enable/Disable caching for improved performance
    --sql_max_open_conns        Maximum number of opened connections to the database

Streaming Server TLS Options:
    -secure                    Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key          Client key for the streaming server
    -tls_client_cert         Client certificate for the streaming server
    -tls_client_cacert       Client certificate CA for the streaming server

Streaming Server Logging Options:
    -SD, --stan_debug=         Enable STAN debugging output
    -SV, --stan_trace=         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)

Embedded NATS Server Options:
    -a, --addr               Bind to host address (default: 0.0.0.0)
    -p, --port                  Use port for clients (default: 4222)
    -P, --pid                File to store PID
    -m, --http_port             Use port for http monitoring
    -ms,--https_port            Use port for https monitoring
    -c, --config             Configuration file

Logging Options:
    -l, --log                File to redirect log output
    -T, --logtime=             Timestamp log entries (default: true)
    -s, --syslog             Enable syslog as log method
    -r, --remote_syslog      Syslog server addr (udp://localhost:514)
    -D, --debug=               Enable debugging output
    -V, --trace=               Trace the raw protocol
    -DV                              Debug and trace

Authorization Options:
        --user               User required for connections
        --pass               Password required for connections
        --auth               Authorization token required for connections

TLS Options:
        --tls=                 Enable TLS, do not verify clients (default: false)
        --tlscert            Server certificate file
        --tlskey             Private key for server certificate
        --tlsverify=           Enable TLS, verify client certificates
        --tlscacert          Client certificate CA for verification

NATS Clustering Options:
        --routes        Routes to solicit and connect
        --cluster            Cluster URL for solicited routes

Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.
源码简单分析NATS Streaming 持久化

目前NATS Streaming支持以下4种持久化方式:

MEMORY

FILE

SQL

RAFT

其实看源码可以知道:NATS Streaming的store基于接口实现,很容易扩展到更多的持久化方式。具体的接口如下:

// Store is the storage interface for NATS Streaming servers.
//
// If an implementation has a Store constructor with StoreLimits, it should be
// noted that the limits don"t apply to any state being recovered, for Store
// implementations supporting recovery.
//
type Store interface {
    // GetExclusiveLock is an advisory lock to prevent concurrent
    // access to the store from multiple instances.
    // This is not to protect individual API calls, instead, it
    // is meant to protect the store for the entire duration the
    // store is being used. This is why there is no `Unlock` API.
    // The lock should be released when the store is closed.
    //
    // If an exclusive lock can be immediately acquired (that is,
    // it should not block waiting for the lock to be acquired),
    // this call will return `true` with no error. Once a store
    // instance has acquired an exclusive lock, calling this
    // function has no effect and `true` with no error will again
    // be returned.
    //
    // If the lock cannot be acquired, this call will return
    // `false` with no error: the caller can try again later.
    //
    // If, however, the lock cannot be acquired due to a fatal
    // error, this call should return `false` and the error.
    //
    // It is important to note that the implementation should
    // make an effort to distinguish error conditions deemed
    // fatal (and therefore trying again would invariably result
    // in the same error) and those deemed transient, in which
    // case no error should be returned to indicate that the
    // caller could try later.
    //
    // Implementations that do not support exclusive locks should
    // return `false` and `ErrNotSupported`.
    GetExclusiveLock() (bool, error)

    // Init can be used to initialize the store with server"s information.
    Init(info *spb.ServerInfo) error

    // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
    Name() string

    // Recover returns the recovered state.
    // Implementations that do not persist state and therefore cannot
    // recover from a previous run MUST return nil, not an error.
    // However, an error must be returned for implementations that are
    // attempting to recover the state but fail to do so.
    Recover() (*RecoveredState, error)

    // SetLimits sets limits for this store. The action is not expected
    // to be retroactive.
    // The store implementation should make a deep copy as to not change
    // the content of the structure passed by the caller.
    // This call may return an error due to limits validation errors.
    SetLimits(limits *StoreLimits) error

    // GetChannelLimits returns the limit for this channel. If the channel
    // does not exist, returns nil.
    GetChannelLimits(name string) *ChannelLimits

    // CreateChannel creates a Channel.
    // Implementations should return ErrAlreadyExists if the channel was
    // already created.
    // Limits defined for this channel in StoreLimits.PeChannel map, if present,
    // will apply. Otherwise, the global limits in StoreLimits will apply.
    CreateChannel(channel string) (*Channel, error)

    // DeleteChannel deletes a Channel.
    // Implementations should make sure that if no error is returned, the
    // channel would not be recovered after a restart, unless CreateChannel()
    // with the same channel is invoked.
    // If processing is expecting to be time consuming, work should be done
    // in the background as long as the above condition is guaranteed.
    // It is also acceptable for an implementation to have CreateChannel()
    // return an error if background deletion is still happening for a
    // channel of the same name.
    DeleteChannel(channel string) error

    // AddClient stores information about the client identified by `clientID`.
    AddClient(info *spb.ClientInfo) (*Client, error)

    // DeleteClient removes the client identified by `clientID` from the store.
    DeleteClient(clientID string) error

    // Close closes this store (including all MsgStore and SubStore).
    // If an exclusive lock was acquired, the lock shall be released.
    Close() error
}

官方也提供了mysql和pgsql两种数据的支持:

postgres.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));
CREATE INDEX Idx_ChannelsName ON Channels (name(256));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));
CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));
CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);

-- Updates for 0.10.0
ALTER TABLE Clients ADD proto BYTEA;

mysql.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);

# Updates for 0.10.0
ALTER TABLE Clients ADD proto BLOB;
总结

后续会详细解读一下代码实现和一些集群部署。当然肯定少不了如何部署高可用的集群在k8s当中。

参阅文章:

NATS Streaming详解

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/27690.html

相关文章

  • Spark Streaming学习笔记

    摘要:输入和接收器输入代表从某种流式数据源流入的数据流。文件数据流可以从任何兼容包括等的文件系统,创建方式如下将监视该目录,并处理该目录下任何新建的文件目前还不支持嵌套目录。会被一个个依次推入队列,而则会依次以数据流形式处理这些的数据。 特点: Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。 Spark Streaming支持从多种数...

    陆斌 评论0 收藏0
  • Spark Streaming遇到问题分析

    摘要:遇到问题分析之后搞了个还没仔细了解可参考的与的有区别及并发控制先看看的,与的这几个概念。一个可以认为就是会最终输出一个结果的一条由组织而成的计算。在中,我们通过使用新极大地增强对状态流处理的支持。 Spark Streaming遇到问题分析 1、Spark2.0之后搞了个Structured Streaming 还没仔细了解,可参考:https://github.com/lw-lin/...

    stormzhang 评论0 收藏0
  • TBSSQL 的那些事 | TiDB Hackathon 2018 优秀项目分享

    摘要:当我们正准备做前期调研和设计的时候,主办方把唐长老拉去做现场导师,参赛规则规定导师不能下场比赛,囧,于是就这样被被动放了鸽子。川总早早来到现场。 本文作者是来自 TiBoys 队的崔秋同学,他们的项目 TBSSQL 在 TiDB Hackathon 2018 中获得了一等奖。TiDB Batch and Streaming SQL(简称 TBSSQL)扩展了 TiDB 的 SQL 引擎...

    Profeel 评论0 收藏0
  • TBSSQL 的那些事 | TiDB Hackathon 2018 优秀项目分享

    摘要:当我们正准备做前期调研和设计的时候,主办方把唐长老拉去做现场导师,参赛规则规定导师不能下场比赛,囧,于是就这样被被动放了鸽子。川总早早来到现场。 本文作者是来自 TiBoys 队的崔秋同学,他们的项目 TBSSQL 在 TiDB Hackathon 2018 中获得了一等奖。TiDB Batch and Streaming SQL(简称 TBSSQL)扩展了 TiDB 的 SQL 引擎...

    KnewOne 评论0 收藏0
  • 阿里巴巴为什么选择Apache Flink?

    摘要:从长远来看,阿里决定用做一个统一的通用的大数据引擎作为未来的选型。在阿里的现状基于在阿里巴巴搭建的平台于年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于搭建的实时计算平台。 本文主要整理自阿里巴巴计算平台事业部资深技术专家莫问在云栖大会的演讲。 合抱之木,生于毫末 随着人工智能时代的降临,数据量的爆发,在典型的大数据的业...

    CoderBear 评论0 收藏0

发表评论

0条评论

qiangdada

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<