X7ROOT File Manager
Current Path:
/opt/alt/php85/usr/include/php/ext/swoole/include
opt
/
alt
/
php85
/
usr
/
include
/
php
/
ext
/
swoole
/
include
/
??
..
??
swoole.h
(26.96 KB)
??
swoole_api.h
(2.02 KB)
??
swoole_asm_context.h
(2.31 KB)
??
swoole_async.h
(3.29 KB)
??
swoole_atomic.h
(2.3 KB)
??
swoole_base64.h
(1.3 KB)
??
swoole_buffer.h
(2.24 KB)
??
swoole_channel.h
(2.37 KB)
??
swoole_client.h
(11.75 KB)
??
swoole_config.h
(8.79 KB)
??
swoole_coroutine.h
(8.92 KB)
??
swoole_coroutine_api.h
(5.12 KB)
??
swoole_coroutine_channel.h
(4.17 KB)
??
swoole_coroutine_context.h
(2.75 KB)
??
swoole_coroutine_socket.h
(17.64 KB)
??
swoole_coroutine_system.h
(3.78 KB)
??
swoole_dtls.h
(2.42 KB)
??
swoole_error.h
(6.46 KB)
??
swoole_file.h
(4.88 KB)
??
swoole_file_hook.h
(2.76 KB)
??
swoole_hash.h
(1.24 KB)
??
swoole_heap.h
(1.9 KB)
??
swoole_http.h
(7.24 KB)
??
swoole_http2.h
(8.55 KB)
??
swoole_iouring.h
(5.52 KB)
??
swoole_llhttp.h
(2.02 KB)
??
swoole_lock.h
(3.25 KB)
??
swoole_log.h
(15.71 KB)
??
swoole_lru_cache.h
(3.18 KB)
??
swoole_memory.h
(2.83 KB)
??
swoole_message_bus.h
(5.18 KB)
??
swoole_mime_type.h
(1.42 KB)
??
swoole_mqtt.h
(2.16 KB)
??
swoole_msg_queue.h
(1.96 KB)
??
swoole_pipe.h
(2.6 KB)
??
swoole_process_pool.h
(13.17 KB)
??
swoole_protocol.h
(5.03 KB)
??
swoole_proxy.h
(3.3 KB)
??
swoole_reactor.h
(12.4 KB)
??
swoole_redis.h
(1.83 KB)
??
swoole_server.h
(49.72 KB)
??
swoole_signal.h
(2.99 KB)
??
swoole_socket.h
(18.51 KB)
??
swoole_socket_hook.h
(2.31 KB)
??
swoole_socket_impl.h
(1.43 KB)
??
swoole_ssl.h
(5.22 KB)
??
swoole_static_handler.h
(5.21 KB)
??
swoole_string.h
(7.32 KB)
??
swoole_table.h
(6.5 KB)
??
swoole_thread.h
(1.99 KB)
??
swoole_timer.h
(4.37 KB)
??
swoole_uring_socket.h
(3.84 KB)
??
swoole_util.h
(7.49 KB)
??
swoole_version.h
(1.74 KB)
??
swoole_websocket.h
(5.11 KB)
Editing: swoole_process_pool.h
/* +----------------------------------------------------------------------+ | Swoole | +----------------------------------------------------------------------+ | This source file is subject to version 2.0 of the Apache license, | | that is bundled with this package in the file LICENSE, and is | | available through the world-wide-web at the following url: | | http://www.apache.org/licenses/LICENSE-2.0.html | | If you did not receive a copy of the Apache2.0 license and are unable| | to obtain it through the world-wide-web, please send a note to | | license@swoole.com so we can mail you a copy immediately. | +----------------------------------------------------------------------+ | Author: Tianfeng Han <rango@swoole.com> | | Twosee <twose@qq.com> | +----------------------------------------------------------------------+ */ #pragma once #include "swoole_signal.h" #include "swoole_lock.h" #include "swoole_pipe.h" #include "swoole_channel.h" #include "swoole_msg_queue.h" #include "swoole_message_bus.h" #include <sys/wait.h> #include <csignal> #include <unordered_map> #include <unordered_set> #include <queue> enum swWorkerStatus { SW_WORKER_BUSY = 1, SW_WORKER_IDLE = 2, SW_WORKER_EXIT = 3, }; enum swWorkerType { SW_MASTER = 1, SW_WORKER = 2, SW_MANAGER = 3, SW_EVENT_WORKER = 2, SW_TASK_WORKER = 4, SW_USER_WORKER = 5, }; enum swIPCMode { SW_IPC_NONE = 0, SW_IPC_UNIXSOCK = 1, SW_IPC_MSGQUEUE = 2, SW_IPC_SOCKET = 3, }; SW_API swoole::WorkerId swoole_get_worker_id(); SW_API pid_t swoole_get_worker_pid(); SW_API int swoole_get_worker_type(); SW_API void swoole_set_worker_pid(pid_t pid); SW_API void swoole_set_worker_id(swoole::WorkerId worker_id); SW_API void swoole_set_worker_type(int type); SW_API char swoole_get_worker_symbol(); namespace swoole { enum WorkerMessageType { SW_WORKER_MESSAGE_STOP = 1, }; enum ProtocolType { SW_PROTOCOL_TASK = 1, SW_PROTOCOL_STREAM, SW_PROTOCOL_MESSAGE, }; struct WorkerStopMessage { pid_t pid; uint16_t worker_id; }; class ExitStatus { pid_t pid_; int status_; public: ExitStatus(pid_t _pid, int _status) : pid_(_pid), status_(_status) {} pid_t get_pid() const { return pid_; } int get_status() const { return status_; } int get_code() const { return WEXITSTATUS(status_); } int get_signal() const { return WTERMSIG(status_); } bool is_normal_exit() const { return WIFEXITED(status_); } }; static inline ExitStatus wait_process() { int status = 0; pid_t pid = ::wait(&status); return {pid, status}; } static inline ExitStatus wait_process(pid_t _pid, int options) { int status = 0; pid_t pid = ::waitpid(_pid, &status, options); return {pid, status}; } struct ProcessPool; struct Worker; struct WorkerGlobal { WorkerId id; uint8_t type; pid_t pid; bool shutdown; bool running; uint32_t max_request; /** * worker is shared memory, visible in other work processes. * When a worker process restarts, it may be held by both the old and new processes simultaneously, * necessitating careful handling of the state. */ Worker *worker; /** * worker_copy is a copy of worker, * but it must be local memory and only used within the current process or thread. * It is not visible to other worker processes. */ Worker *worker_copy; time_t exit_time; }; struct Worker { pid_t pid; WorkerId id; ProcessPool *pool; MsgQueue *queue; bool shared; bool redirect_stdout; bool redirect_stdin; bool redirect_stderr; /** * worker status, IDLE or BUSY */ uint8_t status; uint8_t type; uint8_t msgqueue_mode; uint8_t child_process; uint32_t concurrency; time_t start_time; sw_atomic_long_t dispatch_count; sw_atomic_long_t request_count; sw_atomic_long_t response_count; size_t coroutine_num; Mutex *lock; UnixSocket *pipe_object; network::Socket *pipe_master; network::Socket *pipe_worker; network::Socket *pipe_current; void *ptr; ssize_t send_pipe_message(const void *buf, size_t n, int flags) const; bool has_exceeded_max_request() const; void set_max_request(uint32_t max_request, uint32_t max_request_grace); void report_error(const ExitStatus &exit_status) const; /** * Init global state for worker process. * Must be called after the process is spawned and before the main loop is executed. */ void init(); void shutdown(); bool is_shutdown(); static bool is_running(); void set_status(swWorkerStatus _status) { status = _status; } void set_status_to_idle() { set_status(SW_WORKER_IDLE); } void set_status_to_busy() { set_status(SW_WORKER_BUSY); } void add_request_count() { request_count++; } bool is_busy() const { return status == SW_WORKER_BUSY; } bool is_idle() const { return status == SW_WORKER_IDLE; } }; struct StreamInfo { network::Socket *socket; network::Socket *last_connection; char *socket_file; int socket_port; String *response_buffer; }; struct ReloadTask { std::unordered_map<pid_t, Worker *> workers; std::queue<pid_t> kill_queue; TimerNode *timer = nullptr; size_t count() const { return workers.size(); } bool is_completed() const { return workers.empty(); } bool exists(pid_t pid) { return workers.find(pid) != workers.end(); } ReloadTask() = default; ~ReloadTask(); void kill_one(int signal_number = SIGTERM); void kill_all(int signal_number = SIGKILL); void add_workers(Worker *list, size_t n); void add_timeout_killer(int timeout); bool remove(pid_t pid); void clear_queue(); }; struct ProcessPool { bool running; bool reload_init; bool read_message; bool started; bool schedule_by_sysvmsg; bool async; uint8_t ipc_mode; ProtocolType protocol_type_; pid_t master_pid; uint32_t max_wait_time; uint64_t reload_count; time_t reload_last_time; /** * process type */ uint8_t type; /** * worker->id = start_id + i */ uint16_t start_id; /** * use message queue IPC */ uint8_t use_msgqueue; /** * use stream socket IPC */ uint8_t use_socket; char *packet_buffer; uint32_t max_packet_size_; /** * message queue key */ key_t msgqueue_key; uint32_t worker_num; uint32_t max_request; uint32_t max_request_grace; /** * No idle task work process is available. */ uint8_t scheduler_warning; time_t warning_time; void (*onStart)(ProcessPool *pool); void (*onShutdown)(ProcessPool *pool); void (*onBeforeReload)(ProcessPool *pool); void (*onAfterReload)(ProcessPool *pool); int (*onTask)(ProcessPool *pool, Worker *worker, EventData *task); void (*onWorkerStart)(ProcessPool *pool, Worker *worker); void (*onMessage)(ProcessPool *pool, RecvData *msg); void (*onWorkerExit)(ProcessPool *pool, Worker *worker); void (*onWorkerStop)(ProcessPool *pool, Worker *worker); void (*onWorkerError)(ProcessPool *pool, Worker *worker, const ExitStatus &exit_status); void (*onWorkerMessage)(ProcessPool *pool, EventData *msg); int (*onWorkerNotFound)(ProcessPool *pool, const ExitStatus &exit_status); int (*main_loop)(ProcessPool *pool, Worker *worker); sw_atomic_t round_id; Worker *workers; std::vector<std::shared_ptr<UnixSocket>> *pipes; std::unordered_map<pid_t, Worker *> *map_; MsgQueue *queue; StreamInfo *stream_info_; Channel *message_box = nullptr; MessageBus *message_bus = nullptr; ReloadTask *reload_task = nullptr; void *ptr; Worker *get_worker(WorkerId worker_id) const { return &(workers[worker_id - start_id]); } static TaskId get_task_id(const EventData *task) { return task->info.fd; } static WorkerId get_task_src_worker_id(const EventData *task) { return task->info.reactor_id; } void set_max_packet_size(uint32_t _max_packet_size) { max_packet_size_ = _max_packet_size; } bool is_master() const { return swoole_get_worker_type() == SW_MASTER; } bool is_worker() const { return swoole_get_worker_type() == SW_WORKER; } /** * SW_PROTOCOL_TASK * ================================================================== * The `EventData` structure must be sent as a single message and cannot be split into multiple transmissions. * If the length of the message content exceeds the size limit of the data field in EventData, * it should be written to a temporary file. * In this case, set the SW_TASK_TMPFILE flag in info.ext_flags. * Only the path to the temporary file will be transmitted, * and the receiving end should retrieve the actual message content from this temporary file. * Reference: Server::task_pack() * * SW_PROTOCOL_MESSAGE * ================================================================== * When sending the `EventData` structure, the message can be split into multiple transmissions. * When sending data in multiple parts, you must set a unique `info.msg_id`. * For the first slice, set the `info.flags` with the SW_EVENT_DATA_CHUNK | SW_EVENT_DATA_BEGIN flag, * and for the last slice, set the `info.flags` with the SW_EVENT_DATA_CHUNK | SW_EVENT_DATA_END flag. * The receiving end will place the data into a memory cache table, merge the data, * and only execute the onMessage callback once the complete message has been received. * * Reference: MessageBus::write() and MessageBus::read() * * SW_PROTOCOL_STREAM * ================================================================== * +-------------------------------+-------------------------------+ * | Payload Length ( 4 byte, network byte order) | * | Payload Data ... ( Payload Length byte ) | * +-------------------------------- - - - - - - - - - - - - - - - + * * The packet consists of a 4 byte length header followed by the data payload. * The receiving end should first use `socket.recv(&payload_len, 4)` to obtain the length of the data payload. * Then, execute `socket.recv(payload, payload_len)` to receive the complete data. * Please note that sufficient memory space must be allocated for the payload, * for example, `payload = malloc(payload_len)`. */ void set_protocol(ProtocolType _protocol_type); void set_type(int _type); void set_start_id(int _start_id); void set_max_request(uint32_t _max_request, uint32_t _max_request_grace); bool detach(); int wait(); int start_check(); int start(); bool shutdown(); bool reload(); void reopen_logger(); void trigger_read_message_event() { read_message = true; } pid_t spawn(Worker *worker); void stop(Worker *worker); void kill_all_workers(int signo = SIGKILL); swResultCode dispatch(EventData *data, int *worker_id); int response(const char *data, uint32_t length) const; swResultCode dispatch_sync(EventData *data, int *dst_worker_id); swResultCode dispatch_sync(const char *data, uint32_t len) const; void add_worker(Worker *worker) const; bool del_worker(const Worker *worker) const; Worker *get_worker_by_pid(pid_t pid) const; void destroy(); int create(uint32_t worker_num, key_t msgqueue_key = 0, swIPCMode ipc_mode = SW_IPC_NONE); int create_message_box(size_t memory_size); int create_message_bus(); int push_message(uint8_t _type, const void *data, size_t length) const; int push_message(const EventData *msg) const; bool send_message(WorkerId worker_id, const char *message, size_t l_message) const; int pop_message(void *data, size_t size) const; int listen(const char *socket_file, int backlog) const; int listen(const char *host, int port, int backlog) const; int schedule(); bool is_worker_running(Worker *worker) const; private: static int recv_packet(Reactor *reactor, Event *event); static int recv_message(Reactor *reactor, Event *event); static int run_with_task_protocol(ProcessPool *pool, Worker *worker); static int run_with_stream_protocol(ProcessPool *pool, Worker *worker); static int run_with_message_protocol(ProcessPool *pool, Worker *worker); static int run_async(ProcessPool *pool, Worker *worker); void at_worker_enter(Worker *worker) const; void at_worker_exit(Worker *worker); bool wait_detached_worker(std::unordered_set<pid_t> &detached_workers, pid_t pid); }; }; // namespace swoole static sw_inline int swoole_kill(pid_t _pid, int _sig) { return kill(_pid, _sig); } typedef swoole::ProtocolType swProtocolType; extern SW_THREAD_LOCAL swoole::WorkerGlobal SwooleWG; static inline swoole::Worker *sw_worker() { return SwooleWG.worker; }
Upload File
Create Folder