1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
| package jcache;
import jcache.clients.jcachecluster.base.JCacheClient; import jcache.clients.jcachecluster.common.PropertiesConst; import jcache.clients.jcachecluster.factory.CacheFactorySingle;
import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong;
public class JcacheClusterTPS { private static String HostAndPort = "10.128.31.104:7000;" + "10.128.31.104:7001;" + "10.128.31.105:7000;" + "10.128.31.105:7001;" + "10.128.31.109:7000;" + "10.128.31.109:7001";
public static void main(String[] args) { final JCacheClient cluster = getCluster(); final int nThreads = args.length >= 1 ? Integer.parseInt(args[0]) : 8; final int sendNumOnceTime = args.length >= 2 ? Integer.parseInt(args[1]) : 100; final int keySize = args.length >= 3 ? Integer.parseInt(args[2]) : 10; final int messageSize = args.length >= 4 ? Integer.parseInt(args[3]) : 10; final int times = args.length >= 5 ? Integer.parseInt(args[4]) : 1; final int setOrGet = args.length >= 6 ? Integer.parseInt(args[5]) : 0; final String keyParam = args.length >= 7 ? args[6] : "0"; final AtomicLong atomicSuccessNums = new AtomicLong(0); final List<Long> tpsList = new ArrayList<Long>(); final String msg = buildMessage(messageSize); final List<String> keys = getKeys(keySize, nThreads * sendNumOnceTime, times); doRun(cluster, tpsList, atomicSuccessNums, nThreads, times, sendNumOnceTime, keys, msg, keySize, messageSize, setOrGet, keyParam); }
private static JCacheClient getCluster() { Properties properties = new Properties(); properties.put(PropertiesConst.Keys.HOST_AND_PORT, HostAndPort); properties.put(PropertiesConst.Keys.AUTH_KEY, "13F455A8E9DC2BBEBE1BD906C82B3C0A1"); properties.put(PropertiesConst.Keys.NAMESPACE, "weidian-1"); return CacheFactorySingle.createJCacheClient(properties); }
private static void doRun(final JCacheClient cluster, final List<Long> tpsList, final AtomicLong atomicSuccessNums, final int nThreads, final int times, final int sendNumOnceTime, final List<String> keys, final String msg, final int keySize, final int messageSize, final int setOrGet, final String keyParam) { final AtomicLong atomicFailNum = new AtomicLong(0); for (int time = 0; time < times; time++) { final Object object = new Object(); synchronized (object) { final int t = time + 1; final ExecutorService exec = Executors.newCachedThreadPool(); final long startCurrentTimeMillis = System.currentTimeMillis(); final CyclicBarrier barrier = new CyclicBarrier(nThreads, new Runnable() { public void run() { synchronized (object) { long endCurrentTimeMillis = System.currentTimeMillis(); long sendNums = nThreads * sendNumOnceTime; long escapedTimeMillis = endCurrentTimeMillis - startCurrentTimeMillis; long tps = sendNums * 1000 / escapedTimeMillis; String type = "set"; if (setOrGet != 0) { type = "get"; } tpsList.add(tps); System.out.printf("第 %d 次, 发送完成, 用时 : %d ms, " + "线程大小 : %d , " + "key大小 : %d , " + "msg大小 : %d , " + "发送数量 : %d , " + "成功数量 : %d , " + "失败数量 : %d , " + "统计方式 : %s , " + "TPS : %d !!!", t, escapedTimeMillis, nThreads, keySize, messageSize, sendNums, atomicSuccessNums.intValue(), atomicFailNum.intValue(), type, tps); exec.shutdown(); object.notify(); System.out.println(); } } }); for (int i = 0; i < nThreads; i++) {
final String finalI = "i" + i; exec.execute(new Runnable() { public void run() { try { for (int j = 0; j < sendNumOnceTime; j++) { if (setOrGet == 0) { String key = "ke" + keyParam + finalI + "j" + j; try { String resp = cluster.set(key, msg); if (!"OK".equals(resp)) { atomicFailNum.incrementAndGet(); } else { atomicSuccessNums.incrementAndGet(); } } catch (Exception e) { atomicFailNum.incrementAndGet(); } } else { cluster.get(keys.get(atomicSuccessNums.intValue())); } } barrier.await(); } catch (Exception e) { try { barrier.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } catch (BrokenBarrierException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }); } try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } long sum = 0; for (Long tps : tpsList) { sum += tps; } System.out.printf("全部发送完成, 平均TPS : %d !!!", sum / tpsList.size()); }
private static String buildMessage(final int messageSize) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < messageSize; i += 8) { sb.append("hello baby"); } return sb.toString(); }
private static List<String> getKeys(int keySize, int keys, int times) { System.out.println(">>>>>>>>>>>>>>正在生成Key>>>>>>>>>>>>>>"); List<String> keysList = new ArrayList<String>();
System.out.println(">>>>>>>>>>>>>>生成成功Key>>>>>>>>>>>>>>"); return keysList; }
private static String getUId(int keySize, int i) { return String.format("%0" + keySize + "d", i); }
private static String getUUId() { return UUID.randomUUID().toString(); }
}
|