Skip to content

Commit

Permalink
Merge pull request #15 from schani-rs/revive
Browse files Browse the repository at this point in the history
Revive service
  • Loading branch information
ChristophWurst authored Jan 9, 2018
2 parents 9877428 + 1bb5cd3 commit 95a4770
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 221 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
AMQP_ADDRESS=127.0.0.1
LIBRARY=http://localhost:8002
STORE=http://localhost:8000
25 changes: 11 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
[package]
name = "schani_processor"
version = "0.1.0"
version = "0.2.0"
authors = ["Christoph Wurst <[email protected]>"]

[dependencies]
amq-protocol = "^0.16"
dotenv = "0.8.0"
dotenv = "0.10.1"
env_logger = "0.4.3"
futures = "^0.1"
hyper = "^0.10"
lapin-futures = "^0.8"
log = "^0.3"
futures = "0.1.17"
hyper = "^0.11"
lapin-async = "*"
lapin-futures = "0.10.0"
log = "0.3.8"
resolve = "0.1.2"
rocket = "0.2.8"
rocket_codegen = "0.2.8"
serde = "^0.9"
serde_derive = "^0.9"
serde_json="^0.9"
schani_library_client = { git = "https://github.com/schani-rs/schani_library_client.git" }
schani_store_client = { git = "https://github.com/schani-rs/schani_store_client.git" }
temporary = "0.6.3"
tokio-core = "0.1.7"
tokio-core = "0.1.10"
tokio-io = "^0.1"
tokio-timer = "^0.1"
url = "^1.4"
url = "^1.6"
90 changes: 0 additions & 90 deletions src/bin/processor.rs

This file was deleted.

6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ impl From<io::Error> for Error {
Error::IO(orig)
}
}

impl Into<io::Error> for Error {
fn into(self) -> io::Error {
io::Error::new(io::ErrorKind::Other, self)
}
}
2 changes: 1 addition & 1 deletion src/image_recognition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::Read;
use std::io::prelude::*;
use std::path::PathBuf;

use hyper::{self, header, method, Url};
use hyper::{self, header};
use hyper::client::Request;
use serde_json;

Expand Down
182 changes: 155 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,168 @@
extern crate dotenv;
extern crate env_logger;
extern crate futures;
extern crate hyper;
extern crate lapin_async;
extern crate lapin_futures as lapin;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate resolve;
extern crate schani_library_client;
extern crate schani_store_client;
extern crate temporary;
extern crate tokio_core;
extern crate url;

mod image_recognition;
pub mod error;
pub mod rawtherapee;
mod store;
// mod image_recognition;
mod error;
mod rawtherapee;
mod queue;

use std::env;
use std::fs::File;
use std::io;
use std::io::{BufReader, BufWriter};
use std::io::prelude::*;
use std::net;
use std::str;
use std::sync::Arc;

use futures::Future;
use hyper::Uri;
use schani_library_client::{Image, LibraryClient};
use schani_store_client::StoreClient;
use lapin_async::queue::Message;
use tokio_core::reactor::Core;

struct Config {
amqp_addr: String,
library_uri: Uri,
store_uri: Uri,
}

fn get_config() -> Config {
Config {
amqp_addr: env::var("AMQP_ADDRESS").expect("AMQP_ADDRESS must be set"),
library_uri: env::var("LIBRARY")
.expect("LIBRARY must be set")
.parse()
.expect("could not parse library URI"),
store_uri: env::var("STORE")
.expect("STORE must be set")
.parse()
.expect("could not parse store URI"),
}
}

pub fn run() {
let config = get_config();

// create the reactor
let core = Core::new().unwrap();
let host_addr = resolve::resolve_host(&config.amqp_addr)
.expect("could not lookup host")
.last()
.unwrap();
let addr = net::SocketAddr::new(host_addr, 5672);
println!("connecting to AMQP service at {}", host_addr);
let handle = core.handle();
let lib_client = Arc::new(LibraryClient::new(config.library_uri.clone(), &handle));
let store_client = Arc::new(StoreClient::new(config.store_uri.clone(), &handle));

queue::run(&addr, core, &|message: Message| {
info!("got message: {:?}", message);
let image_id = str::from_utf8(&message.data)
.unwrap()
.to_string()
.parse::<i32>()
.unwrap();
info!("image id: {:?}", image_id);

use rawtherapee::process_raw;
use store::{load_raw_file, upload_image_file};
use image_recognition::classify_image;
let store_client2 = store_client.clone();
let lib_client2 = lib_client.clone();
Box::new(
lib_client
.get_image(image_id)
.map_err(|_| io::Error::from(io::ErrorKind::Other))
.and_then(move |image: Image| {
trace!("processing image {} …", image.id);

use temporary::Directory;
// Create a temporary directory.
futures::lazy(move || temporary::Directory::new("raw_images"))
.join(futures::future::ok(image))
})
.and_then(move |(directory, image)| {
let raw_path = directory.join(image.id.to_string() + &".NEF".to_string());
let img_path = directory.join(image.id.to_string() + &".jpg".to_string());
info!("temporary path: {:?}", raw_path);
info!("image path: {:?}", img_path);
let raw_id = image.raw_id.clone().unwrap();
let sidecar_id = image.sidecar_id.clone().unwrap();

pub fn process_raw_image(image_id: i32) -> Result<(), error::Error> {
// Create a temporary directory.
info!("processing image {} …", image_id);
let directory = Directory::new("raw_images").unwrap();
store_client2
.get_raw_image(&raw_id)
.map_err(|_| io::Error::from(io::ErrorKind::Other))
.join(
store_client2
.get_sidecar(&sidecar_id)
.map_err(|_| io::Error::from(io::ErrorKind::Other)),
)
.and_then(move |(raw_data, sidecar)| {
info!(
"loaded raw file {:?} ({} bytes) and sidecar ({} bytes) for image {}",
raw_id,
raw_data.len(),
sidecar.len(),
image.id
);
let target_path = directory.into_path();
let file = File::create(&raw_path)?;
let mut buf_writer = BufWriter::new(file);
buf_writer.write_all(raw_data.as_slice())?;
info!("raw file written to disk");
let file = File::create(&img_path)?;
let mut buf_writer = BufWriter::new(file);
buf_writer.write_all(sidecar.as_slice())?;
info!("sidecar written to disk");

let tmp_path = directory.join(image_id.to_string() + &".NEF".to_string());
println!("{:?}", tmp_path);
let img_path = directory.join(image_id.to_string() + &".jpg".to_string());
let target_path = directory.into_path();
info!("start processing");
rawtherapee::process_raw(&raw_path, &target_path)?;

try!(load_raw_file(image_id, &tmp_path));
info!("loaded image {} …", image_id);
try!(process_raw(&tmp_path, &target_path));
info!("processed image {} …", image_id);
try!(upload_image_file(image_id, &target_path));
info!("uploaded image {} …", image_id);
try!(classify_image(&img_path));
let file = File::open(&img_path)?;
let mut buf_reader = BufReader::new(file);
let mut buf = vec![];
buf_reader.read_to_end(&mut buf)?;
info!("processed JPEG is {} bytes big", buf.len());

Ok(())
Ok((image, buf, store_client2, lib_client2))
})
.and_then(|(mut image, buf, store_client2, lib_client)| {
store_client2
.upload_image(buf)
.map_err(|_| io::Error::from(io::ErrorKind::Other))
.and_then(move |jpeg_id| {
// upload_image_file(image_id, &target_path)?;
info!("uploaded image and got ID {}", jpeg_id);
// update image
info!(
"updated image {} with new image id {}",
image.id, jpeg_id
);
image.image_id = Some(jpeg_id);
Ok((image, lib_client))
})
})
.and_then(|(image, lib_client)| {
lib_client
.update_image(image)
.map_err(|_| io::Error::from(io::ErrorKind::Other))
})
.map_err(|e| {
warn!("processing failed: {}", e);
e
})
})
.and_then(move |_| Ok(message)),
)
});
}
12 changes: 12 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
extern crate dotenv;
extern crate env_logger;
extern crate schani_processor;

use dotenv::dotenv;

fn main() {
env_logger::init().unwrap();
dotenv().ok();

schani_processor::run();
}
Loading

0 comments on commit 95a4770

Please sign in to comment.