2 Copyright (C) 2012-2018 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
39 #include <dcp/locale_convert.h>
40 #include <boost/foreach.hpp>
48 /* OS X strikes again */
60 using boost::shared_ptr;
61 using boost::weak_ptr;
62 using boost::dynamic_pointer_cast;
63 using boost::optional;
66 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
71 , _queued_full_in_memory (0)
72 /* These will be reset to sensible values when J2KEncoder is created */
73 , _maximum_frames_in_memory (8)
74 , _maximum_queue_size (8)
80 shared_ptr<Job> job = _job.lock ();
81 DCPOMATIC_ASSERT (job);
84 list<DCPTimePeriod> const reels = _film->reels ();
85 BOOST_FOREACH (DCPTimePeriod p, reels) {
86 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
89 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
90 and captions arrive to the Writer in sequence. This is not so for video.
92 _audio_reel = _reels.begin ();
93 _subtitle_reel = _reels.begin ();
94 BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
95 _caption_reels[i] = _reels.begin ();
98 /* Check that the signer is OK if we need one */
100 if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
101 throw InvalidSignerError (reason);
108 _thread = new boost::thread (boost::bind (&Writer::thread, this));
109 #ifdef DCPOMATIC_LINUX
110 pthread_setname_np (_thread->native_handle(), "writer");
116 terminate_thread (false);
119 /** Pass a video frame to the writer for writing to disk at some point.
120 * This method can be called with frames out of order.
121 * @param encoded JPEG2000-encoded data.
122 * @param frame Frame index within the DCP.
123 * @param eyes Eyes that this frame image is for.
126 Writer::write (Data encoded, Frame frame, Eyes eyes)
128 boost::mutex::scoped_lock lock (_state_mutex);
130 while (_queued_full_in_memory > _maximum_frames_in_memory) {
131 /* There are too many full frames in memory; wake the main writer thread and
132 wait until it sorts everything out */
133 _empty_condition.notify_all ();
134 _full_condition.wait (lock);
138 qi.type = QueueItem::FULL;
139 qi.encoded = encoded;
140 qi.reel = video_reel (frame);
141 qi.frame = frame - _reels[qi.reel].start ();
143 if (_film->three_d() && eyes == EYES_BOTH) {
144 /* 2D material in a 3D DCP; fake the 3D */
146 _queue.push_back (qi);
147 ++_queued_full_in_memory;
148 qi.eyes = EYES_RIGHT;
149 _queue.push_back (qi);
150 ++_queued_full_in_memory;
153 _queue.push_back (qi);
154 ++_queued_full_in_memory;
157 /* Now there's something to do: wake anything wait()ing on _empty_condition */
158 _empty_condition.notify_all ();
162 Writer::can_repeat (Frame frame) const
164 return frame > _reels[video_reel(frame)].start();
167 /** Repeat the last frame that was written to a reel as a new frame.
168 * @param frame Frame index within the DCP of the new (repeated) frame.
169 * @param eyes Eyes that this repeated frame image is for.
172 Writer::repeat (Frame frame, Eyes eyes)
174 boost::mutex::scoped_lock lock (_state_mutex);
176 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
177 /* The queue is too big, and the main writer thread can run and fix it, so
178 wake it and wait until it has done.
180 _empty_condition.notify_all ();
181 _full_condition.wait (lock);
185 qi.type = QueueItem::REPEAT;
186 qi.reel = video_reel (frame);
187 qi.frame = frame - _reels[qi.reel].start ();
188 if (_film->three_d() && eyes == EYES_BOTH) {
190 _queue.push_back (qi);
191 qi.eyes = EYES_RIGHT;
192 _queue.push_back (qi);
195 _queue.push_back (qi);
198 /* Now there's something to do: wake anything wait()ing on _empty_condition */
199 _empty_condition.notify_all ();
203 Writer::fake_write (Frame frame, Eyes eyes)
205 boost::mutex::scoped_lock lock (_state_mutex);
207 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
208 /* The queue is too big, and the main writer thread can run and fix it, so
209 wake it and wait until it has done.
211 _empty_condition.notify_all ();
212 _full_condition.wait (lock);
215 size_t const reel = video_reel (frame);
216 Frame const reel_frame = frame - _reels[reel].start ();
218 FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
220 throw ReadFileError (_film->info_file(_reels[reel].period()));
222 dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
226 qi.type = QueueItem::FAKE;
229 qi.frame = reel_frame;
230 if (_film->three_d() && eyes == EYES_BOTH) {
232 _queue.push_back (qi);
233 qi.eyes = EYES_RIGHT;
234 _queue.push_back (qi);
237 _queue.push_back (qi);
240 /* Now there's something to do: wake anything wait()ing on _empty_condition */
241 _empty_condition.notify_all ();
244 /** Write some audio frames to the DCP.
245 * @param audio Audio data.
246 * @param time Time of this data within the DCP.
247 * This method is not thread safe.
250 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
252 DCPOMATIC_ASSERT (audio);
254 int const afr = _film->audio_frame_rate();
256 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
258 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
263 if (_audio_reel == _reels.end ()) {
264 /* This audio is off the end of the last reel; ignore it */
268 if (end <= _audio_reel->period().to) {
269 /* Easy case: we can write all the audio to this reel */
270 _audio_reel->write (audio);
273 /* Split the audio into two and write the first part */
274 DCPTime part_lengths[2] = {
275 _audio_reel->period().to - t,
276 end - _audio_reel->period().to
279 Frame part_frames[2] = {
280 part_lengths[0].frames_ceil(afr),
281 part_lengths[1].frames_ceil(afr)
284 if (part_frames[0]) {
285 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
286 part->copy_from (audio.get(), part_frames[0], 0, 0);
287 _audio_reel->write (part);
290 if (part_frames[1]) {
291 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
292 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
299 t += part_lengths[0];
304 /** This must be called from Writer::thread() with an appropriate lock held */
306 Writer::have_sequenced_image_at_queue_head ()
308 if (_queue.empty ()) {
314 QueueItem const & f = _queue.front();
315 ReelWriter const & reel = _reels[f.reel];
317 /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
319 if (f.eyes == EYES_BOTH) {
321 return f.frame == (reel.last_written_video_frame() + 1);
326 if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
330 if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
343 boost::mutex::scoped_lock lock (_state_mutex);
347 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
348 /* We've got something to do: go and do it */
352 /* Nothing to do: wait until something happens which may indicate that we do */
353 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
354 _empty_condition.wait (lock);
355 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
358 if (_finish && _queue.empty()) {
362 /* We stop here if we have been asked to finish, and if either the queue
363 is empty or we do not have a sequenced image at its head (if this is the
364 case we will never terminate as no new frames will be sent once
367 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
368 /* (Hopefully temporarily) log anything that was not written */
369 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
370 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
371 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
372 if (i->type == QueueItem::FULL) {
373 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
375 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
382 /* Write any frames that we can write; i.e. those that are in sequence. */
383 while (have_sequenced_image_at_queue_head ()) {
384 QueueItem qi = _queue.front ();
386 if (qi.type == QueueItem::FULL && qi.encoded) {
387 --_queued_full_in_memory;
392 ReelWriter& reel = _reels[qi.reel];
395 case QueueItem::FULL:
396 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
398 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
400 reel.write (qi.encoded, qi.frame, qi.eyes);
403 case QueueItem::FAKE:
404 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
405 reel.fake_write (qi.frame, qi.eyes, qi.size);
408 case QueueItem::REPEAT:
409 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
410 reel.repeat_write (qi.frame, qi.eyes);
416 _full_condition.notify_all ();
419 while (_queued_full_in_memory > _maximum_frames_in_memory) {
420 /* Too many frames in memory which can't yet be written to the stream.
421 Write some FULL frames to disk.
424 /* Find one from the back of the queue */
426 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
427 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
431 DCPOMATIC_ASSERT (i != _queue.rend());
433 /* For the log message below */
434 int const awaiting = _reels[_queue.front().reel].last_written_video_frame() + 1;
437 /* i is valid here, even though we don't hold a lock on the mutex,
438 since list iterators are unaffected by insertion and only this
439 thread could erase the last item in the list.
442 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
444 i->encoded->write_via_temp (
445 _film->j2c_path (i->reel, i->frame, i->eyes, true),
446 _film->j2c_path (i->reel, i->frame, i->eyes, false)
451 --_queued_full_in_memory;
452 _full_condition.notify_all ();
462 Writer::terminate_thread (bool can_throw)
464 boost::mutex::scoped_lock lock (_state_mutex);
470 _empty_condition.notify_all ();
471 _full_condition.notify_all ();
474 if (_thread->joinable ()) {
493 LOG_GENERAL_NC ("Terminating writer thread");
495 terminate_thread (true);
497 LOG_GENERAL_NC ("Finishing ReelWriters");
499 BOOST_FOREACH (ReelWriter& i, _reels) {
503 LOG_GENERAL_NC ("Writing XML");
505 dcp::DCP dcp (_film->dir (_film->dcp_name()));
507 shared_ptr<dcp::CPL> cpl (
510 _film->dcp_content_type()->libdcp_kind ()
516 /* Calculate digests for each reel in parallel */
518 shared_ptr<Job> job = _job.lock ();
519 job->sub (_("Computing digests"));
521 boost::asio::io_service service;
522 boost::thread_group pool;
524 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
526 int const threads = max (1, Config::instance()->master_encoding_threads ());
528 for (int i = 0; i < threads; ++i) {
529 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
532 BOOST_FOREACH (ReelWriter& i, _reels) {
533 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
534 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
541 /* Add reels to CPL */
543 BOOST_FOREACH (ReelWriter& i, _reels) {
544 cpl->add (i.create_reel (_reel_assets, _fonts));
547 dcp::XMLMetadata meta;
548 meta.annotation_text = cpl->annotation_text ();
549 meta.creator = Config::instance()->dcp_creator ();
550 if (meta.creator.empty ()) {
551 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
553 meta.issuer = Config::instance()->dcp_issuer ();
554 if (meta.issuer.empty ()) {
555 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
557 meta.set_issue_date_now ();
559 cpl->set_metadata (meta);
561 shared_ptr<const dcp::CertificateChain> signer;
562 if (_film->is_signed ()) {
563 signer = Config::instance()->signer_chain ();
564 /* We did check earlier, but check again here to be on the safe side */
566 if (!signer->valid (&reason)) {
567 throw InvalidSignerError (reason);
571 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
574 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
577 write_cover_sheet ();
581 Writer::write_cover_sheet ()
583 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
584 FILE* f = fopen_boost (cover, "w");
586 throw OpenFileError (cover, errno, false);
589 string text = Config::instance()->cover_sheet ();
590 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
591 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
592 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
593 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
594 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", _film->isdcf_metadata().subtitle_language);
596 boost::uintmax_t size = 0;
598 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
599 i != boost::filesystem::recursive_directory_iterator();
601 if (boost::filesystem::is_regular_file (i->path ())) {
602 size += boost::filesystem::file_size (i->path ());
606 if (size > (1000000000L)) {
607 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
609 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
612 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
613 string description = String::compose("%1.%2", ch.first, ch.second);
615 if (description == "0.0") {
616 description = _("None");
617 } else if (description == "1.0") {
618 description = _("Mono");
619 } else if (description == "2.0") {
620 description = _("Stereo");
622 boost::algorithm::replace_all (text, "$AUDIO", description);
625 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
627 if (h == 0 && m == 0) {
628 length = String::compose("%1s", s);
629 } else if (h == 0 && m > 0) {
630 length = String::compose("%1m%2s", m, s);
631 } else if (h > 0 && m > 0) {
632 length = String::compose("%1h%2m%3s", h, m, s);
635 boost::algorithm::replace_all (text, "$LENGTH", length);
637 checked_fwrite (text.c_str(), text.length(), f, cover);
641 /** @param frame Frame index within the whole DCP.
642 * @return true if we can fake-write this frame.
645 Writer::can_fake_write (Frame frame) const
647 if (_film->encrypted()) {
648 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
652 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
653 parameters in the asset writer.
656 ReelWriter const & reel = _reels[video_reel(frame)];
658 /* Make frame relative to the start of the reel */
659 frame -= reel.start ();
660 return (frame != 0 && frame < reel.first_nonexistant_frame());
663 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
665 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
667 vector<ReelWriter>::iterator* reel = 0;
670 case TEXT_OPEN_SUBTITLE:
671 reel = &_subtitle_reel;
673 case TEXT_CLOSED_CAPTION:
674 DCPOMATIC_ASSERT (track);
675 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
676 reel = &_caption_reels[*track];
679 DCPOMATIC_ASSERT (false);
682 DCPOMATIC_ASSERT (*reel != _reels.end());
683 while ((*reel)->period().to <= period.from) {
685 DCPOMATIC_ASSERT (*reel != _reels.end());
688 (*reel)->write (text, type, track, period);
692 Writer::write (list<shared_ptr<Font> > fonts)
694 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
696 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
698 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
705 _fonts.push_back (i);
711 operator< (QueueItem const & a, QueueItem const & b)
713 if (a.reel != b.reel) {
714 return a.reel < b.reel;
717 if (a.frame != b.frame) {
718 return a.frame < b.frame;
721 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
725 operator== (QueueItem const & a, QueueItem const & b)
727 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
731 Writer::set_encoder_threads (int threads)
733 boost::mutex::scoped_lock lm (_state_mutex);
734 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
735 _maximum_queue_size = threads * 16;
739 Writer::write (ReferencedReelAsset asset)
741 _reel_assets.push_back (asset);
745 Writer::video_reel (int frame) const
747 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
749 while (i < _reels.size() && !_reels[i].period().contains (t)) {
753 DCPOMATIC_ASSERT (i < _reels.size ());
758 Writer::set_digest_progress (Job* job, float progress)
760 /* I believe this is thread-safe */
761 _digest_progresses[boost::this_thread::get_id()] = progress;
763 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
764 float min_progress = FLT_MAX;
765 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
766 min_progress = min (min_progress, i->second);
769 job->set_progress (min_progress);