2 Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
49 /* OS X strikes again */
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
66 using namespace dcpomatic;
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
72 , _queued_full_in_memory (0)
73 /* These will be reset to sensible values when J2KEncoder is created */
74 , _maximum_frames_in_memory (8)
75 , _maximum_queue_size (8)
81 shared_ptr<Job> job = _job.lock ();
82 DCPOMATIC_ASSERT (job);
85 list<DCPTimePeriod> const reels = _film->reels ();
86 BOOST_FOREACH (DCPTimePeriod p, reels) {
87 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
90 _last_written.resize (reels.size());
92 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
93 and captions arrive to the Writer in sequence. This is not so for video.
95 _audio_reel = _reels.begin ();
96 _subtitle_reel = _reels.begin ();
97 BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
98 _caption_reels[i] = _reels.begin ();
100 _atmos_reel = _reels.begin ();
102 /* Check that the signer is OK */
104 if (!Config::instance()->signer_chain()->valid(&reason)) {
105 throw InvalidSignerError (reason);
112 _thread = boost::thread (boost::bind(&Writer::thread, this));
113 #ifdef DCPOMATIC_LINUX
114 pthread_setname_np (_thread.native_handle(), "writer");
120 terminate_thread (false);
123 /** Pass a video frame to the writer for writing to disk at some point.
124 * This method can be called with frames out of order.
125 * @param encoded JPEG2000-encoded data.
126 * @param frame Frame index within the DCP.
127 * @param eyes Eyes that this frame image is for.
130 Writer::write (Data encoded, Frame frame, Eyes eyes)
132 boost::mutex::scoped_lock lock (_state_mutex);
134 while (_queued_full_in_memory > _maximum_frames_in_memory) {
135 /* There are too many full frames in memory; wake the main writer thread and
136 wait until it sorts everything out */
137 _empty_condition.notify_all ();
138 _full_condition.wait (lock);
142 qi.type = QueueItem::FULL;
143 qi.encoded = encoded;
144 qi.reel = video_reel (frame);
145 qi.frame = frame - _reels[qi.reel].start ();
147 if (_film->three_d() && eyes == EYES_BOTH) {
148 /* 2D material in a 3D DCP; fake the 3D */
150 _queue.push_back (qi);
151 ++_queued_full_in_memory;
152 qi.eyes = EYES_RIGHT;
153 _queue.push_back (qi);
154 ++_queued_full_in_memory;
157 _queue.push_back (qi);
158 ++_queued_full_in_memory;
161 /* Now there's something to do: wake anything wait()ing on _empty_condition */
162 _empty_condition.notify_all ();
166 Writer::can_repeat (Frame frame) const
168 return frame > _reels[video_reel(frame)].start();
171 /** Repeat the last frame that was written to a reel as a new frame.
172 * @param frame Frame index within the DCP of the new (repeated) frame.
173 * @param eyes Eyes that this repeated frame image is for.
176 Writer::repeat (Frame frame, Eyes eyes)
178 boost::mutex::scoped_lock lock (_state_mutex);
180 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
181 /* The queue is too big, and the main writer thread can run and fix it, so
182 wake it and wait until it has done.
184 _empty_condition.notify_all ();
185 _full_condition.wait (lock);
189 qi.type = QueueItem::REPEAT;
190 qi.reel = video_reel (frame);
191 qi.frame = frame - _reels[qi.reel].start ();
192 if (_film->three_d() && eyes == EYES_BOTH) {
194 _queue.push_back (qi);
195 qi.eyes = EYES_RIGHT;
196 _queue.push_back (qi);
199 _queue.push_back (qi);
202 /* Now there's something to do: wake anything wait()ing on _empty_condition */
203 _empty_condition.notify_all ();
207 Writer::fake_write (Frame frame, Eyes eyes)
209 boost::mutex::scoped_lock lock (_state_mutex);
211 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
212 /* The queue is too big, and the main writer thread can run and fix it, so
213 wake it and wait until it has done.
215 _empty_condition.notify_all ();
216 _full_condition.wait (lock);
219 size_t const reel = video_reel (frame);
220 Frame const frame_in_reel = frame - _reels[reel].start ();
223 qi.type = QueueItem::FAKE;
226 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
227 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
231 qi.frame = frame_in_reel;
232 if (_film->three_d() && eyes == EYES_BOTH) {
234 _queue.push_back (qi);
235 qi.eyes = EYES_RIGHT;
236 _queue.push_back (qi);
239 _queue.push_back (qi);
242 /* Now there's something to do: wake anything wait()ing on _empty_condition */
243 _empty_condition.notify_all ();
246 /** Write some audio frames to the DCP.
247 * @param audio Audio data.
248 * @param time Time of this data within the DCP.
249 * This method is not thread safe.
252 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
254 DCPOMATIC_ASSERT (audio);
256 int const afr = _film->audio_frame_rate();
258 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
260 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
265 if (_audio_reel == _reels.end ()) {
266 /* This audio is off the end of the last reel; ignore it */
270 if (end <= _audio_reel->period().to) {
271 /* Easy case: we can write all the audio to this reel */
272 _audio_reel->write (audio);
274 } else if (_audio_reel->period().to <= t) {
275 /* This reel is entirely before the start of our audio; just skip the reel */
278 /* This audio is over a reel boundary; split the audio into two and write the first part */
279 DCPTime part_lengths[2] = {
280 _audio_reel->period().to - t,
281 end - _audio_reel->period().to
284 Frame part_frames[2] = {
285 part_lengths[0].frames_ceil(afr),
286 part_lengths[1].frames_ceil(afr)
289 if (part_frames[0]) {
290 shared_ptr<AudioBuffers> part (new AudioBuffers(audio, part_frames[0], 0));
291 _audio_reel->write (part);
294 if (part_frames[1]) {
295 audio.reset (new AudioBuffers(audio, part_frames[1], part_frames[0]));
301 t += part_lengths[0];
308 Writer::write (shared_ptr<const dcp::AtmosFrame> atmos, DCPTime time, AtmosMetadata metadata)
310 if (_atmos_reel->period().to == time) {
312 DCPOMATIC_ASSERT (_atmos_reel != _reels.end());
315 /* We assume that we get a video frame's worth of data here */
316 _atmos_reel->write (atmos, metadata);
320 /** Caller must hold a lock on _state_mutex */
322 Writer::have_sequenced_image_at_queue_head ()
324 if (_queue.empty ()) {
329 QueueItem const & f = _queue.front();
330 return _last_written[f.reel].next(f);
335 Writer::LastWritten::next (QueueItem qi) const
337 if (qi.eyes == EYES_BOTH) {
339 return qi.frame == (_frame + 1);
344 if (_eyes == EYES_LEFT && qi.frame == _frame && qi.eyes == EYES_RIGHT) {
348 if (_eyes == EYES_RIGHT && qi.frame == (_frame + 1) && qi.eyes == EYES_LEFT) {
357 Writer::LastWritten::update (QueueItem qi)
370 boost::mutex::scoped_lock lock (_state_mutex);
374 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
375 /* We've got something to do: go and do it */
379 /* Nothing to do: wait until something happens which may indicate that we do */
380 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
381 _empty_condition.wait (lock);
382 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
385 if (_finish && _queue.empty()) {
389 /* We stop here if we have been asked to finish, and if either the queue
390 is empty or we do not have a sequenced image at its head (if this is the
391 case we will never terminate as no new frames will be sent once
394 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
395 /* (Hopefully temporarily) log anything that was not written */
396 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
397 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
398 BOOST_FOREACH (QueueItem const& i, _queue) {
399 if (i.type == QueueItem::FULL) {
400 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
402 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
409 /* Write any frames that we can write; i.e. those that are in sequence. */
410 while (have_sequenced_image_at_queue_head ()) {
411 QueueItem qi = _queue.front ();
412 _last_written[qi.reel].update (qi);
414 if (qi.type == QueueItem::FULL && qi.encoded) {
415 --_queued_full_in_memory;
420 ReelWriter& reel = _reels[qi.reel];
423 case QueueItem::FULL:
424 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
426 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
428 reel.write (qi.encoded, qi.frame, qi.eyes);
431 case QueueItem::FAKE:
432 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
433 reel.fake_write (qi.size);
436 case QueueItem::REPEAT:
437 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
438 reel.repeat_write (qi.frame, qi.eyes);
444 _full_condition.notify_all ();
447 while (_queued_full_in_memory > _maximum_frames_in_memory) {
448 /* Too many frames in memory which can't yet be written to the stream.
449 Write some FULL frames to disk.
452 /* Find one from the back of the queue */
454 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
455 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
459 DCPOMATIC_ASSERT (i != _queue.rend());
461 /* For the log message below */
462 int const awaiting = _last_written[_queue.front().reel].frame() + 1;
465 /* i is valid here, even though we don't hold a lock on the mutex,
466 since list iterators are unaffected by insertion and only this
467 thread could erase the last item in the list.
470 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
472 i->encoded->write_via_temp (
473 _film->j2c_path (i->reel, i->frame, i->eyes, true),
474 _film->j2c_path (i->reel, i->frame, i->eyes, false)
479 --_queued_full_in_memory;
480 _full_condition.notify_all ();
490 Writer::terminate_thread (bool can_throw)
492 boost::mutex::scoped_lock lock (_state_mutex);
493 if (!_thread.joinable()) {
498 _empty_condition.notify_all ();
499 _full_condition.notify_all ();
512 if (!_thread.joinable()) {
516 LOG_GENERAL_NC ("Terminating writer thread");
518 terminate_thread (true);
520 LOG_GENERAL_NC ("Finishing ReelWriters");
522 BOOST_FOREACH (ReelWriter& i, _reels) {
526 LOG_GENERAL_NC ("Writing XML");
528 dcp::DCP dcp (_film->dir (_film->dcp_name()));
530 shared_ptr<dcp::CPL> cpl (
533 _film->dcp_content_type()->libdcp_kind ()
539 /* Calculate digests for each reel in parallel */
541 shared_ptr<Job> job = _job.lock ();
542 job->sub (_("Computing digests"));
544 boost::asio::io_service service;
545 boost::thread_group pool;
547 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
549 int const threads = max (1, Config::instance()->master_encoding_threads ());
551 for (int i = 0; i < threads; ++i) {
552 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
555 BOOST_FOREACH (ReelWriter& i, _reels) {
556 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
557 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
564 /* Add reels to CPL */
566 BOOST_FOREACH (ReelWriter& i, _reels) {
567 cpl->add (i.create_reel (_reel_assets, _fonts));
570 dcp::XMLMetadata meta;
571 meta.annotation_text = cpl->annotation_text ();
572 meta.creator = Config::instance()->dcp_creator ();
573 if (meta.creator.empty ()) {
574 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
576 meta.issuer = Config::instance()->dcp_issuer ();
577 if (meta.issuer.empty ()) {
578 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
580 meta.set_issue_date_now ();
582 cpl->set_metadata (meta);
583 cpl->set_ratings (vector_to_list(_film->ratings()));
584 cpl->set_content_version_label_text (_film->content_version());
586 shared_ptr<const dcp::CertificateChain> signer;
587 signer = Config::instance()->signer_chain ();
588 /* We did check earlier, but check again here to be on the safe side */
590 if (!signer->valid (&reason)) {
591 throw InvalidSignerError (reason);
594 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
597 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
600 write_cover_sheet ();
604 Writer::write_cover_sheet ()
606 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
607 FILE* f = fopen_boost (cover, "w");
609 throw OpenFileError (cover, errno, OpenFileError::WRITE);
612 string text = Config::instance()->cover_sheet ();
613 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
614 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
615 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
616 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
618 optional<string> subtitle_language;
619 BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
620 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
621 if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
622 subtitle_language = j->language ();
626 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
628 boost::uintmax_t size = 0;
630 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
631 i != boost::filesystem::recursive_directory_iterator();
633 if (boost::filesystem::is_regular_file (i->path ())) {
634 size += boost::filesystem::file_size (i->path ());
638 if (size > (1000000000L)) {
639 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
641 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
644 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
645 string description = String::compose("%1.%2", ch.first, ch.second);
647 if (description == "0.0") {
648 description = _("None");
649 } else if (description == "1.0") {
650 description = _("Mono");
651 } else if (description == "2.0") {
652 description = _("Stereo");
654 boost::algorithm::replace_all (text, "$AUDIO", description);
657 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
659 if (h == 0 && m == 0) {
660 length = String::compose("%1s", s);
661 } else if (h == 0 && m > 0) {
662 length = String::compose("%1m%2s", m, s);
663 } else if (h > 0 && m > 0) {
664 length = String::compose("%1h%2m%3s", h, m, s);
667 boost::algorithm::replace_all (text, "$LENGTH", length);
669 checked_fwrite (text.c_str(), text.length(), f, cover);
673 /** @param frame Frame index within the whole DCP.
674 * @return true if we can fake-write this frame.
677 Writer::can_fake_write (Frame frame) const
679 if (_film->encrypted()) {
680 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
684 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
685 parameters in the asset writer.
688 ReelWriter const & reel = _reels[video_reel(frame)];
690 /* Make frame relative to the start of the reel */
691 frame -= reel.start ();
692 return (frame != 0 && frame < reel.first_nonexistant_frame());
695 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
697 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
699 vector<ReelWriter>::iterator* reel = 0;
702 case TEXT_OPEN_SUBTITLE:
703 reel = &_subtitle_reel;
705 case TEXT_CLOSED_CAPTION:
706 DCPOMATIC_ASSERT (track);
707 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
708 reel = &_caption_reels[*track];
711 DCPOMATIC_ASSERT (false);
714 DCPOMATIC_ASSERT (*reel != _reels.end());
715 while ((*reel)->period().to <= period.from) {
717 DCPOMATIC_ASSERT (*reel != _reels.end());
720 (*reel)->write (text, type, track, period);
724 Writer::write (list<shared_ptr<Font> > fonts)
726 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
728 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
730 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
737 _fonts.push_back (i);
743 operator< (QueueItem const & a, QueueItem const & b)
745 if (a.reel != b.reel) {
746 return a.reel < b.reel;
749 if (a.frame != b.frame) {
750 return a.frame < b.frame;
753 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
757 operator== (QueueItem const & a, QueueItem const & b)
759 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
763 Writer::set_encoder_threads (int threads)
765 boost::mutex::scoped_lock lm (_state_mutex);
766 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
767 _maximum_queue_size = threads * 16;
771 Writer::write (ReferencedReelAsset asset)
773 _reel_assets.push_back (asset);
777 Writer::video_reel (int frame) const
779 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
781 while (i < _reels.size() && !_reels[i].period().contains (t)) {
785 DCPOMATIC_ASSERT (i < _reels.size ());
790 Writer::set_digest_progress (Job* job, float progress)
792 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
794 _digest_progresses[boost::this_thread::get_id()] = progress;
795 float min_progress = FLT_MAX;
796 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
797 min_progress = min (min_progress, i->second);
800 job->set_progress (min_progress);