public static MilletMemcacheClient getIstance(String configPath) throws IOException { try { if (instance == null) { synchronized (MilletMemcacheClient.class) { if (instance == null) { if (null == memEntity) { logger.info("Load path:[ {} ] on etcd!",configPath); ClientConfig config = ServiceDiscovery.getSingleton().read(configPath); memEntity = new MilletMemcacheClientBuilder(AddrUtil.getAddressMap(StringUtils.trim(config.getIps()))); memEntity.setConnectionPoolSize(config.getLinksNum()); memEntity.setSessionLocator(new KetamaMemcachedSessionLocator()); } MemcachedClient client = memEntity.build(); if (null != client) { client.setMergeFactor(MERGE_FACTOR); client.setConnectTimeout(CONNECT_TIMEOUT); } instance = new MilletMemcacheClient(client); } } } } catch (Exception e) { logger.error("Init memcache client failed!", e); throw e; } return instance; }
/* * (non-Javadoc) * * @see net.rubyeye.xmemcached.MemcachedClientBuilder#build() */ public MemcachedClient build() throws IOException { XMemcachedClient memcachedClient; // kestrel protocol use random session locator. if (this.commandFactory.getProtocol() == Protocol.Kestrel) { if (!(this.sessionLocator instanceof RandomMemcachedSessionLocaltor)) { log.warn("Recommend to use `net.rubyeye.xmemcached.impl.RandomMemcachedSessionLocaltor` as session locator for kestrel protocol."); } } if (this.weights == null) { memcachedClient = new XMemcachedClient(this.sessionLocator, this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory, this.transcoder, this.addressMap, this.stateListeners, this.authInfoMap, this.connectionPoolSize, this.connectTimeout, this.name, this.failureMode); } else { if (this.addressMap == null) { throw new IllegalArgumentException("Null Address map"); } if (this.addressMap.size() > this.weights.length) { throw new IllegalArgumentException( "Weights Array's length is less than server's number"); } memcachedClient = new XMemcachedClient(this.sessionLocator, this.bufferAllocator, this.configuration, this.socketOptions, this.commandFactory, this.transcoder, this.addressMap, this.weights, this.stateListeners, this.authInfoMap, this.connectionPoolSize, this.connectTimeout, this.name, this.failureMode); } this.configureClient(memcachedClient); return memcachedClient; }
/** <a href="http://www.cpupk.com/decompiler">Eclipse Class Decompiler</a> plugin, Copyright (c) 2017 Chen Chao. */ // Copyright (c) 2006 Dustin Sallings <dustin@spy.net> package net.rubyeye.xmemcached.transcoders; /** * Transcoder is an interface for classes that convert between byte arrays and * objects for storage in the cache. */ public interface Transcoder<T> { /** * Encode the given object for storage. * * @param o * the object * @return the CachedData representing what should be sent */ CachedData encode(T o); /** * Decode the cached object into the object it represents. * * @param d * the data * @return the return value */ T decode(CachedData d); /** * Set whether store primitive type as string. * * @param primitiveAsString */ public void setPrimitiveAsString(boolean primitiveAsString); /** * Set whether pack zeros * * @param primitiveAsString */ public void setPackZeros(boolean packZeros); /** * Set compression threshold in bytes * @param to */ public void setCompressionThreshold(int to); /** * Returns if client stores primitive type as string. * @return */ public boolean isPrimitiveAsString(); /** * Returns if transcoder packs zero. * @return */ public boolean isPackZeros(); /** * Set compress mode,default is ZIP * @see CompressionMode * @param compressMode */ public void setCompressionMode(CompressionMode compressMode); }
public <T> boolean add(String key, int exp, T value, Transcoder<T> transcoder, long timeout) throws MemcachedException { try { return mc.add(key, exp, value, transcoder, timeout); } catch (TimeoutException e) { onTimeoutException(OPERATE_ADD, key, e); } catch (InterruptedException e) { onInterruptedException(OPERATE_ADD, key, e); } return false; }
/* * (non-Javadoc) * * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int, T, * net.rubyeye.xmemcached.transcoders.Transcoder, long) */ public final <T> boolean add(String key, final int exp, final T value, final Transcoder<T> transcoder, final long timeout) throws TimeoutException, InterruptedException, MemcachedException { key = this.preProcessKey(key); return this.add0(key, exp, value, transcoder, timeout); }
private <T> boolean add0(String key, int exp, T value, Transcoder<T> transcoder, long timeout) throws InterruptedException, TimeoutException, MemcachedException { byte[] keyBytes = this.checkStoreArguments(key, exp, value); return this.sendStoreCommand(this.commandFactory.createAddCommand(key, keyBytes, exp, value, false, transcoder), timeout); }
private final <T> boolean sendStoreCommand(Command command, long timeout) throws InterruptedException, TimeoutException, MemcachedException { final Session session = this.sendCommand(command); if (!command.isNoreply()) { this.latchWait(command, timeout, session); command.getIoBuffer().free(); this.checkException(command); if (command.getResult() == null) { throw new MemcachedException( "Operation fail,may be caused by networking or timeout"); } } else { return false; } return (Boolean) command.getResult(); }
private final <T> byte[] checkStoreArguments(final String key, final int exp, final T value) { byte[] keyBytes = ByteUtils.getBytes(key); ByteUtils.checkKey(keyBytes); if (value == null) { throw new IllegalArgumentException("value could not be null"); } if (exp < 0) { throw new IllegalArgumentException( "Expire time must be greater than or equal to 0"); } return keyBytes; }
public <T> T get(String key, long timeout, Transcoder<T> transcoder) throws MemcachedException { try { try { readController.beginTransaction(); } catch (InterruptedException e) { onInterruptedException(OPERATE_GET, key, e); } try { return mc.get(key, timeout, transcoder); } catch (TimeoutException e) { onTimeoutException(OPERATE_GET, key, e); } catch (InterruptedException e) { onInterruptedException(OPERATE_GET, key, e); } } finally { readController.finishTransaction(); } return null; }
/* * (non-Javadoc) * * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String, long, * net.rubyeye.xmemcached.transcoders.Transcoder) */ @SuppressWarnings("unchecked") public final <T> T get(final String key, final long timeout, final Transcoder<T> transcoder) throws TimeoutException, InterruptedException, MemcachedException { return (T) this.get0(key, timeout, CommandType.GET_ONE, transcoder); }
private <T> Object get0(String key, final long timeout, final CommandType cmdType, final Transcoder<T> transcoder) throws TimeoutException, InterruptedException, MemcachedException { key = this.preProcessKey(key); byte[] keyBytes = ByteUtils.getBytes(key); ByteUtils.checkKey(keyBytes); return this.fetch0(key, keyBytes, cmdType, timeout, transcoder); }
@SuppressWarnings("unchecked") private final <T> Object fetch0(final String key, final byte[] keyBytes, final CommandType cmdType, final long timeout, Transcoder<T> transcoder) throws InterruptedException, TimeoutException, MemcachedException, MemcachedException { final Command command = this.commandFactory.createGetCommand(key, keyBytes, cmdType, this.transcoder); this.latchWait(command, timeout, this.sendCommand(command)); command.getIoBuffer().free(); // free buffer this.checkException(command); CachedData data = (CachedData) command.getResult(); if (data == null) { return null; } if (transcoder == null) { transcoder = this.transcoder; } if (cmdType == CommandType.GETS_ONE) { return new GetsResponse<T>(data.getCas(), transcoder.decode(data)); } else { return transcoder.decode(data); } }
public boolean cas(String key, int exp, Object value, long cas) throws MemcachedException { try { return mc.cas(key, exp, value, cas); } catch (TimeoutException e) { onTimeoutException(OPERATE_CAS, key, e); } catch (InterruptedException e) { onInterruptedException(OPERATE_CAS, key, e); } return false; }
/* * (non-Javadoc) * * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, * java.lang.Object, long) */ public final boolean cas(final String key, final int exp, final Object value, final long cas) throws TimeoutException, InterruptedException, MemcachedException { return this.cas(key, exp, value, this.opTimeout, cas); }
/* * (non-Javadoc) * * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, * java.lang.Object, long, long) */ @SuppressWarnings("unchecked") public final boolean cas(final String key, final int exp, final Object value, final long timeout, final long cas) throws TimeoutException, InterruptedException, MemcachedException { return this.cas(key, exp, value, this.transcoder, timeout, cas); }
/* * (non-Javadoc) * * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, T, * net.rubyeye.xmemcached.transcoders.Transcoder, long, long) */ public final <T> boolean cas(String key, final int exp, final T value, final Transcoder<T> transcoder, final long timeout, final long cas) throws TimeoutException, InterruptedException, MemcachedException { key = this.preProcessKey(key); byte[] keyBytes = this.checkStoreArguments(key, 0, value); return this.sendStoreCommand(this.commandFactory.createCASCommand(key, keyBytes, exp, value, cas, false, transcoder), timeout); }
private final <T> boolean sendStoreCommand(Command command, long timeout) throws InterruptedException, TimeoutException, MemcachedException { final Session session = this.sendCommand(command); if (!command.isNoreply()) { this.latchWait(command, timeout, session); command.getIoBuffer().free(); this.checkException(command); if (command.getResult() == null) { throw new MemcachedException( "Operation fail,may be caused by networking or timeout"); } } else { return false; } return (Boolean) command.getResult(); }
public static final void checkKey(final byte[] keyBytes) { if (keyBytes.length > ByteUtils.maxKeyLength) { throw new IllegalArgumentException("Key is too long (maxlen = " + ByteUtils.maxKeyLength + ")"); } // Validate the key if (memcachedProtocol == Protocol.Text || testing) { for (byte b : keyBytes) { if (b == ' ' || b == '\n' || b == '\r' || b == 0) { try { throw new IllegalArgumentException( "Key contains invalid characters: " + new String(keyBytes, "utf-8")); } catch (UnsupportedEncodingException e) { } } } } }
private static int maxKeyLength = 250;
/* * (non-Javadoc) * * @see * net.rubyeye.xmemcached.CommandFactory#createGetCommand(java.lang.String, * byte[], net.rubyeye.xmemcached.command.CommandType) */ @SuppressWarnings("unchecked") public final Command createGetCommand(final String key, final byte[] keyBytes, final CommandType cmdType, Transcoder transcoder) { return new TextGetOneCommand(key, keyBytes, cmdType, new CountDownLatch(1)); }
protected void latchWait(final Command cmd, final long timeout, final Session session) throws InterruptedException, TimeoutException { if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) { AtomicInteger counter = this.getContinuousTimeoutCounter(session); // reset counter. if (counter.get() > 0) { counter.set(0); } } else { cmd.cancel(); AtomicInteger counter = this.getContinuousTimeoutCounter(session); if (counter.incrementAndGet() > this.timeoutExceptionThreshold) { log.warn(session + " exceeded continuous timeout threshold,we will close it."); try { // reset counter. counter.set(0); session.close(); } catch (Exception e) { // ignore it. } } throw new TimeoutException( "Timed out(" + timeout + " milliseconds) waiting for operation while connected to " + session); } } private AtomicInteger getContinuousTimeoutCounter(final Session session) { AtomicInteger counter = (AtomicInteger) session .getAttribute(CONTINUOUS_TIMEOUT_COUNTER); if (counter == null) { counter = new AtomicInteger(0); AtomicInteger oldCounter = (AtomicInteger) session .setAttributeIfAbsent(CONTINUOUS_TIMEOUT_COUNTER, counter); if (oldCounter != null) { counter = oldCounter; } } return counter; }
public void destroy() { Command command = this.currentCommand.get(); if (command != null) { command.setException(new MemcachedException( "Session has been closed")); CountDownLatch latch = command.getLatch(); if (latch != null) { latch.countDown(); } } while ((command = this.commandAlreadySent.poll()) != null) { command.setException(new MemcachedException( "Session has been closed")); CountDownLatch latch = command.getLatch(); if (latch != null) { latch.countDown(); } } }
package com.wulinfeng.memcache.view.service; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Service; import org.springframework.util.ResourceUtils; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import com.fasterxml.jackson.core.JsonProcessingException; import com.wulinfeng.memcache.view.bean.MemcacheViewBean; import com.wulinfeng.memcache.view.util.HttpClientUtil; import com.wulinfeng.memcache.view.util.PropertiesConfigUtil; /** * memcache展示业务逻辑类 * * @author wulinfeng * @version C10 2017年11月21日 * @since SDP V300R003C10 */ @Service public class MemcacheViewService { private static Logger LOGGER = LogManager.getLogger(MemcacheViewService.class); private static List<MemcacheViewBean> memcacheCategoryList = new ArrayList<>(); private static Map<String, String> cacheDataMap = new HashMap<>(); private static final String CONFIG_FILE = "/memcache_config.xml"; private static final String TEST_URL = PropertiesConfigUtil.getProperty("test_url"); /** * 启动加载缓存信息 * * @author wulinfeng */ public static void init() { // 1、加载缓存分类列表 initializeCacheList(); // 2、加载缓存客户端实例 getMemcacheClient(); } /** * 获取缓存分类列表 * * @author wulinfeng * @return */ public MemcacheViewBean[] getMemcacheCategory() { return (MemcacheViewBean[])(memcacheCategoryList.toArray(new MemcacheViewBean[memcacheCategoryList.size()])); } /** * 针对缓存的查、删、改 * * @author wulinfeng * @param operationType * @param key2 * @return * @throws JsonProcessingException */ public String doMemcached(String operationType, String cacheName, String key) { String result = null; XMemcachedClient client = XMemcachedClient.getMemcachedClient(); switch (operationType) { case "1":// 根据key查缓存 Object obj = client.getObject(key); result = (String)obj; break; case "2":// 根据key删缓存 if (client.deleteObject(key)) { result = "{\"msg\":\"delete sucess\"}"; } break; case "3":// 根据缓存名找到查数据接口名,调用接口获取数据 String dataLoaderName = cacheDataMap.get(cacheName); if (StringUtils.isNotEmpty(dataLoaderName)) { result = HttpClientUtil.sendRequestByGetAsync(TEST_URL + dataLoaderName); } if (result != null) { client.addObject(key, result, 0); } break; default: ; } return result; } /** * 加载缓存客户端 * * @author wulinfeng * @return */ private static void getMemcacheClient() { String mcListStr = PropertiesConfigUtil.getProperty("memcache.server", ""); if (StringUtils.isEmpty(mcListStr)) { LOGGER.error("Get MemcacheClient Error ! mcListStr is null !"); return; } String[] mcList = mcListStr.split(","); try { XMemcachedClient.getInstance(PropertiesConfigUtil.getInt("memcache.linksNum", 1), mcList); } catch (IOException e) { LOGGER.error("Get MemcacheClient Error ! e:" + e); e.printStackTrace(); } } /** * 加载缓存分类列表 * * @author wulinfeng */ private static void initializeCacheList() { memcacheCategoryList.clear(); cacheDataMap.clear(); try { DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder; builder = builderFactory.newDocumentBuilder(); String basePath = ResourceUtils.getFile("classpath:").getPath(); String configPath = basePath + CONFIG_FILE; Document doc = builder.parse(configPath); NodeList itemNodes = doc.getElementsByTagName("item"); for (int itemIndex = 0; itemIndex < itemNodes.getLength(); itemIndex++) { Element itemElem = (Element)itemNodes.item(itemIndex); NodeList childList = itemElem.getChildNodes(); String cacheName = ""; MemcacheViewBean memcacheViewBean = new MemcacheViewBean(); for (int i = 0; i < childList.getLength(); i++) { Node childNode = childList.item(i); if (childNode.getNodeType() == Node.ELEMENT_NODE) { Element element = (Element)childNode; String tagName = element.getTagName(); String textContent = element.getTextContent().trim(); switch (tagName) { case "cacheName": cacheName = textContent; memcacheViewBean.setCacheName(textContent); break; case "keyProfix": memcacheViewBean.setKeyProfix(textContent); break; case "supportRefresh": memcacheViewBean.setSupportRefresh(textContent.equals("1")); break; case "dataLoaderName": cacheDataMap.put(cacheName, textContent); break; default: } } } memcacheCategoryList.add(memcacheViewBean); } } catch (Throwable e) { LOGGER.error("MemcacheViewService.init failed, e: ", e); } } }
package com.wulinfeng.memcache.view.service; import java.io.IOException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import net.rubyeye.xmemcached.GetsResponse; import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.XMemcachedClientBuilder; import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator; import net.rubyeye.xmemcached.utils.AddrUtil; /** * memcache缓存客户端 * * @author wulinfeng * @version C10 2017年11月22日 * @since SDP V300R003C10 */ public class XMemcachedClient { private final static Logger LOGGER = LoggerFactory.getLogger(XMemcachedClient.class); /** * 批量请求,每批次请求数量 */ private final static int MERGE_FACTOR = 20; /** * 链接超时时间 */ private final static int CONNECT_TIMEOUT = 500; /** * 超时时间的配置 单位:毫秒 */ protected static final long REQUEST_TIME_OUT = 1000; /** * 步长 */ private final long STEP = 1L; /** * 默认值 */ private final long DEFAULTVALUE = 0; private static XMemcachedClientBuilder memEntity; private static volatile XMemcachedClient instance; private MemcachedClient mc; private XMemcachedClient(MemcachedClient memcachedClient) throws IOException { this.mc = memcachedClient; } /** * 实例化客户端,单例模式 * * @author wulinfeng * @param linksNum * @param memcacheIp * @return * @throws IOException */ public static XMemcachedClient getInstance(int linksNum, String... memcacheIp) throws IOException { try { if (instance == null) { synchronized (XMemcachedClient.class) { if (instance == null) { if (0 != memcacheIp.length) { memEntity = new XMemcachedClientBuilder( AddrUtil.getAddressMap(StringUtils.trim(array2string(memcacheIp)))); if (0 != linksNum) { memEntity.setConnectionPoolSize(linksNum); } } // 节点挂掉返回失败,不会自动路由到其他节点 memEntity.setFailureMode(true); // 选择一致性哈希算法 memEntity.setSessionLocator(new KetamaMemcachedSessionLocator()); MemcachedClient client = memEntity.build(); if (null != client) { client.setMergeFactor(MERGE_FACTOR); client.setConnectTimeout(CONNECT_TIMEOUT); } client.setEnableHeartBeat(true); client.setEnableHealSession(true); // 网络关闭,失败,重试间隔时间间隔,10毫秒 client.setHealSessionInterval(10); instance = new XMemcachedClient(client); } } } } catch (Exception e) { LOGGER.error("Init memcache client failed!", e); throw e; } return instance; } /** * 获取缓存客户端实例给外部 * * @author wulinfeng * @return */ public static XMemcachedClient getMemcachedClient() { return instance; } /** * 根据key值从IMemcachedClient获取缓存对象 * * @param key 缓存的Key * @return 缓存对象 */ public Object getObject(String key) { try { Object result = mc.get(key, REQUEST_TIME_OUT); return result; } catch (Exception e) { LOGGER.error("get memcached faield : ", key, e); } return null; } /** * 根据key值从IMemcachedClient获取缓存对象 * * @param key 缓存的Key * @param obj Object * @param time int * @return 缓存对象 */ public boolean addObject(String key, Object obj, int time) { // 是否添加成功 boolean rst = false; int expire = 0; if (0 != time) { expire = (int)(System.currentTimeMillis() / 1000) + time; } try { rst = mc.add(key, expire, obj, REQUEST_TIME_OUT); } catch (Exception e) { LOGGER.error("add memcached faield : ", key, e); } return rst; } /** * 根据key值从IMemcachedClient获取缓存对象 * * @param key 缓存的Key * @return 缓存对象 */ public Object getsObject(String key) { try { GetsResponse result = mc.gets(key, REQUEST_TIME_OUT); return result; } catch (Exception e) { LOGGER.error("gets memcached faield : ", key, e); } return null; } /** * 根据key值从IMemcachedClient删除缓存对象 * * @param key 缓存的Key * @return 是否成功删除 */ public boolean deleteObject(String key) { try { boolean result = mc.delete(key, REQUEST_TIME_OUT); return result; } catch (Exception e) { LOGGER.error("delete memcached faield : ", key, e); } return false; } /** * 根据key值和更新对象,在Memcached中更新缓存对象 * * @param key 缓存的Key * @param data 缓存对象 * @param time int * @return 是否成功更新 */ public boolean updateObject(String key, Object data, int time) { boolean rst = false; try { GetsResponse getsResponse = mc.gets(key); if (null != getsResponse) { int expire = 0; if (0 != time) { expire = (int)(System.currentTimeMillis() / 1000) + time; } rst = mc.cas(key, expire, data, REQUEST_TIME_OUT, getsResponse.getCas()); } } catch (Exception e) { LOGGER.error("update memcached faield : ", key, e); } return rst; } /** * 根据key值和更新对象,在Memcached中更新缓存对象 与updateObject不同之处在于不需要乐观锁判断,避免并发更新同一个主键对象失败 * * @param key 缓存的Key * @param data 缓存对象 * @param time int * @return 是否成功更新 */ public boolean updateObject2(String key, Object data, int time) { int expire = 0; boolean result = false; if (0 != time) { expire = (int)(System.currentTimeMillis() / 1000) + time; } try { result = mc.set(key, expire, data, REQUEST_TIME_OUT); } catch (Exception e) { LOGGER.error("update memcached faield : ", key, e); } return result; } /** * 缓存累加器 * * @author wulinfeng * @param key 缓存key * @param initValue 初始化的值 * @param invalidDurance 失效时间 * @return */ public long incr(String key, long initValue, int invalidDurance) { int expire = 0; if (0 != invalidDurance) { expire = (int)(System.currentTimeMillis() / 1000) + invalidDurance; } try { long result = mc.incr(key, STEP, initValue, REQUEST_TIME_OUT, expire); return result; } catch (Exception e) { LOGGER.error("incr memcached faield : ", key, e); } return 0; } /** * 递减计数器 * * @author wulinfeng * @param key 缓存key * @param invalidDurance 失效时间 * @return */ public long decr(String key, int invalidDurance) { int expire = 0; if (0 != invalidDurance) { expire = (int)(System.currentTimeMillis() / 1000) + invalidDurance; } try { long result = mc.decr(key, STEP, DEFAULTVALUE, REQUEST_TIME_OUT, expire); return result; } catch (Exception e) { LOGGER.error("incr memcached faield : ", key, e); } return 0; } /** * @param szList * @return */ private static String array2string(String[] szList) { StringBuilder sb = new StringBuilder(); for (String szTmp : szList) { sb.append(szTmp).append(","); } return sb.substring(0, sb.lastIndexOf(",")); } }