XMemcached使用经历

  XMemcached就是Memcached的java客户端之一,目前项目里用到了。据说它比起其他的java客户端从性能上要好一点,实现方式是NIO的。先看怎么实例化出来一个Memcached客户端吧:

    public static MilletMemcacheClient getIstance(String configPath) throws IOException {
        try {
            if (instance == null) {
                synchronized (MilletMemcacheClient.class) {
                    if (instance == null) {
                        if (null == memEntity) {
                            logger.info("Load path:[ {} ] on etcd!",configPath);
                            ClientConfig config = ServiceDiscovery.getSingleton().read(configPath);                            
                            memEntity = new MilletMemcacheClientBuilder(AddrUtil.getAddressMap(StringUtils.trim(config.getIps())));
                            memEntity.setConnectionPoolSize(config.getLinksNum());
                            memEntity.setSessionLocator(new KetamaMemcachedSessionLocator());
                        }
                        MemcachedClient client = memEntity.build();
                        if (null != client) {
                            client.setMergeFactor(MERGE_FACTOR);
                            client.setConnectTimeout(CONNECT_TIMEOUT);
                        }
                        instance = new MilletMemcacheClient(client);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Init memcache client failed!", e);
            throw e;
        }
        return instance;
    }

  这里的memEntity继承了net.rubyeye.xmemcached.XMemcachedClientBuilder,而它的build方式具体实现是这样的:

    /*
     * (non-Javadoc)
     * 
     * @see net.rubyeye.xmemcached.MemcachedClientBuilder#build()
     */
    public MemcachedClient build() throws IOException {
        XMemcachedClient memcachedClient;
        // kestrel protocol use random session locator.
        if (this.commandFactory.getProtocol() == Protocol.Kestrel) {
            if (!(this.sessionLocator instanceof RandomMemcachedSessionLocaltor)) {
                log.warn("Recommend to use `net.rubyeye.xmemcached.impl.RandomMemcachedSessionLocaltor` as session locator for kestrel protocol.");
            }
        }
        if (this.weights == null) {
            memcachedClient = new XMemcachedClient(this.sessionLocator,
                    this.bufferAllocator, this.configuration,
                    this.socketOptions, this.commandFactory, this.transcoder,
                    this.addressMap, this.stateListeners, this.authInfoMap,
                    this.connectionPoolSize, this.connectTimeout, this.name,
                    this.failureMode); 
        } else {
            if (this.addressMap == null) {
                throw new IllegalArgumentException("Null Address map");
            }
            if (this.addressMap.size() > this.weights.length) {
                throw new IllegalArgumentException(
                        "Weights Array's length is less than server's number");
            }
            memcachedClient = new XMemcachedClient(this.sessionLocator,
                    this.bufferAllocator, this.configuration,
                    this.socketOptions, this.commandFactory, this.transcoder,
                    this.addressMap, this.weights, this.stateListeners,
                    this.authInfoMap, this.connectionPoolSize,
                    this.connectTimeout, this.name, this.failureMode);         }
        this.configureClient(memcachedClient);
        return memcachedClient;
    }

  我们可以看到这里需要配置一些参数,memcached的客户端就是根据这些参数来进行实例化的。当然大部分参数实际上都有默认值。这里最主要的是拿到memcached的地址和连接数。

  接着看具体操作,在这之前先看个序列化接口。我们存入和取出memcached的对象都是二进制的,那么必然要进行序列化,XMemcached给了我们接口

/** <a href="http://www.cpupk.com/decompiler">Eclipse Class Decompiler</a> plugin, Copyright (c) 2017 Chen Chao. */
// Copyright (c) 2006  Dustin Sallings <dustin@spy.net>
package net.rubyeye.xmemcached.transcoders;

/**
 * Transcoder is an interface for classes that convert between byte arrays and
 * objects for storage in the cache.
 */
public interface Transcoder<T> {

    /**
     * Encode the given object for storage.
     * 
     * @param o
     *            the object
     * @return the CachedData representing what should be sent
     */
    CachedData encode(T o);

    /**
     * Decode the cached object into the object it represents.
     * 
     * @param d
     *            the data
     * @return the return value
     */
    T decode(CachedData d);

    /**
     * Set whether store primitive type as string.
     * 
     * @param primitiveAsString
     */
    public void setPrimitiveAsString(boolean primitiveAsString);

    /**
     * Set whether pack zeros
     * 
     * @param primitiveAsString
     */
    public void setPackZeros(boolean packZeros);
    /**
     * Set compression threshold in bytes
     * @param to
     */
    public void setCompressionThreshold(int to);

    /**
     * Returns if client stores primitive type as string.
     * @return
     */
    public boolean isPrimitiveAsString();

    /**
     * Returns if transcoder packs zero.
     * @return
     */
    public boolean isPackZeros();
    
    /**
     * Set compress mode,default is ZIP
     * @see CompressionMode
     * @param compressMode
     */
    public void setCompressionMode(CompressionMode compressMode);
}

  既然给了接口,就是允许我们自己扩展的,当然如果不扩展,那么也可以用XMemcached自己的默认实现。下面看下存和取怎么实现

    public <T> boolean add(String key, int exp, T value, Transcoder<T> transcoder, long timeout)
        throws MemcachedException
    {
        try
        {
            return mc.add(key, exp, value, transcoder, timeout);
        }
        catch (TimeoutException e)
        {
            onTimeoutException(OPERATE_ADD, key, e);
        }
        catch (InterruptedException e)
        {
            onInterruptedException(OPERATE_ADD, key, e);
        }

        return false;
    }

  这里的mc是一个接口,统一封装的XMemcached的操作,实际调用的是MemcachedClient的add方法

    /*
     * (non-Javadoc)
     * 
     * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int, T,
     * net.rubyeye.xmemcached.transcoders.Transcoder, long)
     */
    public final <T> boolean add(String key, final int exp, final T value,
            final Transcoder<T> transcoder, final long timeout)
            throws TimeoutException, InterruptedException, MemcachedException {
        key = this.preProcessKey(key);
        return this.add0(key, exp, value, transcoder, timeout);
    }
    private <T> boolean add0(String key, int exp, T value,
            Transcoder<T> transcoder, long timeout)
            throws InterruptedException, TimeoutException, MemcachedException {
        byte[] keyBytes = this.checkStoreArguments(key, exp, value);
        return this.sendStoreCommand(this.commandFactory.createAddCommand(key,
                keyBytes, exp, value, false, transcoder), timeout);
    }
    private final <T> boolean sendStoreCommand(Command command, long timeout)
            throws InterruptedException, TimeoutException, MemcachedException {

        final Session session = this.sendCommand(command);
        if (!command.isNoreply()) {
            this.latchWait(command, timeout, session);
            command.getIoBuffer().free();
            this.checkException(command);
            if (command.getResult() == null) {
                throw new MemcachedException(
                        "Operation fail,may be caused by networking or timeout");
            }
        } else {
            return false;
        }
        return (Boolean) command.getResult();
    }
    private final <T> byte[] checkStoreArguments(final String key,
            final int exp, final T value) {
        byte[] keyBytes = ByteUtils.getBytes(key);
        ByteUtils.checkKey(keyBytes);
        if (value == null) {
            throw new IllegalArgumentException("value could not be null");
        }
        if (exp < 0) {
            throw new IllegalArgumentException(
                    "Expire time must be greater than or equal to 0");
        }
        return keyBytes;
    }

  我们看到序列化方式作为参数transcoder被带进去了,针对key有checkKey校验,针对并发有latchWait限制。存的话返回一个布尔值,取直接返回对象

    public <T> T get(String key, long timeout, Transcoder<T> transcoder) throws MemcachedException
    {
        try
        {
            try
            {
                readController.beginTransaction();
            }
            catch (InterruptedException e)
            {
                onInterruptedException(OPERATE_GET, key, e);
            }

            try
            {
                return mc.get(key, timeout, transcoder);
            }
            catch (TimeoutException e)
            {
                onTimeoutException(OPERATE_GET, key, e);
            }
            catch (InterruptedException e)
            {
                onInterruptedException(OPERATE_GET, key, e);
            }
        }
        finally
        {
            readController.finishTransaction();
        }
        return null;
    }
    /*
     * (non-Javadoc)
     * 
     * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String, long,
     * net.rubyeye.xmemcached.transcoders.Transcoder)
     */
    @SuppressWarnings("unchecked")
    public final <T> T get(final String key, final long timeout,
            final Transcoder<T> transcoder) throws TimeoutException,
            InterruptedException, MemcachedException {
        return (T) this.get0(key, timeout, CommandType.GET_ONE, transcoder);
    }
    private <T> Object get0(String key, final long timeout,
            final CommandType cmdType, final Transcoder<T> transcoder)
            throws TimeoutException, InterruptedException, MemcachedException {
        key = this.preProcessKey(key);
        byte[] keyBytes = ByteUtils.getBytes(key);
        ByteUtils.checkKey(keyBytes);
        return this.fetch0(key, keyBytes, cmdType, timeout, transcoder);
    }
    @SuppressWarnings("unchecked")
    private final <T> Object fetch0(final String key, final byte[] keyBytes,
            final CommandType cmdType, final long timeout,
            Transcoder<T> transcoder) throws InterruptedException,
            TimeoutException, MemcachedException, MemcachedException {
        final Command command = this.commandFactory.createGetCommand(key,
                keyBytes, cmdType, this.transcoder);
        this.latchWait(command, timeout, this.sendCommand(command));
        command.getIoBuffer().free(); // free buffer
        this.checkException(command);
        CachedData data = (CachedData) command.getResult();
        if (data == null) {
            return null;
        }
        if (transcoder == null) {
            transcoder = this.transcoder;
        }
        if (cmdType == CommandType.GETS_ONE) {
            return new GetsResponse<T>(data.getCas(), transcoder.decode(data));
        } else {
            return transcoder.decode(data);
        }
    }

  我们看到执行memcached命令取到数据后执行了反序列化,把二进制变成了对象。而且这里在读取之前还加了事务控制,XMemcached是没有事务支持的,所以必须自己实现。同存操作,我们看到取操作也有针对key有checkKey校验,针对并发有latchWait限制。最后看下CAS的实现:

    public boolean cas(String key, int exp, Object value, long cas) throws MemcachedException
    {
        try
        {
            return mc.cas(key, exp, value, cas);
        }
        catch (TimeoutException e)
        {
            onTimeoutException(OPERATE_CAS, key, e);
        }
        catch (InterruptedException e)
        {
            onInterruptedException(OPERATE_CAS, key, e);
        }

        return false;
    }
    /*
     * (non-Javadoc)
     * 
     * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
     * java.lang.Object, long)
     */
    public final boolean cas(final String key, final int exp,
            final Object value, final long cas) throws TimeoutException,
            InterruptedException, MemcachedException {
        return this.cas(key, exp, value, this.opTimeout, cas);
    }
    /*
     * (non-Javadoc)
     * 
     * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
     * java.lang.Object, long, long)
     */
    @SuppressWarnings("unchecked")
    public final boolean cas(final String key, final int exp,
            final Object value, final long timeout, final long cas)
            throws TimeoutException, InterruptedException, MemcachedException {
        return this.cas(key, exp, value, this.transcoder, timeout, cas);
    }
    /*
     * (non-Javadoc)
     * 
     * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, T,
     * net.rubyeye.xmemcached.transcoders.Transcoder, long, long)
     */
    public final <T> boolean cas(String key, final int exp, final T value,
            final Transcoder<T> transcoder, final long timeout, final long cas)
            throws TimeoutException, InterruptedException, MemcachedException {
        key = this.preProcessKey(key);
        byte[] keyBytes = this.checkStoreArguments(key, 0, value);
        return this.sendStoreCommand(this.commandFactory.createCASCommand(key,
                keyBytes, exp, value, cas, false, transcoder), timeout);
    }
    private final <T> boolean sendStoreCommand(Command command, long timeout)
            throws InterruptedException, TimeoutException, MemcachedException {

        final Session session = this.sendCommand(command);
        if (!command.isNoreply()) {
            this.latchWait(command, timeout, session);
            command.getIoBuffer().free();
            this.checkException(command);
            if (command.getResult() == null) {
                throw new MemcachedException(
                        "Operation fail,may be caused by networking or timeout");
            }
        } else {
            return false;
        }
        return (Boolean) command.getResult();
    }

  客户端的CAS并无特别之处,就跟执行ADD命令一样返回执行是否成功,只是多传了一个cas参数用于版本号的控制。跟存取操作一样,针对key有checkKey校验,针对并发有latchWait限制。

    public static final void checkKey(final byte[] keyBytes) {

        if (keyBytes.length > ByteUtils.maxKeyLength) {
            throw new IllegalArgumentException("Key is too long (maxlen = "
                    + ByteUtils.maxKeyLength + ")");
        }
        // Validate the key
        if (memcachedProtocol == Protocol.Text || testing) {
            for (byte b : keyBytes) {
                if (b == ' ' || b == '\n' || b == '\r' || b == 0) {
                    try {
                        throw new IllegalArgumentException(
                                "Key contains invalid characters:  "
                                        + new String(keyBytes, "utf-8"));

                    } catch (UnsupportedEncodingException e) {
                    }
                }

            }
        }
    }
private static int maxKeyLength = 250;

  缓存KEY的字节数控制不能超过250。并发用闭锁控制,闭锁的创建和释放分别在Command对象的创建和Session对象的销毁,而锁住的动作则在执行命令之前:

  TextCommandFactory类:创建命令和闭锁,允许通过闭锁的线程数为1

    /*
     * (non-Javadoc)
     * 
     * @see
     * net.rubyeye.xmemcached.CommandFactory#createGetCommand(java.lang.String,
     * byte[], net.rubyeye.xmemcached.command.CommandType)
     */
    @SuppressWarnings("unchecked")
    public final Command createGetCommand(final String key,
            final byte[] keyBytes, final CommandType cmdType,
            Transcoder transcoder) {
        return new TextGetOneCommand(key, keyBytes, cmdType,
                new CountDownLatch(1));
    }

   XMemcachedClient类:锁住,除非缓存命令操作结束,否则不允许其他线程进入;超时则取消命令

    protected void latchWait(final Command cmd, final long timeout,
            final Session session) throws InterruptedException,
            TimeoutException {
        if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
            AtomicInteger counter = this.getContinuousTimeoutCounter(session);
            // reset counter.
            if (counter.get() > 0) {
                counter.set(0);
            }
        } else {
            cmd.cancel();
            AtomicInteger counter = this.getContinuousTimeoutCounter(session);
            if (counter.incrementAndGet() > this.timeoutExceptionThreshold) {
                log.warn(session
                        + " exceeded continuous timeout threshold,we will close it.");
                try {
                    // reset counter.
                    counter.set(0);
                    session.close();
                } catch (Exception e) {
                    // ignore it.
                }
            }
            throw new TimeoutException(
                    "Timed out("
                            + timeout
                            + " milliseconds) waiting for operation while connected to "
                            + session);
        }
    }

    private AtomicInteger getContinuousTimeoutCounter(final Session session) {
        AtomicInteger counter = (AtomicInteger) session
                .getAttribute(CONTINUOUS_TIMEOUT_COUNTER);
        if (counter == null) {
            counter = new AtomicInteger(0);
            AtomicInteger oldCounter = (AtomicInteger) session
                    .setAttributeIfAbsent(CONTINUOUS_TIMEOUT_COUNTER, counter);
            if (oldCounter != null) {
                counter = oldCounter;
            }
        }
        return counter;
    }

   MemcachedHandler类:命令完成,销毁session

    public void destroy() {
        Command command = this.currentCommand.get();    
        if (command != null) {
            command.setException(new MemcachedException(
                    "Session has been closed"));
            CountDownLatch latch = command.getLatch();
            if (latch != null) {
                latch.countDown();
            }
        }
        while ((command = this.commandAlreadySent.poll()) != null) {
            command.setException(new MemcachedException(
                    "Session has been closed"));
            CountDownLatch latch = command.getLatch();
            if (latch != null) {
                latch.countDown();
            }
        }

    }

   最后再看一个实例化memcache缓存客户端、操作缓存的例子:

package com.wulinfeng.memcache.view.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.ResourceUtils;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.wulinfeng.memcache.view.bean.MemcacheViewBean;
import com.wulinfeng.memcache.view.util.HttpClientUtil;
import com.wulinfeng.memcache.view.util.PropertiesConfigUtil;

/**
 * memcache展示业务逻辑类
 *
 * @author wulinfeng
 * @version C10 2017年11月21日
 * @since SDP V300R003C10
 */
@Service
public class MemcacheViewService
{
    private static Logger LOGGER = LogManager.getLogger(MemcacheViewService.class);
    
    private static List<MemcacheViewBean> memcacheCategoryList = new ArrayList<>();
    
    private static Map<String, String> cacheDataMap = new HashMap<>();
    
    private static final String CONFIG_FILE = "/memcache_config.xml";
    
    private static final String TEST_URL = PropertiesConfigUtil.getProperty("test_url");
    
    /**
     * 启动加载缓存信息
     *
     * @author wulinfeng
     */
    public static void init()
    {
        // 1、加载缓存分类列表
        initializeCacheList();
        
        // 2、加载缓存客户端实例
        getMemcacheClient();
    }
    
    /**
     * 获取缓存分类列表
     *
     * @author wulinfeng
     * @return
     */
    public MemcacheViewBean[] getMemcacheCategory()
    {
        return (MemcacheViewBean[])(memcacheCategoryList.toArray(new MemcacheViewBean[memcacheCategoryList.size()]));
    }
    
    /**
     * 针对缓存的查、删、改
     *
     * @author wulinfeng
     * @param operationType
     * @param key2
     * @return
     * @throws JsonProcessingException
     */
    public String doMemcached(String operationType, String cacheName, String key)
    {
        String result = null;
        XMemcachedClient client = XMemcachedClient.getMemcachedClient();
        switch (operationType)
        {
            case "1":// 根据key查缓存
                Object obj = client.getObject(key);
                result = (String)obj;
                break;
            case "2":// 根据key删缓存
                if (client.deleteObject(key))
                {
                    result = "{\"msg\":\"delete sucess\"}";
                }
                break;
            case "3":// 根据缓存名找到查数据接口名,调用接口获取数据
                String dataLoaderName = cacheDataMap.get(cacheName);
                if (StringUtils.isNotEmpty(dataLoaderName))
                {
                    result = HttpClientUtil.sendRequestByGetAsync(TEST_URL + dataLoaderName);
                }
                if (result != null)
                {
                    client.addObject(key, result, 0);
                }
                break;
            default:
                ;
        }
        
        return result;
    }
    
    /**
     * 加载缓存客户端
     *
     * @author wulinfeng
     * @return
     */
    private static void getMemcacheClient()
    {
        String mcListStr = PropertiesConfigUtil.getProperty("memcache.server", "");
        
        if (StringUtils.isEmpty(mcListStr))
        {
            LOGGER.error("Get MemcacheClient Error ! mcListStr is null !");
            return;
        }
        String[] mcList = mcListStr.split(",");
        try
        {
            XMemcachedClient.getInstance(PropertiesConfigUtil.getInt("memcache.linksNum", 1), mcList);
        }
        catch (IOException e)
        {
            LOGGER.error("Get MemcacheClient Error ! e:" + e);
            e.printStackTrace();
        }
    }
    
    /**
     * 加载缓存分类列表
     *
     * @author wulinfeng
     */
    private static void initializeCacheList()
    {
        memcacheCategoryList.clear();
        cacheDataMap.clear();
        try
        {
            DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder builder;
            builder = builderFactory.newDocumentBuilder();
            String basePath = ResourceUtils.getFile("classpath:").getPath();
            String configPath = basePath + CONFIG_FILE;
            Document doc = builder.parse(configPath);
            NodeList itemNodes = doc.getElementsByTagName("item");
            
            for (int itemIndex = 0; itemIndex < itemNodes.getLength(); itemIndex++)
            {
                Element itemElem = (Element)itemNodes.item(itemIndex);
                NodeList childList = itemElem.getChildNodes();
                String cacheName = "";
                
                MemcacheViewBean memcacheViewBean = new MemcacheViewBean();
                for (int i = 0; i < childList.getLength(); i++)
                {
                    Node childNode = childList.item(i);
                    
                    if (childNode.getNodeType() == Node.ELEMENT_NODE)
                    {
                        Element element = (Element)childNode;
                        String tagName = element.getTagName();
                        String textContent = element.getTextContent().trim();
                        
                        switch (tagName)
                        {
                            case "cacheName":
                                cacheName = textContent;
                                memcacheViewBean.setCacheName(textContent);
                                break;
                            case "keyProfix":
                                memcacheViewBean.setKeyProfix(textContent);
                                break;
                            case "supportRefresh":
                                memcacheViewBean.setSupportRefresh(textContent.equals("1"));
                                break;
                            case "dataLoaderName":
                                cacheDataMap.put(cacheName, textContent);
                                break;
                            default:
                            
                        }
                    }
                }
                
                memcacheCategoryList.add(memcacheViewBean);
            }
        }
        catch (Throwable e)
        {
            LOGGER.error("MemcacheViewService.init failed, e: ", e);
        }
    }
}
package com.wulinfeng.memcache.view.service;

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.rubyeye.xmemcached.GetsResponse;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.utils.AddrUtil;

/**
 * memcache缓存客户端
 *
 * @author wulinfeng
 * @version C10 2017年11月22日
 * @since SDP V300R003C10
 */
public class XMemcachedClient
{
    
    private final static Logger LOGGER = LoggerFactory.getLogger(XMemcachedClient.class);
    
    /**
     * 批量请求,每批次请求数量
     */
    private final static int MERGE_FACTOR = 20;
    
    /**
     * 链接超时时间
     */
    private final static int CONNECT_TIMEOUT = 500;
    
    /**
     * 超时时间的配置 单位:毫秒
     */
    protected static final long REQUEST_TIME_OUT = 1000;
    
    /**
     * 步长
     */
    private final long STEP = 1L;
    
    /**
     * 默认值
     */
    private final long DEFAULTVALUE = 0;
    
    private static XMemcachedClientBuilder memEntity;
    
    private static volatile XMemcachedClient instance;
    
    private MemcachedClient mc;
    
    private XMemcachedClient(MemcachedClient memcachedClient)
        throws IOException
    {
        this.mc = memcachedClient;
    }
    
    /**
     * 实例化客户端,单例模式
     *
     * @author wulinfeng
     * @param linksNum
     * @param memcacheIp
     * @return
     * @throws IOException
     */
    public static XMemcachedClient getInstance(int linksNum, String... memcacheIp)
        throws IOException
    {
        try
        {
            if (instance == null)
            {
                synchronized (XMemcachedClient.class)
                {
                    if (instance == null)
                    {
                        if (0 != memcacheIp.length)
                        {
                            memEntity = new XMemcachedClientBuilder(
                                AddrUtil.getAddressMap(StringUtils.trim(array2string(memcacheIp))));
                            if (0 != linksNum)
                            {
                                memEntity.setConnectionPoolSize(linksNum);
                            }
                        }
                        
                        // 节点挂掉返回失败,不会自动路由到其他节点
                        memEntity.setFailureMode(true);
                        
                        // 选择一致性哈希算法
                        memEntity.setSessionLocator(new KetamaMemcachedSessionLocator());
                        MemcachedClient client = memEntity.build();
                        if (null != client)
                        {
                            client.setMergeFactor(MERGE_FACTOR);
                            client.setConnectTimeout(CONNECT_TIMEOUT);
                        }
                        client.setEnableHeartBeat(true);
                        client.setEnableHealSession(true);
                        
                        // 网络关闭,失败,重试间隔时间间隔,10毫秒
                        client.setHealSessionInterval(10);
                        instance = new XMemcachedClient(client);
                    }
                }
            }
        }
        catch (Exception e)
        {
            LOGGER.error("Init memcache client failed!", e);
            throw e;
        }
        return instance;
    }
    
    /**
     * 获取缓存客户端实例给外部
     *
     * @author wulinfeng
     * @return
     */
    public static XMemcachedClient getMemcachedClient()
    {
        return instance;
    }
    
    /**
     * 根据key值从IMemcachedClient获取缓存对象
     *
     * @param key 缓存的Key
     * @return 缓存对象
     */
    public Object getObject(String key)
    {
        try
        {
            Object result = mc.get(key, REQUEST_TIME_OUT);
            return result;
        }
        catch (Exception e)
        {
            LOGGER.error("get memcached faield : ", key, e);
        }
        return null;
    }
    
    /**
     * 根据key值从IMemcachedClient获取缓存对象
     *
     * @param key 缓存的Key
     * @param obj Object
     * @param time int
     * @return 缓存对象
     */
    public boolean addObject(String key, Object obj, int time)
    {
        // 是否添加成功
        boolean rst = false;
        int expire = 0;
        if (0 != time)
        {
            expire = (int)(System.currentTimeMillis() / 1000) + time;
        }
        try
        {
            rst = mc.add(key, expire, obj, REQUEST_TIME_OUT);
        }
        catch (Exception e)
        {
            LOGGER.error("add memcached faield : ", key, e);
        }
        return rst;
    }
    
    /**
     * 根据key值从IMemcachedClient获取缓存对象
     *
     * @param key 缓存的Key
     * @return 缓存对象
     */
    public Object getsObject(String key)
    {
        try
        {
            GetsResponse result = mc.gets(key, REQUEST_TIME_OUT);
            return result;
        }
        catch (Exception e)
        {
            LOGGER.error("gets memcached faield : ", key, e);
        }
        return null;
    }
    
    /**
     * 根据key值从IMemcachedClient删除缓存对象
     *
     * @param key 缓存的Key
     * @return 是否成功删除
     */
    public boolean deleteObject(String key)
    {
        try
        {
            boolean result = mc.delete(key, REQUEST_TIME_OUT);
            return result;
        }
        catch (Exception e)
        {
            LOGGER.error("delete memcached faield : ", key, e);
        }
        return false;
    }
    
    /**
     * 根据key值和更新对象,在Memcached中更新缓存对象
     *
     * @param key 缓存的Key
     * @param data 缓存对象
     * @param time int
     * @return 是否成功更新
     */
    public boolean updateObject(String key, Object data, int time)
    {
        boolean rst = false;
        try
        {
            GetsResponse getsResponse = mc.gets(key);
            if (null != getsResponse)
            {
                int expire = 0;
                if (0 != time)
                {
                    expire = (int)(System.currentTimeMillis() / 1000) + time;
                }
                rst = mc.cas(key, expire, data, REQUEST_TIME_OUT, getsResponse.getCas());
            }
        }
        catch (Exception e)
        {
            LOGGER.error("update memcached faield : ", key, e);
        }
        return rst;
    }
    
    /**
     * 根据key值和更新对象,在Memcached中更新缓存对象 与updateObject不同之处在于不需要乐观锁判断,避免并发更新同一个主键对象失败
     *
     * @param key 缓存的Key
     * @param data 缓存对象
     * @param time int
     * @return 是否成功更新
     */
    public boolean updateObject2(String key, Object data, int time)
    {
        int expire = 0;
        boolean result = false;
        if (0 != time)
        {
            expire = (int)(System.currentTimeMillis() / 1000) + time;
        }
        try
        {
            result = mc.set(key, expire, data, REQUEST_TIME_OUT);
        }
        catch (Exception e)
        {
            LOGGER.error("update memcached faield : ", key, e);
        }
        return result;
    }
    
    /**
     * 缓存累加器
     *
     * @author wulinfeng
     * @param key 缓存key
     * @param initValue 初始化的值
     * @param invalidDurance 失效时间
     * @return
     */
    public long incr(String key, long initValue, int invalidDurance)
    {
        int expire = 0;
        if (0 != invalidDurance)
        {
            expire = (int)(System.currentTimeMillis() / 1000) + invalidDurance;
        }
        try
        {
            long result = mc.incr(key, STEP, initValue, REQUEST_TIME_OUT, expire);
            return result;
        }
        catch (Exception e)
        {
            LOGGER.error("incr memcached faield : ", key, e);
        }
        return 0;
    }
    
    /**
     * 递减计数器
     *
     * @author wulinfeng
     * @param key 缓存key
     * @param invalidDurance 失效时间
     * @return
     */
    public long decr(String key, int invalidDurance)
    {
        int expire = 0;
        if (0 != invalidDurance)
        {
            expire = (int)(System.currentTimeMillis() / 1000) + invalidDurance;
        }
        try
        {
            long result = mc.decr(key, STEP, DEFAULTVALUE, REQUEST_TIME_OUT, expire);
            return result;
        }
        catch (Exception e)
        {
            LOGGER.error("incr memcached faield : ", key, e);
        }
        return 0;
    }
    
    /**
     * @param szList
     * @return
     */
    private static String array2string(String[] szList)
    {
        StringBuilder sb = new StringBuilder();
        for (String szTmp : szList)
        {
            sb.append(szTmp).append(",");
        }
        return sb.substring(0, sb.lastIndexOf(","));
    }
}

 

  

  

    原文作者:memcached
    原文地址: https://www.cnblogs.com/wuxun1997/p/7751704.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞