/*
 *    Copyright 2004-2006 Intel Corporation
 * 
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 * 
 *        http://www.apache.org/licenses/LICENSE-2.0
 * 
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

#ifdef HAVE_CONFIG_H
#  include <config.h>
#endif

#include <sys/types.h>
#include <sys/stat.h>
#include <oasys/compat/inet_aton.h>
#include <oasys/compat/rpc.h>
#include <oasys/io/FileIOClient.h>
#include <oasys/io/NetUtils.h>
#include <oasys/util/Pointers.h>
#include <oasys/util/ScratchBuffer.h>
#include <oasys/util/XDRUtils.h>

#include "APIServer.h"
#include "bundling/APIBlockProcessor.h"
#include "bundling/Bundle.h"
#include "bundling/BundleEvent.h"
#include "bundling/BundleDaemon.h"
#include "bundling/BundleStatusReport.h"
#include "bundling/SDNV.h"
#include "bundling/GbofId.h"
#include "naming/EndpointID.h"
#include "cmd/APICommand.h"
#include "reg/APIRegistration.h"
#include "reg/RegistrationTable.h"
#include "routing/BundleRouter.h"
#include "storage/GlobalStore.h"

#ifndef MIN
#define MIN(x, y) ((x)<(y) ? (x) : (y))
#endif

namespace dtn {

//----------------------------------------------------------------------
APIServer::APIServer()
    : TCPServerThread("APIServer", "/dtn/apiserver")
{
    enabled_    = true;
    local_addr_ = htonl(INADDR_LOOPBACK);
    local_port_ = DTN_IPC_PORT;

    // override the defaults via environment variables, if given
    char *env;
    if ((env = getenv("DTNAPI_ADDR")) != NULL) {
        if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
        {
            log_err("DTNAPI_ADDR environment variable (%s) "
                    "not a valid ip address, using default of localhost",
                    env);
            // in case inet_aton touched it
            local_addr_ = htonl(INADDR_LOOPBACK);
        } else {
            log_debug("local address set to %s by DTNAPI_ADDR "
                      "environment variable", env);
        }
    }

    if ((env = getenv("DTNAPI_PORT")) != NULL) {
        char *end;
        u_int port = strtoul(env, &end, 10);
        if (*end != '\0' || port > 0xffff)
        {
            log_err("DTNAPI_PORT environment variable (%s) "
                    "not a valid ip port, using default of %d",
                    env, DTN_IPC_PORT);
            port = DTN_IPC_PORT;
        } else {
            log_debug("api port set to %s by DTNAPI_PORT "
                      "environment variable", env);
        }
        local_port_ = (u_int16_t)port;
    }

    if (local_addr_ != INADDR_ANY || local_port_ != 0) {
        log_debug("APIServer init (evironment set addr %s port %d)",
                  intoa(local_addr_), local_port_);
    } else {
        log_debug("APIServer init");
    }

    oasys::TclCommandInterp::instance()->reg(new APICommand(this));
}

//----------------------------------------------------------------------
void
APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
{
    APIClient* c = new APIClient(fd, addr, port);
    c->start();
}

//----------------------------------------------------------------------
APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port)
    : Thread("APIClient", DELETE_ON_EXIT),
      TCPClient(fd, addr, port, "/dtn/apiclient"),
      notifier_(logpath_)
{
    // note that we skip space for the message length and code/status
    xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
    xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);

    bindings_ = new APIRegistrationList();
}

//----------------------------------------------------------------------
APIClient::~APIClient()
{
    log_debug("client destroyed");
    delete_z(bindings_);
}

//----------------------------------------------------------------------
void
APIClient::close_session()
{
    TCPClient::close();

    APIRegistration* reg;
    while (! bindings_->empty()) {
        reg = bindings_->front();
        bindings_->pop_front();
        
        reg->set_active(false);

        if (reg->expired()) {
            log_debug("removing expired registration %d", reg->regid());
            BundleDaemon::post(new RegistrationExpiredEvent(reg));
        }
    }
}

//----------------------------------------------------------------------
int
APIClient::handle_handshake()
{
    u_int32_t handshake;
    u_int16_t message_type, ipc_version;
    
    int ret = readall((char*)&handshake, sizeof(handshake));
    if (ret != sizeof(handshake)) {
        log_err("error reading handshake: (got %d/%zu), \"error\" %s",
                ret, sizeof(handshake), strerror(errno));
        return -1;
    }

    message_type = ntohl(handshake) >> 16;
    ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);

    if (message_type != DTN_OPEN) {
        log_err("handshake (0x%x)'s message type %d != DTN_OPEN (%d)",
                handshake, message_type, DTN_OPEN);
        return -1;
    }
    
    // to handle version mismatch more cleanly, we re-build the
    // handshake word with our own version and send it back to inform
    // the client, then if there's a mismatch, close the channel
    handshake = htonl(DTN_OPEN << 16 | DTN_IPC_VERSION);
    
    ret = writeall((char*)&handshake, sizeof(handshake));
    if (ret != sizeof(handshake)) {
        log_err("error writing handshake: %s", strerror(errno));
        return -1;
    }

    if (ipc_version != DTN_IPC_VERSION) {
        log_err("handshake (0x%x)'s version %d != DTN_IPC_VERSION (%d)",
                handshake, ipc_version, DTN_IPC_VERSION);
        return -1;
    }

    return 0;
}

//----------------------------------------------------------------------
void
APIClient::run()
{
    int ret;
    u_int8_t type;
    u_int32_t len;
    
    log_info("new session %s:%d -> %s:%d",
             intoa(local_addr()), local_port(),
             intoa(remote_addr()), remote_port());

    if (handle_handshake() != 0) {
        close_session();
        return;
    }
    
    while (true) {
        xdr_setpos(&xdr_encode_, 0);
        xdr_setpos(&xdr_decode_, 0);

        // read the incoming message into the fourth byte of the
        // buffer, since the typecode + message length is only five
        // bytes long, but the XDR engines are set to point at the
        // eighth byte of the buffer
        ret = read(&buf_[3], DTN_MAX_API_MSG);
            
        if (ret <= 0) {
            log_warn("client disconnected without calling dtn_close");
            close_session();
            return;
        }
        
        if (ret < 5) {
            log_err("ack!! can't handle really short read...");
            close_session();
            return;
        }

        // NOTE: this protocol is duplicated in the implementation of
        // handle_begin_poll to take care of a cancel_poll request
        // coming in while the thread is waiting for bundles so any
        // modifications must be propagated there
        type = buf_[3];
        memcpy(&len, &buf_[4], sizeof(len));

        len = ntohl(len);

        ret -= 5;
        log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);

        // if we didn't get the whole message, loop to get the rest,
        // skipping the header bytes and the already-read amount
        if (ret < (int)len) {
            int toget = len - ret;
            if (readall(&buf_[8 + ret], toget) != toget) {
                log_err("error reading message remainder: %s",
                        strerror(errno));
                close_session();
                return;
            }
        }

        // dispatch to the handler routine
        switch(type) {
#define DISPATCH(_type, _fn)                    \
        case _type:                             \
            ret = _fn();                        \
            break;
            
            DISPATCH(DTN_LOCAL_EID,         handle_local_eid);
            DISPATCH(DTN_REGISTER,          handle_register);
            DISPATCH(DTN_UNREGISTER,        handle_unregister);
            DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
            DISPATCH(DTN_SEND,              handle_send);
            DISPATCH(DTN_CANCEL,            handle_cancel);
            DISPATCH(DTN_BIND,              handle_bind);
            DISPATCH(DTN_UNBIND,            handle_unbind);
            DISPATCH(DTN_RECV,              handle_recv);
            DISPATCH(DTN_BEGIN_POLL,        handle_begin_poll);
            DISPATCH(DTN_CANCEL_POLL,       handle_cancel_poll);
            DISPATCH(DTN_CLOSE,             handle_close);
#undef DISPATCH

        default:
            log_err("unknown message type code 0x%x", type);
            ret = DTN_EMSGTYPE;
            break;
        }

        // if the handler returned -1, then the session should be
        // immediately terminated
        if (ret == -1) {
            close_session();
            return;
        }
        
        // send the response
        if (send_response(ret) != 0) {
            return;
        }
        // if there was an IPC communication error or unknown message
        // type, close terminate the session
        // XXX/matt we could potentially close on all errors, not just these 2
        if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
            close_session();
            return;
        }
        
    } // while(1)
}

//----------------------------------------------------------------------
int
APIClient::send_response(int ret)
{
    u_int32_t len, msglen;
    
    // make sure the dispatched function returned a valid error
    // code
    ASSERT(ret == DTN_SUCCESS ||
           (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
        
    // fill in the reply message with the status code and the
    // length of the reply. note that if there is no reply, then
    // the xdr position should still be zero
    len = xdr_getpos(&xdr_encode_);
    log_debug("building reply: status %s, length %d",
              dtn_strerror(ret), len);

    msglen = len + 8;
    ret = ntohl(ret);
    len = htonl(len);

    memcpy(buf_,     &ret, sizeof(ret));
    memcpy(&buf_[4], &len, sizeof(len));

    log_debug("sending %d byte reply message", msglen);
    if (writeall(buf_, msglen) != (int)msglen) {
        log_err("error sending reply: %s", strerror(errno));
        close_session();
        return -1;
    }

    return 0;
}
        
//----------------------------------------------------------------------
bool
APIClient::is_bound(u_int32_t regid)
{
    APIRegistrationList::iterator iter;
    for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
        if ((*iter)->regid() == regid) {
            return true;
        }
    }

    return false;
}

//----------------------------------------------------------------------
int
APIClient::handle_local_eid()
{
    dtn_service_tag_t service_tag;
    dtn_endpoint_id_t local_eid;
    
    // unpack the request
    if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    // build up the response
    EndpointID eid(BundleDaemon::instance()->local_eid());
    if (eid.append_service_tag(service_tag.tag) == false) {
        log_err("error appending service tag");
        return DTN_EINVAL;
    }

    memset(&local_eid, 0, sizeof(local_eid));
    eid.copyto(&local_eid);
    
    // pack the response
    if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
        log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
        return DTN_EXDR;
    }

    log_debug("get_local_eid encoded %d byte response",
              xdr_getpos(&xdr_encode_));
    
    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_register()
{
    APIRegistration* reg;
    Registration::failure_action_t action;
    EndpointIDPattern endpoint;
    std::string script;
    
    dtn_reg_info_t reginfo;

    memset(&reginfo, 0, sizeof(reginfo));
    
    // unpack and parse the request
    if (!xdr_dtn_reg_info_t(&xdr_decode_, &reginfo))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    // make sure we free any dynamically-allocated bits in the
    // incoming structure before we exit the proc
    oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)&reginfo);
    
    endpoint.assign(&reginfo.endpoint);

    if (!endpoint.valid()) {
        log_err("invalid endpoint id in register: '%s'",
                reginfo.endpoint.uri);
        return DTN_EINVAL;
    }

    switch (reginfo.failure_action) {
    case DTN_REG_DEFER: action = Registration::DEFER; break;
    case DTN_REG_DROP:  action = Registration::DROP;  break;
    case DTN_REG_EXEC:  action = Registration::EXEC;  break;
    default: {
        log_err("invalid failure action code 0x%x", reginfo.failure_action);
        return DTN_EINVAL;
    }
    }

    if (action == Registration::EXEC) {
        script.assign(reginfo.script.script_val, reginfo.script.script_len);
    }

    u_int32_t regid = GlobalStore::instance()->next_regid();
    reg = new APIRegistration(regid, endpoint, action,
                              reginfo.expiration, script);

    if (! reginfo.init_passive) {
        // store the registration in the list for this session
        bindings_->push_back(reg);
        reg->set_active(true);
    }
    
    BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
                                &notifier_);
    
    // fill the response with the new registration id
    if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
        log_err("internal error in xdr: xdr_dtn_reg_id_t");
        return DTN_EXDR;
    }
    
    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_unregister()
{
    Registration* reg;
    dtn_reg_id_t regid;
    
    // unpack and parse the request
    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    reg = BundleDaemon::instance()->reg_table()->get(regid);
    if (reg == NULL) {
        return DTN_ENOTFOUND;
    }

    // handle the special case in which we're unregistering a
    // currently bound registration, in which we actually leave it
    // around in the expired state, soit will be cleaned up when the
    // application either calls dtn_unbind() or closes the api socket
    if (is_bound(reg->regid()) && reg->active()) {
        if (reg->expired()) {
            return DTN_EINVAL;
        }
        
        reg->force_expire();
        ASSERT(reg->expired());
        return DTN_SUCCESS;
    }

    // otherwise it's an error to call unregister on a registration
    // that's in-use by someone else
    if (reg->active()) {
        return DTN_EBUSY;
    }

    BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
                                &notifier_);
    
    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_find_registration()
{
    Registration* reg;
    EndpointIDPattern endpoint;
    dtn_endpoint_id_t app_eid;

    // unpack and parse the request
    if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    endpoint.assign(&app_eid);
    if (!endpoint.valid()) {
        log_err("invalid endpoint id in find_registration: '%s'",
                app_eid.uri);
        return DTN_EINVAL;
    }

    reg = BundleDaemon::instance()->reg_table()->get(endpoint);
    if (reg == NULL) {
        return DTN_ENOTFOUND;
    }

    u_int32_t regid = reg->regid();
    
    // fill the response with the new registration id
    if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
        log_err("internal error in xdr: xdr_dtn_reg_id_t");
        return DTN_EXDR;
    }
    
    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_bind()
{
    dtn_reg_id_t regid;

    // unpack the request
    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    // look up the registration
    const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
    Registration* reg = regtable->get(regid);

    if (!reg) {
        log_err("can't find registration %d", regid);
        return DTN_ENOTFOUND;
    }

    APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
    if (api_reg == NULL) {
        log_crit("registration %d is not an API registration!!",
                 regid);
        return DTN_ENOTFOUND;
    }

    if (api_reg->active()) {
        log_err("registration %d is already in active mode", regid);
        return DTN_EBUSY;
    }

    // store the registration in the list for this session
    bindings_->push_back(api_reg);
    api_reg->set_active(true);

    log_info("DTN_BIND: bound to registration %d", reg->regid());
    
    return DTN_SUCCESS;
}
    
//----------------------------------------------------------------------
int
APIClient::handle_unbind()
{
    dtn_reg_id_t regid;

    // unpack the request
    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    // look up the registration
    const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
    Registration* reg = regtable->get(regid);

    if (!reg) {
        log_err("can't find registration %d", regid);
        return DTN_ENOTFOUND;
    }

    APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
    if (api_reg == NULL) {
        log_crit("registration %d is not an API registration!!",
                 regid);
        return DTN_ENOTFOUND;
    }

    APIRegistrationList::iterator iter;
    for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
        if (*iter == api_reg) {
            bindings_->erase(iter);
            ASSERT(api_reg->active());
            api_reg->set_active(false);

            log_info("DTN_UNBIND: unbound from registration %d", regid);
            return DTN_SUCCESS;
        }
    }

    log_err("registration %d not bound to this api client", regid);
    return DTN_ENOTFOUND;
}
    
//----------------------------------------------------------------------
int
APIClient::handle_send()
{
    dtn_bundle_spec_t spec;
    dtn_bundle_payload_t payload;

    memset(&spec, 0, sizeof(spec));
    memset(&payload, 0, sizeof(payload));
    
    /* Unpack the arguments */
    if (!xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
        !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    BundleRef b("APIClient::handle_send");
    b = new Bundle();
    
    // make sure any xdr calls to malloc are cleaned up
    oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t,
                           (char*)&spec);
    oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t,
                           (char*)&payload);
    
    // assign the addressing fields
    b->source_.assign(&spec.source);
    b->dest_.assign(&spec.dest);
    if (spec.replyto.uri[0] == '\0') {
        b->replyto_.assign(EndpointID::NULL_EID());
    } else {
        b->replyto_.assign(&spec.replyto);
    }
    b->custodian_.assign(EndpointID::NULL_EID());

    // set the is_singleton bit, first checking if the application
    // specified a value, then seeing if the scheme is known and can
    // therefore determine for itself, and finally, checking the
    // global default
    if (spec.dopts & DOPTS_SINGLETON_DEST)
    {
        b->singleton_dest_ = true;
    }
    else if (spec.dopts & DOPTS_MULTINODE_DEST)
    {
        b->singleton_dest_ = false;
    }
    else 
    {
        EndpointID::singleton_info_t info;
        
        if (b->dest_.known_scheme()) {
            info = b->dest_.is_singleton();

            // all schemes must make a decision one way or the other
            ASSERT(info != EndpointID::UNKNOWN);
        } else {
            info = EndpointID::is_singleton_default_;
        }

        switch (info) {
        case EndpointID::UNKNOWN:
            log_err("bundle destination %s in unknown scheme and "
                    "app did not assert singleton/multipoint",
                    b->dest_.c_str());
            return DTN_EINVAL;

        case EndpointID::SINGLETON:
            b->singleton_dest_ = true;
            break;

        case EndpointID::MULTINODE:
            b->singleton_dest_ = false;
            break;
        }
    }
    
    // the priority code
    switch (spec.priority) {
#define COS(_cos) case _cos: b->priority_ = Bundle::_cos; break;
        COS(COS_BULK);
        COS(COS_NORMAL);
        COS(COS_EXPEDITED);
        COS(COS_RESERVED);
#undef COS
    default:
        log_err("invalid priority level %d", (int)spec.priority);
        return DTN_EINVAL;
    };
    
    // Bundles with a null source EID are not allowed to request reports or
    // custody transfer, and must not be fragmented.
    if (b->source_ == EndpointID::NULL_EID()) {
        if (spec.dopts) {
            log_err("bundle with null source EID requested reports and/or "
                    "custody transfer");
            return DTN_EINVAL;
        }
        
        b->do_not_fragment_ = true;
    }
    
    else {
        // The bundle's source EID must be either dtn:none or an EID registered
        // at this node.
        const RegistrationTable* reg_table = 
                BundleDaemon::instance()->reg_table();
        std::string base_reg_str = b->source_.uri().scheme() + "://" + 
                b->source_.uri().host();
        
        if (!reg_table->get(EndpointIDPattern(base_reg_str)) &&
            !reg_table->get(EndpointIDPattern(b->source_))) {
            log_err("this node is not a member of the bundle's source EID (%s)",
                    b->source_.str().c_str());
            return DTN_EINVAL;
        }
    }

    // delivery options
    if (spec.dopts & DOPTS_CUSTODY)
        b->custody_requested_ = true;
    
    if (spec.dopts & DOPTS_DELIVERY_RCPT)
        b->delivery_rcpt_ = true;

    if (spec.dopts & DOPTS_RECEIVE_RCPT)
        b->receive_rcpt_ = true;

    if (spec.dopts & DOPTS_FORWARD_RCPT)
        b->forward_rcpt_ = true;

    if (spec.dopts & DOPTS_CUSTODY_RCPT)
        b->custody_rcpt_ = true;

    if (spec.dopts & DOPTS_DELETE_RCPT)
        b->deletion_rcpt_ = true;

    if (spec.dopts & DOPTS_DO_NOT_FRAGMENT)
        b->do_not_fragment_ = true;

    // expiration time
    b->expiration_ = spec.expiration;

    // extension blocks
    for (u_int i = 0; i < spec.blocks.blocks_len; i++) {
        dtn_extension_block_t* block = &spec.blocks.blocks_val[i];

        BlockInfo* info =
            b->api_blocks_.append_block(APIBlockProcessor::instance());
        APIBlockProcessor::instance()->
            init_block(info, block->type, block->flags,
                       (u_char*)block->data.data_val,
                       block->data.data_len);
    }

    // metadata blocks
    for (unsigned int i = 0; i < spec.metadata.metadata_len; ++i) {
        dtn_extension_block_t* block = &spec.metadata.metadata_val[i];

        LinkRef null_link("APIServer::handle_send");
        MetadataVec * vec = b->generated_metadata_.find_blocks(null_link);
        if (vec == NULL) {
            vec = b->generated_metadata_.create_blocks(null_link);
        }
        ASSERT(vec != NULL);

        MetadataBlock * meta_block = new MetadataBlock(
                                             (u_int64_t)block->type,
                                             (u_char *)block->data.data_val,
                                             (u_int32_t)block->data.data_len);
        meta_block->set_flags((u_int64_t)block->flags);

        // XXX/demmer currently this block needs to be stuck on the
        // outgoing metadata for the null link (so it's transmit to
        // all destinations) as well as on the recv_metadata vector so
        // it's conveyed to local applications. this should really be
        // cleaned up...
        vec->push_back(meta_block);
        b->recv_metadata_.push_back(meta_block);
    }

    // validate the bundle metadata
    oasys::StringBuffer error;
    if (!b->validate(&error)) {
        log_err("bundle validation failed: %s", error.data());
        return DTN_EINVAL;
    }
    
    // set up the payload, including calculating its length, but don't
    // copy it in yet
    size_t payload_len;
    char filename[PATH_MAX];

    switch (payload.location) {
    case DTN_PAYLOAD_MEM:
        payload_len = payload.buf.buf_len;
        break;
        
    case DTN_PAYLOAD_FILE:
    case DTN_PAYLOAD_TEMP_FILE:
        struct stat finfo;
        sprintf(filename, "%.*s", 
                (int)payload.filename.filename_len,
                payload.filename.filename_val);

        if (stat(filename, &finfo) != 0)
        {
            log_err("payload file %s does not exist!", filename);
            return DTN_EINVAL;
        }
        
        payload_len = finfo.st_size;
        break;

    default:
        log_err("payload.location of %d unknown", payload.location);
        return DTN_EINVAL;
    }
    
    b->payload_.set_length(payload_len);

    // before filling in the payload, we first probe the router to
    // determine if there's sufficient storage for the bundle
    bool result;
    int  reason;
    BundleDaemon::post_and_wait(
        new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason),
        &notifier_);

    if (!result) {
        log_info("DTN_SEND bundle not accepted: reason %s",
                 BundleStatusReport::reason_to_str(reason));

        switch (reason) {
        case BundleProtocol::REASON_DEPLETED_STORAGE:
            return DTN_ENOSPACE;
        default:
            return DTN_EINTERNAL;
        }
    }

    switch (payload.location) {
    case DTN_PAYLOAD_MEM:
        b->payload_.set_data((u_char*)payload.buf.buf_val,
                             payload.buf.buf_len);
        break;
        
    case DTN_PAYLOAD_FILE:
        FILE* file;
        int r, left;
        u_char buffer[4096];
        size_t offset;

        if ((file = fopen(filename, "r")) == NULL)
        {
            log_err("payload file %s can't be opened!", filename);
            return DTN_EINVAL;
        }
        
        left = payload_len;
        r = 0;
        offset = 0;
        while (left > 0)
        {
            r = fread(buffer, 1, (left>4096)?4096:left, file);
            
            if (r)
            {
                b->payload_.write_data(buffer, offset, r);
                left   -= r;
                offset += r;
            }
            else
            {
                sleep(1); // pause before re-reading
            }
        }

        fclose(file);
        break;
        
    case DTN_PAYLOAD_TEMP_FILE:
        if (! b->payload_.replace_with_file(filename)) {
            log_err("payload file %s can't be linked or copied",
                    filename);
            return DTN_EINVAL;
        }
        
        if (::unlink(filename) != 0) {
            log_err("error unlinking payload temp file: %s",
                    strerror(errno));
            // continue on since this is non-fatal
        }
    }

    //  before posting the received event, fill in the bundle id struct
    dtn_bundle_id_t id;
    memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
    id.creation_ts.secs  = b->creation_ts_.seconds_;
    id.creation_ts.seqno = b->creation_ts_.seqno_;
    id.frag_offset = 0;
    id.orig_length = 0;
    
    log_info("DTN_SEND bundle *%p", b.object());

    // deliver the bundle
    // Note: the bundle state may change once it has been posted
    BundleDaemon::post_and_wait(new BundleReceivedEvent(b.object(), EVENTSRC_APP),
                                &notifier_);

    // return the bundle id struct
    if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
        log_err("internal error in xdr: xdr_dtn_bundle_id_t");
        return DTN_EXDR;
    }
    
    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_cancel()
{
    dtn_bundle_id_t id;

    memset(&id, 0, sizeof(id));
    
    /* Unpack the arguments */
    if (!xdr_dtn_bundle_id_t(&xdr_decode_, &id))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }
    
    GbofId gbof_id;
    gbof_id.source_ = EndpointID( std::string(id.source.uri) );
    gbof_id.creation_ts_.seconds_ = id.creation_ts.secs;
    gbof_id.creation_ts_.seqno_ = id.creation_ts.seqno;
    gbof_id.is_fragment_ = (id.orig_length > 0);
    gbof_id.frag_length_ = id.orig_length;
    gbof_id.frag_offset_ = id.frag_offset;
    
    BundleRef bundle;
    oasys::ScopeLock pending_lock(
        BundleDaemon::instance()->pending_bundles()->lock(), "handle_cancel");
    bundle = BundleDaemon::instance()->pending_bundles()->find(gbof_id);
    
    if (!bundle.object()) {
        log_warn("no bundle matching [%s]; cannot cancel", 
                 gbof_id.str().c_str());
        return DTN_ENOTFOUND;
    }
    
    log_info("DTN_CANCEL bundle *%p", bundle.object());
    
    BundleDaemon::post(new BundleCancelRequest(bundle, std::string()));
    return DTN_SUCCESS;
}

// Size for temporary memory buffer used when delivering bundles
// via files.
#define DTN_FILE_DELIVERY_BUF_SIZE 1000

//----------------------------------------------------------------------
int
APIClient::handle_recv()
{
    dtn_bundle_spec_t             spec;
    dtn_bundle_payload_t          payload;
    dtn_bundle_payload_location_t location;
    dtn_bundle_status_report_t    status_report;
    dtn_timeval_t                 timeout;
    oasys::ScratchBuffer<u_char*> buf;
    APIRegistration*              reg = NULL;
    bool                          sock_ready = false;
    oasys::FileIOClient           tmpfile;

    // unpack the arguments
    if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
        (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }
    
    int err = wait_for_bundle("recv", timeout, &reg, &sock_ready);
    if (err != 0) {
        return err;
    }
    
    // if there's data on the socket, that either means the socket was
    // closed by an exiting application or the app is violating the
    // protocol...
    if (sock_ready) {
        log_debug("handle_recv: api socket ready -- trying to read one byte");
        char b;
        if (read(&b, 1) != 0) {
            log_err("handle_recv: protocol error -- "
                    "data arrived or error while blocked in recv");
            return DTN_ECOMM;
        }

        log_info("IPC socket closed while blocked in read... "
                 "application must have exited");
        return -1;
    }
    
    BundleRef bref("APIClient::handle_recv");
    bref = reg->bundle_list()->pop_front();
    Bundle* b = bref.object();
    ASSERT(b != NULL);
    
    log_debug("handle_recv: popped *%p for registration %d (timeout %d)",
              b, reg->regid(), timeout);
    
    memset(&spec, 0, sizeof(spec));
    memset(&payload, 0, sizeof(payload));
    memset(&status_report, 0, sizeof(status_report));

    // copyto will malloc string buffer space that needs to be freed
    // at the end of the fn
    b->source_.copyto(&spec.source);
    b->dest_.copyto(&spec.dest);
    b->replyto_.copyto(&spec.replyto);

    spec.dopts = 0;
    if (b->custody_requested_) spec.dopts |= DOPTS_CUSTODY;
    if (b->delivery_rcpt_)     spec.dopts |= DOPTS_DELIVERY_RCPT;
    if (b->receive_rcpt_)      spec.dopts |= DOPTS_RECEIVE_RCPT;
    if (b->forward_rcpt_)      spec.dopts |= DOPTS_FORWARD_RCPT;
    if (b->custody_rcpt_)      spec.dopts |= DOPTS_CUSTODY_RCPT;
    if (b->deletion_rcpt_)     spec.dopts |= DOPTS_DELETE_RCPT;

    spec.expiration = b->expiration_;
    spec.creation_ts.secs = b->creation_ts_.seconds_;
    spec.creation_ts.seqno = b->creation_ts_.seqno_;

    // copy extension blocks
    unsigned int blocks_found = 0;
    unsigned int data_len = 0;
    for (unsigned int i = 0; i < b->recv_blocks_.size(); ++i) {
        if ((b->recv_blocks_[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
            (b->recv_blocks_[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
            (b->recv_blocks_[i].type() == BundleProtocol::METADATA_BLOCK)) {
            continue;
        }
        blocks_found++;
        data_len += b->recv_blocks_[i].data_length();
    }

    if (blocks_found > 0) {
        unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
                               data_len;
        void * buf = malloc(buf_len);
        memset(buf, 0, buf_len);

        dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
        char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
        for (unsigned int i = 0; i < b->recv_blocks_.size(); ++i) {
            if ((b->recv_blocks_[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
                (b->recv_blocks_[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
                (b->recv_blocks_[i].type() == BundleProtocol::METADATA_BLOCK)) {
                continue;
            }

            bp->type          = b->recv_blocks_[i].type();
            bp->flags         = b->recv_blocks_[i].flags();
            bp->data.data_len = b->recv_blocks_[i].data_length();
            bp->data.data_val = dp;
            memcpy(dp, b->recv_blocks_[i].data(), bp->data.data_len);

            bp++;
            dp += bp->data.data_len;
        }

        spec.blocks.blocks_len = blocks_found;
        spec.blocks.blocks_val = (dtn_extension_block_t *)buf;
    }

    // copy metadata extension blocks
    blocks_found = 0;
    data_len = 0;
    for (unsigned int i = 0; i < b->recv_metadata_.size(); ++i) {
        blocks_found++;
        data_len += b->recv_metadata_[i]->metadata_len();
    }

    if (blocks_found > 0) {
        unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
                               data_len;
        void * buf = (char *)malloc(buf_len);
        memset(buf, 0, buf_len);

        dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
        char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
        for (unsigned int i = 0; i < b->recv_metadata_.size(); ++i) {
            bp->type          = b->recv_metadata_[i]->ontology();
            bp->flags         = b->recv_metadata_[i]->flags();
            bp->data.data_len = b->recv_metadata_[i]->metadata_len();
            bp->data.data_val = dp;
            memcpy(dp, b->recv_metadata_[i]->metadata(), bp->data.data_len);
            dp += bp->data.data_len;
            bp++;
        }

        spec.metadata.metadata_len = blocks_found;
        spec.metadata.metadata_val = (dtn_extension_block_t *)buf;
    }

    // XXX/demmer verify bundle size constraints
    payload.location = location;
    
    if (location == DTN_PAYLOAD_MEM) {
        // the app wants the payload in memory
        // XXX/demmer verify bounds

        size_t payload_len = b->payload_.length();
        payload.buf.buf_len = payload_len;
        if (payload_len != 0) {
            buf.reserve(payload_len);
            payload.buf.buf_val =
                (char*)b->payload_.read_data(0, payload_len, buf.buf());
        } else {
            payload.buf.buf_val = 0;
        }
        
    } else if (location == DTN_PAYLOAD_FILE) {
        char *tdir, templ[64];

        // XXX/demmer do this with a hard link

        tdir = getenv("TMP");
        if (tdir == NULL) {
            tdir = getenv("TEMP");
        }
        if (tdir == NULL) {
            tdir = "/tmp";
        }

        snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);

        if (tmpfile.mkstemp(templ) == -1) {
            log_err("can't open temporary file to deliver bundle");
            return DTN_EINTERNAL;
        }
        
        if (chmod(tmpfile.path(), 0666) < 0) {
            log_warn("can't set the permission of temp file to 0666: %s",
                     strerror(errno));
        }
        
        if (b->payload_.location() == BundlePayload::MEMORY) {
            tmpfile.writeall((char*)b->payload_.memory_buf()->buf(),
                             b->payload_.length());
            
        } else {
            b->payload_.copy_file(&tmpfile);
        }

        payload.filename.filename_val = (char*)tmpfile.path();
        payload.filename.filename_len = tmpfile.path_len() + 1;
        tmpfile.close();
        
    } else {
        log_err("payload location %d not understood", location);
        return DTN_EINVAL;
    }
    
    /*
     * If the bundle is a status report, parse it and copy out the
     * data into the status report.
     */
    BundleStatusReport::data_t sr_data;
    if (BundleStatusReport::parse_status_report(&sr_data, b))
    {
        payload.status_report = &status_report;
        sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source);
        status_report.bundle_id.creation_ts.secs =
            sr_data.orig_creation_tv_.seconds_;
        status_report.bundle_id.creation_ts.seqno =
            sr_data.orig_creation_tv_.seqno_;
        status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_;
        status_report.bundle_id.orig_length = sr_data.orig_frag_length_;

        status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_;
        status_report.flags =  (dtn_status_report_flags_t)sr_data.status_flags_;

        status_report.receipt_ts.secs     = sr_data.receipt_tv_.seconds_;
        status_report.receipt_ts.seqno    = sr_data.receipt_tv_.seqno_;
        status_report.custody_ts.secs     = sr_data.custody_tv_.seconds_;
        status_report.custody_ts.seqno    = sr_data.custody_tv_.seqno_;
        status_report.forwarding_ts.secs  = sr_data.forwarding_tv_.seconds_;
        status_report.forwarding_ts.seqno = sr_data.forwarding_tv_.seqno_;
        status_report.delivery_ts.secs    = sr_data.delivery_tv_.seconds_;
        status_report.delivery_ts.seqno   = sr_data.delivery_tv_.seqno_;
        status_report.deletion_ts.secs    = sr_data.deletion_tv_.seconds_;
        status_report.deletion_ts.seqno   = sr_data.deletion_tv_.seqno_;
        status_report.ack_by_app_ts.secs  = sr_data.ack_by_app_tv_.seconds_;
        status_report.ack_by_app_ts.seqno = sr_data.ack_by_app_tv_.seqno_;
    }
    
    if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
    {
        log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
        return DTN_EXDR;
    }
    
    if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
    {
        log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
        return DTN_EXDR;
    }

    // prevent xdr_free of non-malloc'd pointer
    payload.status_report = NULL;
    
    log_info("DTN_RECV: "
             "successfully delivered bundle %d to registration %d",
             b->bundleid_, reg->regid());
    
    BundleDaemon::post(new BundleDeliveredEvent(b, reg));

    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_begin_poll()
{
    dtn_timeval_t    timeout;
    APIRegistration* reg = NULL;
    bool             sock_ready = false;
    
    // unpack the arguments
    if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
    {
        log_err("error in xdr unpacking arguments");
        return DTN_EXDR;
    }

    int err = wait_for_bundle("poll", timeout, &reg, &sock_ready);
    if (err != 0) {
        return err;
    }

    // if there's data on the socket, then the application either quit
    // and closed the socket, or called dtn_poll_cancel
    if (sock_ready) {
        log_debug("handle_begin_poll: "
                  "api socket ready -- trying to read one byte");
        char type;
        
        int ret = read(&type, 1);
        if (ret == 0) {
            log_info("IPC socket closed while blocked in read... "
                     "application must have exited");
            return -1;
        }

        if (ret == -1) {
            log_err("handle_begin_poll: protocol error -- "
                    "error while blocked in poll");
            return DTN_ECOMM;
        }

        if (type != DTN_CANCEL_POLL) {
            log_err("handle_poll: error got unexpected message '%s' "
                    "while blocked in poll", dtnipc_msgtoa(type));
            return DTN_ECOMM;
        }

        // read in the length which must be zero
        u_int32_t len;
        ret = read((char*)&len, 4);
        if (ret != 4 || len != 0) {
            log_err("handle_begin_poll: protocol error -- "
                    "error getting cancel poll length");
            return DTN_ECOMM;
        }

        // immediately send the response to the poll cancel, then
        // we return from the handler which will follow it with the
        // response code to the original poll request
        send_response(DTN_SUCCESS);
    }
    
    return DTN_SUCCESS;
}

//----------------------------------------------------------------------
int
APIClient::handle_cancel_poll()
{
    // the only reason we should get in here is if the call to
    // dtn_begin_poll() returned but the app still called cancel_poll
    // and so the messages crossed. but, since there's nothing wrong
    // with this, we just return success in both cases
    
    return DTN_SUCCESS;
}
        
//----------------------------------------------------------------------
int
APIClient::wait_for_bundle(const char* operation, dtn_timeval_t dtn_timeout,
                           APIRegistration** regp, bool* sock_ready)
{
    APIRegistration* reg;
    
    if (bindings_->empty()) {
        log_err("wait_for_bundle(%s): no bound registrations", operation);
        return DTN_EINVAL;
    }

    int timeout = (int)dtn_timeout;
    if (timeout < -1) {
        log_err("wait_for_bundle(%s): "
                "invalid timeout value %d", operation, timeout);
        return DTN_EINVAL;
    }

    // try to optimize by using a statically sized pollfds array,
    // otherwise we need to malloc the array.
    //
    // XXX/demmer this would be cleaner by tweaking the
    // StaticScratchBuffer class to be handle arrays of arbitrary
    // sized structs
    struct pollfd static_pollfds[64];
    struct pollfd* pollfds;
    oasys::ScopeMalloc pollfd_malloc;
    size_t npollfds = 1 + bindings_->size();
    if (npollfds <= 64) {
        pollfds = &static_pollfds[0];
    } else {
        pollfds = (struct pollfd*)malloc(npollfds * sizeof(struct pollfd));
        pollfd_malloc = pollfds;
    }
    
    struct pollfd* sock_poll = &pollfds[0];
    sock_poll->fd            = TCPClient::fd_;
    sock_poll->events        = POLLIN | POLLERR;
    sock_poll->revents       = 0;

    // loop through all the registrations -- if one has bundles on its
    // list, we don't need to poll, just return it immediately.
    // otherwise we'll need to poll it
    APIRegistrationList::iterator iter;
    int i = 1;
    for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
        reg = *iter;
            
        if (! reg->bundle_list()->empty()) {
            log_debug("wait_for_bundle(%s): "
                      "immediately returning bundle for reg %d",
                      operation, reg->regid());
            *regp = reg;
            return 0;
        }
        
        pollfds[i].fd      = reg->bundle_list()->notifier()->read_fd();
        pollfds[i].events  = POLLIN;
        pollfds[i].revents = 0;
        ++i;
    }

    log_debug("wait_for_bundle(%s): "
              "blocking to get bundles from %zu registrations (timeout %d)",
              operation, bindings_->size(), timeout);
    int nready = oasys::IO::poll_multiple(&pollfds[0], npollfds, timeout,
                                          NULL, logpath_);

    if (nready == oasys::IOTIMEOUT) {
        log_debug("wait_for_bundle(%s): timeout waiting for bundle",
                  operation);
        return DTN_ETIMEOUT;

    } else if (nready <= 0) {
        log_err("wait_for_bundle(%s): unexpected error polling for bundle",
                operation);
        return DTN_EINTERNAL;
    }

    // if there's data on the socket, that either means the socket was
    // closed by an exiting application or the app is violating the
    // protocol...
    if (sock_poll->revents != 0) {
        *sock_ready = true;
        return 0;
    }

    // otherwise, there should be data on one (or more) bundle lists, so
    // scan the list to find the first one.
    *regp = NULL;
    for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
        reg = *iter;
        if (! reg->bundle_list()->empty()) {
            *regp = reg;
            break;
        }
    }

    if (*regp == NULL) {
        log_err("wait_for_bundle(%s): error -- no lists have any bundles",
                operation);
        return DTN_EINTERNAL;
    }
    
    return 0;
}

//----------------------------------------------------------------------
int
APIClient::handle_close()
{
    log_info("received DTN_CLOSE message; closing API handle");
    // return -1 to force the session to close:
    return -1;
}

} // namespace dtn

