案例:基于线程池技术的简单Web服务器

线程池技术

ThreadPool (接口类)

  • 客户端可以通过execute(Job)将Job提交入线程池执行,而客户端自身不用等待Job的执行完成
  • 这里的工作者线程代表一个重复执行Job的线程
  • 而每个客户端提交的Job豆浆进入到一个工作队列中等待工作者线程的处理
package test.ThreadPoolDemo;

/**
 * @author xuzhihua
 * @date 2019/2/23 11:58 AM
 * 简单的线程池接口定义
 */
public interface ThreadPool<Job extends Runnable> {
    // 执行一个job,这个job需要实现Runnable
    void execute(Job job);
    // 关闭线程池
    void shutDown();
    // 增加工作组线程
    void addWorker(int num);
    // 减少工作组线程
    void removeWorker(int num);
    // 得到正在等待执行的任务数量
    int getJobSize();
}

DefaultThreadPool (实现类)

package test.ThreadPoolDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author xuzhihua
 * @date 2019/2/23 12:02 PM
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 这是一个工作列表,将会向里面插入工作
    private final LinkedList<Job> jobs = new LinkedList<Job>();
    // 工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    // 工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num);
        initializeWorkers(workerNum);
    }

    @Override
    public void execute(Job job) {
        if (job != null) {
            // 添加一个工作者,然后进行通知
            synchronized (job) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutDown() {
        for (Worker worker:workers) {
            worker.shutDown();
        }
    }

    @Override
    public void addWorker(int num) {
        synchronized (jobs) {
            // 限制新增的Worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num >= this.workerNum)
                throw new IllegalArgumentException("beyond workNum");
            // 按照给定的数量停止Worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutDown();
                    count ++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }


    // 初始化线程工作者
    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    // 工作者,负责消费任务
    class Worker implements Runnable {
        // 是否工作
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    // 如果工作者列表是空的,那么就wait
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            // 感知到我补对WorkerThread的终端操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出一个Job
                    job = jobs.removeFirst();
                }
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception e) {
                        // 忽略Job执行中的Exception
                    }
                }
            }
        }
        public void shutDown() {
            running = false;
        }
    }
}

SimpleHttpServer (Web服务器)

SimpleHttpServer 时序图

package test.ThreadPoolDemo;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author xuzhihua
 * @date 2019/2/23 12:23 PM
 * 基于线程池技术的简单Web服务器
 */
public class SimpleHttpServer {
    // 处理HttpRequest的线程池
    static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(1);
    // SimpleHttpServer的根路径
    static String basePath;
    static ServerSocket serverSocket;
    static int port = 8080;
    public static void setPort(int port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }
    public static void setBasePath(String basePath) {
        if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory())
            SimpleHttpServer.basePath = basePath;
    }

    // 启动SimpleHttpServer
    public static void start() throws Exception {
        serverSocket = new ServerSocket(port);
        Socket socket = null;
        while ((socket = serverSocket.accept()) != null) {
            // 接收一个客户端Socket,生成一个 HttpRequestHandler, 放入线程池执行
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }



    static class HttpRequestHandler implements Runnable {
        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String line = null;
            BufferedReader br = null;
            BufferedReader reader = null;
            PrintWriter out = null;
            InputStream in = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                // 由相对路径计算出绝对路径
                String filePath = basePath + header.split(" ")[1];
                out = new PrintWriter(socket.getOutputStream());
                // 如果请求资源的后缀为jpg或ico,则读取资源并输出
                if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i = 0;
                    while ((i = in.read()) != -1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: molly");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-length: " + array.length);
                    out.println(" ");
                    socket.getOutputStream().write(array, 0, array.length);
                } else {
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: molly");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println(" ");
                    while ((line = br.readLine()) != null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (Exception e) {
                out.println("HTTP/1.1 500");
                out.println(" ");
                out.flush();
            } finally {
                close(br, in, reader, out, socket);
            }
        }
    }
    // 关闭流或者Socket
    private static void close(Closeable... closeables) {
        if (closeables != null) {
            for (Closeable closeable : closeables) {
                try {
                    closeable.getClass();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试方法

  1. 准备一个html文件

    <html>
        <head>
            <title>测试页面</title>
        </head>
        <body>
            <h1>第一张图片</h1>
            <img src="1.jpg" />
            <h1>第二张图片</h1>
            <img src="2.jpg" />
            <h1>第三张图片</h1>
            <img src="3.jpg" />
        </body>
    </html>
    
  2. 将SimpleHttpServer的根目录设置到该HTML页面所在目录,并启动SimpleHttpServer

  3. 通过 ab(Apache HTTP Server benchmarking tool) 来测试不同线程数下, SimpleHttpServer的吞吐量表现