末尾大家再介绍redis.py的socket交互作用,redis合计在以下几点之间做出了妥胁

 public String set(final String key, String value) { StringBuilder sb = new StringBuilder(); //虽然输出的时候,会被转义,然而我们传送的时候还是要带上rn sb.append("*3").append("rn"); sb.append("$3").append("rn"); sb.append("SET").append("rn"); sb.append("$").append(key.length()).append("rn"); sb.append(key).append("rn"); sb.append("$").append(value.length()).append("rn"); sb.append(value).append("rn"); byte[] bytes= new byte[1024]; try { outputStream.write(sb.toString().getBytes()); inputStream.read(bytes); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return new String(bytes); }

3、请求

Redis接收由不同参数组成的命令。一旦收到命令,将会立刻被处理,并回复给客户端

创建事件轮询

Redis.c:920

  server.el = aeCreateEventLoop(); Ae.c:55 aeEventLoop *aeCreateEventLoop(void) {     aeEventLoop *eventLoop;     int i;      eventLoop = zmalloc(sizeof(*eventLoop));     if (!eventLoop) return NULL;     eventLoop->timeEventHead = NULL;     eventLoop->timeEventNextId = 0;     eventLoop->stop = 0;     eventLoop->maxfd = -1;     eventLoop->beforesleep = NULL;     if (aeApiCreate(eventLoop) == -1) {         zfree(eventLoop);         return NULL;     } /* Events with mask == AE_NONE are not set. So let's initialize  * the vector with it. */     for (i = 0; i < AE_SETSIZE; i++)         eventLoop->events[i].mask = AE_NONE;     return eventLoop; }

数字回复

数字回复和状态回复类似,只不过回复的token类型以:开头,其他过程和状态回复类似。不同在于客户端的解析要转换成数字类型。

*3 数组包含3个元素,分别是SET、eat、I want to eat$3 是一个字符串,且字符串长度为3SET 字符串的内容$3 是一个字符串,且字符串长度为3eat 字符串的内容$13 是一个字符串,且字符串长度为13I want to eat 字符串的内容

2、协议介绍

redis协议规范(Redis Protocol
specification)。

redis协议在以下几点之间做出了折衷:

  (1)简单的实现

  (2)快速地被计算机解析

  (3)简单得可以能被人工解析

  (4)网络层,Redis在TCP端口6379上监听到来的连接(本质就是socket),客户端连接到来时,Redis服务器为此创建一个TCP连接。在客户端与服务器端之间传输的每个Redis命令或者数据都以rn结尾。

初始化服务器配置

先来看看redis 的main函数的入口

Redis.c:1694

int main(int argc, char **argv) {     time_t start;      initServerConfig();     if (argc == 2) {         if (strcmp(argv[1], "-v") == 0 ||             strcmp(argv[1], "--version") == 0) version();         if (strcmp(argv[1], "--help") == 0) usage();         resetServerSaveParams();         loadServerConfig(argv[1]);     } else if ((argc > 2)) {         usage();     } else {         ...     }     if (server.daemonize) daemonize();     initServer();     ...
  • initServerConfig初始化全局变量 server 的属性为默认值。
  • 如果命令行指定了配置文件,
    resetServerSaveParams重置对落地备份的配置(即重置为默认值)并读取配置文件的内容对全局变量
    server 再进行初始化 ,没有在配置文件中配置的将使用默认值。
  • 如果服务器配置成后台执行,则对服务器进行 daemonize。
  • initServer初始化服务器,主要是设置信号处理函数,初始化事件轮询,起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,初始化虚拟内存和log等等。
  • 创建服务器监听端口。

Redis.c:923

    if (server.port != 0) {         server.ipfd= anetTcpServer(server.neterr,server.port,server.bindaddr);         if (server.ipfd == ANET_ERR) {             redisLog(REDIS_WARNING, "Opening port %d: %s",                 server.port, server.neterr);             exit(1);         }     }
  • anetTcpServer创建一个socket并进行监听,然后把返回的socket
    fd赋值给server.ipfd。

read_response

redispy中,调用PythonParser类的read_response方法来读取redis的数据。该方法又会相继调用_buffer对象的readline和read方法。后两者分别调用SocketBuffer类的_read_from_socket方法来读取socket。为了模拟从socket中读取数据,我们会修改_read_from_socket方法,使其读socket的数据改成从我们假设的缓冲区变量读取。

class Socket(object):

    def __init__(self, data):
        self.data = data

    def recv(self, length):
        data = self.data[:length]
        self.data=self.data[length:]
        return data

用我们定义的Socket类模拟网络数据流,其中recv方法则从data中返回数据。为了简化学习,我们暂时把所有错误的处理都忽略。

执行结果:

1、官网文档

准备写的数据内容

addReply函数一进来后就绑定写数据的回调函数,接下来就是准备写的数据内容

Networking.c:190

void addReply(redisClient *c, robj *obj) {     if (_installWriteEvent(c) != REDIS_OK) return;     redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);      /* This is an important place where we can avoid copy-on-write      * when there is a saving child running, avoiding touching the      * refcount field of the object if it's not needed.      *      * If the encoding is RAW and there is room in the static buffer      * we'll be able to send the object to the client without      * messing with its page. */     if (obj->encoding == REDIS_ENCODING_RAW) {         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)             _addReplyObjectToList(c,obj);     } else {         /* FIXME: convert the long into string and use _addReplyToBuffer()          * instead of calling getDecodedObject. As this place in the          * code is too performance critical. */         obj = getDecodedObject(obj);         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)             _addReplyObjectToList(c,obj);         decrRefCount(obj);     } }
  • 先尝试把要返回的内容添加到发送数据缓冲区中(redisClient->buf),如果该缓冲区的大小已经放不下这次想放进去的数据,或者已经有数据在排队(redisClient->reply
    链表不为空),则把数据添加到发送链表的尾部。

编码发送数据到redis服务,客户端完成了第一个交互过程,即请求的过程。接下来客户端还要接受并解析服务端的响应回复。这个过程我们需要将RESP协议编码的字节串解析成python的字串。

redis协议

4、新的统一请求协议

在这个统一协议里,发送给Redis服务端的所有参数都是二进制安全的。以下是通用形式:

*后面数量表示存在几个$
$后面数量表示字符串的长度

例子:

*3
$3
SET
$5
mykey
$7
myvalue

上面的命令看上去像是单引号字符串,所以可以在查询中看到每个字节的准确值:

"*3rn$3rnSETrn$5rnmykeyrn$7rnmyvaluern"

在Redis的回复中也使用这样的格式。批量回复时,这种格式用于每个参数$6rnmydatarn。
实际的统一请求协议是Redis用于返回列表项,并调用 Multi-bulk回复。
仅仅是N个以以*rn为前缀的不同批量回复,是紧随的参数(批量回复)数目。

接收请求

SocketBuffer

SocketBuffer类的主要职能就是把从socket中读取的数据,以bytes的方式存储到内存中。然后从内存中解析该数据。通过控制buffer的写入和写出的值,可以精确的设置什么时候从socket中读数据。

class SocketBuffer(object):
    def __init__(self, socket, socket_read_size):
        self._sock = socket
        self.socket_read_size = socket_read_size
        self._buffer = BytesIO()
        self.bytes_written = 0
        self.bytes_read = 0

    @property
    def length(self):
        return self.bytes_written - self.bytes_read

    def _read_from_socket(self, length=None):
        pass

    def purge(self):
        pass

    def read(self, length):
        pass

    def readline(self):
        pass

该类实例化的时候会初始化socket对象和_buffer对象,后者是BytesIO的实例,用于读取写入内存字节数据。

回到我们的测试代码中,一旦调用了on_connect方法,下面就是调用read_response方法。在该方法中,首先会调用_buffer对象的readline方法:

    def readline(self):
        buf = self._buffer
        buf.seek(self.bytes_read)
        data = buf.readline()
        # 处理包结束
        while not data.endswith(SYM_CRLF):
            self._read_from_socket()
            buf.seek(self.bytes_read)
            data = buf.readline()

        self.bytes_read += len(data)
        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

readline方法的主要功能就是从socket中读取一行数据。首先将bytes的指针seek到起始的位置。然后判断是否以CRLF结尾,即表示是否读取了redis的一个编码单位。如果尚未读取,就会调用_read_from_socket方法从socket缓冲区读取数据到内存缓冲区中。最后再从内存中读取一行数据到data变量中。

例如我们的例子中,redis返回的数据是b'+OKrn',此时会将所有数据都读取到BytesIO中,然后从BytesIO读取到data,最后返回+OK

下面再看read_response方法:

    def read_response(self):
        response = self._buffer.readline()

        byte, response = byte_to_chr(response[0]), response[1:]

        if byte not in ('-', '+', ':', '$', '*'):
            raise RedisError

        # server returned an error
        if byte == '-':
            response = nativestr(response)
            # 处理错误
            return response
        # single value
        elif byte == '+':
            pass
        # int value
        elif byte == ':':
            response = int(response)
        # bulk response
        elif byte == '$':
            length = int(response)
            if length == -1:
                return None
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]
        if isinstance(response, bytes) and self.encoding:
            response = response.decode(self.encoding)
        return response

该方法会读取stocketbuffer对象的返回,即上面的+OK。通过判断第一个字节的类型来判断回复的类型。此时比较简单,直接返回OK。错误回复也类似,直接把错误类型和错误信息返回即可。

 RedisClient redisClient = new RedisClient("127.0.0.1", 6379); String result = redisClient.set("eat", "please eat"); System.out.println(result);

6、模拟Redis服务和客户端通讯,实现RESP协议通信

枚举类

public enum CommandRedis {
    SET, GET, SETNX
}

实现类

package com.protocol;

import org.junit.Test;

import java.io.IOException;
import java.net.Proxy;
import java.net.Socket;

/**
 * Created by wxh on 2018/1/29.
 */
public class RedisClientByResp {

    private Socket socket;

    public RedisClientByResp() {
        try {
            socket = new Socket("192.168.56.180", 6379);
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("连接失败" + e.getMessage());
        }
    }

    /**
     * 设置值
     * @param key
     * @param value
     * @return
     * @throws IOException
     */
    public String set(String key, String value) throws IOException {
        StringBuilder sb = new StringBuilder();
        sb.append("*3").append("rn");
        sb.append("$").append(CommandRedis.SET.name().length()).append("rn");
        sb.append(CommandRedis.SET.name()).append("rn");
        // 注意中文汉字。一个汉字两个字节,长度为2
        sb.append("$").append(key.getBytes().length).append("rn");
        sb.append(key).append("rn");
        sb.append("$").append(value.getBytes().length).append("rn");
        sb.append(value).append("rn");
        System.out.println(sb.toString());

        socket.getOutputStream().write(sb.toString().getBytes());
        byte[] b = new byte[2048];
        socket.getInputStream().read(b);
        return new String(b);
    }

    /**
     * 获取值
     * @param key
     * @return
     * @throws Exception
     */
    public String get(String key) throws Exception {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("*2").append("rn");
        stringBuffer.append("$").append(CommandRedis.GET.name().length()).append("rn");
        stringBuffer.append(CommandRedis.GET).append("rn");
        stringBuffer.append("$").append(key.getBytes().length).append("rn");
        stringBuffer.append(key).append("rn");

        socket.getOutputStream().write(stringBuffer.toString().getBytes());
        byte[] b = new byte[2048];
        socket.getInputStream().read(b);
        return new String(b);
    }

    /**
     * 设置值:不会覆盖存在的值
     * @param key
     * @param value
     * @return
     * @throws Exception
     */
    public String setnx(String key, String value) throws Exception {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("*3").append("rn");
        stringBuffer.append("$").append(CommandRedis.SETNX.name().length()).append("rn");
        stringBuffer.append(CommandRedis.SETNX.name()).append("rn");
        stringBuffer.append("$").append(key.getBytes().length).append("rn");
        stringBuffer.append(key).append("rn");
        stringBuffer.append("$").append(value.getBytes().length).append("rn");
        stringBuffer.append(value).append("rn");

        socket.getOutputStream().write(stringBuffer.toString().getBytes());
        byte[] b = new byte[2048];
        socket.getInputStream().read(b);
        return new String(b);
    }

    public static void main(String[] args) throws Exception {
        System.out.println(new RedisClientByResp().set("mykey" ,"myvalue"));
        System.out.println(new RedisClientByResp().get("mykey"));
    }
}

 

本文分析源码基于
Redis 2.4.7
stable 版本。

多批量回复

多批量回复以*开头,这个编码格式和请求的命令一样。多个字节串分别编码,然后再和*参数数结合。例如下面一个回复样式:

*3rn$3rn777rn$6rnxe4xbdxa0xe5xa5xbdrn$5rnhellorn

再看read_response中解析多批量回复的代码:

elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]

一旦是多批量回复,因为*后跟着返回的参数个数,而这些参数个数的编码和批量回复的一模一样。既然如此,那么递归调用read_response,再解析出来的批量回复组合起来即可。

set协议:

5、回复

Redis用不同的回复类型回复命令。它可能从服务器发送的第一个字节开始校验回复类型:

  (1)用单行回复,回复的第一个字节将是“+”

  (2)错误消息,回复的第一个字节将是“-”

  (3)整型数字,回复的第一个字节将是“:”

  (4)批量回复,回复的第一个字节将是“$”

  (5)多个批量回复,回复的第一个字节将是“*”

例子:状态回复(或者单行回复)

以“+”开始以“rn”结尾的单行字符串形式。例如:

"+OKrn"

客户端库将在“+”后面返回所有数据,正如上例中字符串“OK”一样。

有关与其他的操作请查看官方文档。

Multibulk请求协议

Multibulk协议比inline协议复杂,它是二进制安全的,即传送数据可以包含不安全字符。Inline协议不是二进制安全的,比如,如果
set key
value命令中的key或value包含空白字符,那么inline协议解析时将会失败,因为解析出来的参数个数与命令需要的的参数个数会不一致。

协议格式

*<number of arguments> CR LF $<number of bytes of argument 1> CR LF <argument data> CR LF ... $<number of bytes of argument N> CR LF <argument data> CR LF

协议举例

*3 $3 SET $5 mykey $7 myvalue

具体解析代码位于

Networking.c:731

int processMultibulkBuffer(redisClient *c) { ... }

详细解析见协议详解

特殊类型回复

RESP的回复我们都介绍了,所谓的特殊。是数据情况特别的时候,比如返回空字符串的时候,token会是0,返回Nil值的时候,token可能是-1。具体这些情况,可以参考官方文档的案例。

 public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(6379); Socket socket = server.accept(); byte[] chars = new byte[64]; socket.getInputStream().read(chars); System.out.println(new String(chars)); }

总结

框架从启动,接收请求,读取客户端数据,请求协议解析,处理命令,回复请求,退出对redis运行的整个流程做了一个梳理。对整个redis的运作和框架有了一个初步的了解。

状况回复

从前面的RESP协议可以得知,状态回复以+开头,后面跟着状态消息,最后以CRLF结束。

测试的代码如下:

    data = b'+OKrn'
    pp = PythonParser(socket_read_size=65536)
    pp.on_connect(data)
    print(pp.read_response())

打印的结果为b'OK'。我们先看下PythonParser类的定义。

class PythonParser(object):
    encoding = None

    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        self._sock = None
        self._buffer = None

PythonParser类定义了读取socket的数据大小,已经socket对象和buffer对象。

再看on_connect方法,主要是初始化了我们假定的Socket对象和SocketBuffer对象。

    def on_connect(self, data):
        self._sock = Socket(data)
        self._buffer = SocketBuffer(self._sock, self.socket_read_size)

相关文章

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图