async server u zig-u (sve platforme)

  • Začetnik teme Začetnik teme bmaxa
  • Datum pokretanja Datum pokretanja

bmaxa

Legenda
Poruka
70.808
Elem opet sam se malo igrao za zigom, pa napravih mali server, koji donekle dobro radi.
Radi ovako: primi do 100 konekcija, pa onda ceka da se sve zavrse.
Normalno, lako se obori, samo treba otvoriti jednu konekciju izmedju ostalih,
i cutati, pa ce da se zaglupi. Tako se u principu moze oboriti svaki server,
zato oni imaju read timeout, sto ovde nema.
No necemo sad da pravimo bulletproof server, nego da pokazemo
kako se async koristi u zigu.
Prvo sam probao bez threadova, ali ako se pozovu dva async poziva
nad istom f-jom u istom threadu, dolazi do gazenja konteksta f-je,
sto nije dobro. Dakle jedan async poziv iste f-je, u jednom threadu.
To zato da ne bi morali da zovemo await odmah nakon async,
sto nema poentu, jer onda nije asinhrono :P
dakle evo ga:
Kod:
bmaxa@Branimirs-Air zigrays % cat test.zig
const std = @import("std");
const net = std.net;
const mem = std.mem;
const testing = std.testing;
pub const log_level: std.log.Level = .debug;
pub const io_mode = .evented;
pub fn main() !void {
    if (!std.io.is_async) return error.SkipZigTest;

    // Ignore sigpipe
    var act = std.os.Sigaction{
        .handler = .{ .handler = std.os.SIG.IGN },
        .mask = std.os.empty_sigset,
        .flags = 0,
    };
    try std.os.sigaction(std.os.SIG.PIPE, &act, null);
    const localhost = try net.Address.parseIp("::", 6666);

    var server = net.StreamServer.init(net.StreamServer.Options{ .reuse_address = true });
    defer server.deinit();
    try server.listen(localhost);

    var server_frame = async testServer(&server);
    var client_frame = async testClient(server.listen_address);

    try await server_frame;
    try await client_frame;
}

fn testClient(addr: net.Address) anyerror!void {
    const socket_file = try net.tcpConnectToAddress(addr);
    defer socket_file.close();

    var buf: [100]u8 = undefined;
    _ = try socket_file.write("abcd");
    const len = try socket_file.read(&buf);
    const msg = buf[0..len];
    _ = try testing.expect(mem.eql(u8, msg, "hello from server\n"));
}
fn testServer(server: *net.StreamServer) anyerror!void {
    var connections:u32 = 0;
    var tasks:std.ArrayList(@Frame(handle)) = std.ArrayList(@Frame(handle)).init(std.heap.c_allocator);
    var threads:std.ArrayList(std.Thread) = std.ArrayList(std.Thread).init(std.heap.c_allocator);
    var clients: std.ArrayList(net.StreamServer.Connection) = std.ArrayList(net.StreamServer.Connection).init(std.heap.c_allocator);
    while (true) {
    while (connections<100) :(connections += 1) {
        var client = try server.accept();
        std.log.info("accepted: {}",.{client});
        _ = try tasks.addOne();
        _ = try clients.append(client);
        _ = try threads.append(try std.Thread.spawn(std.Thread.SpawnConfig{}, tf, .{&tasks.items[tasks.items.len-1],client}));
    }
        for (tasks.items) |*t,index| { std.log.info("waiting {} {}",.{index,t});await t catch |err| {
                std.log.warn("Disconnected {}: {}\n", .{ clients.items[index], err });
            };
        }
        for (threads.items) |*t| {
            t.join();
        }
        tasks.clearRetainingCapacity();
        threads.clearRetainingCapacity();
        clients.clearRetainingCapacity();
        connections = 0;
    }
}

fn handle(client: net.StreamServer.Connection) !void {
    defer client.stream.close();
    const wstream = client.stream.writer();
    const readstream = client.stream.reader();
    var buf: [256]u8 = undefined;
    const len = try readstream.read(buf[0..]);
    std.debug.print("got {s}\n",.{buf[0..len]});
    _ = try wstream.print("hello from server\n", .{});
}
fn tf(frame: *@Frame(handle), client:net.StreamServer.Connection)!void{
    frame.* = async handle(client);
}

zamenite samo u parseIp sa nekom drugom adresom (recimo "0.0.0.0" za bilo koju), ukoliko nemate ipv6.
ovo se kompajlira sa recimo: zig build-exe -fstage1 -O ReleaseSmall test.zig
ovo fstage1 je potrebno za async, jer je lik poceo da pravi self hosted compiler pa jos nije tu implementirao async.

E sad klijent za testiranje ovog programa, u Go-u:
koristi se ovako:
Kod:
bmaxa@Branimirs-Air tcpclient % ./tcpclient
numcpu: 8
usage : tcpclient conc nreqs address:port
dakle ide concurrency sto znaci sa koliko konekcija paralelno da napadne server,
broj requestova ukupno,
i ovo zadnje je koja adresa:port
Ne pokusavajte ovaj program na internetu, jer ce vas verovatno banovati, samo na ovom serveru.
Kod:
bmaxa@Branimirs-Air tcpclient % cat main.go
package main

import (
  "net"
  "io/ioutil"
  "os"
  "fmt"
  "flag"
  "strconv"
  "sync"
  "time"
  "runtime"
)

var exit chan int = make(chan int)
func main() {
  timeout, err := time.ParseDuration("2s")
  runtime.GOMAXPROCS(runtime.NumCPU())
  fmt.Println("numcpu:",runtime.NumCPU())
  flag.Parse()
  if flag.NArg() != 3 {
    fmt.Println("usage : tcpclient conc nreqs address:port");
    return
  }
  ta,err := net.ResolveTCPAddr("tcp",flag.Arg(2))
  if err != nil {
    fmt.Fprintln(os.Stderr,err)
    return
  }
  fmt.Println(flag.Arg(2))
  conc,_ := strconv.Atoi(flag.Arg(0))
  nreqs,_ := strconv.Atoi(flag.Arg(1))
  reqs := float64(nreqs)
  bytes := 0
  l := sync.Mutex{}
  pb := sync.Mutex{}
  cnd := sync.NewCond(&l)
  count := 0
  start := time.Now()
  exit1 := make(chan int)
  go func(){
    var rc,errcnt int
    for reqs:=nreqs;reqs > 0;reqs-- {
      if reqs % 100 == 0 {
        fmt.Println("completed",nreqs-reqs)
      }
      rc = <- exit
      errcnt += rc
    }
    fmt.Println("errors",errcnt)
    exit1 <- 1
  }()
  for nreqs:=reqs;nreqs>0; {
    cnd.L.Lock()
    cu := count
    cnd.L.Unlock()
    if cu < conc {
      nreqs--
      cnd.L.Lock()
      count++
      cnd.L.Unlock()
      go func() {
        conn,e := net.DialTCP("tcp",nil,ta)
        defer func() {
          cnd.L.Lock()
          count--
          cnd.Signal()
          cnd.L.Unlock()
          var rc = 1
          if conn != nil { conn.Close();rc = 0; }
          exit <- rc
        } ()
        if e != nil {
          fmt.Fprintln(os.Stderr,e)
          return
        }
        conn.SetDeadline(time.Now().Add(timeout))
        conn.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
        b,_ := ioutil.ReadAll(conn)
        pb.Lock()
        bytes += len(b)
        pb.Unlock()
      }()
    } else {
      cnd.L.Lock()
      for count >= conc {
        cnd.Wait()
      }
      cnd.L.Unlock()
    }
  }
  <- exit1
  end := time.Since(start)
  fmt.Printf("%f\nbytes %d\n",reqs/end.Seconds(),bytes)
}

Dakle ovo radi svuda bez izmena, pa malo eksperimentisite :P
 
Inace ovo je za vrziju 0.10 zig kompajlera, u 0.11 je eliminisao potpuno async, pa ko radi sa najnovijim kompajlerom evo
verzije bez async:
Kod:
const std = @import("std");
const net = std.net;
const mem = std.mem;
const testing = std.testing;
pub const default_level: std.log.Level = .debug;
//pub const io_mode = .evented;
pub fn main() !void {
//    if (!std.io.is_async) return error.SkipZigTest;

    // Ignore sigpipe
    var act = std.os.Sigaction{
        .handler = .{ .handler = std.os.SIG.IGN },
        .mask = std.os.empty_sigset,
        .flags = 0,
    };
    try std.os.sigaction(std.os.SIG.PIPE, &act, null);
    const localhost = try net.Address.parseIp("::", 6666);

    var server = net.StreamServer.init(net.StreamServer.Options{ .reuse_address = true });
    defer server.deinit();
    try server.listen(localhost);

    var server_frame = try std.Thread.spawn(std.Thread.SpawnConfig{}, testServer,.{&server});
    try testClient(server.listen_address);

    server_frame.join();
}

fn testClient(addr: net.Address) anyerror!void {
    const socket_file = try net.tcpConnectToAddress(addr);
    defer socket_file.close();

    var buf: [100]u8 = undefined;
    _ = try socket_file.write("abcd");
    const len = try socket_file.read(&buf);
    const msg = buf[0..len];
    _ = try testing.expect(mem.eql(u8, msg, "hello from server\n"));
}
fn testServer(server: *net.StreamServer) anyerror!void {
    var connections:u32 = 0;
    var threads:std.ArrayList(std.Thread) = std.ArrayList(std.Thread).init(std.heap.c_allocator);
    var clients: std.ArrayList(net.StreamServer.Connection) = std.ArrayList(net.StreamServer.Connection).init(std.heap.c_allocator);
    while (true) {
    while (connections<100) :(connections += 1) {
        var client = try server.accept();
        std.log.err("accepted: {}",.{client});
        _ = try clients.append(client);
        _ = try threads.append(try std.Thread.spawn(std.Thread.SpawnConfig{}, tf, .{client}));
    }
        for (threads.items) |*t| {
            t.join();
        }
        threads.clearRetainingCapacity();
        clients.clearRetainingCapacity();
        connections = 0;
    }
}

fn handle(client: net.StreamServer.Connection) !void {
    defer client.stream.close();
    const wstream = client.stream.writer();
    const readstream = client.stream.reader();
    var buf: [256]u8 = undefined;
    const len = try readstream.read(buf[0..]);
    std.debug.print("got {s}\n",.{buf[0..len]});
    _ = try wstream.print("hello from server\n", .{});
}
fn tf(client:net.StreamServer.Connection)!void{
    try handle(client);
}
zig je u aktivnom developmentu tako da se iz verzije u verziju featuri
dodaju/izbacuju.
recimo default log ne radi pa sam log prebacio na error nivo, da ne bih
morao da se smaram sa scoped logom za ovako mali program.
inace non blocking soketi su potrebni da bi ovo na nesto licilo, sto
bi trebalo da bude obezbedjeno sa async, al ajde :P
 
A sad za macOS primer kernel queue, u kombinaciji sa threadovima. Prilichno bulet proof server jer ima timeout...
Swift naravno, na Linux-u bi to bio epoll, na Windows-u IOCP, a presek svih bi bio select. Uz threadove i select
bi bio dovoljan jer on ima maks 1000 konekcija, poq queue...
Elem dizhem threadova kolko ima procesora, i svaki do koliko procesora ima konekcija obradjuje.
U sluchaju da su svi zauzeti, dizhem dodatne threadove po potrebi, a spushtam na broj procesora kada su
slobodni, uz dodatak da oslobadjam thread kad napuni 100k requesta, zbog fragmentacije memorije
koja izaziva da memorijsko zauzetje stalno raste. Eto, u malo koda, ozbiljan server, koji sabira
dva broja iz nekog drugog zadatka :P
Swift:
import Foundation
signal(SIGPIPE,SIG_IGN)
let s = Server();
s.start()
var factor = 4
var ende:Bool? = nil
var semacnt:Int? = nil
var occupied:Int? = nil
var timeout:timespec? = nil
var timeoutr:Int? = nil
var timeoutw:Int? = nil
var serviced:Int? = nil
var accarrFD:[Int32]?
var cond:NSCondition?
var lock:NSLock?
var standardError:FileHandle? = nil
class Server {
  init() {
      let task = Process()
      task.executableURL = URL(fileURLWithPath: "/usr/sbin/sysctl")
      task.arguments = ["hw.ncpu"]
      let out = Pipe()
      task.standardOutput = out
      let res: ()? = try? task.run()
      if let _ = res {
          let outputData = out.fileHandleForReading.readDataToEndOfFile()
          let s = String(decoding: outputData, as: UTF8.self)
          print("got from sysctl ",s)
          let sep = CharacterSet(charactersIn: " \n")
          let components = s.components(separatedBy: sep)
          factor = Int(components[1]) ?? 4
      }
    if (CommandLine.argc > 1) {
      tmpSvcPort = Int(CommandLine.arguments[1]) ?? 0
      if (tmpSvcPort > 0){ servicePort = String(tmpSvcPort) }
    }
    semacnt = 0
    occupied = 0
    ende = false
    timeoutr = 0
    timeoutw = 0
    serviced = 0
    timeout = timespec(tv_sec:5,tv_nsec:0)
    standardError = FileHandle.standardError
    cond = NSCondition()
    lock = NSLock()
    accarrFD = Array(repeating:Int32(0),count:0)
  }
  deinit {
  }
  var tmpSvcPort: Int = 0
  var servicePort = "1234"
  class Worker:Thread{
    override init(){
      super.init()
      semacnt! += 1
    }
    deinit {
      print("deinit")
      semacnt! -= 1
    }
    override func main(){
      let MTU = 65536
      let buffer = UnsafeMutablePointer<CChar>.allocate(capacity:MTU)
      defer {
        buffer.deallocate()
      }
      let sockKqueue = kqueue()
      if sockKqueue == -1 {
          print("Error creating kqueue",to:&standardError!)
      }
      var servicedone = 0
      defer {close(sockKqueue)}
      while(semacnt! <= factor && servicedone<100000){
        cond!.lock()
        while (accarrFD!.isEmpty){
          cond!.wait()
        }
        var oneFD:[Int32:kevent]=[:]
        var count = 0
        while (!accarrFD!.isEmpty && count<factor){
          /// collect rest of fds
          let fd = accarrFD![accarrFD!.count-1]
          oneFD[fd]=kevent(
            ident: UInt(fd),
            filter: Int16(EVFILT_READ),
            flags: UInt16(EV_ADD | EV_ENABLE),
            fflags: 0,
            data: 0,
            udata: nil)
          accarrFD!.remove(at:accarrFD!.count-1)
          count+=1
        }

        cond!.unlock()
        lock!.lock()
        occupied! += 1
        serviced!+=oneFD.count
        servicedone+=oneFD.count
        lock!.unlock()
        while !oneFD.isEmpty{
        let toproc = oneFD
        var arrFD = Array(repeating:kevent(),count:0)
        for (fd,value) in toproc{
          let flags = fcntl(fd,F_GETFL)
          let _ = fcntl(fd,F_SETFL,flags | O_NONBLOCK)
          arrFD.append(value)
        }

        kevent(sockKqueue, &arrFD, Int32(arrFD.count), nil, 0, nil)
        var event = Array(repeating:kevent(),count:arrFD.count)
        let status = kevent(sockKqueue, nil, 0, &event, Int32(event.count), &timeout!)
        if  status == 0 {
          print("Timeout")
          timeoutr! += 1
          for i in oneFD {
            close(i.key)
          }
          oneFD.removeAll()
          continue
        } else if status > 0 {
            for i in 0..<status {
              let i:Int = Int(i)
              let fd:Int32 = Int32(event[i].ident)
              defer {close(fd);oneFD.removeValue(forKey:fd)}
              if (event[i].flags & UInt16(EV_EOF)) == EV_EOF {
                print("The socket (\(event[i].ident)) has been closed.")
                continue
              }
              print("File descriptor: \(event[i].ident) - has \(event[i].data) characters for reading")
              let readResult = read(fd, buffer, event[i].data>MTU ? MTU : event[i].data)

              if (readResult == 0) {
                continue;  // end of file
              } else if (readResult == -1) {
                print("Error reading form client(\(fd)) - \(errno)",to:&standardError!)
                continue;  // error
              } else {
                buffer[event[i].data]=0
                let strResult =
                  String(cString: buffer)
                let cset = CharacterSet(charactersIn: " \r\n")
                let numbers = strResult.components(separatedBy: cset)
                var sum = 0
                if numbers.count > 1 {
                  if numbers[0].lowercased() == "quit" {
                    ende! = true
                    continue
                  } else {
                    let num1 = Int(numbers[0]) ?? 0
                    let num2 = Int(numbers[1]) ?? 0
                    sum = num1 + num2
                  }
                }
                print("Received form client(\(fd)): \(strResult)")
                let out = String(format:"suma %d\n",sum)
                let _ = out.withCString {
                  var sockKevent = kevent(
                    ident: UInt(fd),
                    filter: Int16(EVFILT_WRITE),
                    flags: UInt16(EV_ADD | EV_ENABLE),
                    fflags: 0,
                    data: 0,
                    udata: nil)
                  kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
                  var event = kevent()
                  let status = kevent(sockKqueue, nil, 0, &event, 1, &timeout!)
                  if  status == 0 {
                    print("Timeout")
                    timeoutw!+=1
                  } else if status > 0 {
                      if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                        print("The socket (\(fd)) has been closed.")
                      }
                      print("File descriptor: \(fd) - can write")
                      write(fd, $0, out.count)
                    }
                      else {
                        print("Error reading kevent")
                    }
                  }
                }
            }
          }
          else {
            print("Error reading kevent",to:&standardError!)
            continue
          }
        }
        print("release occupied",occupied!,semacnt!,timeoutr!,timeoutw!,serviced!)
        lock!.lock()
        occupied! -= 1
        lock!.unlock()
      }
    }
  }
  func start() {
    print("Server starting...")

    let socketFD = socket(AF_INET6,    //Domain [AF_INET,AF_INET6, AF_UNIX]
                          SOCK_STREAM, //Type [SOCK_STREAM, SOCK_DGRAM,
                                       // SOCK_SEQPACKET, SOCK_RAW]
                          IPPROTO_TCP  //Protocol [IPPROTO_TCP, IPPROTO_SCTP,
                                       // IPPROTO_UDP, IPPROTO_DCCP]
                          )            //Return a FileDescriptor -1 = error
    if socketFD == -1 {
      print("Error creating BSD Socket",to:&standardError!)
      return
    }

    var hints = addrinfo(
      ai_flags: AI_PASSIVE,    // Assign the address of the local host to the
                               // socket structures
      ai_family: AF_UNSPEC,    // Either IPv4 or IPv6
      ai_socktype: SOCK_STREAM,// TCP
      ai_protocol: 0,
      ai_addrlen: 0,
      ai_canonname: nil,
      ai_addr: nil,
      ai_next: nil)

    var servinfo: UnsafeMutablePointer<addrinfo>? = nil
    let addrInfoResult = getaddrinfo(
      nil,                        // Any interface
      servicePort,                // The port on which will be listenend
      &hints,                     // Protocol configuration as per above
      &servinfo)

    if addrInfoResult != 0 {
      print("Error getting address info: \(errno)",to:&standardError!)
      return
    }
    setsockopt(socketFD,SOL_SOCKET,SO_REUSEADDR,servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))
    let bindResult = bind(socketFD, servinfo!.pointee.ai_addr, socklen_t(servinfo!.pointee.ai_addrlen))

    if bindResult == -1 {
      print("Error binding socket to Address: \(errno)",to:&standardError!)
      return
    }

    let listenResult = listen(socketFD, //Socket File descriptor
                              128         // adequate
                                        // The backlog argument defines the
                                        // maximum length the queue of pending
                                        // connections may grow to
    )

    let flags = fcntl(socketFD,F_GETFL)
    let _ = fcntl(socketFD,F_SETFL,flags | O_NONBLOCK)

    let sockKqueue = kqueue()
    if sockKqueue == -1 {
        print("Error creating kqueue",to:&standardError!)
      }
      var sockKevent = kevent(
        ident: UInt(socketFD),
        filter: Int16(EVFILT_READ),
        flags: UInt16(EV_ADD | EV_ENABLE),
        fflags: 0,
        data: 0,
        udata: nil)
      kevent(sockKqueue, &sockKevent, 1, nil, 0, nil)
      var event = kevent()
      if listenResult == -1 {
      print("Error setting our socket to listen",to:&standardError!)
      return
    }
    while (!ende!) {
      var addr = sockaddr()
      var addr_len :socklen_t = 0
      while (!ende!) {
          print("About to accept")
          let status = kevent(sockKqueue, nil, 0, &event, 1, nil)
          if  status == 0 {
            print("Timeout")
            continue
          } else if status > 0 {
              if (event.flags & UInt16(EV_EOF)) == EV_EOF {
                print("The socket (\(socketFD)) has been closed.shutdown.",to:&standardError!)
                exit(0)
              }
              print("File descriptor: \(socketFD) - has \(event.data) characters for reading")
            }
            else {
              print("Error reading kevent",to:&standardError!)
              continue
            }
          for _ in 0..<event.data {
          let accFD = accept(socketFD, &addr, &addr_len)
          if (accFD > 0){
              print("Accepted new client with file descriptor: \(accFD)")
              cond!.lock()
              accarrFD!.append(accFD)
              if (semacnt! < factor || semacnt! <= occupied!) {
                let handle = Worker();
                handle.start()
              }
              cond!.unlock()
            } else {
                print("Error accepting connection",to:&standardError!);break
            }
          }
          cond!.broadcast()
        }
    }
    exit(EXIT_SUCCESS)
  }

}

extension FileHandle: TextOutputStream {
  public func write(_ string: String) {
    let data = Data(string.utf8)
    self.write(data)
  }
}
 

Back
Top