Java编程思想 - 第二十一章、并发(七)

| 分类 Java  | 标签 Java编程思想 

###21.8 仿真

这个小节是综合运用前面7个小节的知识来完成一个小的需求。虽然很简单,但是也有地方值得自己学习,所以我直接上代码了。

####1. 银行出纳问题

模拟一个银行,有顾客会来银行办理业务,而处理业务的是出纳员,出纳员很多,又需要管理人员。所以抽象了三个 Entity,分别是顾客,出纳员,出纳员经理。抓住这三个核心 Entity 就可以理解其他了,嗯,代码我已经注释的非常清楚了。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package concurrency.bidsimulation;

import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

// 去银行办事的顾客
class Customer {
    // 该顾客办理业务需要的时间
    private final int serviceTime;

    public Customer(int serviceTime) {
        this.serviceTime = serviceTime;
    }

    public int getServiceTime() {
        return serviceTime;
    }

    @Override
    public String toString() {
        return "[" + serviceTime + "] ";
    }
}

// 每个柜台都会排队,这个就模拟用户排队
class CustomerLine extends ArrayBlockingQueue<Customer> {
    /**
     * 
     */
    private static final long serialVersionUID = 7788436315954397218L;

    public CustomerLine(int maxSize) {
        super(maxSize);
    }

    public String toString() {
        if (this.size() == 0) {
            return "[柜台目前无人办理业务,空闲中...]";
        }
        StringBuilder sb = new StringBuilder();
        for (Customer customer : this) {
            sb.append(customer);
        }
        return sb.toString();
    }
}

/**
 * 模拟来银行办理业务的顾客:<br>
 * 
 * 1. 它们来的时间完全随机<br>
 * 2. 顾客会随机选择要排队的柜台(一般都是选择人少的...)<br>
 * 3. 办理业务的时间也是随机
 * 
 */
class CustomerGenerator implements Runnable {
    private CustomerLine customerLine;
    private static Random random = new Random(47);

    public CustomerGenerator(CustomerLine customerLine) {
        this.customerLine = customerLine;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 某位顾客来银行办理业务
                TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                // 在哪个柜台?用多少时间?
                customerLine.put(new Customer(random.nextInt(1000)));
            }
        } catch (InterruptedException e) {
            System.out.println("门坏了,,,,顾客没法来了= =" + e);
        }
        System.out.println("下班,顾客不能再来办理业务了。");
    }
}

/**
 * 出纳员
 * 
 */
class Teller implements Runnable, Comparable<Teller> {
    private static int counter = 0;
    // 出纳员编号通过 counter 来获得,所以 counter 是隐藏的
    private final int id = counter++;
    // 负责的顾客队伍(也就是服务的柜台)
    private CustomerLine customerLine;
    // 记录出纳员服务的顾客数,和后面经理分配活儿有关
    private int customerServed = 0;
    // 出纳员服务柜台的状态。如果出纳员有事,就设置为 false,代表当前柜台不能服务
    private boolean isServingCustomerLine = true;

    public Teller(CustomerLine customerLine) {
        this.customerLine = customerLine;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 这个是阻塞的哦
                Customer customer = customerLine.take();
                TimeUnit.MILLISECONDS.sleep(customer.getServiceTime());
                // TODO 这个有疑问,为啥要加同步控制块,没有临界资源啊
                synchronized (this) {
                    customerServed++;
                    while (isServingCustomerLine == false) {
                        wait();
                    }

                }
            }
        } catch (InterruptedException e) {
            System.out.println(this + "被打断,有其他事情需要处理...");
        }
        System.out.println("出纳员下班了...");
    }

    // 出纳员有事情,自己或者有其他紧急任务
    public synchronized void doSomethingElse() {
        customerServed = 0;
        isServingCustomerLine = false;
    }

    // 出纳员处理任务完毕,回到工作岗位
    public synchronized void comebackWorking() {
        if (isServingCustomerLine == false) {
            System.out.println(this + "负责的柜台继续提供服务...");
            isServingCustomerLine = true;
            notifyAll(); // 和 wait()遥相呼应
        }
    }

    @Override
    public String toString() {
        return "出纳员" + id + " ";
    }

    // 经理需要用优先队列安排工作,所以服务人数少的人优先干活(以服务人数不按工作量,呵呵呵)
    public synchronized int compareTo(Teller teller) {
        return this.customerServed - teller.customerServed;
    }
}

// 虽然实现了 Runnable,但是其实只有一个经理。实现是为了让 ExecutorService 统一管理
class TellerManager implements Runnable {

    // 管理所有的出纳员
    private ExecutorService exec;
    // 同时管理所有的柜台
    private CustomerLine customerLine;
    // 按照服务顾客数目排序,少的分配活儿
    private PriorityQueue<Teller> workingTellers = new PriorityQueue<Teller>();
    // 分配去干其他事情的出纳员
    private Queue<Teller> tellersDoingOtherThings = new LinkedList<Teller>();
    // 调整算法因子
    private int adjustmentPeriod;

    public TellerManager(ExecutorService exec, CustomerLine customerLine, int adjustmentPeriod) {
        this.exec = exec;
        this.customerLine = customerLine;
        this.adjustmentPeriod = adjustmentPeriod;

        // 经理下面最起码带个人不是。。。
        Teller teller = new Teller(customerLine);
        exec.execute(teller);
        workingTellers.add(teller);
    }

    // 经理会根据自己的经验安排工作
    public void adjustTellerNumber() {
        // 如果队伍很长(顾客数目是出纳员数目的2倍多)
        if (customerLine.size() / workingTellers.size() > 2) {
            // 如果有在做其他事情的出纳员,要事优先原则
            if (tellersDoingOtherThings.size() > 0) {
                Teller teller = tellersDoingOtherThings.remove();
                teller.comebackWorking();
                workingTellers.add(teller);
                return;
            }
            // 人确实不够了,通知 hr 赶紧去招人...
            Teller teller = new Teller(customerLine);
            exec.execute(teller);
            workingTellers.add(teller);
            return;
        }

        // 队伍很短,不能让出纳员闲着...(老板都是这想法吧= =)
        if (workingTellers.size() > 1 && customerLine.size() / workingTellers.size() < 2) {
            reassignOneTeller();
        }
        // 队伍压根没人,留一个出纳员工作即可,其他都去干别的活儿,不能让出纳员闲着...
        if (customerLine.size() == 0) {
            while (workingTellers.size() > 1) {
                reassignOneTeller();
            }
        }
    }

    // 分配出纳员去干别的活儿
    private void reassignOneTeller() {
        Teller teller = workingTellers.poll();
        // poll() 在队列为空的时候返回 null,不用判断 teller为 null 是因为上面肯定留了一个出纳员在 woerkingTellers
        teller.doSomethingElse();
        tellersDoingOtherThings.offer(teller);
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
                adjustTellerNumber();
                System.out.print("{排队中的顾客:" + customerLine + "}----");
                System.out.print("{目前工作中的出纳员:[");
                for (Teller teller : workingTellers) {
                    System.out.print(teller);
                }
                System.out.println("]}");
            }
        } catch (InterruptedException e) {
            System.out.println("经理工作被打断");
        }
        System.out.println("经理下班了...");
    }

    @Override
    public String toString() {
        return "我是所有出纳员的经理...";
    }
}

public class BankTellerSimulation {
    static final int MAX_SIZE = 50;
    static final int ADJUSTMENT_PERIOD = 1000;

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        CustomerLine customerLine = new CustomerLine(MAX_SIZE);
        exec.execute(new CustomerGenerator(customerLine));
        exec.execute(new TellerManager(exec, customerLine, ADJUSTMENT_PERIOD));
        // 结束模拟:带结束时间或者按下 Enter
        if (args.length > 0) {
            TimeUnit.SECONDS.sleep(new Integer(args[0]));
        } else {
            System.out.println("Press 'Enter' to quit.");
            System.in.read();
        }
        exec.shutdownNow();
    }
}

####2. 饭店仿真

这个例子感觉不咋地,就跳过去了= =

####3. 分发问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package concurrency;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 第一步:Chassis 创建一个 new MyCar,然后装底盘<br>
 * 第二步:Assembler 装配,从 RobotPool 中取得机器人资源,组装引擎、动力系统、轮胎<br>
 * 第三步:组装完成后会有一个记录系统,并将完成的车辆放入 finishingQueue<br>
 */
class MyCar {
    private final int id;
    private boolean engine;
    private boolean driveTrain;
    private boolean wheels;

    public MyCar(int id) {
        this.id = id;
    }

    public MyCar() {
        id = -1;
    }

    public synchronized int getId() {
        return id;
    }

    public synchronized void addEngine() {
        engine = true;
    }

    public synchronized void addDriveTrain() {
        driveTrain = true;
    }

    public synchronized void addWheels() {
        wheels = true;
    }

    public synchronized String toString() {
        return "Car " + id + " [" + " engine: " + engine + " driveTrain: " + driveTrain + " wheels: " + wheels + " ]";
    }
}

// 模拟工厂的流水线
class CarQueue extends LinkedBlockingQueue<MyCar> {
    private static final long serialVersionUID = -8022590210916666885L;
}

// 底盘环节
class ChassisBuilder implements Runnable {
    // 底盘的 carQueue 是一辆车的第一个入口
    private CarQueue carQueue;
    private int counter = 0;

    public ChassisBuilder(CarQueue carQueue) {
        this.carQueue = carQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 创建新车 & 安装底盘
                MyCar myCar = new MyCar(counter++);
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println(myCar + "底盘已经搞定,其余部件开始装配:");
                carQueue.put(myCar);
            }
        } catch (InterruptedException e) {
            System.out.println("底盘制造被终止!");
        }
        System.out.println("底盘装配线关闭...");
    }
}

// 组装引擎、动力系统、轮胎环节
class Assembler implements Runnable {
    // 底盘之后的第二道工序
    private CarQueue chassisQueue;
    private CarQueue finishingQueue;
    private MyCar myCar;
    // 不用 CountDownLatch 是因为要复用
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
    private RobotPool robotPool;

    public Assembler(CarQueue chassisQueue, CarQueue finishingQueue, RobotPool robotPool) {
        this.chassisQueue = chassisQueue;
        this.finishingQueue = finishingQueue;
        this.robotPool = robotPool;
    }

    public MyCar car() {
        return myCar;
    }

    public CyclicBarrier barrier() {
        return cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 阻塞直到底盘装配线传来待组装的车
                myCar = chassisQueue.take();

                // 使用机器人装配
                robotPool.consume(EngineRobot.class, this);
                robotPool.consume(DriveTrainRobot.class, this);
                robotPool.consume(WheelRobot.class, this);

                // 第四个 barrier,表示车组装完成
                cyclicBarrier.await();
                finishingQueue.put(myCar);
            }
        } catch (InterruptedException e) {
            System.out.println("装配引擎、动力系统、轮胎被终止!");
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }

        System.out.println("组装装配线关闭...");
    }
}

// 通报系统
class Reporter implements Runnable {
    private CarQueue finishingQueue;

    public Reporter(CarQueue finishingQueue) {
        this.finishingQueue = finishingQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.println(finishingQueue.take());
            }
        } catch (InterruptedException e) {
            System.out.println("通报系统被终止!");
        }
        System.out.println("通报系统关闭...");
    }
}

// 抽象组装过程中的三个机器人
abstract class Robot implements Runnable {
    private RobotPool robotPool;
    protected Assembler assembler;
    private boolean isWorking = false;

    public Robot(RobotPool robotPool) {
        this.robotPool = robotPool;
    }

    public Robot assignAssembler(Assembler assembler) {
        this.assembler = assembler;
        return this;
    }

    // 开始干活
    public synchronized void engage() {
        isWorking = true;
        notifyAll();
    }

    abstract protected void performService();

    @Override
    public void run() {
        try {
            // wait until needed
            powerDown();
            while (!Thread.interrupted()) {
                performService();
                assembler.barrier().await();
                // 已经做完了
                powerDown();
            }
        } catch (InterruptedException e) {
            System.out.println("机器人自动装配被终止!");
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
        System.out.println(this + "关闭...");
    }

    // 消除状态,停止工作。直到有可用的机器人
    private synchronized void powerDown() throws InterruptedException {
        isWorking = false;
        assembler = null;
        robotPool.releaser(this);
        while (isWorking == false) {
            wait();
        }
    }

    @Override
    public String toString() {
        return getClass().getName();
    }
}

// 安装引擎机器人
class EngineRobot extends Robot {
    public EngineRobot(RobotPool robotPool) {
        super(robotPool);
    }

    @Override
    protected void performService() {
        System.out.println(this + " 安装引擎...");
        assembler.car().addEngine();
    }
}

// 安装动力系统机器人
class DriveTrainRobot extends Robot {
    public DriveTrainRobot(RobotPool robotPool) {
        super(robotPool);
    }

    @Override
    protected void performService() {
        System.out.println(this + " 安装动力系统...");
        assembler.car().addDriveTrain();
    }
}

// 安装轮胎机器人
class WheelRobot extends Robot {
    public WheelRobot(RobotPool robotPool) {
        super(robotPool);
    }

    @Override
    protected void performService() {
        System.out.println(this + " 安装轮胎...");
        assembler.car().addWheels();
    }
}

// 机器人池
class RobotPool {
    private Set<Robot> pool = new HashSet<Robot>();

    // 使用完毕的机器人重新放入资源池,然后通知等待该机器人的任务
    public synchronized void produce(Robot robot) {
        pool.add(robot);
        notifyAll();
    }

    // 机器人消费者
    public synchronized void consume(Class<? extends Robot> robotType, Assembler assembler) throws InterruptedException {
        for (Robot robot : pool) {
            if (robot.getClass().equals(robotType)) {
                pool.remove(robot);
                robot.assignAssembler(assembler);
                robot.engage();
                return;
            }
        }
        wait();
        consume(robotType, assembler);
    }

    // 释放资源,本质就是重新加入到机器人池
    public synchronized void releaser(Robot robot) {
        produce(robot);
    }
}

public class CarBuilder {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();

        // 初始化机器人池,每个类型只有一个机器人,所以是临界资源
        RobotPool robotPool = new RobotPool();
        exec.execute(new EngineRobot(robotPool));
        exec.execute(new DriveTrainRobot(robotPool));
        exec.execute(new WheelRobot(robotPool));

        /**
         * 生产线正式启动。<br>
         * 故意把底盘装配(第一个环节)放在最后一个启动,反正 Assembler 会自动阻塞
         */
        CarQueue chassisQueue = new CarQueue();
        CarQueue finishingQueue = new CarQueue();
        exec.execute(new Assembler(chassisQueue, finishingQueue, robotPool));
        exec.execute(new Reporter(finishingQueue));
        exec.execute(new ChassisBuilder(chassisQueue));

        // 模拟程序在7秒后自动结束
        TimeUnit.SECONDS.sleep(7);
        exec.shutdownNow();
    }
}

上一篇     下一篇