问题

在看[Redis in Action]这本书的时候,官方虽然提供了java代码,但是他是用jedis实现的。本着练手和学习的目的打算在spring boot中使用spring-boot-starter-data-redis重新写一遍。然而在进行到第四章讲到multiexec的时候就出现了问题,举个简单的例子:

redisTemplate.opsForHash().put("joker", "age", "27");
redisTemplate.watch("joker");
redisTemplate.multi();
redisTemplate.opsForHash().put("joker", "pet", "beibei");
redisTemplate.exec();

运行这段代码,程序就会给出Caused by: org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR EXEC without MULTI错误,但是我明明执行multi()了呀~

原因

遇到问题,第一部当然是去问google,但是现在搜出来的结果很多都是抄的,而且很多抄的还是驴唇不对马嘴~
也不知道咋回事,我记得以前google的搜索结果不是这样的~

我们一层一层的剥开,可以找到这么一个干实事的函数:

    /**
     * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can
     * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios).
     *
     * @param <T> return type
     * @param action callback object to execute
     * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
     * @param pipeline whether to pipeline or not the connection for the execution
     * @return object returned by the action
     */
    @Nullable
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) &#123;

        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");

        RedisConnectionFactory factory = getRequiredConnectionFactory();
        RedisConnection conn = null;
        try &#123;
            // 1
            if (enableTransactionSupport) &#123;
                // only bind resources in case of potential transaction synchronization
                conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
            &#125; else &#123;
                conn = RedisConnectionUtils.getConnection(factory);
            &#125;

            boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);

            RedisConnection connToUse = preProcessConnection(conn, existingConnection);

            boolean pipelineStatus = connToUse.isPipelined();
            if (pipeline && !pipelineStatus) &#123;
                connToUse.openPipeline();
            &#125;

            RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
            T result = action.doInRedis(connToExpose);

            // close pipeline
            if (pipeline && !pipelineStatus) &#123;
                connToUse.closePipeline();
            &#125;

            // TODO: any other connection processing?
            return postProcessResult(result, connToUse, existingConnection);
        &#125; finally &#123;
            RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
        &#125;
    &#125;

在代码1处,可以看到有enableTransactionSupport这么一个参数,看一下他的值是false的话,那么会重新拿一个连接(而且他的默认值还就是false),这也就解释了为啥我们明明执行multi了,但是还没说我们在exec前没有multi~
但是,如果enableTransactionSupport的值是true呢,他又干了啥呢?我们一路点进去,找到了这么一个函数:

    /**
     * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current
     * thread, for example when using a transaction manager. Will create a new Connection otherwise, if
     * &#123;@code allowCreate&#125; is <tt>true</tt>.
     *
     * @param factory connection factory for creating the connection.
     * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the
     *          current thread.
     * @param bind binds the connection to the thread, in case one was created-
     * @param transactionSupport whether transaction support is enabled.
     * @return an active Redis connection.
     */
    public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
            boolean transactionSupport) &#123;

        Assert.notNull(factory, "No RedisConnectionFactory specified");
        // 1
        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

        if (connHolder != null) &#123; // 2
            if (transactionSupport) &#123;
                potentiallyRegisterTransactionSynchronisation(connHolder, factory); // 3
            &#125;
            return connHolder.getConnection();
        &#125;

        if (!allowCreate) &#123;
            throw new IllegalArgumentException("No connection found and allowCreate = false");
        &#125;

        if (log.isDebugEnabled()) &#123;
            log.debug("Opening RedisConnection");
        &#125;

        RedisConnection conn = factory.getConnection(); // 4

        if (bind) &#123;

            RedisConnection connectionToBind = conn;
            if (transactionSupport && isActualNonReadonlyTransactionActive()) &#123;
                connectionToBind = createConnectionProxy(conn, factory);
            &#125;

            connHolder = new RedisConnectionHolder(connectionToBind); 

            TransactionSynchronizationManager.bindResource(factory, connHolder);// 5
            if (transactionSupport) &#123; 
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            &#125;

            return connHolder.getConnection(); // 8
        &#125;

        return conn;
    &#125;

说明:

  1. 这里有一个新的东西:TransactionSynchronizationManager,这是由spring提供的,他里面有一个叫resources的成员,他是一个ThreadLocal。所以这一行代码,就很清楚了,他是去拿到跟当前线程绑定的连接。
  2. 这里就是判断啊,当前线程是否绑定了这么一个连接。
  3. 如果拿到了跟当前线程绑定的连接,且enableTransactionSupport的值是true,那么需要做一些操作~ 不过这些操作是同spring的事务相关的,在我们的代码中,不会执行~
  4. 但是,我们第一次执行啊,好像没有给当前线程绑定过连接,所以上一步是执行不到的~ 这里创建一个连接~
  5. 然后,在这里,我们把当前线程和连接绑定起来~

所以,综上,为啥我们的代码不对呢,因为RedisTemplate默认是不开启事务支持的,而且在执行exec方法时,会重新创建一个连接对象(或者从当前线程的ThreadLocal中拿到上一次绑定的连接)。所以,我们在不开启事务的情况下,自己在外面执行的multi方法时完全不会生效的(因为连接对象都换了)~

解决

看到这,原因既然已经知道了,那么自然就迎刃而解了~
最简单的方式,既然默认是不开启事务支持的,那么我们手动把他打开不就好了~
执行: redisTemplate.setEnableTransactionSupport(true);即可~

可能有些地方描述的不是很清楚,我们还是拿我们的例子来说,还是上面那段代码:

redisTemplate.opsForHash().put("joker", "age", "27"); // 1
redisTemplate.setEnableTransactionSupport(true); // 2
redisTemplate.watch("joker"); // 3
redisTemplate.multi(); // 4
redisTemplate.opsForHash().put("joker", "pet", "beibei"); // 5
redisTemplate.exec(); // 6

说明:

  1. 初始化一条数据~
  2. 开始事务支持
  3. watch一个key,同时在这一步执行时,会创建一个新的连接并与当前线程绑定~
  4. 执行multi,这里会拿到上一步与当前线程绑定的连接,并通过该连接调用multi方法~
  5. 再加一条数据~
  6. 执行exec方法,同样是拿到与线程绑定的连接后,通过该连接执行exec方法~ 因为该连接已经执行了watchmulti,所以在此之前,对应的key如果发生变化,那么,不会执行成功,我们的目的也就达到了~

不过,这种方法还有一个问题,大家可以顺着源代码继续往下捋~ 会发现,与当前线程绑定的连接不会解绑,更不会被close~
所以,感觉RedisTemplate提供的SessionCallback才是正解~

redisTemplate.execute(new SessionCallback<List<Object>>() &#123;
    public List<Object> execute(RedisOperations operations) throws DataAccessException &#123;
        operations.watch("joker");
        operations.multi();
        operations.opsForHash().put("joker", "pet", "beibei");
        return operations.exec();
    &#125;
&#125;);

RedisTemplatepublic <T> T execute(SessionCallback<T> session)方法,会在finally中调用RedisConnectionUtils.unbindConnection(factory);来解除执行过程中与当前线程绑定的连接,并在随后关闭连接。