diff options
-rw-r--r-- | DOCS/man/options.rst | 18 | ||||
-rw-r--r-- | demux/demux.c | 710 | ||||
-rw-r--r-- | demux/demux.h | 49 | ||||
-rw-r--r-- | demux/demux_disc.c | 11 | ||||
-rw-r--r-- | demux/demux_lavf.c | 3 | ||||
-rw-r--r-- | demux/stheader.h | 1 | ||||
-rw-r--r-- | options/options.c | 6 | ||||
-rw-r--r-- | options/options.h | 3 | ||||
-rw-r--r-- | player/command.c | 91 | ||||
-rw-r--r-- | player/core.h | 2 | ||||
-rw-r--r-- | player/discnav.c | 31 | ||||
-rw-r--r-- | player/loadfile.c | 64 | ||||
-rw-r--r-- | player/misc.c | 10 | ||||
-rw-r--r-- | player/playloop.c | 24 | ||||
-rw-r--r-- | stream/cache.c | 23 |
15 files changed, 743 insertions, 303 deletions
diff --git a/DOCS/man/options.rst b/DOCS/man/options.rst index b554fecc62..84300eac47 100644 --- a/DOCS/man/options.rst +++ b/DOCS/man/options.rst @@ -778,6 +778,24 @@ OPTIONS ``--demuxer-rawvideo-size=<value>`` Frame size in bytes when using ``--demuxer=rawvideo``. +``--demuxer-thread=<yes|no>`` + Run the demuxer in a separate thread, and let it prefetch a certain amount + of packets (default: yes). + +``--demuxer-readahead-packets=N`` + If ``--demuxer-thread`` is enabled, this controls how much the demuxer + should buffer ahead. If the number of packets in the packet queue exceeds + ``--demuxer-readahead-packets``, or the total number of bytes exceeds + ``--demuxer-readahead-bytes``, the thread stops reading ahead. + + Note that if you set these options near the maximum, you might get a + packet queue overflow warning. + + See ``--list-options`` for defaults and value range. + +``--demuxer-readahead-bytes=N`` + See ``--demuxer-readahead-packets``. + ``--dump-stats=<filename>`` Write certain statistics to the given file. The file is truncated on opening. The file will contain raw samples, each with a timestamp. To diff --git a/demux/demux.c b/demux/demux.c index e716194614..599894ed82 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -21,6 +21,7 @@ #include <string.h> #include <assert.h> #include <unistd.h> +#include <pthread.h> #include <math.h> @@ -79,19 +80,66 @@ const demuxer_desc_t *const demuxer_list[] = { NULL }; +struct demux_internal { + struct mp_log *log; + + // The demuxer runs potentially in another thread, so we keep two demuxer + // structs; the real demuxer can access the shadow struct only. + // Since demuxer and user threads both don't use locks, a third demuxer + // struct d_buffer is used to copy data between them in a synchronized way. + struct demuxer *d_thread; // accessed by demuxer impl. (producer) + struct demuxer *d_user; // accessed by player (consumer) + struct demuxer *d_buffer; // protected by lock; used to sync d_user/thread + + // The lock protects the packet queues (struct demux_stream), d_buffer, + // and some minor fields like thread_paused. + pthread_mutex_t lock; + pthread_cond_t wakeup; + pthread_t thread; + + // -- All the following fields are protected by lock. + + bool thread_paused; + int thread_request_pause; // counter, if >0, make demuxer thread pause + bool thread_terminate; + bool threading; + void (*wakeup_cb)(void *ctx); + void *wakeup_cb_ctx; + + bool warned_queue_overflow; + bool eof; // last global EOF status + bool autoselect; + int min_packs; + int min_bytes; + + // Cached state. + double time_length; + struct mp_tags *stream_metadata; + int64_t stream_size; + int64_t stream_cache_size; + int64_t stream_cache_fill; + int stream_cache_idle; +}; + struct demux_stream { - struct demuxer *demuxer; - int selected; // user wants packets from this stream - int eof; // end of demuxed stream? (true if all buffer empty) - int packs; // number of packets in buffer - int bytes; // total bytes of packets in buffer + struct demux_internal *in; + enum stream_type type; + // all fields are protected by in->lock + bool selected; // user wants packets from this stream + bool active; // try to keep at least 1 packet queued + bool eof; // end of demuxed stream? (true if all buffer empty) + size_t packs; // number of packets in buffer + size_t bytes; // total bytes of packets in buffer struct demux_packet *head; struct demux_packet *tail; }; -void demuxer_sort_chapters(demuxer_t *demuxer); +static void demuxer_sort_chapters(demuxer_t *demuxer); +static void *demux_thread(void *pctx); +static void update_cache(struct demux_internal *in); -static void ds_free_packs(struct demux_stream *ds) +// called locked +static void ds_flush(struct demux_stream *ds) { demux_packet_t *dp = ds->head; while (dp) { @@ -100,13 +148,16 @@ static void ds_free_packs(struct demux_stream *ds) dp = dn; } ds->head = ds->tail = NULL; - ds->packs = 0; // !!!!! + ds->packs = 0; ds->bytes = 0; - ds->eof = 0; + ds->eof = false; + ds->active = false; } struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) { + assert(demuxer == demuxer->in->d_thread); + if (demuxer->num_streams > MAX_SH_STREAMS) { MP_WARN(demuxer, "Too many streams.\n"); return NULL; @@ -121,13 +172,15 @@ struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) struct sh_stream *sh = talloc_ptrtype(demuxer, sh); *sh = (struct sh_stream) { .type = type, - .demuxer = demuxer, .index = demuxer->num_streams, .demuxer_id = demuxer_id, // may be overwritten by demuxer - .ds = talloc_zero(sh, struct demux_stream), + .ds = talloc(sh, struct demux_stream), + }; + *sh->ds = (struct demux_stream) { + .in = demuxer->in, + .type = sh->type, + .selected = demuxer->in->autoselect, }; - sh->ds->demuxer = demuxer; - sh->ds->selected = demuxer->stream_select_default; MP_TARRAY_APPEND(demuxer, demuxer->streams, demuxer->num_streams, sh); switch (sh->type) { case STREAM_VIDEO: sh->video = talloc_zero(demuxer, struct sh_video); break; @@ -142,49 +195,84 @@ void free_demuxer(demuxer_t *demuxer) { if (!demuxer) return; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + demux_stop_thread(demuxer); + if (demuxer->desc->close) - demuxer->desc->close(demuxer); - // free streams: + demuxer->desc->close(in->d_thread); for (int n = 0; n < demuxer->num_streams; n++) - ds_free_packs(demuxer->streams[n]->ds); + ds_flush(demuxer->streams[n]->ds); + pthread_mutex_destroy(&in->lock); + pthread_cond_destroy(&in->wakeup); talloc_free(demuxer); } -const char *stream_type_name(enum stream_type type) +// Start the demuxer thread, which reads ahead packets on its own. +void demux_start_thread(struct demuxer *demuxer) { - switch (type) { - case STREAM_VIDEO: return "video"; - case STREAM_AUDIO: return "audio"; - case STREAM_SUB: return "sub"; - default: return "unknown"; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (!in->threading) { + in->threading = true; + if (pthread_create(&in->thread, NULL, demux_thread, in)) + in->threading = false; } } -static int count_packs(struct demuxer *demux, enum stream_type type) +void demux_stop_thread(struct demuxer *demuxer) { - int c = 0; - for (int n = 0; n < demux->num_streams; n++) - c += demux->streams[n]->type == type ? demux->streams[n]->ds->packs : 0; - return c; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (in->threading) { + pthread_mutex_lock(&in->lock); + in->thread_terminate = true; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); + pthread_join(in->thread, NULL); + in->threading = false; + in->thread_terminate = false; + } } -static int count_bytes(struct demuxer *demux, enum stream_type type) +// The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached. +void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx) { - int c = 0; - for (int n = 0; n < demux->num_streams; n++) - c += demux->streams[n]->type == type ? demux->streams[n]->ds->bytes : 0; - return c; + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + in->wakeup_cb = cb; + in->wakeup_cb_ctx = ctx; + pthread_mutex_unlock(&in->lock); +} + +const char *stream_type_name(enum stream_type type) +{ + switch (type) { + case STREAM_VIDEO: return "video"; + case STREAM_AUDIO: return "audio"; + case STREAM_SUB: return "sub"; + default: return "unknown"; + } } // Returns the same value as demuxer->fill_buffer: 1 ok, 0 EOF/not selected. int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) { struct demux_stream *ds = stream ? stream->ds : NULL; - if (!dp || !ds || !ds->selected) { + if (!dp || !ds) { + talloc_free(dp); + return 0; + } + struct demux_internal *in = ds->in; + pthread_mutex_lock(&in->lock); + if (!ds->selected) { + pthread_mutex_unlock(&in->lock); talloc_free(dp); return 0; } - struct demuxer *demuxer = ds->demuxer; dp->stream = stream->index; dp->next = NULL; @@ -200,75 +288,131 @@ int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) ds->head = ds->tail = dp; } // obviously not true anymore - ds->eof = 0; + ds->eof = false; // For video, PTS determination is not trivial, but for other media types // distinguishing PTS and DTS is not useful. if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE) dp->pts = dp->dts; - if (mp_msg_test(demuxer->log, MSGL_DEBUG)) { - MP_DBG(demuxer, "DEMUX: Append packet to %s, len=%d pts=%5.3f pos=" - "%"PRIi64" [A=%d V=%d S=%d]\n", stream_type_name(stream->type), - dp->len, dp->pts, dp->pos, count_packs(demuxer, STREAM_AUDIO), - count_packs(demuxer, STREAM_VIDEO), count_packs(demuxer, STREAM_SUB)); - } + MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" " + "[num=%zd size=%zd]\n", stream_type_name(stream->type), + dp->len, dp->pts, dp->pts, dp->pos, ds->packs, ds->bytes); + + if (ds->in->wakeup_cb) + ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); return 1; } -static bool demux_check_queue_full(demuxer_t *demux) +// Returns true if there was "progress" (lock was released temporarily). +static bool read_packet(struct demux_internal *in) { - for (int n = 0; n < demux->num_streams; n++) { - struct sh_stream *sh = demux->streams[n]; - if (sh->ds->packs > MAX_PACKS || sh->ds->bytes > MAX_PACK_BYTES) - goto overflow; + in->eof = false; + + // Check if we need to read a new packet. We do this if all queues are below + // the minimum, or if a stream explicitly needs new packets. Also includes + // safe-guards against packet queue overflow. + bool active = false, read_more = false; + size_t packs = 0, bytes = 0; + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + active |= ds->selected; + read_more |= ds->active && !ds->head; + packs += ds->packs; + bytes += ds->bytes; } - return false; - -overflow: - - if (!demux->warned_queue_overflow) { - MP_ERR(demux, "Too many packets in the demuxer " - "packet queue (video: %d packets in %d bytes, audio: %d " - "packets in %d bytes, sub: %d packets in %d bytes).\n", - count_packs(demux, STREAM_VIDEO), count_bytes(demux, STREAM_VIDEO), - count_packs(demux, STREAM_AUDIO), count_bytes(demux, STREAM_AUDIO), - count_packs(demux, STREAM_SUB), count_bytes(demux, STREAM_SUB)); - MP_INFO(demux, "Maybe you are playing a non-" - "interleaved stream/file or the codec failed?\n"); + MP_DBG(in, "packets=%zd, bytes=%zd, active=%d, more=%d\n", + packs, bytes, active, read_more); + if (packs >= MAX_PACKS || bytes >= MAX_PACK_BYTES) { + if (!in->warned_queue_overflow) { + in->warned_queue_overflow = true; + MP_ERR(in, "Too many packets in the demuxer packet queues:\n"); + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + if (ds->selected) { + MP_ERR(in, " %s/%d: %zd packets, %zd bytes\n", + stream_type_name(ds->type), n, ds->packs, ds->bytes); + } + } + } + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + ds->eof |= !ds->head; + } + pthread_cond_signal(&in->wakeup); + return false; + } + if (packs < in->min_packs && bytes < in->min_bytes) + read_more |= active; + + if (!read_more) + return false; + + // Actually read a packet. Drop the lock while doing so, because waiting + // for disk or network I/O can take time. + pthread_mutex_unlock(&in->lock); + struct demuxer *demux = in->d_thread; + bool eof = !demux->desc->fill_buffer || demux->desc->fill_buffer(demux) <= 0; + pthread_mutex_lock(&in->lock); + + update_cache(in); + + in->eof = eof; + if (in->eof) { + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + ds->eof = true; + } + pthread_cond_signal(&in->wakeup); + MP_VERBOSE(in, "EOF reached.\n"); } - demux->warned_queue_overflow = true; return true; } -// return value: -// 0 = EOF or no stream found or invalid type -// 1 = successfully read a packet - -static int demux_fill_buffer(demuxer_t *demux) +// must be called locked; may temporarily unlock +static void ds_get_packets(struct demux_stream *ds) { - return demux->desc->fill_buffer ? demux->desc->fill_buffer(demux) : 0; + const char *t = stream_type_name(ds->type); + struct demux_internal *in = ds->in; + MP_DBG(in, "reading packet for %s\n", t); + in->eof = false; // force retry + ds->eof = false; + while (ds->selected && !ds->head && !ds->eof) { + ds->active = true; + // Note: the following code marks EOF if it can't continue + if (in->threading) { + MP_VERBOSE(in, "waiting for demux thread (%s)\n", t); + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); + } else { + read_packet(in); + } + } } -static void ds_get_packets(struct sh_stream *sh) +static void *demux_thread(void *pctx) { - struct demux_stream *ds = sh->ds; - demuxer_t *demux = sh->demuxer; - MP_TRACE(demux, "ds_get_packets (%s) called\n", - stream_type_name(sh->type)); - while (1) { - if (ds->head) - return; - - if (demux_check_queue_full(demux)) - break; - - if (!demux_fill_buffer(demux)) - break; // EOF + struct demux_internal *in = pctx; + pthread_mutex_lock(&in->lock); + while (!in->thread_terminate) { + in->thread_paused = in->thread_request_pause > 0; + if (in->thread_paused) { + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); + continue; + } + if (!in->eof) { + if (read_packet(in)) + continue; // read_packet unlocked, so recheck conditions + } + update_cache(in); + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); } - MP_VERBOSE(demux, "ds_get_packets: EOF reached (stream: %s)\n", - stream_type_name(sh->type)); - ds->eof = 1; + pthread_mutex_unlock(&in->lock); + return NULL; } // Read a packet from the given stream. The returned packet belongs to the @@ -277,10 +421,12 @@ static void ds_get_packets(struct sh_stream *sh) struct demux_packet *demux_read_packet(struct sh_stream *sh) { struct demux_stream *ds = sh ? sh->ds : NULL; + struct demux_packet *pkt = NULL; if (ds) { - ds_get_packets(sh); - struct demux_packet *pkt = ds->head; - if (pkt) { + pthread_mutex_lock(&ds->in->lock); + ds_get_packets(ds); + if (ds->head) { + pkt = ds->head; ds->head = pkt->next; pkt->next = NULL; if (!ds->head) @@ -288,13 +434,15 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) ds->bytes -= pkt->len; ds->packs--; + // This implies this function is actually called from "the" user + // thread. if (pkt && pkt->pos >= 0) - sh->demuxer->filepos = pkt->pos; - - return pkt; + ds->in->d_user->filepos = pkt->pos; } + pthread_cond_signal(&ds->in->wakeup); // possibly read more + pthread_mutex_unlock(&ds->in->lock); } - return NULL; + return pkt; } // Return the pts of the next packet that demux_read_packet() would return. @@ -302,37 +450,55 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) // packets from the queue. double demux_get_next_pts(struct sh_stream *sh) { - if (sh && sh->ds->selected) { - ds_get_packets(sh); + double res = MP_NOPTS_VALUE; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + ds_get_packets(sh->ds); if (sh->ds->head) - return sh->ds->head->pts; + res = sh->ds->head->pts; + pthread_mutex_unlock(&sh->ds->in->lock); } - return MP_NOPTS_VALUE; + return res; } // Return whether a packet is queued. Never blocks, never forces any reads. bool demux_has_packet(struct sh_stream *sh) { - return sh && sh->ds->head; + bool has_packet = false; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + has_packet = sh->ds->head; + pthread_mutex_unlock(&sh->ds->in->lock); + } + return has_packet; } // Return whether EOF was returned with an earlier packet read. bool demux_stream_eof(struct sh_stream *sh) { - return !sh || sh->ds->eof; + bool eof = false; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + eof = sh->ds->eof && !sh->ds->head; + pthread_mutex_unlock(&sh->ds->in->lock); + } + return eof; } // Read and return any packet we find. struct demux_packet *demux_read_any_packet(struct demuxer *demuxer) { + assert(!demuxer->in->threading); // doesn't work with threading for (int retry = 0; retry < 2; retry++) { for (int n = 0; n < demuxer->num_streams; n++) { struct sh_stream *sh = demuxer->streams[n]; - if (sh->ds->head) + if (demux_has_packet(sh)) return demux_read_packet(sh); } // retry after calling this - demux_fill_buffer(demuxer); + pthread_mutex_lock(&demuxer->in->lock); + read_packet(demuxer->in); + pthread_mutex_unlock(&demuxer->in->lock); } return NULL; } @@ -440,6 +606,80 @@ static void demux_export_replaygain(demuxer_t *demuxer) } } +// Copy all fields from src to dst, depending on event flags. +static void demux_copy(struct demuxer *dst, struct demuxer *src) +{ + if (src->events & DEMUX_EVENT_INIT) { + // Note that we do as shallow copies as possible. We expect the date + // that is not-copied (only referenced) to be immutable. + // This implies e.g. that no chapters are added after initialization. + dst->chapters = src->chapters; + dst->num_chapters = src->num_chapters; + dst->editions = src->editions; + dst->num_editions = src->num_editions; + dst->edition = src->edition; + dst->attachments = src->attachments; + dst->num_attachments = src->num_attachments; + dst->matroska_data = src->matroska_data; + dst->file_contents = src->file_contents; + dst->playlist = src->playlist; + dst->seekable = src->seekable; + dst->filetype = src->filetype; + dst->ts_resets_possible = src->ts_resets_possible; + dst->start_time = src->start_time; + } + if (src->events & DEMUX_EVENT_STREAMS) { + // The stream structs themselves are immutable. + for (int n = dst->num_streams; n < src->num_streams; n++) + MP_TARRAY_APPEND(dst, dst->streams, dst->num_streams, src->streams[n]); + } + if (src->events & DEMUX_EVENT_METADATA) { + talloc_free(dst->metadata); + dst->metadata = mp_tags_dup(dst, src->metadata); + } + dst->events |= src->events; + src->events = 0; +} + +// This is called by demuxer implementations if certain parameters change +// at runtime. +// events is one of DEMUX_EVENT_* +// The code will copy the fields references by the events to the user-thread. +void demux_changed(demuxer_t *demuxer, int events) +{ + assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only + struct demux_internal *in = demuxer->in; + + demuxer->events |= events; + + pthread_mutex_lock(&in->lock); + + update_cache(in); + + if (demuxer->events & DEMUX_EVENT_INIT) + demuxer_sort_chapters(demuxer); + if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS)) + demux_export_replaygain(demuxer); + + demux_copy(in->d_buffer, demuxer); + + pthread_mutex_unlock(&in->lock); +} + +// Called by the user thread (i.e. player) to update metadata and other things +// from the demuxer thread. +void demux_update(demuxer_t *demuxer) +{ + assert(demuxer == demuxer->in->d_user); + struct demux_internal *in = demuxer->in; + + pthread_mutex_lock(&in->lock); + demux_copy(demuxer, in->d_buffer); + if (in->stream_metadata && (demuxer->events & DEMUX_EVENT_METADATA)) + mp_tags_merge(demuxer->metadata, in->stream_metadata); + pthread_mutex_unlock(&in->lock); +} + static struct demuxer *open_given_type(struct mpv_global *global, struct mp_log *log, const struct demuxer_desc *desc, @@ -459,32 +699,50 @@ static struct demuxer *open_given_type(struct mpv_global *global, .log = mp_log_new(demuxer, log, desc->name), .glog = log, .filename = talloc_strdup(demuxer, stream->url), - .metadata = talloc_zero(demuxer, struct mp_tags), - .events = DEMUX_EVENT_METADATA, + .events = DEMUX_EVENT_ALL, + }; + struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in); + *in = (struct demux_internal){ + .log = demuxer->log, + .d_thread = talloc(demuxer, struct demuxer), + .d_buffer = talloc(demuxer, struct demuxer), + .d_user = demuxer, + .min_packs = demuxer->opts->demuxer_min_packs, + .min_bytes = demuxer->opts->demuxer_min_bytes, }; - demuxer->params = params; // temporary during open() + pthread_mutex_init(&in->lock, NULL); + pthread_cond_init(&in->wakeup, NULL); + + *in->d_thread = *demuxer; + *in->d_buffer = *demuxer; + + in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags); + in->d_user->metadata = talloc_zero(in->d_user, struct mp_tags); + in->d_buffer->metadata = talloc_zero(in->d_buffer, struct mp_tags); + int64_t start_pos = stream_tell(stream); mp_verbose(log, "Trying demuxer: %s (force-level: %s)\n", desc->name, d_level(check)); - int ret = demuxer->desc->open(demuxer, check); + in->d_thread->params = params; // temporary during open() + + int ret = demuxer->desc->open(in->d_thread, check); if (ret >= 0) { - demuxer->params = NULL; - if (demuxer->filetype) + in->d_thread->params = NULL; + if (in->d_thread->filetype) mp_verbose(log, "Detected file format: %s (%s)\n", - demuxer->filetype, desc->desc); + in->d_thread->filetype, desc->desc); else mp_verbose(log, "Detected file format: %s\n", desc->desc); - demuxer_sort_chapters(demuxer); - demux_info_update(demuxer); - demux_export_replaygain(demuxer); // Pretend we can seek if we can't seek, but there's a cache. - if (!demuxer->seekable && stream->uncached_stream) { + if (!in->d_thread->seekable && stream->uncached_stream) { mp_warn(log, "File is not seekable, but there's a cache: enabling seeking.\n"); - demuxer->seekable = true; + in->d_thread->seekable = true; } + demux_changed(in->d_thread, DEMUX_EVENT_ALL); + demux_update(demuxer); return demuxer; } @@ -552,9 +810,12 @@ done: void demux_flush(demuxer_t *demuxer) { + pthread_mutex_lock(&demuxer->in->lock); for (int n = 0; n < demuxer->num_streams; n++) - ds_free_packs(demuxer->streams[n]->ds); - demuxer->warned_queue_overflow = false; + ds_flush(demuxer->streams[n]->ds); + demuxer->in->warned_queue_overflow = false; + demuxer->in->eof = false; + pthread_mutex_unlock(&demuxer->in->lock); } int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags) @@ -567,29 +828,17 @@ int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags) if (rel_seek_secs == MP_NOPTS_VALUE && (flags & SEEK_ABSOLUTE)) return 0; + demux_pause(demuxer); + // clear the packet queues demux_flush(demuxer); if (demuxer->desc->seek) - demuxer->desc->seek(demuxer, rel_seek_secs, flags); + demuxer->desc->seek(demuxer->in->d_thread, rel_seek_secs, flags); - return 1; -} + demux_unpause(demuxer); -static int demux_info_print(demuxer_t *demuxer) -{ - struct mp_tags *info = demuxer->metadata; - int n; - - if (!info || !info->num_keys) - return 0; - - mp_info(demuxer->glog, "File tags:\n"); - for (n = 0; n < info->num_keys; n++) { - mp_info(demuxer->glog, " %s: %s\n", info->keys[n], info->values[n]); - } - - return 0; + return 1; } char *demux_info_get(demuxer_t *demuxer, const char *opt) @@ -597,35 +846,6 @@ char *demux_info_get(demuxer_t *demuxer, const char *opt) return mp_tags_get_str(demuxer->metadata, opt); } -bool demux_info_update(struct demuxer *demuxer) -{ - bool r = false; - // Take care of stream metadata as well - struct mp_tags *s_meta = NULL; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_METADATA, &s_meta) > 0) { - talloc_free(demuxer->stream_metadata); - demuxer->stream_metadata = talloc_steal(demuxer, s_meta); - demuxer->events |= DEMUX_EVENT_METADATA; - } - if (demuxer->events & DEMUX_EVENT_METADATA) { - demuxer->events &= ~DEMUX_EVENT_METADATA; - if (demuxer->stream_metadata) - mp_tags_merge(demuxer->metadata, demuxer->stream_metadata); - demux_info_print(demuxer); - r = true; - } - return r; -} - -int demux_control(demuxer_t *demuxer, int cmd, void *arg) -{ - - if (demuxer->desc->control) - return demuxer->desc->control(demuxer, cmd, arg); - - return DEMUXER_CTRL_NOTIMPL; -} - struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, enum stream_type t, int id) { @@ -653,16 +873,34 @@ void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream, bool selected) { // don't flush buffers if stream is already selected / unselected + pthread_mutex_lock(&demuxer->in->lock); + bool update = false; if (stream->ds->selected != selected) { stream->ds->selected = selected; - ds_free_packs(stream->ds); - demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL); + stream->ds->active = false; + ds_flush(stream->ds); + update = true; } + pthread_mutex_unlock(&demuxer->in->lock); + if (update) + demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL); +} + +void demux_set_stream_autoselect(struct demuxer *demuxer, bool autoselect) +{ + assert(!demuxer->in->threading); // laziness + demuxer->in->autoselect = autoselect; } bool demux_stream_is_selected(struct sh_stream *stream) { - return stream && stream->ds->selected; + if (!stream) + return false; + bool r = false; + pthread_mutex_lock(&stream->ds->in->lock); + r = stream->ds->selected; + pthread_mutex_unlock(&stream->ds->in->lock); + return r; } int demuxer_add_attachment(demuxer_t *demuxer, struct bstr name, @@ -696,7 +934,7 @@ static int chapter_compare(const void *p1, const void *p2) return c1->original_index > c2->original_index ? 1 :-1; // never equal } -void demuxer_sort_chapters(demuxer_t *demuxer) +static void demuxer_sort_chapters(demuxer_t *demuxer) { qsort(demuxer->chapters, demuxer->num_chapters, sizeof(struct demux_chapter), chapter_compare); @@ -725,3 +963,151 @@ double demuxer_get_time_length(struct demuxer *demuxer) return len; return -1; } + +// must be called locked +static void update_cache(struct demux_internal *in) +{ + struct demuxer *demuxer = in->d_thread; + struct stream *stream = demuxer->stream; + + in->time_length = -1; + if (demuxer->desc->control) { + demuxer->desc->control(demuxer, DEMUXER_CTRL_GET_TIME_LENGTH, + &in->time_length); + } + + struct mp_tags *s_meta = NULL; + stream_control(stream, STREAM_CTRL_GET_METADATA, &s_meta); + if (s_meta) { + talloc_free(in->stream_metadata); + in->stream_metadata = talloc_steal(in, s_meta); + in->d_buffer->events |= DEMUX_EVENT_METADATA; + } + + in->stream_size = -1; + stream_control(stream, STREAM_CTRL_GET_SIZE, &in->stream_size); + in->stream_cache_size = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_SIZE, &in->stream_cache_size); + in->stream_cache_fill = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_FILL, &in->stream_cache_fill); + in->stream_cache_idle = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_IDLE, &in->stream_cache_idle); +} + +// must be called locked +static int cached_stream_control(struct demux_internal *in, int cmd, void *arg) +{ + switch (cmd) { + case STREAM_CTRL_GET_CACHE_SIZE: + if (in->stream_cache_size < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_cache_size; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_FILL: + if (in->stream_cache_fill < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_cache_fill; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_IDLE: + if (in->stream_cache_idle < 0) + return STREAM_UNSUPPORTED; + *(int *)arg = in->stream_cache_idle; + return STREAM_OK; + case STREAM_CTRL_GET_SIZE: + if (in->stream_size < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_size; + return STREAM_OK; + } + return STREAM_ERROR; +} + +// must be called locked +static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) +{ + switch (cmd) { + case DEMUXER_CTRL_GET_TIME_LENGTH: + if (in->time_length < 0) + return DEMUXER_CTRL_NOTIMPL; + *(double *)arg = in->time_length; + return DEMUXER_CTRL_OK; + case DEMUXER_CTRL_STREAM_CTRL: { + struct demux_ctrl_stream_ctrl *c = arg; + int r = cached_stream_control(in, c->ctrl, c->arg); + if (r == STREAM_ERROR) + break; + c->res = r; + return DEMUXER_CTRL_OK; + } + } + return DEMUXER_CTRL_DONTKNOW; +} + +int demux_control(demuxer_t *demuxer, int cmd, void *arg) +{ + struct demux_internal *in = demuxer->in; + + pthread_mutex_lock(&in->lock); + if (!in->threading) + update_cache(in); + int cr = cached_demux_control(in, cmd, arg); + if (cr != DEMUXER_CTRL_DONTKNOW) { + pthread_mutex_unlock(&in->lock); + return cr; + } + pthread_mutex_unlock(&in->lock); + + int r = DEMUXER_CTRL_NOTIMPL; + demux_pause(demuxer); + if (cmd == DEMUXER_CTRL_STREAM_CTRL) { + struct demux_ctrl_stream_ctrl *c = arg; + MP_VERBOSE(demuxer, "blocking for STREAM_CTRL %d\n", c->ctrl); + c->res = stream_control(demuxer->stream, c->ctrl, c->arg); + if (c->res != STREAM_UNSUPPORTED) + r = DEMUXER_CTRL_OK; + } + if (r != DEMUXER_CTRL_OK) { + MP_VERBOSE(demuxer, "blocking for DEMUXER_CTRL %d\n", cmd); + if (demuxer->desc->control) + r = demuxer->desc->control(demuxer->in->d_thread, cmd, arg); + } + demux_unpause(demuxer); + return r; +} + +int demux_stream_control(demuxer_t *demuxer, int ctrl, void *arg) +{ + struct demux_ctrl_stream_ctrl c = {ctrl, arg, STREAM_UNSUPPORTED}; + demux_control(demuxer, DEMUXER_CTRL_STREAM_CTRL, &c); + return c.res; +} + +// Make the demuxer thread stop doing anything. +// demux_unpause() wakes up the thread again. +// Can be nested with other calls, but trying to read packets may deadlock. +void demux_pause(demuxer_t *demuxer) +{ + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + MP_VERBOSE(in, "pause demux thread\n"); + + pthread_mutex_lock(&in->lock); + in->thread_request_pause++; + pthread_cond_signal(&in->wakeup); + while (in->threading && !in->thread_paused) + pthread_cond_wait(&in->wakeup, &in->lock); + pthread_mutex_unlock(&in->lock); +} + +void demux_unpause(demuxer_t *demuxer) +{ + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + pthread_mutex_lock(&in->lock); + assert(in->thread_request_pause > 0); + in->thread_request_pause--; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); +} diff --git a/demux/demux.h b/demux/demux.h index 58da35c796..4ec259d6dc 100644 --- a/demux/demux.h +++ b/demux/demux.h @@ -31,10 +31,15 @@ #include "packet.h" #include "stheader.h" -struct MPOpts; - -#define MAX_PACKS 4096 -#define MAX_PACK_BYTES 0x8000000 // 128 MiB +// Maximum total size of packets queued - if larger, no new packets are read, +// and the demuxer pretends EOF was reached. +#define MAX_PACKS 16000 +#define MAX_PACK_BYTES (400 * 1024 * 1024) +// Minimum total size of packets queued - the demuxer thread will read more +// packets, until either number or total size of the packets exceed the minimum. +// This can actually be configured with command line options. +#define MIN_PACKS 64 +#define MIN_PACK_BYTES (5 * 1024 * 1024) enum demuxer_type { DEMUXER_TYPE_GENERIC = 0, @@ -48,14 +53,14 @@ enum demuxer_type { #define DEMUXER_CTRL_NOTIMPL -1 #define DEMUXER_CTRL_DONTKNOW 0 #define DEMUXER_CTRL_OK 1 -#define DEMUXER_CTRL_GUESS 2 enum demux_ctrl { DEMUXER_CTRL_SWITCHED_TRACKS = 1, DEMUXER_CTRL_GET_TIME_LENGTH, DEMUXER_CTRL_RESYNC, DEMUXER_CTRL_IDENTIFY_PROGRAM, - DEMUXER_CTRL_STREAM_CTRL, // stupid workaround for legacy TV code + DEMUXER_CTRL_STREAM_CTRL, + DEMUXER_CTRL_STREAM_AUTOSELECT, }; struct demux_ctrl_stream_ctrl { @@ -87,7 +92,10 @@ enum demux_check { }; enum demux_event { - DEMUX_EVENT_METADATA = (1 << 0), + DEMUX_EVENT_INIT = 1 << 0, // complete (re-)initialization + DEMUX_EVENT_STREAMS = 1 << 1, // a stream was added + DEMUX_EVENT_METADATA = 1 << 2, // metadata or stream_metadata changed + DEMUX_EVENT_ALL = 0xFFFF, }; #define MAX_SH_STREAMS 256 @@ -172,15 +180,12 @@ typedef struct demuxer { const demuxer_desc_t *desc; ///< Demuxer description structure const char *filetype; // format name when not identified by demuxer (libavformat) int64_t filepos; // input stream current pos. - struct stream *stream; char *filename; // same as stream->url enum demuxer_type type; int seekable; // flag double start_time; // File format allows PTS resets (even if the current file is without) bool ts_resets_possible; - bool warned_queue_overflow; - bool stream_select_default; // initial selection status of a new stream // Bitmask of DEMUX_EVENT_* int events; @@ -207,13 +212,19 @@ typedef struct demuxer { struct mp_tags *metadata; - struct mp_tags *stream_metadata; - void *priv; // demuxer-specific internal data struct MPOpts *opts; struct mpv_global *global; struct mp_log *log, *glog; struct demuxer_params *params; + + struct demux_internal *in; // internal to demux.c + + // Since the demuxer can run in its own thread, and the stream is not + // thread-safe, only the demuxer is allowed to access the stream directly. + // You can freely use demux_stream_control() to send STREAM_CTRLs, or use + // demux_pause() to get exclusive access to the stream. + struct stream *stream; } demuxer_t; typedef struct { @@ -238,11 +249,14 @@ struct demuxer *demux_open(struct stream *stream, char *force_format, struct demuxer_params *params, struct mpv_global *global); +void demux_start_thread(struct demuxer *demuxer); +void demux_stop_thread(struct demuxer *demuxer); +void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx); + void demux_flush(struct demuxer *demuxer); int demux_seek(struct demuxer *demuxer, float rel_seek_secs, int flags); char *demux_info_get(struct demuxer *demuxer, const char *opt); -bool demux_info_update(struct demuxer *demuxer); int demux_control(struct demuxer *demuxer, int cmd, void *arg); @@ -250,6 +264,7 @@ void demuxer_switch_track(struct demuxer *demuxer, enum stream_type type, struct sh_stream *stream); void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream, bool selected); +void demux_set_stream_autoselect(struct demuxer *demuxer, bool autoselect); void demuxer_help(struct mp_log *log); @@ -260,6 +275,14 @@ int demuxer_add_chapter(struct demuxer *demuxer, struct bstr name, double demuxer_get_time_length(struct demuxer *demuxer); +int demux_stream_control(demuxer_t *demuxer, int ctrl, void *arg); + +void demux_pause(demuxer_t *demuxer); +void demux_unpause(demuxer_t *demuxer); + +void demux_changed(demuxer_t *demuxer, int events); +void demux_update(demuxer_t *demuxer); + struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, enum stream_type t, int id); diff --git a/demux/demux_disc.c b/demux/demux_disc.c index 06cea65d1a..dcb0762cb5 100644 --- a/demux/demux_disc.c +++ b/demux/demux_disc.c @@ -205,6 +205,8 @@ static int d_fill_buffer(demuxer_t *demuxer) if (!pkt) return 0; + demux_update(p->slave); + if (p->seek_reinit) reset_pts(demuxer); @@ -285,6 +287,13 @@ static int d_open(demuxer_t *demuxer, enum demux_check check) if (demuxer->stream->uncached_type == STREAMTYPE_CDDA) demux = "+rawaudio"; + char *t = NULL; + stream_control(demuxer->stream, STREAM_CTRL_GET_DISC_NAME, &t); + if (t) { + mp_tags_set_bstr(demuxer->metadata, bstr0("TITLE"), bstr0(t)); + talloc_free(t); + } + // Initialize the playback time. We need to read _some_ data to get the // correct stream-layer time (at least with libdvdnav). stream_peek(demuxer->stream, 1); @@ -295,7 +304,7 @@ static int d_open(demuxer_t *demuxer, enum demux_check check) return -1; // So that we don't miss initial packets of delayed subtitle streams. - p->slave->stream_select_default = true; + demux_set_stream_autoselect(p->slave, true); // Can be seekable even if the stream isn't. demuxer->seekable = true; diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c index 4688bf9db7..762d7f571f 100644 --- a/demux/demux_lavf.c +++ b/demux/demux_lavf.c @@ -591,6 +591,7 @@ static void handle_stream(demuxer_t *demuxer, int i) } select_tracks(demuxer, i); + demux_changed(demuxer, DEMUX_EVENT_STREAMS); } // Add any new streams that might have been added @@ -615,7 +616,7 @@ static void update_metadata(demuxer_t *demuxer, AVPacket *pkt) mp_tags_clear(demuxer->metadata); mp_tags_copy_from_av_dictionary(demuxer->metadata, dict); av_dict_free(&dict); - demuxer->events |= DEMUX_EVENT_METADATA; + demux_changed(demuxer, DEMUX_EVENT_METADATA); } } #endif diff --git a/demux/stheader.h b/demux/stheader.h index 1771f75ff5..082fffa5e2 100644 --- a/demux/stheader.h +++ b/demux/stheader.h @@ -31,7 +31,6 @@ struct demuxer; struct sh_stream { enum stream_type type; - struct demuxer *demuxer; // Index into demuxer->streams. int index; // Demuxer/format specific ID. Corresponds to the stream IDs as encoded in diff --git a/options/options.c b/options/options.c index 84eda8db64..fd37d63197 100644 --- a/options/options.c +++ b/options/options.c @@ -214,6 +214,9 @@ const m_option_t mp_opts[] = { OPT_STRING("demuxer", demuxer_name, 0), OPT_STRING("audio-demuxer", audio_demuxer_name, 0), OPT_STRING("sub-demuxer", sub_demuxer_name, 0), + OPT_FLAG("demuxer-thread", demuxer_thread, 0), + OPT_INTRANGE("demuxer-readahead-packets", demuxer_min_packs, 0, 0, MAX_PACKS), + OPT_INTRANGE("demuxer-readahead-bytes", demuxer_min_bytes, 0, 0, MAX_PACK_BYTES), OPT_DOUBLE("mf-fps", mf_fps, 0), OPT_STRING("mf-type", mf_type, 0), @@ -591,6 +594,9 @@ const struct MPOpts mp_default_opts = { }, .stream_cache_pause = 50, .stream_cache_unpause = 100, + .demuxer_thread = 0, + .demuxer_min_packs = MIN_PACKS, + .demuxer_min_bytes = MIN_PACK_BYTES, .network_rtsp_transport = 2, .chapterrange = {-1, -1}, .edition_id = -1, diff --git a/options/options.h b/options/options.h index 100fded4a3..81c7e394a2 100644 --- a/options/options.h +++ b/options/options.h @@ -183,6 +183,9 @@ typedef struct MPOpts { char **audio_files; char *demuxer_name; + int demuxer_thread; + int demuxer_min_packs; + int demuxer_min_bytes; char *audio_demuxer_name; char *sub_demuxer_name; int mkv_subtitle_preroll; diff --git a/player/command.c b/player/command.c index fffa63bb70..6b29fdc735 100644 --- a/player/command.c +++ b/player/command.c @@ -205,11 +205,11 @@ static int mp_property_file_size(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - if (!mpctx->stream) + if (!mpctx->demuxer) return M_PROPERTY_UNAVAILABLE; int64_t size; - if (stream_control(mpctx->stream, STREAM_CTRL_GET_SIZE, &size) != STREAM_OK) + if (demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_SIZE, &size) < 1) return M_PROPERTY_UNAVAILABLE; if (action == M_PROPERTY_PRINT) { @@ -232,13 +232,6 @@ static int mp_property_media_title(void *ctx, struct m_property *prop, name = demux_info_get(mpctx->master_demuxer, "title"); if (name && name[0]) return m_property_strdup_ro(action, arg, name); - struct stream *stream = mpctx->master_demuxer->stream; - if (stream_control(stream, STREAM_CTRL_GET_DISC_NAME, &name) > 0 - && name) { - int r = m_property_strdup_ro(action, arg, name); - talloc_free(name); - return r; - } } return mp_property_filename(ctx, prop, action, arg); } @@ -247,7 +240,8 @@ static int mp_property_stream_path(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - struct stream *stream = mpctx->stream; + // demuxer->stream as well as stream->url are immutable -> ok to access + struct stream *stream = mpctx->demuxer ? mpctx->demuxer->stream : NULL; if (!stream || !stream->url) return M_PROPERTY_UNAVAILABLE; return m_property_strdup_ro(action, arg, stream->url); @@ -257,12 +251,14 @@ static int mp_property_stream_capture(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - if (!mpctx->stream) + if (!mpctx->demuxer) return M_PROPERTY_UNAVAILABLE; if (action == M_PROPERTY_SET) { char *filename = *(char **)arg; - stream_set_capture_file(mpctx->stream, filename); + demux_pause(mpctx->demuxer); + stream_set_capture_file(mpctx->demuxer->stream, filename); + demux_unpause(mpctx->demuxer); // fall through to mp_property_generic_option } return mp_property_generic_option(mpctx, prop, action, arg); @@ -284,14 +280,19 @@ static int mp_property_stream_pos(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - struct stream *stream = mpctx->stream; - if (!stream) + struct demuxer *demuxer = mpctx->demuxer; + if (!demuxer) return M_PROPERTY_UNAVAILABLE; + demux_pause(demuxer); + int r; if (action == M_PROPERTY_SET) { - stream_seek(stream, *(int64_t *) arg); - return M_PROPERTY_OK; + stream_seek(demuxer->stream, *(int64_t *) arg); + r = M_PROPERTY_OK; + } else { + r = m_property_int64_ro(action, arg, stream_tell(demuxer->stream)); } - return m_property_int64_ro(action, arg, stream_tell(stream)); + demux_unpause(demuxer); + return r; } /// Stream end offset (RO) @@ -490,14 +491,13 @@ static int mp_property_disc_title(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - struct demuxer *demuxer = mpctx->master_demuxer; - if (!demuxer || !demuxer->stream) + struct demuxer *d = mpctx->master_demuxer; + if (!d) return M_PROPERTY_UNAVAILABLE; - struct stream *stream = demuxer->stream; unsigned int title = -1; switch (action) { case M_PROPERTY_GET: - if (stream_control(stream, STREAM_CTRL_GET_CURRENT_TITLE, &title) <= 0) + if (demux_stream_control(d, STREAM_CTRL_GET_CURRENT_TITLE, &title) < 0) return M_PROPERTY_UNAVAILABLE; *(int*)arg = title; return M_PROPERTY_OK; @@ -510,7 +510,7 @@ static int mp_property_disc_title(void *ctx, struct m_property *prop, return M_PROPERTY_OK; case M_PROPERTY_SET: title = *(int*)arg; - if (stream_control(stream, STREAM_CTRL_SET_CURRENT_TITLE, &title) <= 0) + if (demux_stream_control(d, STREAM_CTRL_SET_CURRENT_TITLE, &title) < 0) return M_PROPERTY_NOT_IMPLEMENTED; return M_PROPERTY_OK; } @@ -813,8 +813,8 @@ static int mp_property_disc_titles(void *ctx, struct m_property *prop, MPContext *mpctx = ctx; struct demuxer *demuxer = mpctx->master_demuxer; unsigned int num_titles; - if (!demuxer || stream_control(demuxer->stream, STREAM_CTRL_GET_NUM_TITLES, - &num_titles) < 1) + if (!demuxer || demux_stream_control(demuxer, STREAM_CTRL_GET_NUM_TITLES, + &num_titles) < 1) return M_PROPERTY_UNAVAILABLE; return m_property_int_ro(action, arg, num_titles); } @@ -853,11 +853,11 @@ static int mp_property_angle(void *ctx, struct m_property *prop, int ris, angles = -1, angle = 1; - ris = stream_control(demuxer->stream, STREAM_CTRL_GET_NUM_ANGLES, &angles); + ris = demux_stream_control(demuxer, STREAM_CTRL_GET_NUM_ANGLES, &angles); if (ris == STREAM_UNSUPPORTED) return M_PROPERTY_UNAVAILABLE; - ris = stream_control(demuxer->stream, STREAM_CTRL_GET_ANGLE, &angle); + ris = demux_stream_control(demuxer, STREAM_CTRL_GET_ANGLE, &angle); if (ris == STREAM_UNSUPPORTED) return -1; @@ -878,7 +878,7 @@ static int mp_property_angle(void *ctx, struct m_property *prop, return M_PROPERTY_ERROR; demux_flush(demuxer); - ris = stream_control(demuxer->stream, STREAM_CTRL_SET_ANGLE, &angle); + ris = demux_stream_control(demuxer, STREAM_CTRL_SET_ANGLE, &angle); if (ris != STREAM_OK) return M_PROPERTY_ERROR; @@ -1111,13 +1111,14 @@ static int mp_property_cache_size(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - if (!mpctx->stream) + struct demuxer *demuxer = mpctx->demuxer; + if (!demuxer) return M_PROPERTY_UNAVAILABLE; switch (action) { case M_PROPERTY_GET: case M_PROPERTY_PRINT: { int64_t size = -1; - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_SIZE, &size); + demux_stream_control(demuxer, STREAM_CTRL_GET_CACHE_SIZE, &size); if (size <= 0) break; return property_int_kb_size(size / 1024, action, arg); @@ -1131,7 +1132,7 @@ static int mp_property_cache_size(void *ctx, struct m_property *prop, return M_PROPERTY_OK; case M_PROPERTY_SET: { int64_t size = *(int *)arg * 1024LL; - int r = stream_control(mpctx->stream, STREAM_CTRL_SET_CACHE_SIZE, &size); + int r = demux_stream_control(demuxer, STREAM_CTRL_SET_CACHE_SIZE, &size); if (r == STREAM_UNSUPPORTED) break; if (r == STREAM_OK) @@ -1146,11 +1147,11 @@ static int mp_property_cache_used(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - if (!mpctx->stream) + if (!mpctx->demuxer) return M_PROPERTY_UNAVAILABLE; int64_t size = -1; - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_FILL, &size); + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_FILL, &size); if (size < 0) return M_PROPERTY_UNAVAILABLE; return property_int_kb_size(size / 1024, action, arg); @@ -1160,16 +1161,16 @@ static int mp_property_cache_free(void *ctx, struct m_property *prop, int action, void *arg) { MPContext *mpctx = ctx; - if (!mpctx->stream) + if (!mpctx->demuxer) return M_PROPERTY_UNAVAILABLE; int64_t size_used = -1; - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_FILL, &size_used); + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_FILL, &size_used); if (size_used < 0) return M_PROPERTY_UNAVAILABLE; int64_t size = -1; - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_SIZE, &size); + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_SIZE, &size); if (size <= 0) return M_PROPERTY_UNAVAILABLE; @@ -2308,22 +2309,11 @@ static int mp_property_sub_pos(void *ctx, struct m_property *prop, return property_osd_helper(mpctx, prop, action, arg); } -static int demux_stream_control(struct MPContext *mpctx, int ctrl, void *arg) -{ - int r = STREAM_UNSUPPORTED; - if (mpctx->stream) - r = stream_control(mpctx->stream, ctrl, arg); - if (r == STREAM_UNSUPPORTED && mpctx->demuxer) { - struct demux_ctrl_stream_ctrl c = {ctrl, arg, STREAM_UNSUPPORTED}; - demux_control(mpctx->demuxer, DEMUXER_CTRL_STREAM_CTRL, &c); - r = c.res; - } - return r; -} - static int prop_stream_ctrl(struct MPContext *mpctx, int ctrl, void *arg) { - int r = demux_stream_control(mpctx, ctrl, arg); + if (!mpctx->demuxer) + return M_PROPERTY_UNAVAILABLE; + int r = demux_stream_control(mpctx->demuxer, ctrl, arg); switch (r) { case STREAM_OK: return M_PROPERTY_OK; case STREAM_UNSUPPORTED: return M_PROPERTY_UNAVAILABLE; @@ -3703,7 +3693,8 @@ int run_command(MPContext *mpctx, mp_cmd_t *cmd) break; case MP_CMD_TV_LAST_CHANNEL: { - demux_stream_control(mpctx, STREAM_CTRL_TV_LAST_CHAN, NULL); + if (mpctx->demuxer) + demux_stream_control(mpctx->demuxer, STREAM_CTRL_TV_LAST_CHAN, NULL); break; } diff --git a/player/core.h b/player/core.h index 84b6123c1c..039f5c26cb 100644 --- a/player/core.h +++ b/player/core.h @@ -292,7 +292,6 @@ typedef struct MPContext { double audio_delay; double last_heartbeat; - double last_metadata_update; double last_idle_tick; double mouse_timer; @@ -401,6 +400,7 @@ struct playlist_entry *mp_next_file(struct MPContext *mpctx, int direction, bool force); void mp_set_playlist_entry(struct MPContext *mpctx, struct playlist_entry *e); void mp_play_files(struct MPContext *mpctx); +void update_demuxer_properties(struct MPContext *mpctx); // main.c int mpv_main(int argc, char *argv[]); diff --git a/player/discnav.c b/player/discnav.c index 0b52fed479..2c24fc9a42 100644 --- a/player/discnav.c +++ b/player/discnav.c @@ -26,6 +26,7 @@ #include "common/common.h" #include "input/input.h" +#include "demux/demux.h" #include "stream/discnav.h" #include "sub/dec_sub.h" @@ -90,6 +91,18 @@ int mp_nav_in_menu(struct MPContext *mpctx) return mpctx->nav_state ? mpctx->nav_state->nav_menu : -1; } +// If a demuxer is accessing the stream, we have to use demux_stream_control() +// to avoid synchronization issues; otherwise access it directly. +static int run_stream_control(struct MPContext *mpctx, int cmd, void *arg) +{ + if (mpctx->demuxer) { + return demux_stream_control(mpctx->demuxer, cmd, arg); + } else if (mpctx->stream) { + return stream_control(mpctx->stream, cmd, arg); + } + return STREAM_ERROR; +} + // Allocate state and enable navigation features. Must happen before // initializing cache, because the cache would read data. Since stream_dvdnav is // in a mode which skips all transitions on reading data (before enabling @@ -103,7 +116,7 @@ void mp_nav_init(struct MPContext *mpctx) return; struct mp_nav_cmd inp = {MP_NAV_CMD_ENABLE}; - if (stream_control(mpctx->stream, STREAM_CTRL_NAV_CMD, &inp) < 1) + if (run_stream_control(mpctx, STREAM_CTRL_NAV_CMD, &inp) < 1) return; mpctx->nav_state = talloc_zero(NULL, struct mp_nav_state); @@ -125,14 +138,14 @@ void mp_nav_reset(struct MPContext *mpctx) if (!nav) return; struct mp_nav_cmd inp = {MP_NAV_CMD_RESUME}; - stream_control(mpctx->stream, STREAM_CTRL_NAV_CMD, &inp); + run_stream_control(mpctx, STREAM_CTRL_NAV_CMD, &inp); osd_set_nav_highlight(mpctx->osd, NULL); nav->hi_visible = 0; nav->nav_menu = false; nav->nav_draining = false; nav->nav_still_frame = 0; mp_input_disable_section(mpctx->input, "discnav-menu"); - stream_control(mpctx->stream, STREAM_CTRL_RESUME_CACHE, NULL); + run_stream_control(mpctx, STREAM_CTRL_RESUME_CACHE, NULL); update_state(mpctx); } @@ -164,11 +177,11 @@ void mp_nav_user_input(struct MPContext *mpctx, char *command) osd_coords_to_video(mpctx->osd, vid.w, vid.h, &x, &y); inp.u.mouse_pos.x = x; inp.u.mouse_pos.y = y; - stream_control(mpctx->stream, STREAM_CTRL_NAV_CMD, &inp); + run_stream_control(mpctx, STREAM_CTRL_NAV_CMD, &inp); } else { struct mp_nav_cmd inp = {MP_NAV_CMD_MENU}; inp.u.menu.action = command; - stream_control(mpctx->stream, STREAM_CTRL_NAV_CMD, &inp); + run_stream_control(mpctx, STREAM_CTRL_NAV_CMD, &inp); } } @@ -179,7 +192,7 @@ void mp_handle_nav(struct MPContext *mpctx) return; while (1) { struct mp_nav_event *ev = NULL; - stream_control(mpctx->stream, STREAM_CTRL_GET_NAV_EVENT, &ev); + run_stream_control(mpctx, STREAM_CTRL_GET_NAV_EVENT, &ev); if (!ev) break; switch (ev->event) { @@ -261,15 +274,15 @@ void mp_handle_nav(struct MPContext *mpctx) nav->nav_still_frame = -2; } else if (nav->nav_still_frame == -2) { struct mp_nav_cmd inp = {MP_NAV_CMD_SKIP_STILL}; - stream_control(mpctx->stream, STREAM_CTRL_NAV_CMD, &inp); + run_stream_control(mpctx, STREAM_CTRL_NAV_CMD, &inp); } } if (nav->nav_draining && mpctx->stop_play == AT_END_OF_FILE) { MP_VERBOSE(nav, "execute drain\n"); struct mp_nav_cmd inp = {MP_NAV_CMD_DRAIN_OK}; - stream_control(mpctx->stream, STREAM_CTRL_NAV_CMD, &inp); + run_stream_control(mpctx, STREAM_CTRL_NAV_CMD, &inp); nav->nav_draining = false; - stream_control(mpctx->stream, STREAM_CTRL_RESUME_CACHE, NULL); + run_stream_control(mpctx, STREAM_CTRL_RESUME_CACHE, NULL); } // E.g. keep displaying still frames if (mpctx->stop_play == AT_END_OF_FILE && !nav->nav_eof) diff --git a/player/loadfile.c b/player/loadfile.c index e2a1e6e145..e1ffa5d783 100644 --- a/player/loadfile.c +++ b/player/loadfile.c @@ -221,10 +221,15 @@ static void print_stream(struct MPContext *mpctx, struct track *t) MP_INFO(mpctx, "%s\n", b); } -static void print_file_properties(struct MPContext *mpctx) +void update_demuxer_properties(struct MPContext *mpctx) { struct demuxer *demuxer = mpctx->master_demuxer; - if (demuxer->num_editions > 1) { + if (!demuxer) + return; + demux_update(demuxer); + int events = demuxer->events; + demuxer->events = 0; + if ((events & DEMUX_EVENT_INIT) && demuxer->num_editions > 1) { for (int n = 0; n < demuxer->num_editions; n++) { struct demux_edition *edition = &demuxer->editions[n]; char b[128] = {0}; @@ -238,10 +243,20 @@ static void print_file_properties(struct MPContext *mpctx) MP_INFO(mpctx, "%s\n", b); } } - for (int t = 0; t < STREAM_TYPE_COUNT; t++) { - for (int n = 0; n < mpctx->num_tracks; n++) - if (mpctx->tracks[n]->type == t) - print_stream(mpctx, mpctx->tracks[n]); + if (events & DEMUX_EVENT_STREAMS) { + add_demuxer_tracks(mpctx, demuxer); + for (int t = 0; t < STREAM_TYPE_COUNT; t++) { + for (int n = 0; n < mpctx->num_tracks; n++) + if (mpctx->tracks[n]->type == t) + print_stream(mpctx, mpctx->tracks[n]); + } + } + struct mp_tags *info = demuxer->metadata; + if ((events & DEMUX_EVENT_METADATA) && info->num_keys) { + MP_INFO(mpctx, "File tags:\n"); + for (int n = 0; n < info->num_keys; n++) + MP_INFO(mpctx, " %s: %s\n", info->keys[n], info->values[n]); + mp_notify(mpctx, MPV_EVENT_METADATA_UPDATE, NULL); } } @@ -311,6 +326,11 @@ bool timeline_set_part(struct MPContext *mpctx, int i, bool force) uninit_player(mpctx, INITIALIZED_VCODEC | (mpctx->opts->fixed_vo ? 0 : INITIALIZED_VO) | (mpctx->opts->gapless_audio ? 0 : INITIALIZED_AO) | INITIALIZED_ACODEC | INITIALIZED_SUB | INITIALIZED_SUB2); mpctx->stop_play = orig_stop_play; + if (mpctx->demuxer) { + demux_stop_thread(mpctx->demuxer); + demux_flush(mpctx->demuxer); + } + mpctx->demuxer = n->source; mpctx->stream = mpctx->demuxer->stream; @@ -333,6 +353,9 @@ bool timeline_set_part(struct MPContext *mpctx, int i, bool force) } reselect_demux_streams(mpctx); + if (mpctx->demuxer && mpctx->opts->demuxer_thread) + demux_start_thread(mpctx->demuxer); + return true; } @@ -364,6 +387,7 @@ static int find_new_tid(struct MPContext *mpctx, enum stream_type t) } static struct track *add_stream_track(struct MPContext *mpctx, + struct demuxer *demuxer, struct sh_stream *stream, bool under_timeline) { @@ -383,7 +407,7 @@ static struct track *add_stream_track(struct MPContext *mpctx, .attached_picture = stream->attached_picture != NULL, .lang = stream->lang, .under_timeline = under_timeline, - .demuxer = stream->demuxer, + .demuxer = demuxer, .stream = stream, }; MP_TARRAY_APPEND(mpctx, mpctx->tracks, mpctx->num_tracks, track); @@ -398,7 +422,7 @@ static struct track *add_stream_track(struct MPContext *mpctx, void add_demuxer_tracks(struct MPContext *mpctx, struct demuxer *demuxer) { for (int n = 0; n < demuxer->num_streams; n++) - add_stream_track(mpctx, demuxer->streams[n], !!mpctx->timeline); + add_stream_track(mpctx, demuxer, demuxer->streams[n], !!mpctx->timeline); } // Result numerically higher => better match. 0 == no match. @@ -651,9 +675,11 @@ static void open_subtitles_from_options(struct MPContext *mpctx) void *tmp = talloc_new(NULL); char *base_filename = mpctx->filename; char *stream_filename = NULL; - if (stream_control(mpctx->stream, STREAM_CTRL_GET_BASE_FILENAME, - &stream_filename) > 0) - base_filename = talloc_steal(tmp, stream_filename); + if (mpctx->demuxer) { + if (demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_BASE_FILENAME, + &stream_filename) > 0) + base_filename = talloc_steal(tmp, stream_filename); + } struct subfn *list = find_text_subtitles(mpctx->global, base_filename); talloc_steal(tmp, list); for (int i = 0; list && list[i].fname; i++) { @@ -703,7 +729,7 @@ static struct track *open_external_file(struct MPContext *mpctx, char *filename, for (int n = 0; n < demuxer->num_streams; n++) { struct sh_stream *sh = demuxer->streams[n]; if (sh->type == filter) { - struct track *t = add_stream_track(mpctx, sh, false); + struct track *t = add_stream_track(mpctx, demuxer, sh, false); t->is_external = true; t->title = talloc_strdup(t, disp_filename); t->external_filename = talloc_strdup(t, filename); @@ -1160,6 +1186,11 @@ goto_reopen_demuxer: ; } reselect_demux_streams(mpctx); + update_demuxer_properties(mpctx); + + if (mpctx->demuxer && opts->demuxer_thread) + demux_start_thread(mpctx->demuxer); + if (mpctx->current_track[0][STREAM_VIDEO] && mpctx->current_track[0][STREAM_VIDEO]->attached_picture) { @@ -1167,9 +1198,6 @@ goto_reopen_demuxer: ; "Displaying attached picture. Use --no-audio-display to prevent this.\n"); } - demux_info_update(mpctx->master_demuxer); - print_file_properties(mpctx); - #if HAVE_ENCODING if (mpctx->encode_lavc_ctx && mpctx->current_track[0][STREAM_VIDEO]) encode_lavc_expect_stream(mpctx->encode_lavc_ctx, AVMEDIA_TYPE_VIDEO); @@ -1189,11 +1217,11 @@ goto_reopen_demuxer: ; //==================== START PLAYING ======================= if (!mpctx->d_video && !mpctx->d_audio) { - struct stream *s = mpctx->stream; + struct demuxer *d = mpctx->demuxer; MP_FATAL(mpctx, "No video or audio streams selected.\n"); - if (s->uncached_type == STREAMTYPE_DVB) { + if (d->stream->uncached_type == STREAMTYPE_DVB) { int dir = mpctx->last_dvb_step; - if (stream_control(s, STREAM_CTRL_DVB_STEP_CHANNEL, &dir) > 0) + if (demux_stream_control(d, STREAM_CTRL_DVB_STEP_CHANNEL, &dir) > 0) mpctx->stop_play = PT_RELOAD_DEMUXER; } goto terminate_playback; diff --git a/player/misc.c b/player/misc.c index abaf5b208a..e39175a158 100644 --- a/player/misc.c +++ b/player/misc.c @@ -121,11 +121,11 @@ double get_start_time(struct MPContext *mpctx) float mp_get_cache_percent(struct MPContext *mpctx) { - if (mpctx->stream) { + if (mpctx->demuxer) { int64_t size = -1; int64_t fill = -1; - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_SIZE, &size); - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_FILL, &fill); + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_SIZE, &size); + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_FILL, &fill); if (size > 0 && fill >= 0) return fill / (size / 100.0); } @@ -135,8 +135,8 @@ float mp_get_cache_percent(struct MPContext *mpctx) bool mp_get_cache_idle(struct MPContext *mpctx) { int idle = 0; - if (mpctx->stream) - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_IDLE, &idle); + if (mpctx->demuxer) + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_IDLE, &idle); return idle; } diff --git a/player/playloop.c b/player/playloop.c index 7e9b63995b..9ce4c39ed3 100644 --- a/player/playloop.c +++ b/player/playloop.c @@ -448,10 +448,9 @@ double get_current_pos_ratio(struct MPContext *mpctx, bool use_range) if (len > 0 && !demuxer->ts_resets_possible) { ans = MPCLAMP((pos - start) / len, 0, 1); } else { - struct stream *s = demuxer->stream; int64_t size; - if (stream_control(s, STREAM_CTRL_GET_SIZE, &size) > 0 && size > 0) { - if (demuxer->filepos >= 0) + if (demux_stream_control(demuxer, STREAM_CTRL_GET_SIZE, &size) > 0) { + if (size > 0 && demuxer->filepos >= 0) ans = MPCLAMP(demuxer->filepos / (double)size, 0, 1); } } @@ -613,22 +612,13 @@ static bool handle_osd_redraw(struct MPContext *mpctx) return true; } -static void handle_metadata_update(struct MPContext *mpctx) -{ - if (mp_time_sec() > mpctx->last_metadata_update + 2) { - if (demux_info_update(mpctx->demuxer)) - mp_notify(mpctx, MPV_EVENT_METADATA_UPDATE, NULL); - mpctx->last_metadata_update = mp_time_sec(); - } -} - static void handle_pause_on_low_cache(struct MPContext *mpctx) { struct MPOpts *opts = mpctx->opts; - if (!mpctx->stream) + if (!mpctx->demuxer) return; int64_t fill = -1; - stream_control(mpctx->stream, STREAM_CTRL_GET_CACHE_FILL, &fill); + demux_stream_control(mpctx->demuxer, STREAM_CTRL_GET_CACHE_FILL, &fill); int cache_kb = fill > 0 ? (fill + 1023) / 1024 : -1; bool idle = mp_get_cache_idle(mpctx); if (mpctx->paused && mpctx->paused_for_cache) { @@ -927,9 +917,7 @@ void run_playloop(struct MPContext *mpctx) } #endif - // Add tracks that were added by the demuxer later (e.g. MPEG) - if (!mpctx->timeline && mpctx->demuxer) - add_demuxer_tracks(mpctx, mpctx->demuxer); + update_demuxer_properties(mpctx); if (mpctx->timeline) { double end = mpctx->timeline[mpctx->timeline_part + 1].start; @@ -1271,8 +1259,6 @@ void run_playloop(struct MPContext *mpctx) } } - handle_metadata_update(mpctx); - handle_pause_on_low_cache(mpctx); handle_input_and_seek_coalesce(mpctx); diff --git a/stream/cache.c b/stream/cache.c index 197f7d3391..b1b3c9301f 100644 --- a/stream/cache.c +++ b/stream/cache.c @@ -110,11 +110,7 @@ struct priv { // Cached STREAM_CTRLs double stream_time_length; int64_t stream_size; - unsigned int stream_num_chapters; - int stream_cache_idle; - int stream_cache_fill; struct mp_tags *stream_metadata; - char *disc_name; double start_pts; }; @@ -352,26 +348,16 @@ static int resize_cache(struct priv *s, int64_t size) static void update_cached_controls(struct priv *s) { - unsigned int ui; int64_t i64; double d; struct mp_tags *tags; - char *t; s->stream_time_length = 0; if (stream_control(s->stream, STREAM_CTRL_GET_TIME_LENGTH, &d) == STREAM_OK) s->stream_time_length = d; - s->stream_num_chapters = 0; - if (stream_control(s->stream, STREAM_CTRL_GET_NUM_CHAPTERS, &ui) == STREAM_OK) - s->stream_num_chapters = ui; if (stream_control(s->stream, STREAM_CTRL_GET_METADATA, &tags) == STREAM_OK) { talloc_free(s->stream_metadata); s->stream_metadata = talloc_steal(s, tags); } - if (stream_control(s->stream, STREAM_CTRL_GET_DISC_NAME, &t) == STREAM_OK) - { - talloc_free(s->disc_name); - s->disc_name = talloc_steal(s, t); - } s->stream_size = -1; if (stream_control(s->stream, STREAM_CTRL_GET_SIZE, &i64) == STREAM_OK) s->stream_size = i64; @@ -399,9 +385,6 @@ static int cache_get_cached_control(stream_t *cache, int cmd, void *arg) return STREAM_UNSUPPORTED; *(int64_t *)arg = s->stream_size; return STREAM_OK; - case STREAM_CTRL_GET_NUM_CHAPTERS: - *(unsigned int *)arg = s->stream_num_chapters; - return STREAM_OK; case STREAM_CTRL_GET_CURRENT_TIME: { if (s->start_pts == MP_NOPTS_VALUE) return STREAM_UNSUPPORTED; @@ -417,12 +400,6 @@ static int cache_get_cached_control(stream_t *cache, int cmd, void *arg) } return STREAM_UNSUPPORTED; } - case STREAM_CTRL_GET_DISC_NAME: { - if (!s->disc_name) - return STREAM_UNSUPPORTED; - *(char **)arg = talloc_strdup(NULL, s->disc_name); - return STREAM_OK; - } case STREAM_CTRL_RESUME_CACHE: s->idle = s->eof = false; pthread_cond_signal(&s->wakeup); |