用Java实现计数器

  |   0 评论   |   28 浏览

    • Test:
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    
    public class Test {
        private static final ConcurrentHashMap> tpCountMap = new ConcurrentHashMap>();
        public static void main(String[] args) {
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
            scheduler.scheduleAtFixedRate(new Test().new WriteTPLogTask(), 0, 1, java.util.concurrent.TimeUnit.SECONDS);
    
            while (true) {
    
    			handle("自定义key1",3);
                handle("自定义key1",3);
                handle("自定义key1",3);
                handle("自定义key1",3);
                handle("自定义key1",3);
    
                handle("自定义key2",1);
                handle("自定义key2",3);
    
            }
        }
    
    
        /**
     * * @param key
      * @param elapsedTime 耗时
      */
      public static void handle(String key,long elapsedTime){
            ConcurrentHashMultiSet newElapsedTimeCounter = new ConcurrentHashMultiSet();
            ConcurrentHashMultiSet elapsedTimeCounter = tpCountMap.putIfAbsent(key, newElapsedTimeCounter);
            if (elapsedTimeCounter == null) {
                // put成功,集合里面原来确实没有该统计集合,然后使用新的统计集合
    			elapsedTimeCounter = newElapsedTimeCounter;
            }
            // 增加一次该响应时间的计数
    		elapsedTimeCounter.add((int) elapsedTime);
        }
    
        private class WriteTPLogTask implements Runnable {
            private final StringBuilder FULL_LOG_BUFFER = new StringBuilder(2048);
            private final StringBuilder DETAIL_LOG_BUFFER = new StringBuilder(2048);
            /**
    		 * 合并的次数和时间的分隔符
    		  */
    		  private final static String TIME_COUNT_SEP = ",";
    				/**
    		 * 日志输出模板
    		  */
    		  private final static String TP_LOG_TEMPLATE_NEW = "{\"k\":\"{}\",\"e\":\"{}\"}";
    				public void run() {
    					// + 5mill 是为了防止临界值
    		  long currentTimePoint = (SystemClock.millTime() + 5) / 1000;
    					for (Map.Entry> entry : tpCountMap.entrySet()) {
    						final String key = entry.getKey();
    						final ConcurrentHashMultiSet value = entry.getValue();
    						//几秒一个点,单位是毫秒
    		  int period = 5000;
    						period /= 1000;
    						// 没到这个频率输出的时间点
    		  if (currentTimePoint % period != 0) {
    							continue;
    						}
    						for (Integer elapsedTime : value.elementSet()) {
    		//                    System.out.println("key:"+key + ";耗时:" + elapsedTime+";次数:"+value.count(elapsedTime));
    		  DETAIL_LOG_BUFFER.append(elapsedTime).append(TIME_COUNT_SEP)
    									.append(value.count(elapsedTime)).append(TIME_COUNT_SEP);
    						}
    						if (!DETAIL_LOG_BUFFER.isEmpty()) {
    							DETAIL_LOG_BUFFER.deleteCharAt(DETAIL_LOG_BUFFER.length() - 1);
    						}
    		//                System.out.println(DETAIL_LOG_BUFFER.toString());
    		  doWrite(key,DETAIL_LOG_BUFFER.toString());
    						tpCountMap.put(key, new ConcurrentHashMultiSet());
    					}
    
    				}
    
    				private void doWrite(String key, String time) {
    		//            FULL_LOG_BUFFER.append(time);
    		  FULL_LOG_BUFFER.append(LogFormatter.format(TP_LOG_TEMPLATE_NEW, key, DETAIL_LOG_BUFFER.toString()));
    					try {
    		//                reporter.report(LogType.TP, FULL_LOG_BUFFER.toString());
    		  System.out.println(FULL_LOG_BUFFER.toString());
    					} finally {
    						FULL_LOG_BUFFER.delete(0, FULL_LOG_BUFFER.length());
    						DETAIL_LOG_BUFFER.delete(0, DETAIL_LOG_BUFFER.length());
    					}
    				}
        }
    
        private static final java.util.Comparator HISTOGRAM_COMPARATOR_BY_TIMESTAMP = new java.util.Comparator() {
            @Override
      public int compare(Long o1, Long o2) {
                return o1.compareTo(o2);
            }
        };
    
    • ConcurrentHashMultiSet:
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ConcurrentHashMultiSet<E> {
        // 统计用的map
      private final transient ConcurrentMap<E, AtomicInteger> counterMap;
    
        public ConcurrentHashMultiSet() {
            counterMap = new ConcurrentHashMap<E, AtomicInteger>();
        }
    
        /**
     * 往集合里面增加元素
      *
     * @param element 要增加的元素
      * @return
      */
      public int add(E element) {
            if (element == null) {
                return 0;
            }
    
            AtomicInteger existingCounter = counterMap.get(element);
            if (existingCounter == null) {
                // 如果该元素不存在,需要用线程安全的方式往集合里面put计数器
      AtomicInteger newCounter = new AtomicInteger();
                existingCounter = counterMap.putIfAbsent(element, newCounter);
                if (existingCounter == null) {
                    // put成功,集合里面原来确实没有该元素,然后使用新元素的计数器
      existingCounter = newCounter;
                }
            }
    
            // 返回原子增量计算的count值
      return existingCounter.incrementAndGet();
        }
    
        /**
     * 获取所有的元素对象集合
      *
     * @return
      */
      public Set<E> elementSet() {
            return counterMap.keySet();
        }
    
        /**
     * 获取指定元素的计算值
      *
     * @param element
      * @return
      */
      public int count(E element) {
            if (element == null) {
                return 0;
            } else {
                AtomicInteger existingCounter = counterMap.get(element);
                return (existingCounter == null) ? 0 : existingCounter.get();
            }
        }
    
        public int size() {
            return counterMap.size();
        }
    }
    

    评论

    发表评论

    validate