1. 程式人生 > >Netty開發redis客戶端,Netty傳送redis命令,netty解析redis訊息

Netty開發redis客戶端,Netty傳送redis命令,netty解析redis訊息

Netty開發redis客戶端,Netty傳送redis命令,netty解析redis訊息, netty redis ,redis RESP協議。redis客戶端,netty redis協議

我們可以使用redis-cli這個客戶端來操作redis,也可以使用window的命令列telnet連線redis。本文,我們的目標是使用netty來實現redis客戶端,實現目標為:

1. 啟動netty程式
2. 在命令列輸入 set mykey hello,由netty傳送給redis伺服器
3. 在命令列輸入 get mykey hello,得到結果:hello
4. 在命令列輸入 quit,程式退出

前言

Redis在TCP埠6379(預設,可修改埠)上監聽到來的連線,客戶端連線到來時,Redis伺服器為此建立一個TCP連線。在客戶端與伺服器端之間傳輸的每個Redis命令或者資料都以\r\n結尾。當redis服務啟動之後,我們可以使用TCP與之連結,連線之後便可以發訊息,也會受到redis伺服器的訊息。而這個訊息是有格式的,這個格式是事先商量好的,我們稱之為協議,redis的協議叫做RESP,比方說我們有一條redis命令set hello 123,這條命令我們知道它是一條設定命令,通過RESP協議“翻譯”一下,他就是這樣的:

*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$3\r\n123\r\n

然後,這條協議通過網路傳輸(二進位制形式),傳到redis伺服器,被redis伺服器解析,最後完成設定。關於RESP洗衣詳細可以看這裡.

思路

上面我們介紹了redis是基於TCP傳輸,並使用了其自己的協議——RESP。RESP其實是資料交換可解析的協議,你可以理解為資料交換的格式,按照此格式組裝好要傳輸的命令,並以二進位制的形式由client端發往redis服務端。服務端接收這個訊息之後,解析訊息,執行命令,並將結果以協議好的格式組裝好,傳輸給client端。client端接收到響應,解釋成人類可以看懂的結果展示。

因此,我們可以整理一下思路:

1. 我們需要連線redis服務端,因此需要編寫一個netty client端(此處聯想一下netty client端的樣板程式碼)。
2. 我們需要向redis服務端傳送redis命令,很簡單,獲取channel,然後write。即channel.write(...)
3. 我們所編寫的直白的命令,如set xx,get xx之類的需要編碼之後才能傳輸給redis伺服器。
   因此,我們需要 **編碼器**。很榮幸netty自帶了,可以直接使用。
   這裡是 【輸出】 所以要有  outbound handler.
4. redis會響應結果給我們,因此我們需要在 chanelRead方法中處理資料。
   這裡是 【輸入】 所以要有  inbound  handler.

編寫程式碼

上的思路整理好了之後,我們可以寫程式碼了。得益於netty的良好設計,我們只需要把netty client的“樣板程式碼”拷貝過來生成一個client端程式碼即可。剩下的就是 handler ,decoder ,encoder 。我們需要編寫的類有:

  • RedisClient 見名知義,我們的主類,包含client bootstrap資訊。 接收使用者控制檯輸入redis命令。
  • RedisClientInitializer 初始化器,在此新增 handler,decoder,encoder
  • RedisClientHandler 核心邏輯,需要處理 inbound ,outbound 兩種型別事件。

RedisClient 程式碼如下:

public class RedisClient {

    String host;    //   目標主機
    int port;       //   目標主機埠

    public RedisClient(String host,int port){
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new RedisClientInitializer());

            Channel channel = bootstrap.connect(host, port).sync().channel();
            System.out.println(" connected to host : " + host + ", port : " + port);
            System.out.println(" type redis's command to communicate with redis-server or type 'quit' to shutdown ");
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            ChannelFuture lastWriteFuture = null;
            for (;;) {
                String s = in.readLine();
                if(s.equalsIgnoreCase("quit")) {
                    break;
                }
                System.out.print(">");
                lastWriteFuture = channel.writeAndFlush(s);
                lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            System.err.print("write failed: ");
                            future.cause().printStackTrace(System.err);
                        }
                    }
                });
            }
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
            System.out.println(" bye ");
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        RedisClient client = new RedisClient("redis-cache2.228",5001);
        client.start();
    }

}

上面程式碼很長,但是,我們要熟悉netty的套路,它的樣板程式碼就是如此。我們只需要看handler(new RedisClientInitializer()); 這一行,下面的就是一個 for(;;)迴圈,用來接收我們在控制檯輸入的redis命令。

RedisClientInitializer程式碼如下:

public class RedisClientInitializer extends ChannelInitializer<Channel>{

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new RedisDecoder());
        pipeline.addLast(new RedisBulkStringAggregator());
        pipeline.addLast(new RedisArrayAggregator());
        pipeline.addLast(new RedisEncoder());
        pipeline.addLast(new RedisClientHandler());
    }
}

這個類,很簡單,上面的幾個addLast方法,除了最後一個外,其他都是netty自帶的redis協議實現相關的編解碼。最後一個是我們自定義的業務邏輯處理器。原始碼如下:

public class RedisClientHandler extends ChannelDuplexHandler {


    // 傳送 redis 命令
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        String[] commands = ((String) msg).split("\\s+");
        List<RedisMessage> children = new ArrayList<>(commands.length);
        for (String cmdString : commands) {
            children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
        }
        RedisMessage request = new ArrayRedisMessage(children);
        ctx.write(request, promise);
    }


    // 接收 redis 響應資料
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RedisMessage redisMessage = (RedisMessage) msg;
        // 列印響應訊息
        printAggregatedRedisResponse(redisMessage);
        // 是否資源
        ReferenceCountUtil.release(redisMessage);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }


    private static void printAggregatedRedisResponse(RedisMessage msg) {
        if (msg instanceof SimpleStringRedisMessage) {
            System.out.println(((SimpleStringRedisMessage) msg).content());
        } else if (msg instanceof ErrorRedisMessage) {
            System.out.println(((ErrorRedisMessage) msg).content());
        } else if (msg instanceof IntegerRedisMessage) {
            System.out.println(((IntegerRedisMessage) msg).value());
        } else if (msg instanceof FullBulkStringRedisMessage) {
            System.out.println(getString((FullBulkStringRedisMessage) msg));
        } else if (msg instanceof ArrayRedisMessage) {
            for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
                printAggregatedRedisResponse(child);
            }
        } else {
            throw new CodecException("unknown message type: " + msg);
        }
    }

    private static String getString(FullBulkStringRedisMessage msg) {
        if (msg.isNull()) {
            return "(null)";
        }
        return msg.content().toString(CharsetUtil.UTF_8);
    }

}

注意,上面我們討論過,我們需要兩個handler,分別是inbound handler 和outbound handler 。這裡我們使用的是ChannelDuplexHandler。這個ChannelDuplexHandler 支援處理 inbound 和 outbound,其定義如下:

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
  ....
}

執行演示

按照開篇的思路分析,上面我們已經編寫好了netty redis client所需要的程式碼。下面我們需要執行看看。main函式如下:

    public static void main(String[] args) throws Exception{
        RedisClient client = new RedisClient("your-redis-server-ip",6379);
        client.start();
    }

我在本地運行了一下,演示了一些命令:

  • get ,set 以及 錯誤的命令
  • expire命令設定超時時間,及 ttl 命令檢視超時時間
  • del 命令刪除可以
  • quit退出程式。

結果如下:

 connected to host : 192.168.2.120, port : 6379
 type redis's command to communicate with redis-server or type 'quit' to shutdown 
get hello
>(null)
set hello
>ERR wrong number of arguments for 'set' command
set hello 123
>OK
expire hello 10
>1
ttl hello
>6
ttl hello
>4
get hello
>(null)
set hello world
>OK
get hello
>world
del hello
>1
quit
 bye 

Process finished with exit code 0

如此,我們便用netty實現了redis的client端。程式碼下載


使用Netty實現HTTP伺服器
Netty實現心跳機制
Netty系列

spring如何啟動的?這裡結合spring原始碼描述了啟動過程
SpringMVC是怎麼工作的,SpringMVC的工作原理
spring 異常處理。結合spring原始碼分析400異常處理流程及解決方法

Mybatis Mapper介面是如何找到實現類的-原始碼分析