本文共 4034 字,大约阅读时间需要 13 分钟。
IO复用:同步,复用线程,事件循环驱动
#!/usr/bin/pythonimport osimport selectimport socketimport sysdef relay(sock): poll = select.poll() poll.register(sock, select.POLLIN) poll.register(sys.stdin, select.POLLIN) done = False while not done: events = poll.poll(10000) # 10 seconds for fileno, event in events: if event & select.POLLIN: if fileno == sock.fileno(): data = sock.recv(8192) if data: sys.stdout.write(data) else: done = True else: assert fileno == sys.stdin.fileno() data = os.read(fileno, 8192) if data: sock.sendall(data) else: sock.shutdown(socket.SHUT_WR) poll.unregister(sys.stdin)def main(argv): if len(argv) < 3: binary = argv[0] print "Usage:\n %s -l port\n %s host port" % (argv[0], argv[0]) return port = int(argv[2]) if argv[1] == "-l": # server server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', port)) server_socket.listen(5) (client_socket, client_address) = server_socket.accept() server_socket.close() relay(client_socket) else: # client sock = socket.create_connection((argv[1], port)) relay(sock)if __name__ == "__main__": main(sys.argv)
#include "thread/Atomic.h"#include "datetime/Timestamp.h"#include "Acceptor.h"#include "InetAddress.h"#include "TcpStream.h"#include#include muduo::AtomicInt64 g_bytes;std::string getMessage(){ std::string line; for (int i = 33; i < 127; ++i) { line.push_back(char(i)); } line += line; std::string message; for (size_t i = 0; i < 127-33; ++i) { message += line.substr(i, 72) + '\n'; } return message;}void measure(){ muduo::Timestamp start = muduo::Timestamp::now(); while (true) { struct timespec ts = { 1, 0 }; ::nanosleep(&ts, NULL); // unfortunately, those two assignments are not atomic int64_t bytes = g_bytes.getAndSet(0); muduo::Timestamp end = muduo::Timestamp::now(); double elapsed = timeDifference(end, start); start = end; if (bytes) { printf("%.3f MiB/s\n", bytes / (1024.0 * 1024) / elapsed); } }}void chargen(TcpStreamPtr stream){ std::string message = getMessage(); while (true) { #客户端没有read会卡住 int nw = stream->sendAll(message.data(), message.size()); g_bytes.add(nw); if (nw < static_cast (message.size())) { break; } }}// a thread-per-connection current chargen server and clientint main(int argc, char* argv[]){ if (argc < 3) { printf("Usage:\n %s hostname port\n %s -l port\n", argv[0], argv[0]); return 0; } std::thread(measure).detach(); int port = atoi(argv[2]); if (strcmp(argv[1], "-l") == 0) { InetAddress listenAddr(port); Acceptor acceptor(listenAddr); printf("Accepting... Ctrl-C to exit\n"); int count = 0; while (true) { TcpStreamPtr tcpStream = acceptor.accept(); printf("accepted no. %d client\n", ++count); std::thread thr(chargen, std::move(tcpStream)); thr.detach(); } } else { InetAddress addr; const char* hostname = argv[1]; if (InetAddress::resolve(hostname, port, &addr)) { TcpStreamPtr stream(TcpStream::connect(addr)); if (stream) { chargen(std::move(stream)); } else { printf("Unable to connect %s\n", addr.toIpPort().c_str()); perror(""); } } else { printf("Unable to resolve %s\n", hostname); } }}
服务器:./chargen -l 1234
客户端1:nc localhost 1234 > /dev/null # 测试正常 客户端2:nc localhost 1234 < /dev/zero >/dev/null # 阻塞住,因为客户端阻塞在write上转载地址:http://nfhpn.baihongyu.com/