diff options
author | wm4 <wm4@nowhere> | 2014-08-24 23:50:43 +0200 |
---|---|---|
committer | wm4 <wm4@nowhere> | 2014-08-25 01:00:21 +0200 |
commit | 740f0f61d840255a02efcf392fece51486a59183 (patch) | |
tree | 55b91801f9c23badec307a21a945fda8725d8ad2 /input | |
parent | cae22ae3b6e6d4c40ef6fc153a9538f8fea6b0e5 (diff) |
input: redo how --input-file is handled
Abandon the "old" infrastructure for --input-file (mp_input_add_fd(),
select() loop, non-blocking reads). Replace it with something that
starts a reader thread, using blocking input.
This is for the sake of Windows. Windows is a truly insane operating
system, and there's not even a way to read a pipe in a non-blocking
way, or to wait for new input in an interruptible way (like with
poll()). And unfortunately, some want to use pipe to send input to
mpv. There are probably (slightly) better IPC mechanisms available
on Windows, but for the sake of platform uniformity, make this work
again for now.
On Vista+, CancelIoEx() could probably be used. But there's no way on
XP. Also, that function doesn't work on wine, making development
harder. We could forcibly terminate the thread, which might work, but
is unsafe. So what we do is starting a thread, and if we don't want
the pipe input anymore, we just abandon the thread. The thread might
remain blocked forever, but if we exit the process, the kernel will
forcibly kill it. On Unix, just use poll() to handle this.
Unfortunately the code is pretty crappy, but it's ok, because it's late
and I wanted to stop working on this an hour ago.
Tested on wine; might not work on a real Windows.
Diffstat (limited to 'input')
-rw-r--r-- | input/input.c | 118 | ||||
-rw-r--r-- | input/input.h | 34 | ||||
-rw-r--r-- | input/pipe.c | 133 |
3 files changed, 261 insertions, 24 deletions
diff --git a/input/input.c b/input/input.c index b82c1e4850..1dd029219f 100644 --- a/input/input.c +++ b/input/input.c @@ -177,6 +177,8 @@ struct input_ctx { struct input_fd fds[MP_MAX_FDS]; unsigned int num_fds; + struct mp_input_src *sources[MP_MAX_FDS]; + int num_sources; struct cmd_queue cmd_queue; @@ -189,6 +191,7 @@ int async_quit_request; static int parse_config(struct input_ctx *ictx, bool builtin, bstr data, const char *location, const char *restrict_section); +static void close_input_sources(struct input_ctx *ictx); #define OPT_BASE_STRUCT struct input_opts struct input_opts { @@ -1512,11 +1515,6 @@ done: return r; } -static int close_fd(void *ctx, int fd) -{ - return close(fd); -} - #ifndef __MINGW32__ static int read_wakeup(void *ctx, int fd) { @@ -1614,24 +1612,8 @@ struct input_ctx *mp_input_init(struct mpv_global *global) ictx->win_drag = global->opts->allow_win_drag; - if (input_conf->in_file) { - int mode = O_RDONLY; -#ifndef __MINGW32__ - // Use RDWR for FIFOs to ensure they stay open over multiple accesses. - // Note that on Windows due to how the API works, using RDONLY should - // be ok. - struct stat st; - if (stat(input_conf->in_file, &st) == 0 && S_ISFIFO(st.st_mode)) - mode = O_RDWR; - mode |= O_NONBLOCK; -#endif - int in_file_fd = open(input_conf->in_file, mode); - if (in_file_fd >= 0) - mp_input_add_fd(ictx, in_file_fd, 1, input_default_read_cmd, NULL, close_fd, NULL); - else - MP_ERR(ictx, "Can't open %s: %s\n", input_conf->in_file, - strerror(errno)); - } + if (input_conf->in_file && input_conf->in_file[0]) + mp_input_add_pipe(ictx, input_conf->in_file); return ictx; } @@ -1664,6 +1646,7 @@ void mp_input_uninit(struct input_ctx *ictx) if (ictx->fds[i].close_func) ictx->fds[i].close_func(ictx->fds[i].ctx, ictx->fds[i].fd); } + close_input_sources(ictx); for (int i = 0; i < 2; i++) { if (ictx->wakeup_pipe[i] != -1) close(ictx->wakeup_pipe[i]); @@ -1739,3 +1722,92 @@ void mp_input_run_cmd(struct input_ctx *ictx, int def_flags, const char **cmd, mp_cmd_t *cmdt = mp_input_parse_cmd_strv(ictx->log, def_flags, cmd, location); mp_input_queue_cmd(ictx, cmdt); } + +struct mp_input_src *mp_input_add_src(struct input_ctx *ictx) +{ + input_lock(ictx); + if (ictx->num_sources == MP_MAX_FDS) { + input_unlock(ictx); + return NULL; + } + + char name[80]; + snprintf(name, sizeof(name), "#%d", ictx->num_sources + 1); + struct mp_input_src *src = talloc_ptrtype(NULL, src); + *src = (struct mp_input_src){ + .global = ictx->global, + .log = mp_log_new(src, ictx->log, name), + .input_ctx = ictx, + }; + + ictx->sources[ictx->num_sources++] = src; + + input_unlock(ictx); + return src; +} + +static void close_input_sources(struct input_ctx *ictx) +{ + // To avoid lock-order issues, we first remove each source from the context, + // and then destroy it. + while (1) { + input_lock(ictx); + struct mp_input_src *src = ictx->num_sources ? ictx->sources[0] : NULL; + input_unlock(ictx); + if (!src) + break; + mp_input_src_kill(src); + } +} + +void mp_input_src_kill(struct mp_input_src *src) +{ + if (!src) + return; + struct input_ctx *ictx = src->input_ctx; + input_lock(ictx); + for (int n = 0; n < ictx->num_sources; n++) { + if (ictx->sources[n] == src) { + MP_TARRAY_REMOVE_AT(ictx->sources, ictx->num_sources, n); + input_unlock(ictx); + if (src->close) + src->close(src); + talloc_free(src); + return; + } + } + abort(); +} + +#define CMD_BUFFER (4 * 4096) + +void mp_input_src_feed_cmd_text(struct mp_input_src *src, char *buf, size_t len) +{ + if (!src->cmd_buffer) + src->cmd_buffer = talloc_size(src, CMD_BUFFER); + while (len) { + char *next = memchr(buf, '\n', len); + bool term = !!next; + next = next ? next + 1 : buf + len; + size_t copy = next - buf; + bool overflow = copy > CMD_BUFFER - src->cmd_buffer_size; + if (overflow || src->drop) { + src->cmd_buffer_size = 0; + src->drop = overflow || !term; + MP_WARN(src, "Dropping overlong line.\n"); + } else { + memcpy(src->cmd_buffer + src->cmd_buffer_size, buf, copy); + src->cmd_buffer_size += copy; + buf += copy; + len -= copy; + if (term) { + bstr s = {src->cmd_buffer, src->cmd_buffer_size}; + s = bstr_strip(s); + struct mp_cmd *cmd= mp_input_parse_cmd_(src->log, s, "<>"); + if (cmd) + mp_input_queue_cmd(src->input_ctx, cmd); + src->cmd_buffer_size = 0; + } + } + } +} diff --git a/input/input.h b/input/input.h index de363cf33b..a8341d7773 100644 --- a/input/input.h +++ b/input/input.h @@ -94,7 +94,37 @@ typedef struct mp_cmd { const struct mp_cmd_def *def; } mp_cmd_t; -/* Add a new command input source. +struct mp_input_src { + struct mpv_global *global; + struct mp_log *log; + struct input_ctx *input_ctx; + + char *cmd_buffer; + size_t cmd_buffer_size; + bool drop; + + // If not-NULL: called before destroying the input_src. Should close the + // underlying device, and free all memory. + void (*close)(struct mp_input_src *src); + + // For free use by the implementer. + void *priv; +}; + +/* Add a new input source. The input code can create a new thread, which feeds + * keys or commands to input_ctx. mp_input_src.close must be set. + */ +struct mp_input_src *mp_input_add_src(struct input_ctx *ictx); + +// Remove and free the source. You can call this only while the input_ctx +// exists; otherwise there would be a race condition when another thread +// destroys input_ctx. +void mp_input_src_kill(struct mp_input_src *src); + +// Feed text data, which will be split into lines of commands. +void mp_input_src_feed_cmd_text(struct mp_input_src *src, char *buf, size_t len); + +/* Add a new command input source. (Old version.) * "fd" is a file descriptor (use -1 if you don't use any fd) * "select" tells whether to use select() on the fd to determine when to * try reading. @@ -226,6 +256,8 @@ bool mp_input_use_alt_gr(struct input_ctx *ictx); void mp_input_run_cmd(struct input_ctx *ictx, int def_flags, const char **cmd, const char *location); +void mp_input_add_pipe(struct input_ctx *ictx, const char *filename); + void mp_input_set_main_thread(struct input_ctx *ictx); extern int async_quit_request; diff --git a/input/pipe.c b/input/pipe.c new file mode 100644 index 0000000000..a961debba7 --- /dev/null +++ b/input/pipe.c @@ -0,0 +1,133 @@ +#include <pthread.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +#ifndef __MINGW32__ +#include <poll.h> +#endif + +#include "common/msg.h" +#include "bstr/bstr.h" +#include "osdep/io.h" +#include "input.h" +#include "cmd_parse.h" + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + +struct priv { + struct mp_log *log; + char *filename; + struct mp_input_src *src; + int wakeup_pipe[2]; +}; + +static void *reader_thread(void *ctx) +{ + struct priv *p = ctx; + pthread_detach(pthread_self()); + + int mode = O_RDONLY; +#ifndef __MINGW32__ + // Use RDWR for FIFOs to ensure they stay open over multiple accesses. + // Note that on Windows due to how the API works, using RDONLY should + // be ok. + struct stat st; + if (stat(p->filename, &st) == 0 && S_ISFIFO(st.st_mode)) + mode = O_RDWR; +#endif + int fd = -1; + bool close_fd = true; + if (strcmp(p->filename, "/dev/stdin") == 0) { // mainly for win32 + fd = 1; + close_fd = false; + } + if (fd < 0) + fd = open(p->filename, mode); + if (fd < 0) { + MP_ERR(p, "Can't open %s.\n", p->filename); + goto done; + } + + while (1) { +#ifndef __MINGW32__ + struct pollfd fds[2] = { + { .fd = fd, .events = POLLIN }, + { .fd = p->wakeup_pipe[0], .events = POLLIN }, + }; + poll(fds, 2, -1); + if (!(fds[0].revents & POLLIN)) + break; +#endif + char buffer[128]; + int r = read(fd, buffer, sizeof(buffer)); + if (r <= 0) + break; + + pthread_mutex_lock(&lock); + if (!p->src) { + pthread_mutex_unlock(&lock); + break; + } + mp_input_src_feed_cmd_text(p->src, buffer, r); + pthread_mutex_unlock(&lock); + } + + if (close_fd) + close(fd); + +done: + pthread_mutex_lock(&lock); + if (p->src) + p->src->priv = NULL; + pthread_mutex_unlock(&lock); + close(p->wakeup_pipe[0]); + close(p->wakeup_pipe[1]); + talloc_free(p); + return NULL; +} + +static void close_pipe(struct mp_input_src *src) +{ + pthread_mutex_lock(&lock); + struct priv *p = src->priv; + // Windows pipe have a severe problem: they can't be made non-blocking (not + // after creation), and you can't wait on them. The only things that work + // are cancellation (Vista+, broken in wine) or forceful thread termination. + // So don't bother with "correct" termination, and just abandon the reader + // thread. + // On Unix, we interrupt it using the wakeup pipe. + if (p) { +#ifndef __MINGW32__ + write(p->wakeup_pipe[1], &(char){0}, 1); +#endif + p->src = NULL; + } + pthread_mutex_unlock(&lock); +} + +void mp_input_add_pipe(struct input_ctx *ictx, const char *filename) +{ + struct mp_input_src *src = mp_input_add_src(ictx); + if (!src) + return; + + struct priv *p = talloc_zero(NULL, struct priv); + src->priv = p; + p->filename = talloc_strdup(p, filename); + p->src = src; + p->log = mp_log_new(p, src->log, NULL); + mp_make_wakeup_pipe(p->wakeup_pipe); + + pthread_t thread; + if (pthread_create(&thread, NULL, reader_thread, p)) { + close(p->wakeup_pipe[0]); + close(p->wakeup_pipe[1]); + talloc_free(p); + mp_input_src_kill(src); + } else { + src->close = close_pipe; + } +} |