<table class="text">
<tr class="li1"><td class="ln"><pre class="de1">1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227 https://www.aliyun.com/product/mns?spm=5176.8142029 * .388261.80.fNnCkg */ public class MnsQueueAppV2 { private static Logger LOG = Logger.getLogger(MnsQueueAppV2.class.getName()); private static MNSClient client = null; // private static AtomicLong totalCount = new AtomicLong(0); private static String endpoint = null; private static String accessId = null; private static String accessKey = null; private static String queueName = "articlepricinglog"; private static int threadNum = 100; private static int clientNum = 10000; // private static int totalSeconds = 180; private static String log4jConfPath = "./log4j.properties"; static { PropertyConfigurator.configureAndWatch(log4jConfPath); } /** * 解析配置文件 * * @return */ @SuppressWarnings("unused") protected static boolean parseConf() { // URL resource = // MnsQueueAppV2.class.getClassLoader().getResource("name.properties"); String confFilePath = SystemUtils.getUserDir() + SystemUtils.FILE_SEPARATOR + "src/main/resources/mns.properties"; URL resource = MnsQueueAppV2.class.getResource("/mns.properties"); URL resource2 = MnsQueueAppV2.class.getClassLoader().getResource( "mns.properties");// 二者等价 BufferedInputStream bis = null; try { bis = new BufferedInputStream(new FileInputStream(confFilePath)); if (bis == null) { LOG.info("ConfFile not opened: " + confFilePath); return false; } } catch (FileNotFoundException e) { LOG.error("ConfFile not found: " + confFilePath,e); return false; } // load file Properties properties = new Properties(); try { properties.load(bis); } catch (IOException e) { LOG.error("Load ConfFile Failed: " + e.getMessage()); return false; } finally { try { bis.close(); } catch (Exception e) { // do nothing } } // init the member parameters endpoint = properties.getProperty("Endpoint"); LOG.info("Endpoint: " + endpoint); accessId = properties.getProperty("AccessId"); LOG.info("AccessId: " + accessId); accessKey = properties.getProperty("AccessKey"); queueName = properties.getProperty("QueueName",queueName); LOG.info("QueueName: " + queueName); threadNum = Integer.parseInt(properties.getProperty("ThreadNum", String.valueOf(threadNum))); LOG.info("ThreadNum: 线程数" + threadNum); clientNum = Integer.parseInt(properties.getProperty("ClientNum", String.valueOf(clientNum))); LOG.info("ClientNum: 并发数" + clientNum); // totalSeconds = // Integer.parseInt(properties.getProperty("TotalSeconds", // String.valueOf(totalSeconds))); // LOG.info("TotalSeconds: " + totalSeconds); return true; } /** * 程序入口 * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 准备工作 if (!parseConf()) { return; } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setMaxConnections(threadNum); clientConfiguration.setMaxConnectionsPerRoute(threadNum); CloudAccount cloudAccount = new CloudAccount(accessId,accessKey, endpoint,clientConfiguration); client = cloudAccount.getMNSClient(); LOG.info("发送消息"); // 线程池 ExecutorService exec = Executors.newFixedThreadPool(500); /** * Semaphore 一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 * acquire(),然后再获取该许可。每个 release() * 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数 * ,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。 */ final Semaphore semp = new Semaphore(threadNum);// ['seməfɔː] final Semaphore semaphore = new Semaphore(10,true); // 拿到信号量的线程可以进入代码,否则就等待 // Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源 // 辅助理解 :很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯, // 比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯, // 可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路, // 那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。 long startTime = System.currentTimeMillis(); // 开启时间 /** * 原理: * 更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源), * 当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 * 当一个线程调用Wait * (等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作 * ,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。 */ // 开始 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp.acquire();// 获取许可 try { // 获取queue CloudQueue queue = client.getQueueRef(queueName); // 组装消息 Message message = new Message(); message.setMessageBody("Test"); // 发送消息 queue.putMessage(message); } catch (Exception e) { e.printStackTrace(); } semp.release();// 归还许可 } catch (Exception e) { e.printStackTrace(); } } }; exec.submit(task); } long endTime = System.currentTimeMillis(); // 开启时间 exec.shutdown(); LOG.info(clientNum + " 的并发发送消息总耗时:>>>" + (endTime - startTime) + " ms"); LOG.info(clientNum + " 的并发发送消息 QPS为:>>>" + (clientNum * 1000) / (endTime - startTime) + " q/s"); LOG.info("接收消息"); Thread.sleep(3000); ExecutorService exec2 = Executors.newFixedThreadPool(500); final Semaphore semp2 = new Semaphore(threadNum); long startTime2 = System.currentTimeMillis(); // 开启时间 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp2.acquire(); try { // 获取queue CloudQueue queue = client.getQueueRef(queueName); // 获取消息 Message message = queue.popMessage(); // 删掉消息 if (message != null) queue.deleteMessage(message.getReceiptHandle()); } catch (Exception e) { e.printStackTrace(); } semp2.release(); } catch (Exception e) { e.printStackTrace(); } } }; exec2.submit(task); } long endTime2 = System.currentTimeMillis(); // 开启时间 exec2.shutdown(); // 忽略线程切换的耗时 精确的做法? LOG.info(clientNum + " 的并发接收消息总耗时:>>>" + (endTime2 - startTime2) + " ms"); LOG.info(clientNum + " 的并发接收消息 QPS为:>>>" + (clientNum * 1000) / (endTime2 - startTime2) + " q/s"); } } 原文链接:https://www.f2er.com/note/421012.html