2 Copyright (C) 2012-2017 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcp_video.h"
27 #include "dcp_content_type.h"
28 #include "audio_mapping.h"
32 #include "audio_buffers.h"
36 #include "reel_writer.h"
38 #include <dcp/locale_convert.h>
39 #include <boost/foreach.hpp>
47 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
48 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
49 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
50 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
52 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
53 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
55 /* OS X strikes again */
66 using boost::shared_ptr;
67 using boost::weak_ptr;
68 using boost::dynamic_pointer_cast;
71 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
76 , _queued_full_in_memory (0)
77 /* These will be reset to sensible values when J2KEncoder is created */
78 , _maximum_frames_in_memory (8)
79 , _maximum_queue_size (8)
85 shared_ptr<Job> job = _job.lock ();
86 DCPOMATIC_ASSERT (job);
89 list<DCPTimePeriod> const reels = _film->reels ();
90 BOOST_FOREACH (DCPTimePeriod p, reels) {
91 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
94 /* We can keep track of the current audio and subtitle reels easily because audio
95 and subs arrive to the Writer in sequence. This is not so for video.
97 _audio_reel = _reels.begin ();
98 _subtitle_reel = _reels.begin ();
100 /* Check that the signer is OK if we need one */
102 if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
103 throw InvalidSignerError (reason);
110 _thread = new boost::thread (boost::bind (&Writer::thread, this));
111 #ifdef DCPOMATIC_LINUX
112 pthread_setname_np (_thread->native_handle(), "writer");
118 terminate_thread (false);
121 /** Pass a video frame to the writer for writing to disk at some point.
122 * This method can be called with frames out of order.
123 * @param encoded JPEG2000-encoded data.
124 * @param frame Frame index within the DCP.
125 * @param eyes Eyes that this frame image is for.
128 Writer::write (Data encoded, Frame frame, Eyes eyes)
130 boost::mutex::scoped_lock lock (_state_mutex);
132 while (_queued_full_in_memory > _maximum_frames_in_memory) {
133 /* There are too many full frames in memory; wait until that is sorted out */
134 _full_condition.wait (lock);
138 qi.type = QueueItem::FULL;
139 qi.encoded = encoded;
140 qi.reel = video_reel (frame);
141 qi.frame = frame - _reels[qi.reel].start ();
143 if (_film->three_d() && eyes == EYES_BOTH) {
144 /* 2D material in a 3D DCP; fake the 3D */
146 _queue.push_back (qi);
147 ++_queued_full_in_memory;
148 qi.eyes = EYES_RIGHT;
149 _queue.push_back (qi);
150 ++_queued_full_in_memory;
153 _queue.push_back (qi);
154 ++_queued_full_in_memory;
157 /* Now there's something to do: wake anything wait()ing on _empty_condition */
158 _empty_condition.notify_all ();
162 Writer::can_repeat (Frame frame) const
164 return frame > _reels[video_reel(frame)].start();
167 /** Repeat the last frame that was written to a reel as a new frame.
168 * @param frame Frame index within the DCP of the new (repeated) frame.
169 * @param eyes Eyes that this repeated frame image is for.
172 Writer::repeat (Frame frame, Eyes eyes)
174 boost::mutex::scoped_lock lock (_state_mutex);
176 while (_queue.size() > _maximum_queue_size) {
177 /* The queue is too big; wait until that is sorted out */
178 _full_condition.wait (lock);
182 qi.type = QueueItem::REPEAT;
183 qi.reel = video_reel (frame);
184 qi.frame = frame - _reels[qi.reel].start ();
185 if (_film->three_d() && eyes == EYES_BOTH) {
187 _queue.push_back (qi);
188 qi.eyes = EYES_RIGHT;
189 _queue.push_back (qi);
192 _queue.push_back (qi);
195 /* Now there's something to do: wake anything wait()ing on _empty_condition */
196 _empty_condition.notify_all ();
200 Writer::fake_write (Frame frame, Eyes eyes)
202 boost::mutex::scoped_lock lock (_state_mutex);
204 while (_queue.size() > _maximum_queue_size) {
205 /* The queue is too big; wait until that is sorted out. We're assuming here
206 that it will be sorted out either by time or by a necessary full-written
207 frame being given to us. fake_write() must be called more-or-less in
208 order or this will deadlock due to the main write thread waiting for
209 a frame that never arrives because we're waiting here.
211 _full_condition.wait (lock);
214 size_t const reel = video_reel (frame);
215 Frame const reel_frame = frame - _reels[reel].start ();
217 FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
219 throw ReadFileError (_film->info_file(_reels[reel].period()));
221 dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
225 qi.type = QueueItem::FAKE;
228 qi.frame = reel_frame;
229 if (_film->three_d() && eyes == EYES_BOTH) {
231 _queue.push_back (qi);
232 qi.eyes = EYES_RIGHT;
233 _queue.push_back (qi);
236 _queue.push_back (qi);
239 /* Now there's something to do: wake anything wait()ing on _empty_condition */
240 _empty_condition.notify_all ();
243 /** Write some audio frames to the DCP.
244 * @param audio Audio data.
245 * @param time Time of this data within the DCP.
246 * This method is not thread safe.
249 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
251 DCPOMATIC_ASSERT (audio);
253 int const afr = _film->audio_frame_rate();
255 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
257 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
262 if (_audio_reel == _reels.end ()) {
263 /* This audio is off the end of the last reel; ignore it */
267 if (end <= _audio_reel->period().to) {
268 /* Easy case: we can write all the audio to this reel */
269 _audio_reel->write (audio);
272 /* Split the audio into two and write the first part */
273 DCPTime part_lengths[2] = {
274 _audio_reel->period().to - t,
275 end - _audio_reel->period().to
278 Frame part_frames[2] = {
279 part_lengths[0].frames_ceil(afr),
280 part_lengths[1].frames_ceil(afr)
283 if (part_frames[0]) {
284 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
285 part->copy_from (audio.get(), part_frames[0], 0, 0);
286 _audio_reel->write (part);
289 if (part_frames[1]) {
290 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
291 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
298 t += part_lengths[0];
303 /** This must be called from Writer::thread() with an appropriate lock held */
305 Writer::have_sequenced_image_at_queue_head ()
307 if (_queue.empty ()) {
313 QueueItem const & f = _queue.front();
314 ReelWriter const & reel = _reels[f.reel];
316 /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
318 if (f.eyes == EYES_BOTH) {
320 return f.frame == (reel.last_written_video_frame() + 1);
325 if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
329 if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
342 boost::mutex::scoped_lock lock (_state_mutex);
346 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
347 /* We've got something to do: go and do it */
351 /* Nothing to do: wait until something happens which may indicate that we do */
352 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
353 _empty_condition.wait (lock);
354 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
357 if (_finish && _queue.empty()) {
361 /* We stop here if we have been asked to finish, and if either the queue
362 is empty or we do not have a sequenced image at its head (if this is the
363 case we will never terminate as no new frames will be sent once
366 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
367 /* (Hopefully temporarily) log anything that was not written */
368 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
369 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
370 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
371 if (i->type == QueueItem::FULL) {
372 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
374 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
381 /* Write any frames that we can write; i.e. those that are in sequence. */
382 while (have_sequenced_image_at_queue_head ()) {
383 QueueItem qi = _queue.front ();
385 if (qi.type == QueueItem::FULL && qi.encoded) {
386 --_queued_full_in_memory;
391 ReelWriter& reel = _reels[qi.reel];
394 case QueueItem::FULL:
395 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
397 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
399 reel.write (qi.encoded, qi.frame, qi.eyes);
402 case QueueItem::FAKE:
403 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
404 reel.fake_write (qi.frame, qi.eyes, qi.size);
407 case QueueItem::REPEAT:
408 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
409 reel.repeat_write (qi.frame, qi.eyes);
415 _full_condition.notify_all ();
418 while (_queued_full_in_memory > _maximum_frames_in_memory) {
419 /* Too many frames in memory which can't yet be written to the stream.
420 Write some FULL frames to disk.
423 /* Find one from the back of the queue */
425 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
426 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
430 DCPOMATIC_ASSERT (i != _queue.rend());
432 /* For the log message below */
433 int const awaiting = _reels[_queue.front().reel].last_written_video_frame();
436 /* i is valid here, even though we don't hold a lock on the mutex,
437 since list iterators are unaffected by insertion and only this
438 thread could erase the last item in the list.
441 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
443 i->encoded->write_via_temp (
444 _film->j2c_path (i->reel, i->frame, i->eyes, true),
445 _film->j2c_path (i->reel, i->frame, i->eyes, false)
450 --_queued_full_in_memory;
451 _full_condition.notify_all ();
461 Writer::terminate_thread (bool can_throw)
463 boost::mutex::scoped_lock lock (_state_mutex);
469 _empty_condition.notify_all ();
470 _full_condition.notify_all ();
473 if (_thread->joinable ()) {
492 LOG_GENERAL_NC ("Terminating writer thread");
494 terminate_thread (true);
496 LOG_GENERAL_NC ("Finishing ReelWriters");
498 BOOST_FOREACH (ReelWriter& i, _reels) {
502 LOG_GENERAL_NC ("Writing XML");
504 dcp::DCP dcp (_film->dir (_film->dcp_name()));
506 shared_ptr<dcp::CPL> cpl (
509 _film->dcp_content_type()->libdcp_kind ()
515 /* Calculate digests for each reel in parallel */
517 shared_ptr<Job> job = _job.lock ();
518 job->sub (_("Computing digests"));
520 boost::asio::io_service service;
521 boost::thread_group pool;
523 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
525 int const threads = max (1, Config::instance()->master_encoding_threads ());
527 for (int i = 0; i < threads; ++i) {
528 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
531 BOOST_FOREACH (ReelWriter& i, _reels) {
532 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
533 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
540 /* Add reels to CPL */
542 BOOST_FOREACH (ReelWriter& i, _reels) {
543 cpl->add (i.create_reel (_reel_assets, _fonts));
546 dcp::XMLMetadata meta;
547 meta.annotation_text = cpl->annotation_text ();
548 meta.creator = Config::instance()->dcp_creator ();
549 if (meta.creator.empty ()) {
550 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
552 meta.issuer = Config::instance()->dcp_issuer ();
553 if (meta.issuer.empty ()) {
554 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
556 meta.set_issue_date_now ();
558 cpl->set_metadata (meta);
560 shared_ptr<const dcp::CertificateChain> signer;
561 if (_film->is_signed ()) {
562 signer = Config::instance()->signer_chain ();
563 /* We did check earlier, but check again here to be on the safe side */
565 if (!signer->valid (&reason)) {
566 throw InvalidSignerError (reason);
570 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
573 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
576 write_cover_sheet ();
580 Writer::write_cover_sheet ()
582 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
583 FILE* f = fopen_boost (cover, "w");
585 throw OpenFileError (cover, errno, false);
588 string text = Config::instance()->cover_sheet ();
589 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
590 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
591 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
592 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
593 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", _film->isdcf_metadata().subtitle_language);
595 boost::uintmax_t size = 0;
597 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
598 i != boost::filesystem::recursive_directory_iterator();
600 if (boost::filesystem::is_regular_file (i->path ())) {
601 size += boost::filesystem::file_size (i->path ());
605 if (size > (1000000000L)) {
606 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
608 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
611 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
612 string description = String::compose("%1.%2", ch.first, ch.second);
614 if (description == "0.0") {
615 description = _("None");
616 } else if (description == "1.0") {
617 description = _("Mono");
618 } else if (description == "2.0") {
619 description = _("Stereo");
621 boost::algorithm::replace_all (text, "$AUDIO", description);
624 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
626 if (h == 0 && m == 0) {
627 length = String::compose("%1s", s);
628 } else if (h == 0 && m > 0) {
629 length = String::compose("%1m%2s", m, s);
630 } else if (h > 0 && m > 0) {
631 length = String::compose("%1h%2m%3s", h, m, s);
634 boost::algorithm::replace_all (text, "$LENGTH", length);
636 fwrite (text.c_str(), 1, text.length(), f);
640 /** @param frame Frame index within the whole DCP.
641 * @return true if we can fake-write this frame.
644 Writer::can_fake_write (Frame frame) const
646 if (_film->encrypted()) {
647 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
651 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
652 parameters in the asset writer.
655 ReelWriter const & reel = _reels[video_reel(frame)];
657 /* Make frame relative to the start of the reel */
658 frame -= reel.start ();
659 return (frame != 0 && frame < reel.first_nonexistant_frame());
663 Writer::write (PlayerSubtitles subs, DCPTimePeriod period)
665 if (subs.text.empty ()) {
669 while (_subtitle_reel->period().to <= period.from) {
671 DCPOMATIC_ASSERT (_subtitle_reel != _reels.end());
674 DCPOMATIC_ASSERT (_subtitle_reel != _reels.end());
676 _subtitle_reel->write (subs);
680 Writer::write (list<shared_ptr<Font> > fonts)
682 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
684 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
686 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
693 _fonts.push_back (i);
699 operator< (QueueItem const & a, QueueItem const & b)
701 if (a.reel != b.reel) {
702 return a.reel < b.reel;
705 if (a.frame != b.frame) {
706 return a.frame < b.frame;
709 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
713 operator== (QueueItem const & a, QueueItem const & b)
715 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
719 Writer::set_encoder_threads (int threads)
721 boost::mutex::scoped_lock lm (_state_mutex);
722 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
723 _maximum_queue_size = threads * 16;
727 Writer::write (ReferencedReelAsset asset)
729 _reel_assets.push_back (asset);
733 Writer::video_reel (int frame) const
735 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
737 while (i < _reels.size() && !_reels[i].period().contains (t)) {
741 DCPOMATIC_ASSERT (i < _reels.size ());
746 Writer::set_digest_progress (Job* job, float progress)
748 /* I believe this is thread-safe */
749 _digest_progresses[boost::this_thread::get_id()] = progress;
751 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
752 float min_progress = FLT_MAX;
753 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
754 min_progress = min (min_progress, i->second);
757 job->set_progress (min_progress);